Show More
@@ -0,0 +1,27 b'' | |||||
|
1 | import logging | |||
|
2 | from logging import INFO, DEBUG, WARN, ERROR, FATAL | |||
|
3 | ||||
|
4 | import zmq | |||
|
5 | from zmq.log.handlers import PUBHandler | |||
|
6 | ||||
|
7 | class EnginePUBHandler(PUBHandler): | |||
|
8 | """A simple PUBHandler subclass that sets root_topic""" | |||
|
9 | engine=None | |||
|
10 | ||||
|
11 | def __init__(self, engine, *args, **kwargs): | |||
|
12 | PUBHandler.__init__(self,*args, **kwargs) | |||
|
13 | self.engine = engine | |||
|
14 | ||||
|
15 | @property | |||
|
16 | def root_topic(self): | |||
|
17 | """this is a property, in case the handler is created | |||
|
18 | before the engine gets registered with an id""" | |||
|
19 | if isinstance(getattr(self.engine, 'id', None), int): | |||
|
20 | return "engine.%i"%self.engine.id | |||
|
21 | else: | |||
|
22 | return "engine" | |||
|
23 | ||||
|
24 | ||||
|
25 | logger = logging.getLogger('ipzmq') | |||
|
26 | logger.setLevel(logging.DEBUG) | |||
|
27 |
@@ -0,0 +1,167 b'' | |||||
|
1 | # encoding: utf-8 | |||
|
2 | # -*- test-case-name: IPython.kernel.test.test_newserialized -*- | |||
|
3 | ||||
|
4 | """Refactored serialization classes and interfaces.""" | |||
|
5 | ||||
|
6 | __docformat__ = "restructuredtext en" | |||
|
7 | ||||
|
8 | # Tell nose to skip this module | |||
|
9 | __test__ = {} | |||
|
10 | ||||
|
11 | #------------------------------------------------------------------------------- | |||
|
12 | # Copyright (C) 2008 The IPython Development Team | |||
|
13 | # | |||
|
14 | # Distributed under the terms of the BSD License. The full license is in | |||
|
15 | # the file COPYING, distributed as part of this software. | |||
|
16 | #------------------------------------------------------------------------------- | |||
|
17 | ||||
|
18 | #------------------------------------------------------------------------------- | |||
|
19 | # Imports | |||
|
20 | #------------------------------------------------------------------------------- | |||
|
21 | ||||
|
22 | import cPickle as pickle | |||
|
23 | ||||
|
24 | # from twisted.python import components | |||
|
25 | # from zope.interface import Interface, implements | |||
|
26 | ||||
|
27 | try: | |||
|
28 | import numpy | |||
|
29 | except ImportError: | |||
|
30 | pass | |||
|
31 | ||||
|
32 | from IPython.kernel.error import SerializationError | |||
|
33 | ||||
|
34 | #----------------------------------------------------------------------------- | |||
|
35 | # Classes and functions | |||
|
36 | #----------------------------------------------------------------------------- | |||
|
37 | ||||
|
38 | class ISerialized: | |||
|
39 | ||||
|
40 | def getData(): | |||
|
41 | """""" | |||
|
42 | ||||
|
43 | def getDataSize(units=10.0**6): | |||
|
44 | """""" | |||
|
45 | ||||
|
46 | def getTypeDescriptor(): | |||
|
47 | """""" | |||
|
48 | ||||
|
49 | def getMetadata(): | |||
|
50 | """""" | |||
|
51 | ||||
|
52 | ||||
|
53 | class IUnSerialized: | |||
|
54 | ||||
|
55 | def getObject(): | |||
|
56 | """""" | |||
|
57 | ||||
|
58 | class Serialized(object): | |||
|
59 | ||||
|
60 | # implements(ISerialized) | |||
|
61 | ||||
|
62 | def __init__(self, data, typeDescriptor, metadata={}): | |||
|
63 | self.data = data | |||
|
64 | self.typeDescriptor = typeDescriptor | |||
|
65 | self.metadata = metadata | |||
|
66 | ||||
|
67 | def getData(self): | |||
|
68 | return self.data | |||
|
69 | ||||
|
70 | def getDataSize(self, units=10.0**6): | |||
|
71 | return len(self.data)/units | |||
|
72 | ||||
|
73 | def getTypeDescriptor(self): | |||
|
74 | return self.typeDescriptor | |||
|
75 | ||||
|
76 | def getMetadata(self): | |||
|
77 | return self.metadata | |||
|
78 | ||||
|
79 | ||||
|
80 | class UnSerialized(object): | |||
|
81 | ||||
|
82 | # implements(IUnSerialized) | |||
|
83 | ||||
|
84 | def __init__(self, obj): | |||
|
85 | self.obj = obj | |||
|
86 | ||||
|
87 | def getObject(self): | |||
|
88 | return self.obj | |||
|
89 | ||||
|
90 | ||||
|
91 | class SerializeIt(object): | |||
|
92 | ||||
|
93 | # implements(ISerialized) | |||
|
94 | ||||
|
95 | def __init__(self, unSerialized): | |||
|
96 | self.data = None | |||
|
97 | self.obj = unSerialized.getObject() | |||
|
98 | if globals().has_key('numpy') and isinstance(self.obj, numpy.ndarray): | |||
|
99 | if len(self.obj) == 0: # length 0 arrays can't be reconstructed | |||
|
100 | raise SerializationError("You cannot send a length 0 array") | |||
|
101 | self.obj = numpy.ascontiguousarray(self.obj, dtype=None) | |||
|
102 | self.typeDescriptor = 'ndarray' | |||
|
103 | self.metadata = {'shape':self.obj.shape, | |||
|
104 | 'dtype':self.obj.dtype.str} | |||
|
105 | elif isinstance(self.obj, str): | |||
|
106 | self.typeDescriptor = 'bytes' | |||
|
107 | self.metadata = {} | |||
|
108 | elif isinstance(self.obj, buffer): | |||
|
109 | self.typeDescriptor = 'buffer' | |||
|
110 | self.metadata = {} | |||
|
111 | else: | |||
|
112 | self.typeDescriptor = 'pickle' | |||
|
113 | self.metadata = {} | |||
|
114 | self._generateData() | |||
|
115 | ||||
|
116 | def _generateData(self): | |||
|
117 | if self.typeDescriptor == 'ndarray': | |||
|
118 | self.data = numpy.getbuffer(self.obj) | |||
|
119 | elif self.typeDescriptor in ('bytes', 'buffer'): | |||
|
120 | self.data = self.obj | |||
|
121 | elif self.typeDescriptor == 'pickle': | |||
|
122 | self.data = pickle.dumps(self.obj, 2) | |||
|
123 | else: | |||
|
124 | raise SerializationError("Really wierd serialization error.") | |||
|
125 | del self.obj | |||
|
126 | ||||
|
127 | def getData(self): | |||
|
128 | return self.data | |||
|
129 | ||||
|
130 | def getDataSize(self, units=10.0**6): | |||
|
131 | return 1.0*len(self.data)/units | |||
|
132 | ||||
|
133 | def getTypeDescriptor(self): | |||
|
134 | return self.typeDescriptor | |||
|
135 | ||||
|
136 | def getMetadata(self): | |||
|
137 | return self.metadata | |||
|
138 | ||||
|
139 | ||||
|
140 | class UnSerializeIt(UnSerialized): | |||
|
141 | ||||
|
142 | # implements(IUnSerialized) | |||
|
143 | ||||
|
144 | def __init__(self, serialized): | |||
|
145 | self.serialized = serialized | |||
|
146 | ||||
|
147 | def getObject(self): | |||
|
148 | typeDescriptor = self.serialized.getTypeDescriptor() | |||
|
149 | if globals().has_key('numpy') and typeDescriptor == 'ndarray': | |||
|
150 | result = numpy.frombuffer(self.serialized.getData(), dtype = self.serialized.metadata['dtype']) | |||
|
151 | result.shape = self.serialized.metadata['shape'] | |||
|
152 | # This is a hack to make the array writable. We are working with | |||
|
153 | # the numpy folks to address this issue. | |||
|
154 | result = result.copy() | |||
|
155 | elif typeDescriptor == 'pickle': | |||
|
156 | result = pickle.loads(self.serialized.getData()) | |||
|
157 | elif typeDescriptor in ('bytes', 'buffer'): | |||
|
158 | result = self.serialized.getData() | |||
|
159 | else: | |||
|
160 | raise SerializationError("Really wierd serialization error.") | |||
|
161 | return result | |||
|
162 | ||||
|
163 | def serialize(obj): | |||
|
164 | return SerializeIt(UnSerialized(obj)) | |||
|
165 | ||||
|
166 | def unserialize(serialized): | |||
|
167 | return UnSerializeIt(serialized).getObject() |
1 | NO CONTENT: new file 100644 |
|
NO CONTENT: new file 100644 |
This diff has been collapsed as it changes many lines, (562 lines changed) Show them Hide them | |||||
@@ -0,0 +1,562 b'' | |||||
|
1 | #!/usr/bin/env python | |||
|
2 | """A semi-synchronous Client for the ZMQ controller""" | |||
|
3 | ||||
|
4 | import time | |||
|
5 | import threading | |||
|
6 | ||||
|
7 | from functools import wraps | |||
|
8 | ||||
|
9 | from IPython.external.decorator import decorator | |||
|
10 | ||||
|
11 | import streamsession as ss | |||
|
12 | import zmq | |||
|
13 | ||||
|
14 | from remotenamespace import RemoteNamespace | |||
|
15 | from view import DirectView | |||
|
16 | ||||
|
17 | def _push(ns): | |||
|
18 | globals().update(ns) | |||
|
19 | ||||
|
20 | def _pull(keys): | |||
|
21 | g = globals() | |||
|
22 | if isinstance(keys, (list,tuple)): | |||
|
23 | return map(g.get, keys) | |||
|
24 | else: | |||
|
25 | return g.get(keys) | |||
|
26 | ||||
|
27 | def _clear(): | |||
|
28 | globals().clear() | |||
|
29 | ||||
|
30 | def execute(code): | |||
|
31 | exec code in globals() | |||
|
32 | ||||
|
33 | # decorators for methods: | |||
|
34 | @decorator | |||
|
35 | def spinfirst(f,self,*args,**kwargs): | |||
|
36 | self.spin() | |||
|
37 | return f(self, *args, **kwargs) | |||
|
38 | ||||
|
39 | @decorator | |||
|
40 | def defaultblock(f, self, *args, **kwargs): | |||
|
41 | block = kwargs.get('block',None) | |||
|
42 | block = self.block if block is None else block | |||
|
43 | saveblock = self.block | |||
|
44 | self.block = block | |||
|
45 | ret = f(self, *args, **kwargs) | |||
|
46 | self.block = saveblock | |||
|
47 | return ret | |||
|
48 | ||||
|
49 | ||||
|
50 | # @decorator | |||
|
51 | # def checktargets(f): | |||
|
52 | # @wraps(f) | |||
|
53 | # def checked_method(self, *args, **kwargs): | |||
|
54 | # self._build_targets(kwargs['targets']) | |||
|
55 | # return f(self, *args, **kwargs) | |||
|
56 | # return checked_method | |||
|
57 | ||||
|
58 | ||||
|
59 | # class _ZMQEventLoopThread(threading.Thread): | |||
|
60 | # | |||
|
61 | # def __init__(self, loop): | |||
|
62 | # self.loop = loop | |||
|
63 | # threading.Thread.__init__(self) | |||
|
64 | # | |||
|
65 | # def run(self): | |||
|
66 | # self.loop.start() | |||
|
67 | # | |||
|
68 | class Client(object): | |||
|
69 | """A semi-synchronous client to the IPython ZMQ controller | |||
|
70 | ||||
|
71 | Attributes | |||
|
72 | ---------- | |||
|
73 | ids : set | |||
|
74 | a set of engine IDs | |||
|
75 | requesting the ids attribute always synchronizes | |||
|
76 | the registration state. To request ids without synchronization, | |||
|
77 | use _ids | |||
|
78 | ||||
|
79 | history : list of msg_ids | |||
|
80 | a list of msg_ids, keeping track of all the execution | |||
|
81 | messages you have submitted | |||
|
82 | ||||
|
83 | outstanding : set of msg_ids | |||
|
84 | a set of msg_ids that have been submitted, but whose | |||
|
85 | results have not been received | |||
|
86 | ||||
|
87 | results : dict | |||
|
88 | a dict of all our results, keyed by msg_id | |||
|
89 | ||||
|
90 | block : bool | |||
|
91 | determines default behavior when block not specified | |||
|
92 | in execution methods | |||
|
93 | ||||
|
94 | Methods | |||
|
95 | ------- | |||
|
96 | spin : flushes incoming results and registration state changes | |||
|
97 | control methods spin, and requesting `ids` also ensures up to date | |||
|
98 | ||||
|
99 | barrier : wait on one or more msg_ids | |||
|
100 | ||||
|
101 | execution methods: apply/apply_bound/apply_to | |||
|
102 | legacy: execute, run | |||
|
103 | ||||
|
104 | control methods: queue_status, get_result | |||
|
105 | ||||
|
106 | """ | |||
|
107 | ||||
|
108 | ||||
|
109 | _connected=False | |||
|
110 | _engines=None | |||
|
111 | registration_socket=None | |||
|
112 | controller_socket=None | |||
|
113 | notification_socket=None | |||
|
114 | queue_socket=None | |||
|
115 | task_socket=None | |||
|
116 | block = False | |||
|
117 | outstanding=None | |||
|
118 | results = None | |||
|
119 | history = None | |||
|
120 | ||||
|
121 | def __init__(self, addr, context=None, username=None): | |||
|
122 | if context is None: | |||
|
123 | context = zmq.Context() | |||
|
124 | self.context = context | |||
|
125 | self.addr = addr | |||
|
126 | if username is None: | |||
|
127 | self.session = ss.StreamSession() | |||
|
128 | else: | |||
|
129 | self.session = ss.StreamSession(username) | |||
|
130 | self.registration_socket = self.context.socket(zmq.PAIR) | |||
|
131 | self.registration_socket.setsockopt(zmq.IDENTITY, self.session.session) | |||
|
132 | self.registration_socket.connect(addr) | |||
|
133 | self._engines = {} | |||
|
134 | self._ids = set() | |||
|
135 | self.outstanding=set() | |||
|
136 | self.results = {} | |||
|
137 | self.history = [] | |||
|
138 | self._connect() | |||
|
139 | ||||
|
140 | self._notification_handlers = {'registration_notification' : self._register_engine, | |||
|
141 | 'unregistration_notification' : self._unregister_engine, | |||
|
142 | } | |||
|
143 | self._queue_handlers = {'execute_reply' : self._handle_execute_reply, | |||
|
144 | 'apply_reply' : self._handle_apply_reply} | |||
|
145 | ||||
|
146 | ||||
|
147 | @property | |||
|
148 | def ids(self): | |||
|
149 | self._flush_notifications() | |||
|
150 | return self._ids | |||
|
151 | ||||
|
152 | def _update_engines(self, engines): | |||
|
153 | for k,v in engines.iteritems(): | |||
|
154 | eid = int(k) | |||
|
155 | self._engines[eid] = v | |||
|
156 | self._ids.add(eid) | |||
|
157 | ||||
|
158 | def _build_targets(self, targets): | |||
|
159 | if targets is None: | |||
|
160 | targets = self._ids | |||
|
161 | elif isinstance(targets, str): | |||
|
162 | if targets.lower() == 'all': | |||
|
163 | targets = self._ids | |||
|
164 | else: | |||
|
165 | raise TypeError("%r not valid str target, must be 'all'"%(targets)) | |||
|
166 | elif isinstance(targets, int): | |||
|
167 | targets = [targets] | |||
|
168 | return [self._engines[t] for t in targets], list(targets) | |||
|
169 | ||||
|
170 | def _connect(self): | |||
|
171 | """setup all our socket connections to the controller""" | |||
|
172 | if self._connected: | |||
|
173 | return | |||
|
174 | self._connected=True | |||
|
175 | self.session.send(self.registration_socket, 'connection_request') | |||
|
176 | msg = self.session.recv(self.registration_socket,mode=0)[-1] | |||
|
177 | msg = ss.Message(msg) | |||
|
178 | content = msg.content | |||
|
179 | if content.status == 'ok': | |||
|
180 | if content.queue: | |||
|
181 | self.queue_socket = self.context.socket(zmq.PAIR) | |||
|
182 | self.queue_socket.setsockopt(zmq.IDENTITY, self.session.session) | |||
|
183 | self.queue_socket.connect(content.queue) | |||
|
184 | if content.task: | |||
|
185 | self.task_socket = self.context.socket(zmq.PAIR) | |||
|
186 | self.task_socket.setsockopt(zmq.IDENTITY, self.session.session) | |||
|
187 | self.task_socket.connect(content.task) | |||
|
188 | if content.notification: | |||
|
189 | self.notification_socket = self.context.socket(zmq.SUB) | |||
|
190 | self.notification_socket.connect(content.notification) | |||
|
191 | self.notification_socket.setsockopt(zmq.SUBSCRIBE, "") | |||
|
192 | if content.controller: | |||
|
193 | self.controller_socket = self.context.socket(zmq.PAIR) | |||
|
194 | self.controller_socket.setsockopt(zmq.IDENTITY, self.session.session) | |||
|
195 | self.controller_socket.connect(content.controller) | |||
|
196 | self._update_engines(dict(content.engines)) | |||
|
197 | ||||
|
198 | else: | |||
|
199 | self._connected = False | |||
|
200 | raise Exception("Failed to connect!") | |||
|
201 | ||||
|
202 | #### handlers and callbacks for incoming messages ####### | |||
|
203 | def _register_engine(self, msg): | |||
|
204 | content = msg['content'] | |||
|
205 | eid = content['id'] | |||
|
206 | d = {eid : content['queue']} | |||
|
207 | self._update_engines(d) | |||
|
208 | self._ids.add(int(eid)) | |||
|
209 | ||||
|
210 | def _unregister_engine(self, msg): | |||
|
211 | # print 'unregister',msg | |||
|
212 | content = msg['content'] | |||
|
213 | eid = int(content['id']) | |||
|
214 | if eid in self._ids: | |||
|
215 | self._ids.remove(eid) | |||
|
216 | self._engines.pop(eid) | |||
|
217 | ||||
|
218 | def _handle_execute_reply(self, msg): | |||
|
219 | # msg_id = msg['msg_id'] | |||
|
220 | parent = msg['parent_header'] | |||
|
221 | msg_id = parent['msg_id'] | |||
|
222 | if msg_id not in self.outstanding: | |||
|
223 | print "got unknown result: %s"%msg_id | |||
|
224 | else: | |||
|
225 | self.outstanding.remove(msg_id) | |||
|
226 | self.results[msg_id] = ss.unwrap_exception(msg['content']) | |||
|
227 | ||||
|
228 | def _handle_apply_reply(self, msg): | |||
|
229 | # print msg | |||
|
230 | # msg_id = msg['msg_id'] | |||
|
231 | parent = msg['parent_header'] | |||
|
232 | msg_id = parent['msg_id'] | |||
|
233 | if msg_id not in self.outstanding: | |||
|
234 | print "got unknown result: %s"%msg_id | |||
|
235 | else: | |||
|
236 | self.outstanding.remove(msg_id) | |||
|
237 | content = msg['content'] | |||
|
238 | if content['status'] == 'ok': | |||
|
239 | self.results[msg_id] = ss.unserialize_object(msg['buffers']) | |||
|
240 | else: | |||
|
241 | ||||
|
242 | self.results[msg_id] = ss.unwrap_exception(content) | |||
|
243 | ||||
|
244 | def _flush_notifications(self): | |||
|
245 | "flush incoming notifications of engine registrations" | |||
|
246 | msg = self.session.recv(self.notification_socket, mode=zmq.NOBLOCK) | |||
|
247 | while msg is not None: | |||
|
248 | msg = msg[-1] | |||
|
249 | msg_type = msg['msg_type'] | |||
|
250 | handler = self._notification_handlers.get(msg_type, None) | |||
|
251 | if handler is None: | |||
|
252 | raise Exception("Unhandled message type: %s"%msg.msg_type) | |||
|
253 | else: | |||
|
254 | handler(msg) | |||
|
255 | msg = self.session.recv(self.notification_socket, mode=zmq.NOBLOCK) | |||
|
256 | ||||
|
257 | def _flush_results(self, sock): | |||
|
258 | "flush incoming task or queue results" | |||
|
259 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) | |||
|
260 | while msg is not None: | |||
|
261 | msg = msg[-1] | |||
|
262 | msg_type = msg['msg_type'] | |||
|
263 | handler = self._queue_handlers.get(msg_type, None) | |||
|
264 | if handler is None: | |||
|
265 | raise Exception("Unhandled message type: %s"%msg.msg_type) | |||
|
266 | else: | |||
|
267 | handler(msg) | |||
|
268 | msg = self.session.recv(sock, mode=zmq.NOBLOCK) | |||
|
269 | ||||
|
270 | ###### get/setitem ######## | |||
|
271 | ||||
|
272 | def __getitem__(self, key): | |||
|
273 | if isinstance(key, int): | |||
|
274 | if key not in self.ids: | |||
|
275 | raise IndexError("No such engine: %i"%key) | |||
|
276 | return DirectView(self, key) | |||
|
277 | ||||
|
278 | if isinstance(key, slice): | |||
|
279 | indices = range(len(self.ids))[key] | |||
|
280 | ids = sorted(self._ids) | |||
|
281 | key = [ ids[i] for i in indices ] | |||
|
282 | # newkeys = sorted(self._ids)[thekeys[k]] | |||
|
283 | ||||
|
284 | if isinstance(key, (tuple, list, xrange)): | |||
|
285 | _,targets = self._build_targets(list(key)) | |||
|
286 | return DirectView(self, targets) | |||
|
287 | else: | |||
|
288 | raise TypeError("key by int/iterable of ints only, not %s"%(type(key))) | |||
|
289 | ||||
|
290 | ############ begin real methods ############# | |||
|
291 | ||||
|
292 | def spin(self): | |||
|
293 | """flush incoming notifications and execution results.""" | |||
|
294 | if self.notification_socket: | |||
|
295 | self._flush_notifications() | |||
|
296 | if self.queue_socket: | |||
|
297 | self._flush_results(self.queue_socket) | |||
|
298 | if self.task_socket: | |||
|
299 | self._flush_results(self.task_socket) | |||
|
300 | ||||
|
301 | @spinfirst | |||
|
302 | def queue_status(self, targets=None, verbose=False): | |||
|
303 | """fetch the status of engine queues | |||
|
304 | ||||
|
305 | Parameters | |||
|
306 | ---------- | |||
|
307 | targets : int/str/list of ints/strs | |||
|
308 | the engines on which to execute | |||
|
309 | default : all | |||
|
310 | verbose : bool | |||
|
311 | whether to return | |||
|
312 | ||||
|
313 | """ | |||
|
314 | targets = self._build_targets(targets)[1] | |||
|
315 | content = dict(targets=targets) | |||
|
316 | self.session.send(self.controller_socket, "queue_request", content=content) | |||
|
317 | idents,msg = self.session.recv(self.controller_socket, 0) | |||
|
318 | return msg['content'] | |||
|
319 | ||||
|
320 | @spinfirst | |||
|
321 | def clear(self, targets=None): | |||
|
322 | """clear the namespace in target(s)""" | |||
|
323 | pass | |||
|
324 | ||||
|
325 | @spinfirst | |||
|
326 | def abort(self, targets=None): | |||
|
327 | """abort the Queues of target(s)""" | |||
|
328 | pass | |||
|
329 | ||||
|
330 | @defaultblock | |||
|
331 | def execute(self, code, targets='all', block=None): | |||
|
332 | """executes `code` on `targets` in blocking or nonblocking manner. | |||
|
333 | ||||
|
334 | Parameters | |||
|
335 | ---------- | |||
|
336 | code : str | |||
|
337 | the code string to be executed | |||
|
338 | targets : int/str/list of ints/strs | |||
|
339 | the engines on which to execute | |||
|
340 | default : all | |||
|
341 | block : bool | |||
|
342 | whether or not to wait until done | |||
|
343 | """ | |||
|
344 | # block = self.block if block is None else block | |||
|
345 | # saveblock = self.block | |||
|
346 | # self.block = block | |||
|
347 | result = self.apply(execute, (code,), targets=targets, block=block, bound=True) | |||
|
348 | # self.block = saveblock | |||
|
349 | return result | |||
|
350 | ||||
|
351 | def run(self, code, block=None): | |||
|
352 | """runs `code` on an engine. | |||
|
353 | ||||
|
354 | Calls to this are load-balanced. | |||
|
355 | ||||
|
356 | Parameters | |||
|
357 | ---------- | |||
|
358 | code : str | |||
|
359 | the code string to be executed | |||
|
360 | block : bool | |||
|
361 | whether or not to wait until done | |||
|
362 | ||||
|
363 | """ | |||
|
364 | result = self.apply(execute, (code,), targets=None, block=block, bound=False) | |||
|
365 | return result | |||
|
366 | ||||
|
367 | # a = time.time() | |||
|
368 | # content = dict(code=code) | |||
|
369 | # b = time.time() | |||
|
370 | # msg = self.session.send(self.task_socket, 'execute_request', | |||
|
371 | # content=content) | |||
|
372 | # c = time.time() | |||
|
373 | # msg_id = msg['msg_id'] | |||
|
374 | # self.outstanding.add(msg_id) | |||
|
375 | # self.history.append(msg_id) | |||
|
376 | # d = time.time() | |||
|
377 | # if block: | |||
|
378 | # self.barrier(msg_id) | |||
|
379 | # return self.results[msg_id] | |||
|
380 | # else: | |||
|
381 | # return msg_id | |||
|
382 | ||||
|
383 | def _apply_balanced(self, f, args, kwargs, bound=True, block=None): | |||
|
384 | """the underlying method for applying functions in a load balanced | |||
|
385 | manner.""" | |||
|
386 | block = block if block is not None else self.block | |||
|
387 | ||||
|
388 | bufs = ss.pack_apply_message(f,args,kwargs) | |||
|
389 | content = dict(bound=bound) | |||
|
390 | msg = self.session.send(self.task_socket, "apply_request", | |||
|
391 | content=content, buffers=bufs) | |||
|
392 | msg_id = msg['msg_id'] | |||
|
393 | self.outstanding.add(msg_id) | |||
|
394 | self.history.append(msg_id) | |||
|
395 | if block: | |||
|
396 | self.barrier(msg_id) | |||
|
397 | return self.results[msg_id] | |||
|
398 | else: | |||
|
399 | return msg_id | |||
|
400 | ||||
|
401 | def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None): | |||
|
402 | """Then underlying method for applying functions to specific engines.""" | |||
|
403 | block = block if block is not None else self.block | |||
|
404 | queues,targets = self._build_targets(targets) | |||
|
405 | ||||
|
406 | bufs = ss.pack_apply_message(f,args,kwargs) | |||
|
407 | content = dict(bound=bound) | |||
|
408 | msg_ids = [] | |||
|
409 | for queue in queues: | |||
|
410 | msg = self.session.send(self.queue_socket, "apply_request", | |||
|
411 | content=content, buffers=bufs,ident=queue) | |||
|
412 | msg_id = msg['msg_id'] | |||
|
413 | self.outstanding.add(msg_id) | |||
|
414 | self.history.append(msg_id) | |||
|
415 | msg_ids.append(msg_id) | |||
|
416 | if block: | |||
|
417 | self.barrier(msg_ids) | |||
|
418 | else: | |||
|
419 | if len(msg_ids) == 1: | |||
|
420 | return msg_ids[0] | |||
|
421 | else: | |||
|
422 | return msg_ids | |||
|
423 | if len(msg_ids) == 1: | |||
|
424 | return self.results[msg_ids[0]] | |||
|
425 | else: | |||
|
426 | result = {} | |||
|
427 | for target,mid in zip(targets, msg_ids): | |||
|
428 | result[target] = self.results[mid] | |||
|
429 | return result | |||
|
430 | ||||
|
431 | def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None): | |||
|
432 | """calls f(*args, **kwargs) on a remote engine(s), returning the result. | |||
|
433 | ||||
|
434 | if self.block is False: | |||
|
435 | returns msg_id or list of msg_ids | |||
|
436 | else: | |||
|
437 | returns actual result of f(*args, **kwargs) | |||
|
438 | """ | |||
|
439 | args = args if args is not None else [] | |||
|
440 | kwargs = kwargs if kwargs is not None else {} | |||
|
441 | if targets is None: | |||
|
442 | return self._apply_balanced(f,args,kwargs,bound=bound, block=block) | |||
|
443 | else: | |||
|
444 | return self._apply_direct(f, args, kwargs, | |||
|
445 | bound=bound,block=block, targets=targets) | |||
|
446 | ||||
|
447 | # def apply_bound(self, f, *args, **kwargs): | |||
|
448 | # """calls f(*args, **kwargs) on a remote engine. This does get | |||
|
449 | # executed in an engine's namespace. The controller selects the | |||
|
450 | # target engine via 0MQ XREQ load balancing. | |||
|
451 | # | |||
|
452 | # if self.block is False: | |||
|
453 | # returns msg_id | |||
|
454 | # else: | |||
|
455 | # returns actual result of f(*args, **kwargs) | |||
|
456 | # """ | |||
|
457 | # return self._apply(f, args, kwargs, bound=True) | |||
|
458 | # | |||
|
459 | # | |||
|
460 | # def apply_to(self, targets, f, *args, **kwargs): | |||
|
461 | # """calls f(*args, **kwargs) on a specific engine. | |||
|
462 | # | |||
|
463 | # if self.block is False: | |||
|
464 | # returns msg_id | |||
|
465 | # else: | |||
|
466 | # returns actual result of f(*args, **kwargs) | |||
|
467 | # | |||
|
468 | # The target's namespace is not used here. | |||
|
469 | # Use apply_bound_to() to access target's globals. | |||
|
470 | # """ | |||
|
471 | # return self._apply_to(False, targets, f, args, kwargs) | |||
|
472 | # | |||
|
473 | # def apply_bound_to(self, targets, f, *args, **kwargs): | |||
|
474 | # """calls f(*args, **kwargs) on a specific engine. | |||
|
475 | # | |||
|
476 | # if self.block is False: | |||
|
477 | # returns msg_id | |||
|
478 | # else: | |||
|
479 | # returns actual result of f(*args, **kwargs) | |||
|
480 | # | |||
|
481 | # This method has access to the target's globals | |||
|
482 | # | |||
|
483 | # """ | |||
|
484 | # return self._apply_to(f, args, kwargs) | |||
|
485 | # | |||
|
486 | def push(self, ns, targets=None, block=None): | |||
|
487 | """push the contents of `ns` into the namespace on `target`""" | |||
|
488 | if not isinstance(ns, dict): | |||
|
489 | raise TypeError("Must be a dict, not %s"%type(ns)) | |||
|
490 | result = self.apply(_push, (ns,), targets=targets, block=block,bound=True) | |||
|
491 | return result | |||
|
492 | ||||
|
493 | @spinfirst | |||
|
494 | def pull(self, keys, targets=None, block=True): | |||
|
495 | """pull objects from `target`'s namespace by `keys`""" | |||
|
496 | ||||
|
497 | result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True) | |||
|
498 | return result | |||
|
499 | ||||
|
500 | def barrier(self, msg_ids=None, timeout=-1): | |||
|
501 | """waits on one or more `msg_ids`, for up to `timeout` seconds. | |||
|
502 | ||||
|
503 | Parameters | |||
|
504 | ---------- | |||
|
505 | msg_ids : int, str, or list of ints and/or strs | |||
|
506 | ints are indices to self.history | |||
|
507 | strs are msg_ids | |||
|
508 | default: wait on all outstanding messages | |||
|
509 | timeout : float | |||
|
510 | a time in seconds, after which to give up. | |||
|
511 | default is -1, which means no timeout | |||
|
512 | ||||
|
513 | Returns | |||
|
514 | ------- | |||
|
515 | True : when all msg_ids are done | |||
|
516 | False : timeout reached, msg_ids still outstanding | |||
|
517 | """ | |||
|
518 | tic = time.time() | |||
|
519 | if msg_ids is None: | |||
|
520 | theids = self.outstanding | |||
|
521 | else: | |||
|
522 | if isinstance(msg_ids, (int, str)): | |||
|
523 | msg_ids = [msg_ids] | |||
|
524 | theids = set() | |||
|
525 | for msg_id in msg_ids: | |||
|
526 | if isinstance(msg_id, int): | |||
|
527 | msg_id = self.history[msg_id] | |||
|
528 | theids.add(msg_id) | |||
|
529 | self.spin() | |||
|
530 | while theids.intersection(self.outstanding): | |||
|
531 | if timeout >= 0 and ( time.time()-tic ) > timeout: | |||
|
532 | break | |||
|
533 | time.sleep(1e-3) | |||
|
534 | self.spin() | |||
|
535 | return len(theids.intersection(self.outstanding)) == 0 | |||
|
536 | ||||
|
537 | @spinfirst | |||
|
538 | def get_results(self, msg_ids,status_only=False): | |||
|
539 | """returns the result of the execute or task request with `msg_id`""" | |||
|
540 | if not isinstance(msg_ids, (list,tuple)): | |||
|
541 | msg_ids = [msg_ids] | |||
|
542 | theids = [] | |||
|
543 | for msg_id in msg_ids: | |||
|
544 | if isinstance(msg_id, int): | |||
|
545 | msg_id = self.history[msg_id] | |||
|
546 | theids.append(msg_id) | |||
|
547 | ||||
|
548 | content = dict(msg_ids=theids, status_only=status_only) | |||
|
549 | msg = self.session.send(self.controller_socket, "result_request", content=content) | |||
|
550 | zmq.select([self.controller_socket], [], []) | |||
|
551 | idents,msg = self.session.recv(self.controller_socket, zmq.NOBLOCK) | |||
|
552 | ||||
|
553 | # while True: | |||
|
554 | # try: | |||
|
555 | # except zmq.ZMQError: | |||
|
556 | # time.sleep(1e-3) | |||
|
557 | # continue | |||
|
558 | # else: | |||
|
559 | # break | |||
|
560 | return msg['content'] | |||
|
561 | ||||
|
562 | No newline at end of file |
This diff has been collapsed as it changes many lines, (770 lines changed) Show them Hide them | |||||
@@ -0,0 +1,770 b'' | |||||
|
1 | #!/usr/bin/env python | |||
|
2 | # encoding: utf-8 | |||
|
3 | ||||
|
4 | """The IPython Controller with 0MQ | |||
|
5 | This is the master object that handles connections from engines, clients, and | |||
|
6 | """ | |||
|
7 | #----------------------------------------------------------------------------- | |||
|
8 | # Copyright (C) 2008-2009 The IPython Development Team | |||
|
9 | # | |||
|
10 | # Distributed under the terms of the BSD License. The full license is in | |||
|
11 | # the file COPYING, distributed as part of this software. | |||
|
12 | #----------------------------------------------------------------------------- | |||
|
13 | ||||
|
14 | #----------------------------------------------------------------------------- | |||
|
15 | # Imports | |||
|
16 | #----------------------------------------------------------------------------- | |||
|
17 | from datetime import datetime | |||
|
18 | ||||
|
19 | import zmq | |||
|
20 | from zmq.eventloop import zmqstream, ioloop | |||
|
21 | import uuid | |||
|
22 | ||||
|
23 | # internal: | |||
|
24 | from streamsession import Message, wrap_exception # default_unpacker as unpack, default_packer as pack | |||
|
25 | from IPython.zmq.log import logger # a Logger object | |||
|
26 | ||||
|
27 | # from messages import json # use the same import switches | |||
|
28 | ||||
|
29 | #----------------------------------------------------------------------------- | |||
|
30 | # Code | |||
|
31 | #----------------------------------------------------------------------------- | |||
|
32 | ||||
|
33 | class ReverseDict(dict): | |||
|
34 | """simple double-keyed subset of dict methods.""" | |||
|
35 | ||||
|
36 | def __init__(self, *args, **kwargs): | |||
|
37 | dict.__init__(self, *args, **kwargs) | |||
|
38 | self.reverse = dict() | |||
|
39 | for key, value in self.iteritems(): | |||
|
40 | self.reverse[value] = key | |||
|
41 | ||||
|
42 | def __getitem__(self, key): | |||
|
43 | try: | |||
|
44 | return dict.__getitem__(self, key) | |||
|
45 | except KeyError: | |||
|
46 | return self.reverse[key] | |||
|
47 | ||||
|
48 | def __setitem__(self, key, value): | |||
|
49 | if key in self.reverse: | |||
|
50 | raise KeyError("Can't have key %r on both sides!"%key) | |||
|
51 | dict.__setitem__(self, key, value) | |||
|
52 | self.reverse[value] = key | |||
|
53 | ||||
|
54 | def pop(self, key): | |||
|
55 | value = dict.pop(self, key) | |||
|
56 | self.d1.pop(value) | |||
|
57 | return value | |||
|
58 | ||||
|
59 | ||||
|
60 | class EngineConnector(object): | |||
|
61 | """A simple object for accessing the various zmq connections of an object. | |||
|
62 | Attributes are: | |||
|
63 | id (int): engine ID | |||
|
64 | uuid (str): uuid (unused?) | |||
|
65 | queue (str): identity of queue's XREQ socket | |||
|
66 | registration (str): identity of registration XREQ socket | |||
|
67 | heartbeat (str): identity of heartbeat XREQ socket | |||
|
68 | """ | |||
|
69 | id=0 | |||
|
70 | queue=None | |||
|
71 | control=None | |||
|
72 | registration=None | |||
|
73 | heartbeat=None | |||
|
74 | pending=None | |||
|
75 | ||||
|
76 | def __init__(self, id, queue, registration, control, heartbeat=None): | |||
|
77 | logger.info("engine::Engine Connected: %i"%id) | |||
|
78 | self.id = id | |||
|
79 | self.queue = queue | |||
|
80 | self.registration = registration | |||
|
81 | self.control = control | |||
|
82 | self.heartbeat = heartbeat | |||
|
83 | ||||
|
84 | class Controller(object): | |||
|
85 | """The IPython Controller with 0MQ connections | |||
|
86 | ||||
|
87 | Parameters | |||
|
88 | ========== | |||
|
89 | loop: zmq IOLoop instance | |||
|
90 | session: StreamSession object | |||
|
91 | <removed> context: zmq context for creating new connections (?) | |||
|
92 | registrar: ZMQStream for engine registration requests (XREP) | |||
|
93 | clientele: ZMQStream for client connections (XREP) | |||
|
94 | not used for jobs, only query/control commands | |||
|
95 | queue: ZMQStream for monitoring the command queue (SUB) | |||
|
96 | heartbeat: HeartMonitor object checking the pulse of the engines | |||
|
97 | db_stream: connection to db for out of memory logging of commands | |||
|
98 | NotImplemented | |||
|
99 | queue_addr: zmq connection address of the XREP socket for the queue | |||
|
100 | hb_addr: zmq connection address of the PUB socket for heartbeats | |||
|
101 | task_addr: zmq connection address of the XREQ socket for task queue | |||
|
102 | """ | |||
|
103 | # internal data structures: | |||
|
104 | ids=None # engine IDs | |||
|
105 | keytable=None | |||
|
106 | engines=None | |||
|
107 | clients=None | |||
|
108 | hearts=None | |||
|
109 | pending=None | |||
|
110 | results=None | |||
|
111 | tasks=None | |||
|
112 | completed=None | |||
|
113 | mia=None | |||
|
114 | incoming_registrations=None | |||
|
115 | registration_timeout=None | |||
|
116 | ||||
|
117 | #objects from constructor: | |||
|
118 | loop=None | |||
|
119 | registrar=None | |||
|
120 | clientelle=None | |||
|
121 | queue=None | |||
|
122 | heartbeat=None | |||
|
123 | notifier=None | |||
|
124 | db=None | |||
|
125 | client_addr=None | |||
|
126 | engine_addrs=None | |||
|
127 | ||||
|
128 | ||||
|
129 | def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifier, db, engine_addrs, client_addrs): | |||
|
130 | """ | |||
|
131 | # universal: | |||
|
132 | loop: IOLoop for creating future connections | |||
|
133 | session: streamsession for sending serialized data | |||
|
134 | # engine: | |||
|
135 | queue: ZMQStream for monitoring queue messages | |||
|
136 | registrar: ZMQStream for engine registration | |||
|
137 | heartbeat: HeartMonitor object for tracking engines | |||
|
138 | # client: | |||
|
139 | clientele: ZMQStream for client connections | |||
|
140 | # extra: | |||
|
141 | db: ZMQStream for db connection (NotImplemented) | |||
|
142 | engine_addrs: zmq address/protocol dict for engine connections | |||
|
143 | client_addrs: zmq address/protocol dict for client connections | |||
|
144 | """ | |||
|
145 | self.ids = set() | |||
|
146 | self.keytable={} | |||
|
147 | self.incoming_registrations={} | |||
|
148 | self.engines = {} | |||
|
149 | self.by_ident = {} | |||
|
150 | self.clients = {} | |||
|
151 | self.hearts = {} | |||
|
152 | self.mia = set() | |||
|
153 | ||||
|
154 | # self.sockets = {} | |||
|
155 | self.loop = loop | |||
|
156 | self.session = session | |||
|
157 | self.registrar = registrar | |||
|
158 | self.clientele = clientele | |||
|
159 | self.queue = queue | |||
|
160 | self.heartbeat = heartbeat | |||
|
161 | self.notifier = notifier | |||
|
162 | self.db = db | |||
|
163 | ||||
|
164 | self.client_addrs = client_addrs | |||
|
165 | assert isinstance(client_addrs['queue'], str) | |||
|
166 | # self.hb_addrs = hb_addrs | |||
|
167 | self.engine_addrs = engine_addrs | |||
|
168 | assert isinstance(engine_addrs['queue'], str) | |||
|
169 | assert len(engine_addrs['heartbeat']) == 2 | |||
|
170 | ||||
|
171 | ||||
|
172 | # register our callbacks | |||
|
173 | self.registrar.on_recv(self.dispatch_register_request) | |||
|
174 | self.clientele.on_recv(self.dispatch_client_msg) | |||
|
175 | self.queue.on_recv(self.dispatch_queue_traffic) | |||
|
176 | ||||
|
177 | if heartbeat is not None: | |||
|
178 | heartbeat.add_heart_failure_handler(self.handle_heart_failure) | |||
|
179 | heartbeat.add_new_heart_handler(self.handle_new_heart) | |||
|
180 | ||||
|
181 | if self.db is not None: | |||
|
182 | self.db.on_recv(self.dispatch_db) | |||
|
183 | ||||
|
184 | self.client_handlers = {'queue_request': self.queue_status, | |||
|
185 | 'result_request': self.get_results, | |||
|
186 | 'purge_request': self.purge_results, | |||
|
187 | 'resubmit_request': self.resubmit_task, | |||
|
188 | } | |||
|
189 | ||||
|
190 | self.registrar_handlers = {'registration_request' : self.register_engine, | |||
|
191 | 'unregistration_request' : self.unregister_engine, | |||
|
192 | 'connection_request': self.connection_request, | |||
|
193 | ||||
|
194 | } | |||
|
195 | # | |||
|
196 | # this is the stuff that will move to DB: | |||
|
197 | self.results = {} # completed results | |||
|
198 | self.pending = {} # pending messages, keyed by msg_id | |||
|
199 | self.queues = {} # pending msg_ids keyed by engine_id | |||
|
200 | self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id | |||
|
201 | self.completed = {} # completed msg_ids keyed by engine_id | |||
|
202 | self.registration_timeout = max(5000, 2*self.heartbeat.period) | |||
|
203 | ||||
|
204 | logger.info("controller::created controller") | |||
|
205 | ||||
|
206 | def _new_id(self): | |||
|
207 | """gemerate a new ID""" | |||
|
208 | newid = 0 | |||
|
209 | incoming = [id[0] for id in self.incoming_registrations.itervalues()] | |||
|
210 | # print newid, self.ids, self.incoming_registrations | |||
|
211 | while newid in self.ids or newid in incoming: | |||
|
212 | newid += 1 | |||
|
213 | return newid | |||
|
214 | ||||
|
215 | ||||
|
216 | #----------------------------------------------------------------------------- | |||
|
217 | # message validation | |||
|
218 | #----------------------------------------------------------------------------- | |||
|
219 | def _validate_targets(self, targets): | |||
|
220 | """turn any valid targets argument into a list of integer ids""" | |||
|
221 | if targets is None: | |||
|
222 | # default to all | |||
|
223 | targets = self.ids | |||
|
224 | ||||
|
225 | if isinstance(targets, (int,str,unicode)): | |||
|
226 | # only one target specified | |||
|
227 | targets = [targets] | |||
|
228 | _targets = [] | |||
|
229 | for t in targets: | |||
|
230 | # map raw identities to ids | |||
|
231 | if isinstance(t, (str,unicode)): | |||
|
232 | t = self.by_ident.get(t, t) | |||
|
233 | _targets.append(t) | |||
|
234 | targets = _targets | |||
|
235 | bad_targets = [ t for t in targets if t not in self.ids ] | |||
|
236 | if bad_targets: | |||
|
237 | raise IndexError("No Such Engine: %r"%bad_targets) | |||
|
238 | if not targets: | |||
|
239 | raise IndexError("No Engines Registered") | |||
|
240 | return targets | |||
|
241 | ||||
|
242 | def _validate_client_msg(self, msg): | |||
|
243 | """validates and unpacks headers of a message. Returns False if invalid, | |||
|
244 | (ident, header, parent, content)""" | |||
|
245 | client_id = msg[0] | |||
|
246 | try: | |||
|
247 | msg = self.session.unpack_message(msg[1:], content=True) | |||
|
248 | except: | |||
|
249 | logger.error("client::Invalid Message %s"%msg) | |||
|
250 | return False | |||
|
251 | ||||
|
252 | msg_type = msg.get('msg_type', None) | |||
|
253 | if msg_type is None: | |||
|
254 | return False | |||
|
255 | header = msg.get('header') | |||
|
256 | # session doesn't handle split content for now: | |||
|
257 | return client_id, msg | |||
|
258 | ||||
|
259 | ||||
|
260 | #----------------------------------------------------------------------------- | |||
|
261 | # dispatch methods (1 per socket) | |||
|
262 | #----------------------------------------------------------------------------- | |||
|
263 | ||||
|
264 | def dispatch_register_request(self, msg): | |||
|
265 | """""" | |||
|
266 | logger.debug("registration::dispatch_register_request(%s)"%msg) | |||
|
267 | idents,msg = self.session.feed_identities(msg) | |||
|
268 | print idents,msg, len(msg) | |||
|
269 | try: | |||
|
270 | msg = self.session.unpack_message(msg,content=True) | |||
|
271 | except Exception, e: | |||
|
272 | logger.error("registration::got bad registration message: %s"%msg) | |||
|
273 | raise e | |||
|
274 | return | |||
|
275 | ||||
|
276 | msg_type = msg['msg_type'] | |||
|
277 | content = msg['content'] | |||
|
278 | ||||
|
279 | handler = self.registrar_handlers.get(msg_type, None) | |||
|
280 | if handler is None: | |||
|
281 | logger.error("registration::got bad registration message: %s"%msg) | |||
|
282 | else: | |||
|
283 | handler(idents, msg) | |||
|
284 | ||||
|
285 | def dispatch_queue_traffic(self, msg): | |||
|
286 | """all ME and Task queue messages come through here""" | |||
|
287 | logger.debug("queue traffic: %s"%msg[:2]) | |||
|
288 | switch = msg[0] | |||
|
289 | idents, msg = self.session.feed_identities(msg[1:]) | |||
|
290 | if switch == 'in': | |||
|
291 | self.save_queue_request(idents, msg) | |||
|
292 | elif switch == 'out': | |||
|
293 | self.save_queue_result(idents, msg) | |||
|
294 | elif switch == 'intask': | |||
|
295 | self.save_task_request(idents, msg) | |||
|
296 | elif switch == 'outtask': | |||
|
297 | self.save_task_result(idents, msg) | |||
|
298 | elif switch == 'tracktask': | |||
|
299 | self.save_task_destination(idents, msg) | |||
|
300 | else: | |||
|
301 | logger.error("Invalid message topic: %s"%switch) | |||
|
302 | ||||
|
303 | ||||
|
304 | def dispatch_client_msg(self, msg): | |||
|
305 | """Route messages from clients""" | |||
|
306 | idents, msg = self.session.feed_identities(msg) | |||
|
307 | client_id = idents[0] | |||
|
308 | try: | |||
|
309 | msg = self.session.unpack_message(msg, content=True) | |||
|
310 | except: | |||
|
311 | content = wrap_exception() | |||
|
312 | logger.error("Bad Client Message: %s"%msg) | |||
|
313 | self.session.send(self.clientele, "controller_error", ident=client_id, | |||
|
314 | content=content) | |||
|
315 | return | |||
|
316 | ||||
|
317 | # print client_id, header, parent, content | |||
|
318 | #switch on message type: | |||
|
319 | msg_type = msg['msg_type'] | |||
|
320 | logger.info("client:: client %s requested %s"%(client_id, msg_type)) | |||
|
321 | handler = self.client_handlers.get(msg_type, None) | |||
|
322 | try: | |||
|
323 | assert handler is not None, "Bad Message Type: %s"%msg_type | |||
|
324 | except: | |||
|
325 | content = wrap_exception() | |||
|
326 | logger.error("Bad Message Type: %s"%msg_type) | |||
|
327 | self.session.send(self.clientele, "controller_error", ident=client_id, | |||
|
328 | content=content) | |||
|
329 | return | |||
|
330 | else: | |||
|
331 | handler(client_id, msg) | |||
|
332 | ||||
|
333 | def dispatch_db(self, msg): | |||
|
334 | """""" | |||
|
335 | raise NotImplementedError | |||
|
336 | ||||
|
337 | #--------------------------------------------------------------------------- | |||
|
338 | # handler methods (1 per event) | |||
|
339 | #--------------------------------------------------------------------------- | |||
|
340 | ||||
|
341 | #----------------------- Heartbeat -------------------------------------- | |||
|
342 | ||||
|
343 | def handle_new_heart(self, heart): | |||
|
344 | """handler to attach to heartbeater. | |||
|
345 | Called when a new heart starts to beat. | |||
|
346 | Triggers completion of registration.""" | |||
|
347 | logger.debug("heartbeat::handle_new_heart(%r)"%heart) | |||
|
348 | if heart not in self.incoming_registrations: | |||
|
349 | logger.info("heartbeat::ignoring new heart: %r"%heart) | |||
|
350 | else: | |||
|
351 | self.finish_registration(heart) | |||
|
352 | ||||
|
353 | ||||
|
354 | def handle_heart_failure(self, heart): | |||
|
355 | """handler to attach to heartbeater. | |||
|
356 | called when a previously registered heart fails to respond to beat request. | |||
|
357 | triggers unregistration""" | |||
|
358 | logger.debug("heartbeat::handle_heart_failure(%r)"%heart) | |||
|
359 | eid = self.hearts.get(heart, None) | |||
|
360 | if eid is None: | |||
|
361 | logger.info("heartbeat::ignoring heart failure %r"%heart) | |||
|
362 | else: | |||
|
363 | self.unregister_engine(heart, dict(content=dict(id=eid))) | |||
|
364 | ||||
|
365 | #----------------------- MUX Queue Traffic ------------------------------ | |||
|
366 | ||||
|
367 | def save_queue_request(self, idents, msg): | |||
|
368 | queue_id, client_id = idents[:2] | |||
|
369 | ||||
|
370 | try: | |||
|
371 | msg = self.session.unpack_message(msg, content=False) | |||
|
372 | except: | |||
|
373 | logger.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg)) | |||
|
374 | return | |||
|
375 | ||||
|
376 | eid = self.by_ident.get(queue_id, None) | |||
|
377 | if eid is None: | |||
|
378 | logger.error("queue::target %r not registered"%queue_id) | |||
|
379 | logger.debug("queue:: valid are: %s"%(self.by_ident.keys())) | |||
|
380 | return | |||
|
381 | ||||
|
382 | header = msg['header'] | |||
|
383 | msg_id = header['msg_id'] | |||
|
384 | info = dict(submit=datetime.now(), | |||
|
385 | received=None, | |||
|
386 | engine=(eid, queue_id)) | |||
|
387 | self.pending[msg_id] = ( msg, info ) | |||
|
388 | self.queues[eid][0].append(msg_id) | |||
|
389 | ||||
|
390 | def save_queue_result(self, idents, msg): | |||
|
391 | client_id, queue_id = idents[:2] | |||
|
392 | ||||
|
393 | try: | |||
|
394 | msg = self.session.unpack_message(msg, content=False) | |||
|
395 | except: | |||
|
396 | logger.error("queue::engine %r sent invalid message to %r: %s"%( | |||
|
397 | queue_id,client_id, msg)) | |||
|
398 | return | |||
|
399 | ||||
|
400 | eid = self.by_ident.get(queue_id, None) | |||
|
401 | if eid is None: | |||
|
402 | logger.error("queue::unknown engine %r is sending a reply: "%queue_id) | |||
|
403 | logger.debug("queue:: %s"%msg[2:]) | |||
|
404 | return | |||
|
405 | ||||
|
406 | parent = msg['parent_header'] | |||
|
407 | if not parent: | |||
|
408 | return | |||
|
409 | msg_id = parent['msg_id'] | |||
|
410 | self.results[msg_id] = msg | |||
|
411 | if msg_id in self.pending: | |||
|
412 | self.pending.pop(msg_id) | |||
|
413 | self.queues[eid][0].remove(msg_id) | |||
|
414 | self.completed[eid].append(msg_id) | |||
|
415 | else: | |||
|
416 | logger.debug("queue:: unknown msg finished %s"%msg_id) | |||
|
417 | ||||
|
418 | #--------------------- Task Queue Traffic ------------------------------ | |||
|
419 | ||||
|
420 | def save_task_request(self, idents, msg): | |||
|
421 | client_id = idents[0] | |||
|
422 | ||||
|
423 | try: | |||
|
424 | msg = self.session.unpack_message(msg, content=False) | |||
|
425 | except: | |||
|
426 | logger.error("task::client %r sent invalid task message: %s"%( | |||
|
427 | client_id, msg)) | |||
|
428 | return | |||
|
429 | ||||
|
430 | header = msg['header'] | |||
|
431 | msg_id = header['msg_id'] | |||
|
432 | self.mia.add(msg_id) | |||
|
433 | self.pending[msg_id] = msg | |||
|
434 | if not self.tasks.has_key(client_id): | |||
|
435 | self.tasks[client_id] = [] | |||
|
436 | self.tasks[client_id].append(msg_id) | |||
|
437 | ||||
|
438 | def save_task_result(self, idents, msg): | |||
|
439 | client_id = idents[0] | |||
|
440 | try: | |||
|
441 | msg = self.session.unpack_message(msg, content=False) | |||
|
442 | except: | |||
|
443 | logger.error("task::invalid task result message send to %r: %s"%( | |||
|
444 | client_id, msg)) | |||
|
445 | return | |||
|
446 | ||||
|
447 | parent = msg['parent_header'] | |||
|
448 | if not parent: | |||
|
449 | # print msg | |||
|
450 | # logger.warn("") | |||
|
451 | return | |||
|
452 | msg_id = parent['msg_id'] | |||
|
453 | self.results[msg_id] = msg | |||
|
454 | if msg_id in self.pending: | |||
|
455 | self.pending.pop(msg_id) | |||
|
456 | if msg_id in self.mia: | |||
|
457 | self.mia.remove(msg_id) | |||
|
458 | else: | |||
|
459 | logger.debug("task:: unknown task %s finished"%msg_id) | |||
|
460 | ||||
|
461 | def save_task_destination(self, idents, msg): | |||
|
462 | try: | |||
|
463 | msg = self.session.unpack_message(msg, content=True) | |||
|
464 | except: | |||
|
465 | logger.error("task::invalid task tracking message") | |||
|
466 | return | |||
|
467 | content = msg['content'] | |||
|
468 | print content | |||
|
469 | msg_id = content['msg_id'] | |||
|
470 | engine_uuid = content['engine_id'] | |||
|
471 | for eid,queue_id in self.keytable.iteritems(): | |||
|
472 | if queue_id == engine_uuid: | |||
|
473 | break | |||
|
474 | ||||
|
475 | logger.info("task:: task %s arrived on %s"%(msg_id, eid)) | |||
|
476 | if msg_id in self.mia: | |||
|
477 | self.mia.remove(msg_id) | |||
|
478 | else: | |||
|
479 | logger.debug("task::task %s not listed as MIA?!"%(msg_id)) | |||
|
480 | self.tasks[engine_uuid].append(msg_id) | |||
|
481 | ||||
|
482 | def mia_task_request(self, idents, msg): | |||
|
483 | client_id = idents[0] | |||
|
484 | content = dict(mia=self.mia,status='ok') | |||
|
485 | self.session.send('mia_reply', content=content, idents=client_id) | |||
|
486 | ||||
|
487 | ||||
|
488 | ||||
|
489 | #-------------------- Registration ----------------------------- | |||
|
490 | ||||
|
491 | def connection_request(self, client_id, msg): | |||
|
492 | """reply with connection addresses for clients""" | |||
|
493 | logger.info("client::client %s connected"%client_id) | |||
|
494 | content = dict(status='ok') | |||
|
495 | content.update(self.client_addrs) | |||
|
496 | jsonable = {} | |||
|
497 | for k,v in self.keytable.iteritems(): | |||
|
498 | jsonable[str(k)] = v | |||
|
499 | content['engines'] = jsonable | |||
|
500 | self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id) | |||
|
501 | ||||
|
502 | def register_engine(self, reg, msg): | |||
|
503 | """register an engine""" | |||
|
504 | content = msg['content'] | |||
|
505 | try: | |||
|
506 | queue = content['queue'] | |||
|
507 | except KeyError: | |||
|
508 | logger.error("registration::queue not specified") | |||
|
509 | return | |||
|
510 | heart = content.get('heartbeat', None) | |||
|
511 | """register a new engine, and create the socket(s) necessary""" | |||
|
512 | eid = self._new_id() | |||
|
513 | # print (eid, queue, reg, heart) | |||
|
514 | ||||
|
515 | logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart)) | |||
|
516 | ||||
|
517 | content = dict(id=eid,status='ok') | |||
|
518 | content.update(self.engine_addrs) | |||
|
519 | # check if requesting available IDs: | |||
|
520 | if queue in self.by_ident: | |||
|
521 | content = {'status': 'error', 'reason': "queue_id %r in use"%queue} | |||
|
522 | elif heart in self.hearts: # need to check unique hearts? | |||
|
523 | content = {'status': 'error', 'reason': "heart_id %r in use"%heart} | |||
|
524 | else: | |||
|
525 | for h, pack in self.incoming_registrations.iteritems(): | |||
|
526 | if heart == h: | |||
|
527 | content = {'status': 'error', 'reason': "heart_id %r in use"%heart} | |||
|
528 | break | |||
|
529 | elif queue == pack[1]: | |||
|
530 | content = {'status': 'error', 'reason': "queue_id %r in use"%queue} | |||
|
531 | break | |||
|
532 | ||||
|
533 | msg = self.session.send(self.registrar, "registration_reply", | |||
|
534 | content=content, | |||
|
535 | ident=reg) | |||
|
536 | ||||
|
537 | if content['status'] == 'ok': | |||
|
538 | if heart in self.heartbeat.hearts: | |||
|
539 | # already beating | |||
|
540 | self.incoming_registrations[heart] = (eid,queue,reg,None) | |||
|
541 | self.finish_registration(heart) | |||
|
542 | else: | |||
|
543 | purge = lambda : self._purge_stalled_registration(heart) | |||
|
544 | dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop) | |||
|
545 | dc.start() | |||
|
546 | self.incoming_registrations[heart] = (eid,queue,reg,dc) | |||
|
547 | else: | |||
|
548 | logger.error("registration::registration %i failed: %s"%(eid, content['reason'])) | |||
|
549 | return eid | |||
|
550 | ||||
|
551 | def unregister_engine(self, ident, msg): | |||
|
552 | try: | |||
|
553 | eid = msg['content']['id'] | |||
|
554 | except: | |||
|
555 | logger.error("registration::bad engine id for unregistration: %s"%ident) | |||
|
556 | return | |||
|
557 | logger.info("registration::unregister_engine(%s)"%eid) | |||
|
558 | content=dict(id=eid, queue=self.engines[eid].queue) | |||
|
559 | self.ids.remove(eid) | |||
|
560 | self.keytable.pop(eid) | |||
|
561 | ec = self.engines.pop(eid) | |||
|
562 | self.hearts.pop(ec.heartbeat) | |||
|
563 | self.by_ident.pop(ec.queue) | |||
|
564 | self.completed.pop(eid) | |||
|
565 | for msg_id in self.queues.pop(eid)[0]: | |||
|
566 | msg = self.pending.pop(msg_id) | |||
|
567 | ############## TODO: HANDLE IT ################ | |||
|
568 | ||||
|
569 | if self.notifier: | |||
|
570 | self.session.send(self.notifier, "unregistration_notification", content=content) | |||
|
571 | ||||
|
572 | def finish_registration(self, heart): | |||
|
573 | try: | |||
|
574 | (eid,queue,reg,purge) = self.incoming_registrations.pop(heart) | |||
|
575 | except KeyError: | |||
|
576 | logger.error("registration::tried to finish nonexistant registration") | |||
|
577 | return | |||
|
578 | logger.info("registration::finished registering engine %i:%r"%(eid,queue)) | |||
|
579 | if purge is not None: | |||
|
580 | purge.stop() | |||
|
581 | control = queue | |||
|
582 | self.ids.add(eid) | |||
|
583 | self.keytable[eid] = queue | |||
|
584 | self.engines[eid] = EngineConnector(eid, queue, reg, control, heart) | |||
|
585 | self.by_ident[queue] = eid | |||
|
586 | self.queues[eid] = ([],[]) | |||
|
587 | self.completed[eid] = list() | |||
|
588 | self.hearts[heart] = eid | |||
|
589 | content = dict(id=eid, queue=self.engines[eid].queue) | |||
|
590 | if self.notifier: | |||
|
591 | self.session.send(self.notifier, "registration_notification", content=content) | |||
|
592 | ||||
|
593 | def _purge_stalled_registration(self, heart): | |||
|
594 | if heart in self.incoming_registrations: | |||
|
595 | eid = self.incoming_registrations.pop(heart)[0] | |||
|
596 | logger.info("registration::purging stalled registration: %i"%eid) | |||
|
597 | else: | |||
|
598 | pass | |||
|
599 | ||||
|
600 | #------------------- Client Requests ------------------------------- | |||
|
601 | ||||
|
602 | def check_load(self, client_id, msg): | |||
|
603 | content = msg['content'] | |||
|
604 | try: | |||
|
605 | targets = content['targets'] | |||
|
606 | targets = self._validate_targets(targets) | |||
|
607 | except: | |||
|
608 | content = wrap_exception() | |||
|
609 | self.session.send(self.clientele, "controller_error", | |||
|
610 | content=content, ident=client_id) | |||
|
611 | return | |||
|
612 | ||||
|
613 | content = dict(status='ok') | |||
|
614 | # loads = {} | |||
|
615 | for t in targets: | |||
|
616 | content[str(t)] = len(self.queues[t]) | |||
|
617 | self.session.send(self.clientele, "load_reply", content=content, ident=client_id) | |||
|
618 | ||||
|
619 | ||||
|
620 | def queue_status(self, client_id, msg): | |||
|
621 | """handle queue_status request""" | |||
|
622 | content = msg['content'] | |||
|
623 | targets = content['targets'] | |||
|
624 | try: | |||
|
625 | targets = self._validate_targets(targets) | |||
|
626 | except: | |||
|
627 | content = wrap_exception() | |||
|
628 | self.session.send(self.clientele, "controller_error", | |||
|
629 | content=content, ident=client_id) | |||
|
630 | return | |||
|
631 | verbose = msg.get('verbose', False) | |||
|
632 | content = dict() | |||
|
633 | for t in targets: | |||
|
634 | queue = self.queues[t] | |||
|
635 | completed = self.completed[t] | |||
|
636 | if not verbose: | |||
|
637 | queue = len(queue) | |||
|
638 | completed = len(completed) | |||
|
639 | content[str(t)] = {'queue': queue, 'completed': completed } | |||
|
640 | # pending | |||
|
641 | self.session.send(self.clientele, "queue_reply", content=content, ident=client_id) | |||
|
642 | ||||
|
643 | def job_status(self, client_id, msg): | |||
|
644 | """handle queue_status request""" | |||
|
645 | content = msg['content'] | |||
|
646 | msg_ids = content['msg_ids'] | |||
|
647 | try: | |||
|
648 | targets = self._validate_targets(targets) | |||
|
649 | except: | |||
|
650 | content = wrap_exception() | |||
|
651 | self.session.send(self.clientele, "controller_error", | |||
|
652 | content=content, ident=client_id) | |||
|
653 | return | |||
|
654 | verbose = msg.get('verbose', False) | |||
|
655 | content = dict() | |||
|
656 | for t in targets: | |||
|
657 | queue = self.queues[t] | |||
|
658 | completed = self.completed[t] | |||
|
659 | if not verbose: | |||
|
660 | queue = len(queue) | |||
|
661 | completed = len(completed) | |||
|
662 | content[str(t)] = {'queue': queue, 'completed': completed } | |||
|
663 | # pending | |||
|
664 | self.session.send(self.clientele, "queue_reply", content=content, ident=client_id) | |||
|
665 | ||||
|
666 | def purge_results(self, client_id, msg): | |||
|
667 | content = msg['content'] | |||
|
668 | msg_ids = content.get('msg_ids', []) | |||
|
669 | reply = dict(status='ok') | |||
|
670 | if msg_ids == 'all': | |||
|
671 | self.results = {} | |||
|
672 | else: | |||
|
673 | for msg_id in msg_ids: | |||
|
674 | if msg_id in self.results: | |||
|
675 | self.results.pop(msg_id) | |||
|
676 | else: | |||
|
677 | if msg_id in self.pending: | |||
|
678 | reply = dict(status='error', reason="msg pending: %r"%msg_id) | |||
|
679 | else: | |||
|
680 | reply = dict(status='error', reason="No such msg: %r"%msg_id) | |||
|
681 | break | |||
|
682 | eids = content.get('engine_ids', []) | |||
|
683 | for eid in eids: | |||
|
684 | if eid not in self.engines: | |||
|
685 | reply = dict(status='error', reason="No such engine: %i"%eid) | |||
|
686 | break | |||
|
687 | msg_ids = self.completed.pop(eid) | |||
|
688 | for msg_id in msg_ids: | |||
|
689 | self.results.pop(msg_id) | |||
|
690 | ||||
|
691 | self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id) | |||
|
692 | ||||
|
693 | def resubmit_task(self, client_id, msg, buffers): | |||
|
694 | content = msg['content'] | |||
|
695 | header = msg['header'] | |||
|
696 | ||||
|
697 | ||||
|
698 | msg_ids = content.get('msg_ids', []) | |||
|
699 | reply = dict(status='ok') | |||
|
700 | if msg_ids == 'all': | |||
|
701 | self.results = {} | |||
|
702 | else: | |||
|
703 | for msg_id in msg_ids: | |||
|
704 | if msg_id in self.results: | |||
|
705 | self.results.pop(msg_id) | |||
|
706 | else: | |||
|
707 | if msg_id in self.pending: | |||
|
708 | reply = dict(status='error', reason="msg pending: %r"%msg_id) | |||
|
709 | else: | |||
|
710 | reply = dict(status='error', reason="No such msg: %r"%msg_id) | |||
|
711 | break | |||
|
712 | eids = content.get('engine_ids', []) | |||
|
713 | for eid in eids: | |||
|
714 | if eid not in self.engines: | |||
|
715 | reply = dict(status='error', reason="No such engine: %i"%eid) | |||
|
716 | break | |||
|
717 | msg_ids = self.completed.pop(eid) | |||
|
718 | for msg_id in msg_ids: | |||
|
719 | self.results.pop(msg_id) | |||
|
720 | ||||
|
721 | self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id) | |||
|
722 | ||||
|
723 | def get_results(self, client_id, msg): | |||
|
724 | """get the result of 1 or more messages""" | |||
|
725 | content = msg['content'] | |||
|
726 | msg_ids = set(content['msg_ids']) | |||
|
727 | statusonly = content.get('status_only', False) | |||
|
728 | pending = [] | |||
|
729 | completed = [] | |||
|
730 | content = dict(status='ok') | |||
|
731 | content['pending'] = pending | |||
|
732 | content['completed'] = completed | |||
|
733 | for msg_id in msg_ids: | |||
|
734 | if msg_id in self.pending: | |||
|
735 | pending.append(msg_id) | |||
|
736 | elif msg_id in self.results: | |||
|
737 | completed.append(msg_id) | |||
|
738 | if not statusonly: | |||
|
739 | content[msg_id] = self.results[msg_id]['content'] | |||
|
740 | else: | |||
|
741 | content = dict(status='error') | |||
|
742 | content['reason'] = 'no such message: '+msg_id | |||
|
743 | break | |||
|
744 | self.session.send(self.clientele, "result_reply", content=content, | |||
|
745 | parent=msg, ident=client_id) | |||
|
746 | ||||
|
747 | ||||
|
748 | ||||
|
749 | ############ OLD METHODS for Python Relay Controller ################### | |||
|
750 | def _validate_engine_msg(self, msg): | |||
|
751 | """validates and unpacks headers of a message. Returns False if invalid, | |||
|
752 | (ident, message)""" | |||
|
753 | ident = msg[0] | |||
|
754 | try: | |||
|
755 | msg = self.session.unpack_message(msg[1:], content=False) | |||
|
756 | except: | |||
|
757 | logger.error("engine.%s::Invalid Message %s"%(ident, msg)) | |||
|
758 | return False | |||
|
759 | ||||
|
760 | try: | |||
|
761 | eid = msg.header.username | |||
|
762 | assert self.engines.has_key(eid) | |||
|
763 | except: | |||
|
764 | logger.error("engine::Invalid Engine ID %s"%(ident)) | |||
|
765 | return False | |||
|
766 | ||||
|
767 | return eid, msg | |||
|
768 | ||||
|
769 | ||||
|
770 | No newline at end of file |
@@ -0,0 +1,143 b'' | |||||
|
1 | #!/usr/bin/env python | |||
|
2 | """A simple engine that talks to a controller over 0MQ. | |||
|
3 | it handles registration, etc. and launches a kernel | |||
|
4 | connected to the Controller's queue(s). | |||
|
5 | """ | |||
|
6 | import sys | |||
|
7 | import time | |||
|
8 | import traceback | |||
|
9 | import uuid | |||
|
10 | ||||
|
11 | import zmq | |||
|
12 | from zmq.eventloop import ioloop, zmqstream | |||
|
13 | ||||
|
14 | from streamsession import Message, StreamSession | |||
|
15 | from client import Client | |||
|
16 | import streamkernel as kernel | |||
|
17 | import heartmonitor | |||
|
18 | # import taskthread | |||
|
19 | # from log import logger | |||
|
20 | ||||
|
21 | ||||
|
22 | def printer(*msg): | |||
|
23 | print msg | |||
|
24 | ||||
|
25 | class Engine(object): | |||
|
26 | """IPython engine""" | |||
|
27 | ||||
|
28 | id=None | |||
|
29 | context=None | |||
|
30 | loop=None | |||
|
31 | session=None | |||
|
32 | queue_id=None | |||
|
33 | control_id=None | |||
|
34 | heart_id=None | |||
|
35 | registrar=None | |||
|
36 | heart=None | |||
|
37 | kernel=None | |||
|
38 | ||||
|
39 | def __init__(self, context, loop, session, registrar, client, queue_id=None, heart_id=None): | |||
|
40 | self.context = context | |||
|
41 | self.loop = loop | |||
|
42 | self.session = session | |||
|
43 | self.registrar = registrar | |||
|
44 | self.client = client | |||
|
45 | self.queue_id = queue_id or str(uuid.uuid4()) | |||
|
46 | self.heart_id = heart_id or self.queue_id | |||
|
47 | self.registrar.on_send(printer) | |||
|
48 | ||||
|
49 | def register(self): | |||
|
50 | ||||
|
51 | content = dict(queue=self.queue_id, heartbeat=self.heart_id) | |||
|
52 | self.registrar.on_recv(self.complete_registration) | |||
|
53 | self.session.send(self.registrar, "registration_request",content=content) | |||
|
54 | ||||
|
55 | def complete_registration(self, msg): | |||
|
56 | # print msg | |||
|
57 | idents,msg = self.session.feed_identities(msg) | |||
|
58 | msg = Message(self.session.unpack_message(msg)) | |||
|
59 | if msg.content.status == 'ok': | |||
|
60 | self.session.username = str(msg.content.id) | |||
|
61 | queue_addr = msg.content.queue | |||
|
62 | if queue_addr: | |||
|
63 | queue = self.context.socket(zmq.PAIR) | |||
|
64 | queue.setsockopt(zmq.IDENTITY, self.queue_id) | |||
|
65 | queue.connect(str(queue_addr)) | |||
|
66 | self.queue = zmqstream.ZMQStream(queue, self.loop) | |||
|
67 | ||||
|
68 | control_addr = msg.content.control | |||
|
69 | if control_addr: | |||
|
70 | control = self.context.socket(zmq.PAIR) | |||
|
71 | control.setsockopt(zmq.IDENTITY, self.queue_id) | |||
|
72 | control.connect(str(control_addr)) | |||
|
73 | self.control = zmqstream.ZMQStream(control, self.loop) | |||
|
74 | ||||
|
75 | task_addr = msg.content.task | |||
|
76 | print task_addr | |||
|
77 | if task_addr: | |||
|
78 | # task as stream: | |||
|
79 | task = self.context.socket(zmq.PAIR) | |||
|
80 | task.connect(str(task_addr)) | |||
|
81 | self.task_stream = zmqstream.ZMQStream(task, self.loop) | |||
|
82 | # TaskThread: | |||
|
83 | # mon_addr = msg.content.monitor | |||
|
84 | # task = taskthread.TaskThread(zmq.PAIR, zmq.PUB, self.queue_id) | |||
|
85 | # task.connect_in(str(task_addr)) | |||
|
86 | # task.connect_out(str(mon_addr)) | |||
|
87 | # self.task_stream = taskthread.QueueStream(*task.queues) | |||
|
88 | # task.start() | |||
|
89 | ||||
|
90 | hbs = msg.content.heartbeat | |||
|
91 | self.heart = heartmonitor.Heart(*map(str, hbs), heart_id=self.heart_id) | |||
|
92 | self.heart.start() | |||
|
93 | # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start() | |||
|
94 | # placeholder for now: | |||
|
95 | pub = self.context.socket(zmq.PUB) | |||
|
96 | pub = zmqstream.ZMQStream(pub, self.loop) | |||
|
97 | # create and start the kernel | |||
|
98 | self.kernel = kernel.Kernel(self.session, self.control, self.queue, pub, self.task_stream, self.client) | |||
|
99 | self.kernel.start() | |||
|
100 | else: | |||
|
101 | # logger.error("Registration Failed: %s"%msg) | |||
|
102 | raise Exception("Registration Failed: %s"%msg) | |||
|
103 | ||||
|
104 | # logger.info("engine::completed registration with id %s"%self.session.username) | |||
|
105 | ||||
|
106 | print msg | |||
|
107 | ||||
|
108 | def unregister(self): | |||
|
109 | self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username))) | |||
|
110 | time.sleep(1) | |||
|
111 | sys.exit(0) | |||
|
112 | ||||
|
113 | def start(self): | |||
|
114 | print "registering" | |||
|
115 | self.register() | |||
|
116 | ||||
|
117 | ||||
|
118 | if __name__ == '__main__': | |||
|
119 | ||||
|
120 | loop = ioloop.IOLoop.instance() | |||
|
121 | session = StreamSession() | |||
|
122 | ctx = zmq.Context() | |||
|
123 | ||||
|
124 | ip = '127.0.0.1' | |||
|
125 | reg_port = 10101 | |||
|
126 | connection = ('tcp://%s' % ip) + ':%i' | |||
|
127 | reg_conn = connection % reg_port | |||
|
128 | print reg_conn | |||
|
129 | print >>sys.__stdout__, "Starting the engine..." | |||
|
130 | ||||
|
131 | reg = ctx.socket(zmq.PAIR) | |||
|
132 | reg.connect(reg_conn) | |||
|
133 | reg = zmqstream.ZMQStream(reg, loop) | |||
|
134 | client = Client(reg_conn) | |||
|
135 | if len(sys.argv) > 1: | |||
|
136 | queue_id=sys.argv[1] | |||
|
137 | else: | |||
|
138 | queue_id = None | |||
|
139 | ||||
|
140 | e = Engine(ctx, loop, session, reg, client, queue_id) | |||
|
141 | dc = ioloop.DelayedCallback(e.start, 500, loop) | |||
|
142 | dc.start() | |||
|
143 | loop.start() No newline at end of file |
@@ -0,0 +1,169 b'' | |||||
|
1 | #!/usr/bin/env python | |||
|
2 | """ | |||
|
3 | A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB, | |||
|
4 | and hearts are tracked based on their XREQ identities. | |||
|
5 | """ | |||
|
6 | ||||
|
7 | import time | |||
|
8 | import uuid | |||
|
9 | ||||
|
10 | import zmq | |||
|
11 | from zmq.devices import ProcessDevice | |||
|
12 | from zmq.eventloop import ioloop, zmqstream | |||
|
13 | ||||
|
14 | #internal | |||
|
15 | from IPython.zmq.log import logger | |||
|
16 | ||||
|
17 | class Heart(object): | |||
|
18 | """A basic heart object for responding to a HeartMonitor. | |||
|
19 | This is a simple wrapper with defaults for the most common | |||
|
20 | Device model for responding to heartbeats. | |||
|
21 | ||||
|
22 | It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using | |||
|
23 | SUB/XREQ for in/out. | |||
|
24 | ||||
|
25 | You can specify the XREQ's IDENTITY via the optional heart_id argument.""" | |||
|
26 | device=None | |||
|
27 | id=None | |||
|
28 | def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.XREQ, heart_id=None): | |||
|
29 | self.device = ProcessDevice(zmq.FORWARDER, in_type, out_type) | |||
|
30 | self.device.connect_in(in_addr) | |||
|
31 | self.device.connect_out(out_addr) | |||
|
32 | if in_type == zmq.SUB: | |||
|
33 | self.device.setsockopt_in(zmq.SUBSCRIBE, "") | |||
|
34 | if heart_id is None: | |||
|
35 | heart_id = str(uuid.uuid4()) | |||
|
36 | self.device.setsockopt_out(zmq.IDENTITY, heart_id) | |||
|
37 | self.id = heart_id | |||
|
38 | ||||
|
39 | def start(self): | |||
|
40 | return self.device.start() | |||
|
41 | ||||
|
42 | class HeartMonitor(object): | |||
|
43 | """A basic HeartMonitor class | |||
|
44 | pingstream: a PUB stream | |||
|
45 | pongstream: an XREP stream | |||
|
46 | period: the period of the heartbeat in milliseconds""" | |||
|
47 | loop=None | |||
|
48 | pingstream=None | |||
|
49 | pongstream=None | |||
|
50 | period=None | |||
|
51 | hearts=None | |||
|
52 | on_probation=None | |||
|
53 | last_ping=None | |||
|
54 | ||||
|
55 | def __init__(self, loop, pingstream, pongstream, period=1000): | |||
|
56 | self.loop = loop | |||
|
57 | self.period = period | |||
|
58 | ||||
|
59 | self.pingstream = pingstream | |||
|
60 | self.pongstream = pongstream | |||
|
61 | self.pongstream.on_recv(self.handle_pong) | |||
|
62 | ||||
|
63 | self.hearts = set() | |||
|
64 | self.responses = set() | |||
|
65 | self.on_probation = set() | |||
|
66 | self.lifetime = 0 | |||
|
67 | self.tic = time.time() | |||
|
68 | ||||
|
69 | self._new_handlers = set() | |||
|
70 | self._failure_handlers = set() | |||
|
71 | ||||
|
72 | def start(self): | |||
|
73 | self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop) | |||
|
74 | self.caller.start() | |||
|
75 | ||||
|
76 | def add_new_heart_handler(self, handler): | |||
|
77 | """add a new handler for new hearts""" | |||
|
78 | logger.debug("heartbeat::new_heart_handler: %s"%handler) | |||
|
79 | self._new_handlers.add(handler) | |||
|
80 | ||||
|
81 | def add_heart_failure_handler(self, handler): | |||
|
82 | """add a new handler for heart failure""" | |||
|
83 | logger.debug("heartbeat::new heart failure handler: %s"%handler) | |||
|
84 | self._failure_handlers.add(handler) | |||
|
85 | ||||
|
86 | def _flush(self): | |||
|
87 | """override IOLoop triggers""" | |||
|
88 | while True: | |||
|
89 | try: | |||
|
90 | msg = self.pongstream.socket.recv_multipart(zmq.NOBLOCK) | |||
|
91 | logger.warn("IOLoop triggered beat with incoming heartbeat waiting to be handled") | |||
|
92 | except zmq.ZMQError: | |||
|
93 | return | |||
|
94 | else: | |||
|
95 | self.handle_pong(msg) | |||
|
96 | # print '.' | |||
|
97 | ||||
|
98 | ||||
|
99 | def beat(self): | |||
|
100 | self._flush() | |||
|
101 | self.last_ping = self.lifetime | |||
|
102 | ||||
|
103 | toc = time.time() | |||
|
104 | self.lifetime += toc-self.tic | |||
|
105 | self.tic = toc | |||
|
106 | logger.debug("heartbeat::%s"%self.lifetime) | |||
|
107 | goodhearts = self.hearts.intersection(self.responses) | |||
|
108 | missed_beats = self.hearts.difference(goodhearts) | |||
|
109 | heartfailures = self.on_probation.intersection(missed_beats) | |||
|
110 | newhearts = self.responses.difference(goodhearts) | |||
|
111 | map(self.handle_new_heart, newhearts) | |||
|
112 | map(self.handle_heart_failure, heartfailures) | |||
|
113 | self.on_probation = missed_beats.intersection(self.hearts) | |||
|
114 | self.responses = set() | |||
|
115 | # print self.on_probation, self.hearts | |||
|
116 | # logger.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts))) | |||
|
117 | self.pingstream.send(str(self.lifetime)) | |||
|
118 | ||||
|
119 | def handle_new_heart(self, heart): | |||
|
120 | if self._new_handlers: | |||
|
121 | for handler in self._new_handlers: | |||
|
122 | handler(heart) | |||
|
123 | else: | |||
|
124 | logger.info("heartbeat::yay, got new heart %s!"%heart) | |||
|
125 | self.hearts.add(heart) | |||
|
126 | ||||
|
127 | def handle_heart_failure(self, heart): | |||
|
128 | if self._failure_handlers: | |||
|
129 | for handler in self._failure_handlers: | |||
|
130 | try: | |||
|
131 | handler(heart) | |||
|
132 | except Exception, e: | |||
|
133 | print e | |||
|
134 | logger.error("heartbeat::Bad Handler! %s"%handler) | |||
|
135 | pass | |||
|
136 | else: | |||
|
137 | logger.info("heartbeat::Heart %s failed :("%heart) | |||
|
138 | self.hearts.remove(heart) | |||
|
139 | ||||
|
140 | ||||
|
141 | def handle_pong(self, msg): | |||
|
142 | "a heart just beat" | |||
|
143 | if msg[1] == str(self.lifetime): | |||
|
144 | delta = time.time()-self.tic | |||
|
145 | logger.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta)) | |||
|
146 | self.responses.add(msg[0]) | |||
|
147 | elif msg[1] == str(self.last_ping): | |||
|
148 | delta = time.time()-self.tic + (self.lifetime-self.last_ping) | |||
|
149 | logger.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta)) | |||
|
150 | self.responses.add(msg[0]) | |||
|
151 | else: | |||
|
152 | logger.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"% | |||
|
153 | (msg[1],self.lifetime)) | |||
|
154 | ||||
|
155 | ||||
|
156 | if __name__ == '__main__': | |||
|
157 | loop = ioloop.IOLoop.instance() | |||
|
158 | context = zmq.Context() | |||
|
159 | pub = context.socket(zmq.PUB) | |||
|
160 | pub.bind('tcp://127.0.0.1:5555') | |||
|
161 | xrep = context.socket(zmq.XREP) | |||
|
162 | xrep.bind('tcp://127.0.0.1:5556') | |||
|
163 | ||||
|
164 | outstream = zmqstream.ZMQStream(pub, loop) | |||
|
165 | instream = zmqstream.ZMQStream(xrep, loop) | |||
|
166 | ||||
|
167 | hb = HeartMonitor(loop, outstream, instream) | |||
|
168 | ||||
|
169 | loop.start() |
@@ -0,0 +1,95 b'' | |||||
|
1 | """RemoteNamespace object, for dict style interaction with a remote | |||
|
2 | execution kernel.""" | |||
|
3 | ||||
|
4 | from functools import wraps | |||
|
5 | from IPython.external.decorator import decorator | |||
|
6 | ||||
|
7 | def _clear(): | |||
|
8 | globals().clear() | |||
|
9 | ||||
|
10 | @decorator | |||
|
11 | def spinfirst(f): | |||
|
12 | @wraps(f) | |||
|
13 | def spun_method(self, *args, **kwargs): | |||
|
14 | self.spin() | |||
|
15 | return f(self, *args, **kwargs) | |||
|
16 | return spun_method | |||
|
17 | ||||
|
18 | @decorator | |||
|
19 | def myblock(f, self, *args, **kwargs): | |||
|
20 | block = self.client.block | |||
|
21 | self.client.block = self.block | |||
|
22 | ret = f(self, *args, **kwargs) | |||
|
23 | self.client.block = block | |||
|
24 | return ret | |||
|
25 | ||||
|
26 | class RemoteNamespace(object): | |||
|
27 | """A RemoteNamespace object, providing dictionary | |||
|
28 | access to an engine via an IPython.zmq.client object. | |||
|
29 | ||||
|
30 | ||||
|
31 | """ | |||
|
32 | client = None | |||
|
33 | queue = None | |||
|
34 | id = None | |||
|
35 | block = False | |||
|
36 | ||||
|
37 | def __init__(self, client, id): | |||
|
38 | self.client = client | |||
|
39 | self.id = id | |||
|
40 | self.block = client.block # initial state is same as client | |||
|
41 | ||||
|
42 | def __repr__(self): | |||
|
43 | return "<RemoteNamespace[%i]>"%self.id | |||
|
44 | ||||
|
45 | @myblock | |||
|
46 | def apply(self, f, *args, **kwargs): | |||
|
47 | """call f(*args, **kwargs) in remote namespace | |||
|
48 | ||||
|
49 | This method has no access to the user namespace""" | |||
|
50 | return self.client.apply_to(self.id, f, *args, **kwargs) | |||
|
51 | ||||
|
52 | @myblock | |||
|
53 | def apply_bound(self, f, *args, **kwargs): | |||
|
54 | """call `f(*args, **kwargs)` in remote namespace. | |||
|
55 | ||||
|
56 | `f` will have access to the user namespace as globals().""" | |||
|
57 | return self.client.apply_bound_to(self.id, f, *args, **kwargs) | |||
|
58 | ||||
|
59 | @myblock | |||
|
60 | def update(self, ns): | |||
|
61 | """update remote namespace with dict `ns`""" | |||
|
62 | return self.client.push(self.id, ns, self.block) | |||
|
63 | ||||
|
64 | def get(self, key_s): | |||
|
65 | """get object(s) by `key_s` from remote namespace | |||
|
66 | will return one object if it is a key. | |||
|
67 | It also takes a list of keys, and will return a list of objects.""" | |||
|
68 | return self.client.pull(self.id, key_s, self.block) | |||
|
69 | ||||
|
70 | push = update | |||
|
71 | pull = get | |||
|
72 | ||||
|
73 | def __getitem__(self, key): | |||
|
74 | return self.get(key) | |||
|
75 | ||||
|
76 | def __setitem__(self,key,value): | |||
|
77 | self.update({key:value}) | |||
|
78 | ||||
|
79 | def clear(self): | |||
|
80 | """clear the remote namespace""" | |||
|
81 | return self.client.apply_bound_to(self.id, _clear) | |||
|
82 | ||||
|
83 | @decorator | |||
|
84 | def withme(self, toapply): | |||
|
85 | """for use as a decorator, this turns a function into | |||
|
86 | one that executes remotely.""" | |||
|
87 | @wraps(toapply) | |||
|
88 | def applied(self, *args, **kwargs): | |||
|
89 | return self.apply_bound(self, toapply, *args, **kwargs) | |||
|
90 | return applied | |||
|
91 | ||||
|
92 | ||||
|
93 | ||||
|
94 | ||||
|
95 |
@@ -0,0 +1,482 b'' | |||||
|
1 | #!/usr/bin/env python | |||
|
2 | """ | |||
|
3 | Kernel adapted from kernel.py to use ZMQ Streams | |||
|
4 | """ | |||
|
5 | ||||
|
6 | import __builtin__ | |||
|
7 | import sys | |||
|
8 | import time | |||
|
9 | import traceback | |||
|
10 | from signal import SIGTERM, SIGKILL | |||
|
11 | ||||
|
12 | from code import CommandCompiler | |||
|
13 | ||||
|
14 | import zmq | |||
|
15 | from zmq.eventloop import ioloop, zmqstream | |||
|
16 | ||||
|
17 | from streamsession import StreamSession, Message, extract_header, serialize_object,\ | |||
|
18 | unpack_apply_message | |||
|
19 | from IPython.zmq.completer import KernelCompleter | |||
|
20 | ||||
|
21 | class OutStream(object): | |||
|
22 | """A file like object that publishes the stream to a 0MQ PUB socket.""" | |||
|
23 | ||||
|
24 | def __init__(self, session, pub_socket, name, max_buffer=200): | |||
|
25 | self.session = session | |||
|
26 | self.pub_socket = pub_socket | |||
|
27 | self.name = name | |||
|
28 | self._buffer = [] | |||
|
29 | self._buffer_len = 0 | |||
|
30 | self.max_buffer = max_buffer | |||
|
31 | self.parent_header = {} | |||
|
32 | ||||
|
33 | def set_parent(self, parent): | |||
|
34 | self.parent_header = extract_header(parent) | |||
|
35 | ||||
|
36 | def close(self): | |||
|
37 | self.pub_socket = None | |||
|
38 | ||||
|
39 | def flush(self): | |||
|
40 | if self.pub_socket is None: | |||
|
41 | raise ValueError(u'I/O operation on closed file') | |||
|
42 | else: | |||
|
43 | if self._buffer: | |||
|
44 | data = ''.join(self._buffer) | |||
|
45 | content = {u'name':self.name, u'data':data} | |||
|
46 | # msg = self.session.msg(u'stream', content=content, | |||
|
47 | # parent=self.parent_header) | |||
|
48 | msg = self.session.send(self.pub_socket, u'stream', content=content, parent=self.parent_header) | |||
|
49 | # print>>sys.__stdout__, Message(msg) | |||
|
50 | # self.pub_socket.send_json(msg) | |||
|
51 | self._buffer_len = 0 | |||
|
52 | self._buffer = [] | |||
|
53 | ||||
|
54 | def isattr(self): | |||
|
55 | return False | |||
|
56 | ||||
|
57 | def next(self): | |||
|
58 | raise IOError('Read not supported on a write only stream.') | |||
|
59 | ||||
|
60 | def read(self, size=None): | |||
|
61 | raise IOError('Read not supported on a write only stream.') | |||
|
62 | ||||
|
63 | readline=read | |||
|
64 | ||||
|
65 | def write(self, s): | |||
|
66 | if self.pub_socket is None: | |||
|
67 | raise ValueError('I/O operation on closed file') | |||
|
68 | else: | |||
|
69 | self._buffer.append(s) | |||
|
70 | self._buffer_len += len(s) | |||
|
71 | self._maybe_send() | |||
|
72 | ||||
|
73 | def _maybe_send(self): | |||
|
74 | if '\n' in self._buffer[-1]: | |||
|
75 | self.flush() | |||
|
76 | if self._buffer_len > self.max_buffer: | |||
|
77 | self.flush() | |||
|
78 | ||||
|
79 | def writelines(self, sequence): | |||
|
80 | if self.pub_socket is None: | |||
|
81 | raise ValueError('I/O operation on closed file') | |||
|
82 | else: | |||
|
83 | for s in sequence: | |||
|
84 | self.write(s) | |||
|
85 | ||||
|
86 | ||||
|
87 | class DisplayHook(object): | |||
|
88 | ||||
|
89 | def __init__(self, session, pub_socket): | |||
|
90 | self.session = session | |||
|
91 | self.pub_socket = pub_socket | |||
|
92 | self.parent_header = {} | |||
|
93 | ||||
|
94 | def __call__(self, obj): | |||
|
95 | if obj is None: | |||
|
96 | return | |||
|
97 | ||||
|
98 | __builtin__._ = obj | |||
|
99 | # msg = self.session.msg(u'pyout', {u'data':repr(obj)}, | |||
|
100 | # parent=self.parent_header) | |||
|
101 | # self.pub_socket.send_json(msg) | |||
|
102 | self.session.send(self.pub_socket, u'pyout', content={u'data':repr(obj)}, parent=self.parent_header) | |||
|
103 | ||||
|
104 | def set_parent(self, parent): | |||
|
105 | self.parent_header = extract_header(parent) | |||
|
106 | ||||
|
107 | ||||
|
108 | class RawInput(object): | |||
|
109 | ||||
|
110 | def __init__(self, session, socket): | |||
|
111 | self.session = session | |||
|
112 | self.socket = socket | |||
|
113 | ||||
|
114 | def __call__(self, prompt=None): | |||
|
115 | msg = self.session.msg(u'raw_input') | |||
|
116 | self.socket.send_json(msg) | |||
|
117 | while True: | |||
|
118 | try: | |||
|
119 | reply = self.socket.recv_json(zmq.NOBLOCK) | |||
|
120 | except zmq.ZMQError, e: | |||
|
121 | if e.errno == zmq.EAGAIN: | |||
|
122 | pass | |||
|
123 | else: | |||
|
124 | raise | |||
|
125 | else: | |||
|
126 | break | |||
|
127 | return reply[u'content'][u'data'] | |||
|
128 | ||||
|
129 | ||||
|
130 | class Kernel(object): | |||
|
131 | ||||
|
132 | def __init__(self, session, control_stream, reply_stream, pub_stream, | |||
|
133 | task_stream=None, client=None): | |||
|
134 | self.session = session | |||
|
135 | self.control_stream = control_stream | |||
|
136 | self.reply_stream = reply_stream | |||
|
137 | self.task_stream = task_stream | |||
|
138 | self.pub_stream = pub_stream | |||
|
139 | self.client = client | |||
|
140 | self.user_ns = {} | |||
|
141 | self.history = [] | |||
|
142 | self.compiler = CommandCompiler() | |||
|
143 | self.completer = KernelCompleter(self.user_ns) | |||
|
144 | self.aborted = set() | |||
|
145 | ||||
|
146 | # Build dict of handlers for message types | |||
|
147 | self.queue_handlers = {} | |||
|
148 | self.control_handlers = {} | |||
|
149 | for msg_type in ['execute_request', 'complete_request', 'apply_request']: | |||
|
150 | self.queue_handlers[msg_type] = getattr(self, msg_type) | |||
|
151 | ||||
|
152 | for msg_type in ['kill_request', 'abort_request']: | |||
|
153 | self.control_handlers[msg_type] = getattr(self, msg_type) | |||
|
154 | ||||
|
155 | #-------------------- control handlers ----------------------------- | |||
|
156 | ||||
|
157 | def abort_queue(self, stream): | |||
|
158 | while True: | |||
|
159 | try: | |||
|
160 | msg = self.session.recv(stream, zmq.NOBLOCK,content=True) | |||
|
161 | except zmq.ZMQError, e: | |||
|
162 | if e.errno == zmq.EAGAIN: | |||
|
163 | break | |||
|
164 | else: | |||
|
165 | return | |||
|
166 | else: | |||
|
167 | if msg is None: | |||
|
168 | return | |||
|
169 | else: | |||
|
170 | idents,msg = msg | |||
|
171 | ||||
|
172 | # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part." | |||
|
173 | # msg = self.reply_socket.recv_json() | |||
|
174 | print>>sys.__stdout__, "Aborting:" | |||
|
175 | print>>sys.__stdout__, Message(msg) | |||
|
176 | msg_type = msg['msg_type'] | |||
|
177 | reply_type = msg_type.split('_')[0] + '_reply' | |||
|
178 | # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg) | |||
|
179 | # self.reply_socket.send(ident,zmq.SNDMORE) | |||
|
180 | # self.reply_socket.send_json(reply_msg) | |||
|
181 | reply_msg = self.session.send(stream, reply_type, | |||
|
182 | content={'status' : 'aborted'}, parent=msg, ident=idents) | |||
|
183 | print>>sys.__stdout__, Message(reply_msg) | |||
|
184 | # We need to wait a bit for requests to come in. This can probably | |||
|
185 | # be set shorter for true asynchronous clients. | |||
|
186 | time.sleep(0.05) | |||
|
187 | ||||
|
188 | def abort_request(self, stream, ident, parent): | |||
|
189 | msg_ids = parent['content'].get('msg_ids', None) | |||
|
190 | if not msg_ids: | |||
|
191 | self.abort_queue(self.task_stream) | |||
|
192 | self.abort_queue(self.reply_stream) | |||
|
193 | for mid in msg_ids: | |||
|
194 | self.aborted.add(mid) | |||
|
195 | ||||
|
196 | content = dict(status='ok') | |||
|
197 | self.session.send(stream, 'abort_reply', content=content, parent=parent, | |||
|
198 | ident=ident) | |||
|
199 | ||||
|
200 | def kill_request(self, stream, idents, parent): | |||
|
201 | self.abort_queue(self.reply_stream) | |||
|
202 | if self.task_stream: | |||
|
203 | self.abort_queue(self.task_stream) | |||
|
204 | msg = self.session.send(stream, 'kill_reply', ident=idents, parent=parent, | |||
|
205 | content = dict(status='ok')) | |||
|
206 | # we can know that a message is done if we *don't* use streams, but | |||
|
207 | # use a socket directly with MessageTracker | |||
|
208 | time.sleep(1) | |||
|
209 | os.kill(os.getpid(), SIGTERM) | |||
|
210 | time.sleep(.25) | |||
|
211 | os.kill(os.getpid(), SIGKILL) | |||
|
212 | ||||
|
213 | def dispatch_control(self, msg): | |||
|
214 | idents,msg = self.session.feed_identities(msg, copy=False) | |||
|
215 | msg = self.session.unpack_message(msg, content=True, copy=False) | |||
|
216 | ||||
|
217 | header = msg['header'] | |||
|
218 | msg_id = header['msg_id'] | |||
|
219 | ||||
|
220 | handler = self.control_handlers.get(msg['msg_type'], None) | |||
|
221 | if handler is None: | |||
|
222 | print >> sys.__stderr__, "UNKNOWN CONTROL MESSAGE TYPE:", msg | |||
|
223 | else: | |||
|
224 | handler(stream, idents, msg) | |||
|
225 | ||||
|
226 | def flush_control(self): | |||
|
227 | while any(zmq.select([self.control_socket],[],[],1e-4)): | |||
|
228 | try: | |||
|
229 | msg = self.control_socket.recv_multipart(zmq.NOBLOCK, copy=False) | |||
|
230 | except zmq.ZMQError, e: | |||
|
231 | if e.errno != zmq.EAGAIN: | |||
|
232 | raise e | |||
|
233 | return | |||
|
234 | else: | |||
|
235 | self.dispatch_control(msg) | |||
|
236 | ||||
|
237 | ||||
|
238 | #-------------------- queue helpers ------------------------------ | |||
|
239 | ||||
|
240 | def check_dependencies(self, dependencies): | |||
|
241 | if not dependencies: | |||
|
242 | return True | |||
|
243 | if len(dependencies) == 2 and dependencies[0] in 'any all'.split(): | |||
|
244 | anyorall = dependencies[0] | |||
|
245 | dependencies = dependencies[1] | |||
|
246 | else: | |||
|
247 | anyorall = 'all' | |||
|
248 | results = self.client.get_results(dependencies,status_only=True) | |||
|
249 | if results['status'] != 'ok': | |||
|
250 | return False | |||
|
251 | ||||
|
252 | if anyorall == 'any': | |||
|
253 | if not results['completed']: | |||
|
254 | return False | |||
|
255 | else: | |||
|
256 | if results['pending']: | |||
|
257 | return False | |||
|
258 | ||||
|
259 | return True | |||
|
260 | ||||
|
261 | #-------------------- queue handlers ----------------------------- | |||
|
262 | ||||
|
263 | def execute_request(self, stream, ident, parent): | |||
|
264 | try: | |||
|
265 | code = parent[u'content'][u'code'] | |||
|
266 | except: | |||
|
267 | print>>sys.__stderr__, "Got bad msg: " | |||
|
268 | print>>sys.__stderr__, Message(parent) | |||
|
269 | return | |||
|
270 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) | |||
|
271 | # self.pub_stream.send(pyin_msg) | |||
|
272 | self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent) | |||
|
273 | try: | |||
|
274 | comp_code = self.compiler(code, '<zmq-kernel>') | |||
|
275 | # allow for not overriding displayhook | |||
|
276 | if hasattr(sys.displayhook, 'set_parent'): | |||
|
277 | sys.displayhook.set_parent(parent) | |||
|
278 | exec comp_code in self.user_ns, self.user_ns | |||
|
279 | except: | |||
|
280 | # result = u'error' | |||
|
281 | etype, evalue, tb = sys.exc_info() | |||
|
282 | tb = traceback.format_exception(etype, evalue, tb) | |||
|
283 | exc_content = { | |||
|
284 | u'status' : u'error', | |||
|
285 | u'traceback' : tb, | |||
|
286 | u'etype' : unicode(etype), | |||
|
287 | u'evalue' : unicode(evalue) | |||
|
288 | } | |||
|
289 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) | |||
|
290 | self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent) | |||
|
291 | reply_content = exc_content | |||
|
292 | else: | |||
|
293 | reply_content = {'status' : 'ok'} | |||
|
294 | # reply_msg = self.session.msg(u'execute_reply', reply_content, parent) | |||
|
295 | # self.reply_socket.send(ident, zmq.SNDMORE) | |||
|
296 | # self.reply_socket.send_json(reply_msg) | |||
|
297 | reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, ident=ident) | |||
|
298 | # print>>sys.__stdout__, Message(reply_msg) | |||
|
299 | if reply_msg['content']['status'] == u'error': | |||
|
300 | self.abort_queue() | |||
|
301 | ||||
|
302 | def complete_request(self, stream, ident, parent): | |||
|
303 | matches = {'matches' : self.complete(parent), | |||
|
304 | 'status' : 'ok'} | |||
|
305 | completion_msg = self.session.send(stream, 'complete_reply', | |||
|
306 | matches, parent, ident) | |||
|
307 | # print >> sys.__stdout__, completion_msg | |||
|
308 | ||||
|
309 | def complete(self, msg): | |||
|
310 | return self.completer.complete(msg.content.line, msg.content.text) | |||
|
311 | ||||
|
312 | def apply_request(self, stream, ident, parent): | |||
|
313 | try: | |||
|
314 | content = parent[u'content'] | |||
|
315 | bufs = parent[u'buffers'] | |||
|
316 | msg_id = parent['header']['msg_id'] | |||
|
317 | bound = content.get('bound', False) | |||
|
318 | except: | |||
|
319 | print>>sys.__stderr__, "Got bad msg: " | |||
|
320 | print>>sys.__stderr__, Message(parent) | |||
|
321 | return | |||
|
322 | # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) | |||
|
323 | # self.pub_stream.send(pyin_msg) | |||
|
324 | # self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent) | |||
|
325 | try: | |||
|
326 | # allow for not overriding displayhook | |||
|
327 | if hasattr(sys.displayhook, 'set_parent'): | |||
|
328 | sys.displayhook.set_parent(parent) | |||
|
329 | # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns | |||
|
330 | if bound: | |||
|
331 | working = self.user_ns | |||
|
332 | suffix = str(msg_id).replace("-","") | |||
|
333 | prefix = "_" | |||
|
334 | ||||
|
335 | else: | |||
|
336 | working = dict() | |||
|
337 | suffix = prefix = "" | |||
|
338 | f,args,kwargs = unpack_apply_message(bufs, working, copy=False) | |||
|
339 | # if f.fun | |||
|
340 | fname = prefix+f.func_name.strip('<>')+suffix | |||
|
341 | argname = prefix+"args"+suffix | |||
|
342 | kwargname = prefix+"kwargs"+suffix | |||
|
343 | resultname = prefix+"result"+suffix | |||
|
344 | ||||
|
345 | ns = { fname : f, argname : args, kwargname : kwargs } | |||
|
346 | # print ns | |||
|
347 | working.update(ns) | |||
|
348 | code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname) | |||
|
349 | exec code in working, working | |||
|
350 | result = working.get(resultname) | |||
|
351 | # clear the namespace | |||
|
352 | if bound: | |||
|
353 | for key in ns.iterkeys(): | |||
|
354 | self.user_ns.pop(key) | |||
|
355 | else: | |||
|
356 | del working | |||
|
357 | ||||
|
358 | packed_result,buf = serialize_object(result) | |||
|
359 | result_buf = [packed_result]+buf | |||
|
360 | except: | |||
|
361 | result = u'error' | |||
|
362 | etype, evalue, tb = sys.exc_info() | |||
|
363 | tb = traceback.format_exception(etype, evalue, tb) | |||
|
364 | exc_content = { | |||
|
365 | u'status' : u'error', | |||
|
366 | u'traceback' : tb, | |||
|
367 | u'etype' : unicode(etype), | |||
|
368 | u'evalue' : unicode(evalue) | |||
|
369 | } | |||
|
370 | # exc_msg = self.session.msg(u'pyerr', exc_content, parent) | |||
|
371 | self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent) | |||
|
372 | reply_content = exc_content | |||
|
373 | result_buf = [] | |||
|
374 | else: | |||
|
375 | reply_content = {'status' : 'ok'} | |||
|
376 | # reply_msg = self.session.msg(u'execute_reply', reply_content, parent) | |||
|
377 | # self.reply_socket.send(ident, zmq.SNDMORE) | |||
|
378 | # self.reply_socket.send_json(reply_msg) | |||
|
379 | reply_msg = self.session.send(stream, u'apply_reply', reply_content, parent=parent, ident=ident,buffers=result_buf) | |||
|
380 | # print>>sys.__stdout__, Message(reply_msg) | |||
|
381 | if reply_msg['content']['status'] == u'error': | |||
|
382 | self.abort_queue() | |||
|
383 | ||||
|
384 | def dispatch_queue(self, stream, msg): | |||
|
385 | self.flush_control() | |||
|
386 | idents,msg = self.session.feed_identities(msg, copy=False) | |||
|
387 | msg = self.session.unpack_message(msg, content=True, copy=False) | |||
|
388 | ||||
|
389 | header = msg['header'] | |||
|
390 | msg_id = header['msg_id'] | |||
|
391 | dependencies = header.get('dependencies', []) | |||
|
392 | ||||
|
393 | if self.check_aborted(msg_id): | |||
|
394 | return self.abort_reply(stream, msg) | |||
|
395 | if not self.check_dependencies(dependencies): | |||
|
396 | return self.unmet_dependencies(stream, msg) | |||
|
397 | ||||
|
398 | handler = self.queue_handlers.get(msg['msg_type'], None) | |||
|
399 | if handler is None: | |||
|
400 | print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", msg | |||
|
401 | else: | |||
|
402 | handler(stream, idents, msg) | |||
|
403 | ||||
|
404 | def start(self): | |||
|
405 | #### stream mode: | |||
|
406 | if self.control_stream: | |||
|
407 | self.control_stream.on_recv(self.dispatch_control, copy=False) | |||
|
408 | if self.reply_stream: | |||
|
409 | self.reply_stream.on_recv(lambda msg: | |||
|
410 | self.dispatch_queue(self.reply_stream, msg), copy=False) | |||
|
411 | if self.task_stream: | |||
|
412 | self.task_stream.on_recv(lambda msg: | |||
|
413 | self.dispatch_queue(self.task_stream, msg), copy=False) | |||
|
414 | ||||
|
415 | #### while True mode: | |||
|
416 | # while True: | |||
|
417 | # idle = True | |||
|
418 | # try: | |||
|
419 | # msg = self.reply_stream.socket.recv_multipart( | |||
|
420 | # zmq.NOBLOCK, copy=False) | |||
|
421 | # except zmq.ZMQError, e: | |||
|
422 | # if e.errno != zmq.EAGAIN: | |||
|
423 | # raise e | |||
|
424 | # else: | |||
|
425 | # idle=False | |||
|
426 | # self.dispatch_queue(self.reply_stream, msg) | |||
|
427 | # | |||
|
428 | # if not self.task_stream.empty(): | |||
|
429 | # idle=False | |||
|
430 | # msg = self.task_stream.recv_multipart() | |||
|
431 | # self.dispatch_queue(self.task_stream, msg) | |||
|
432 | # if idle: | |||
|
433 | # # don't busywait | |||
|
434 | # time.sleep(1e-3) | |||
|
435 | ||||
|
436 | ||||
|
437 | def main(): | |||
|
438 | raise Exception("Don't run me anymore") | |||
|
439 | loop = ioloop.IOLoop.instance() | |||
|
440 | c = zmq.Context() | |||
|
441 | ||||
|
442 | ip = '127.0.0.1' | |||
|
443 | port_base = 5575 | |||
|
444 | connection = ('tcp://%s' % ip) + ':%i' | |||
|
445 | rep_conn = connection % port_base | |||
|
446 | pub_conn = connection % (port_base+1) | |||
|
447 | ||||
|
448 | print >>sys.__stdout__, "Starting the kernel..." | |||
|
449 | # print >>sys.__stdout__, "XREQ Channel:", rep_conn | |||
|
450 | # print >>sys.__stdout__, "PUB Channel:", pub_conn | |||
|
451 | ||||
|
452 | session = StreamSession(username=u'kernel') | |||
|
453 | ||||
|
454 | reply_socket = c.socket(zmq.XREQ) | |||
|
455 | reply_socket.connect(rep_conn) | |||
|
456 | ||||
|
457 | pub_socket = c.socket(zmq.PUB) | |||
|
458 | pub_socket.connect(pub_conn) | |||
|
459 | ||||
|
460 | stdout = OutStream(session, pub_socket, u'stdout') | |||
|
461 | stderr = OutStream(session, pub_socket, u'stderr') | |||
|
462 | sys.stdout = stdout | |||
|
463 | sys.stderr = stderr | |||
|
464 | ||||
|
465 | display_hook = DisplayHook(session, pub_socket) | |||
|
466 | sys.displayhook = display_hook | |||
|
467 | reply_stream = zmqstream.ZMQStream(reply_socket,loop) | |||
|
468 | pub_stream = zmqstream.ZMQStream(pub_socket,loop) | |||
|
469 | kernel = Kernel(session, reply_stream, pub_stream) | |||
|
470 | ||||
|
471 | # For debugging convenience, put sleep and a string in the namespace, so we | |||
|
472 | # have them every time we start. | |||
|
473 | kernel.user_ns['sleep'] = time.sleep | |||
|
474 | kernel.user_ns['s'] = 'Test string' | |||
|
475 | ||||
|
476 | print >>sys.__stdout__, "Use Ctrl-\\ (NOT Ctrl-C!) to terminate." | |||
|
477 | kernel.start() | |||
|
478 | loop.start() | |||
|
479 | ||||
|
480 | ||||
|
481 | if __name__ == '__main__': | |||
|
482 | main() |
@@ -0,0 +1,443 b'' | |||||
|
1 | #!/usr/bin/env python | |||
|
2 | """edited session.py to work with streams, and move msg_type to the header | |||
|
3 | """ | |||
|
4 | ||||
|
5 | ||||
|
6 | import os | |||
|
7 | import sys | |||
|
8 | import traceback | |||
|
9 | import pprint | |||
|
10 | import uuid | |||
|
11 | ||||
|
12 | import zmq | |||
|
13 | from zmq.utils import jsonapi | |||
|
14 | from zmq.eventloop.zmqstream import ZMQStream | |||
|
15 | ||||
|
16 | from IPython.zmq.pickleutil import can, uncan, canSequence, uncanSequence | |||
|
17 | from IPython.zmq.newserialized import serialize, unserialize | |||
|
18 | ||||
|
19 | try: | |||
|
20 | import cPickle | |||
|
21 | pickle = cPickle | |||
|
22 | except: | |||
|
23 | cPickle = None | |||
|
24 | import pickle | |||
|
25 | ||||
|
26 | # packer priority: jsonlib[2], cPickle, simplejson/json, pickle | |||
|
27 | json_name = '' if not jsonapi.jsonmod else jsonapi.jsonmod.__name__ | |||
|
28 | if json_name in ('jsonlib', 'jsonlib2'): | |||
|
29 | use_json = True | |||
|
30 | elif json_name: | |||
|
31 | if cPickle is None: | |||
|
32 | use_json = True | |||
|
33 | else: | |||
|
34 | use_json = False | |||
|
35 | else: | |||
|
36 | use_json = False | |||
|
37 | ||||
|
38 | if use_json: | |||
|
39 | default_packer = jsonapi.dumps | |||
|
40 | default_unpacker = jsonapi.loads | |||
|
41 | else: | |||
|
42 | default_packer = lambda o: pickle.dumps(o,-1) | |||
|
43 | default_unpacker = pickle.loads | |||
|
44 | ||||
|
45 | ||||
|
46 | DELIM="<IDS|MSG>" | |||
|
47 | ||||
|
48 | def wrap_exception(): | |||
|
49 | etype, evalue, tb = sys.exc_info() | |||
|
50 | tb = traceback.format_exception(etype, evalue, tb) | |||
|
51 | exc_content = { | |||
|
52 | u'status' : u'error', | |||
|
53 | u'traceback' : tb, | |||
|
54 | u'etype' : unicode(etype), | |||
|
55 | u'evalue' : unicode(evalue) | |||
|
56 | } | |||
|
57 | return exc_content | |||
|
58 | ||||
|
59 | class KernelError(Exception): | |||
|
60 | pass | |||
|
61 | ||||
|
62 | def unwrap_exception(content): | |||
|
63 | err = KernelError(content['etype'], content['evalue']) | |||
|
64 | err.evalue = content['evalue'] | |||
|
65 | err.etype = content['etype'] | |||
|
66 | err.traceback = ''.join(content['traceback']) | |||
|
67 | return err | |||
|
68 | ||||
|
69 | ||||
|
70 | class Message(object): | |||
|
71 | """A simple message object that maps dict keys to attributes. | |||
|
72 | ||||
|
73 | A Message can be created from a dict and a dict from a Message instance | |||
|
74 | simply by calling dict(msg_obj).""" | |||
|
75 | ||||
|
76 | def __init__(self, msg_dict): | |||
|
77 | dct = self.__dict__ | |||
|
78 | for k, v in dict(msg_dict).iteritems(): | |||
|
79 | if isinstance(v, dict): | |||
|
80 | v = Message(v) | |||
|
81 | dct[k] = v | |||
|
82 | ||||
|
83 | # Having this iterator lets dict(msg_obj) work out of the box. | |||
|
84 | def __iter__(self): | |||
|
85 | return iter(self.__dict__.iteritems()) | |||
|
86 | ||||
|
87 | def __repr__(self): | |||
|
88 | return repr(self.__dict__) | |||
|
89 | ||||
|
90 | def __str__(self): | |||
|
91 | return pprint.pformat(self.__dict__) | |||
|
92 | ||||
|
93 | def __contains__(self, k): | |||
|
94 | return k in self.__dict__ | |||
|
95 | ||||
|
96 | def __getitem__(self, k): | |||
|
97 | return self.__dict__[k] | |||
|
98 | ||||
|
99 | ||||
|
100 | def msg_header(msg_id, msg_type, username, session): | |||
|
101 | return locals() | |||
|
102 | # return { | |||
|
103 | # 'msg_id' : msg_id, | |||
|
104 | # 'msg_type': msg_type, | |||
|
105 | # 'username' : username, | |||
|
106 | # 'session' : session | |||
|
107 | # } | |||
|
108 | ||||
|
109 | ||||
|
110 | def extract_header(msg_or_header): | |||
|
111 | """Given a message or header, return the header.""" | |||
|
112 | if not msg_or_header: | |||
|
113 | return {} | |||
|
114 | try: | |||
|
115 | # See if msg_or_header is the entire message. | |||
|
116 | h = msg_or_header['header'] | |||
|
117 | except KeyError: | |||
|
118 | try: | |||
|
119 | # See if msg_or_header is just the header | |||
|
120 | h = msg_or_header['msg_id'] | |||
|
121 | except KeyError: | |||
|
122 | raise | |||
|
123 | else: | |||
|
124 | h = msg_or_header | |||
|
125 | if not isinstance(h, dict): | |||
|
126 | h = dict(h) | |||
|
127 | return h | |||
|
128 | ||||
|
129 | def rekey(dikt): | |||
|
130 | """rekey a dict that has been forced to use str keys where there should be | |||
|
131 | ints by json. This belongs in the jsonutil added by fperez.""" | |||
|
132 | for k in dikt.iterkeys(): | |||
|
133 | if isinstance(k, str): | |||
|
134 | ik=fk=None | |||
|
135 | try: | |||
|
136 | ik = int(k) | |||
|
137 | except ValueError: | |||
|
138 | try: | |||
|
139 | fk = float(k) | |||
|
140 | except ValueError: | |||
|
141 | continue | |||
|
142 | if ik is not None: | |||
|
143 | nk = ik | |||
|
144 | else: | |||
|
145 | nk = fk | |||
|
146 | if nk in dikt: | |||
|
147 | raise KeyError("already have key %r"%nk) | |||
|
148 | dikt[nk] = dikt.pop(k) | |||
|
149 | return dikt | |||
|
150 | ||||
|
151 | def serialize_object(obj, threshold=64e-6): | |||
|
152 | """serialize an object into a list of sendable buffers. | |||
|
153 | ||||
|
154 | Returns: (pmd, bufs) | |||
|
155 | where pmd is the pickled metadata wrapper, and bufs | |||
|
156 | is a list of data buffers""" | |||
|
157 | # threshold is 100 B | |||
|
158 | databuffers = [] | |||
|
159 | if isinstance(obj, (list, tuple)): | |||
|
160 | clist = canSequence(obj) | |||
|
161 | slist = map(serialize, clist) | |||
|
162 | for s in slist: | |||
|
163 | if s.getDataSize() > threshold: | |||
|
164 | databuffers.append(s.getData()) | |||
|
165 | s.data = None | |||
|
166 | return pickle.dumps(slist,-1), databuffers | |||
|
167 | elif isinstance(obj, dict): | |||
|
168 | sobj = {} | |||
|
169 | for k in sorted(obj.iterkeys()): | |||
|
170 | s = serialize(can(obj[k])) | |||
|
171 | if s.getDataSize() > threshold: | |||
|
172 | databuffers.append(s.getData()) | |||
|
173 | s.data = None | |||
|
174 | sobj[k] = s | |||
|
175 | return pickle.dumps(sobj,-1),databuffers | |||
|
176 | else: | |||
|
177 | s = serialize(can(obj)) | |||
|
178 | if s.getDataSize() > threshold: | |||
|
179 | databuffers.append(s.getData()) | |||
|
180 | s.data = None | |||
|
181 | return pickle.dumps(s,-1),databuffers | |||
|
182 | ||||
|
183 | ||||
|
184 | def unserialize_object(bufs): | |||
|
185 | """reconstruct an object serialized by serialize_object from data buffers""" | |||
|
186 | bufs = list(bufs) | |||
|
187 | sobj = pickle.loads(bufs.pop(0)) | |||
|
188 | if isinstance(sobj, (list, tuple)): | |||
|
189 | for s in sobj: | |||
|
190 | if s.data is None: | |||
|
191 | s.data = bufs.pop(0) | |||
|
192 | return uncanSequence(map(unserialize, sobj)) | |||
|
193 | elif isinstance(sobj, dict): | |||
|
194 | newobj = {} | |||
|
195 | for k in sorted(sobj.iterkeys()): | |||
|
196 | s = sobj[k] | |||
|
197 | if s.data is None: | |||
|
198 | s.data = bufs.pop(0) | |||
|
199 | newobj[k] = uncan(unserialize(s)) | |||
|
200 | return newobj | |||
|
201 | else: | |||
|
202 | if sobj.data is None: | |||
|
203 | sobj.data = bufs.pop(0) | |||
|
204 | return uncan(unserialize(sobj)) | |||
|
205 | ||||
|
206 | def pack_apply_message(f, args, kwargs, threshold=64e-6): | |||
|
207 | """pack up a function, args, and kwargs to be sent over the wire | |||
|
208 | as a series of buffers. Any object whose data is larger than `threshold` | |||
|
209 | will not have their data copied (currently only numpy arrays support zero-copy)""" | |||
|
210 | msg = [pickle.dumps(can(f),-1)] | |||
|
211 | databuffers = [] # for large objects | |||
|
212 | sargs, bufs = serialize_object(args,threshold) | |||
|
213 | msg.append(sargs) | |||
|
214 | databuffers.extend(bufs) | |||
|
215 | skwargs, bufs = serialize_object(kwargs,threshold) | |||
|
216 | msg.append(skwargs) | |||
|
217 | databuffers.extend(bufs) | |||
|
218 | msg.extend(databuffers) | |||
|
219 | return msg | |||
|
220 | ||||
|
221 | def unpack_apply_message(bufs, g=None, copy=True): | |||
|
222 | """unpack f,args,kwargs from buffers packed by pack_apply_message() | |||
|
223 | Returns: original f,args,kwargs""" | |||
|
224 | bufs = list(bufs) # allow us to pop | |||
|
225 | assert len(bufs) >= 3, "not enough buffers!" | |||
|
226 | if not copy: | |||
|
227 | for i in range(3): | |||
|
228 | bufs[i] = bufs[i].bytes | |||
|
229 | cf = pickle.loads(bufs.pop(0)) | |||
|
230 | sargs = list(pickle.loads(bufs.pop(0))) | |||
|
231 | skwargs = dict(pickle.loads(bufs.pop(0))) | |||
|
232 | # print sargs, skwargs | |||
|
233 | f = cf.getFunction(g) | |||
|
234 | for sa in sargs: | |||
|
235 | if sa.data is None: | |||
|
236 | m = bufs.pop(0) | |||
|
237 | if sa.getTypeDescriptor() in ('buffer', 'ndarray'): | |||
|
238 | if copy: | |||
|
239 | sa.data = buffer(m) | |||
|
240 | else: | |||
|
241 | sa.data = m.buffer | |||
|
242 | else: | |||
|
243 | if copy: | |||
|
244 | sa.data = m | |||
|
245 | else: | |||
|
246 | sa.data = m.bytes | |||
|
247 | ||||
|
248 | args = uncanSequence(map(unserialize, sargs), g) | |||
|
249 | kwargs = {} | |||
|
250 | for k in sorted(skwargs.iterkeys()): | |||
|
251 | sa = skwargs[k] | |||
|
252 | if sa.data is None: | |||
|
253 | sa.data = bufs.pop(0) | |||
|
254 | kwargs[k] = uncan(unserialize(sa), g) | |||
|
255 | ||||
|
256 | return f,args,kwargs | |||
|
257 | ||||
|
258 | class StreamSession(object): | |||
|
259 | """tweaked version of IPython.zmq.session.Session, for development in Parallel""" | |||
|
260 | ||||
|
261 | def __init__(self, username=None, session=None, packer=None, unpacker=None): | |||
|
262 | if username is None: | |||
|
263 | username = os.environ.get('USER','username') | |||
|
264 | self.username = username | |||
|
265 | if session is None: | |||
|
266 | self.session = str(uuid.uuid4()) | |||
|
267 | else: | |||
|
268 | self.session = session | |||
|
269 | self.msg_id = str(uuid.uuid4()) | |||
|
270 | if packer is None: | |||
|
271 | self.pack = default_packer | |||
|
272 | else: | |||
|
273 | if not callable(packer): | |||
|
274 | raise TypeError("packer must be callable, not %s"%type(packer)) | |||
|
275 | self.pack = packer | |||
|
276 | ||||
|
277 | if unpacker is None: | |||
|
278 | self.unpack = default_unpacker | |||
|
279 | else: | |||
|
280 | if not callable(unpacker): | |||
|
281 | raise TypeError("unpacker must be callable, not %s"%type(unpacker)) | |||
|
282 | self.unpack = unpacker | |||
|
283 | ||||
|
284 | self.none = self.pack({}) | |||
|
285 | ||||
|
286 | def msg_header(self, msg_type): | |||
|
287 | h = msg_header(self.msg_id, msg_type, self.username, self.session) | |||
|
288 | self.msg_id = str(uuid.uuid4()) | |||
|
289 | return h | |||
|
290 | ||||
|
291 | def msg(self, msg_type, content=None, parent=None, subheader=None): | |||
|
292 | msg = {} | |||
|
293 | msg['header'] = self.msg_header(msg_type) | |||
|
294 | msg['msg_id'] = msg['header']['msg_id'] | |||
|
295 | msg['parent_header'] = {} if parent is None else extract_header(parent) | |||
|
296 | msg['msg_type'] = msg_type | |||
|
297 | msg['content'] = {} if content is None else content | |||
|
298 | sub = {} if subheader is None else subheader | |||
|
299 | msg['header'].update(sub) | |||
|
300 | return msg | |||
|
301 | ||||
|
302 | def send(self, stream, msg_type, content=None, buffers=None, parent=None, subheader=None, ident=None): | |||
|
303 | """send a message via stream""" | |||
|
304 | msg = self.msg(msg_type, content, parent, subheader) | |||
|
305 | buffers = [] if buffers is None else buffers | |||
|
306 | to_send = [] | |||
|
307 | if isinstance(ident, list): | |||
|
308 | # accept list of idents | |||
|
309 | to_send.extend(ident) | |||
|
310 | elif ident is not None: | |||
|
311 | to_send.append(ident) | |||
|
312 | to_send.append(DELIM) | |||
|
313 | to_send.append(self.pack(msg['header'])) | |||
|
314 | to_send.append(self.pack(msg['parent_header'])) | |||
|
315 | # if parent is None: | |||
|
316 | # to_send.append(self.none) | |||
|
317 | # else: | |||
|
318 | # to_send.append(self.pack(dict(parent))) | |||
|
319 | if content is None: | |||
|
320 | content = self.none | |||
|
321 | elif isinstance(content, dict): | |||
|
322 | content = self.pack(content) | |||
|
323 | elif isinstance(content, str): | |||
|
324 | # content is already packed, as in a relayed message | |||
|
325 | pass | |||
|
326 | else: | |||
|
327 | raise TypeError("Content incorrect type: %s"%type(content)) | |||
|
328 | to_send.append(content) | |||
|
329 | flag = 0 | |||
|
330 | if buffers: | |||
|
331 | flag = zmq.SNDMORE | |||
|
332 | stream.send_multipart(to_send, flag, copy=False) | |||
|
333 | for b in buffers[:-1]: | |||
|
334 | stream.send(b, flag, copy=False) | |||
|
335 | if buffers: | |||
|
336 | stream.send(buffers[-1], copy=False) | |||
|
337 | omsg = Message(msg) | |||
|
338 | return omsg | |||
|
339 | ||||
|
340 | def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True): | |||
|
341 | """receives and unpacks a message | |||
|
342 | returns [idents], msg""" | |||
|
343 | if isinstance(socket, ZMQStream): | |||
|
344 | socket = socket.socket | |||
|
345 | try: | |||
|
346 | msg = socket.recv_multipart(mode) | |||
|
347 | except zmq.ZMQError, e: | |||
|
348 | if e.errno == zmq.EAGAIN: | |||
|
349 | # We can convert EAGAIN to None as we know in this case | |||
|
350 | # recv_json won't return None. | |||
|
351 | return None | |||
|
352 | else: | |||
|
353 | raise | |||
|
354 | # return an actual Message object | |||
|
355 | # determine the number of idents by trying to unpack them. | |||
|
356 | # this is terrible: | |||
|
357 | idents, msg = self.feed_identities(msg, copy) | |||
|
358 | try: | |||
|
359 | return idents, self.unpack_message(msg, content=content, copy=copy) | |||
|
360 | except Exception, e: | |||
|
361 | print idents, msg | |||
|
362 | # TODO: handle it | |||
|
363 | raise e | |||
|
364 | ||||
|
365 | def feed_identities(self, msg, copy=True): | |||
|
366 | """This is a completely horrible thing, but it strips the zmq | |||
|
367 | ident prefixes off of a message. It will break if any identities | |||
|
368 | are unpackable by self.unpack.""" | |||
|
369 | msg = list(msg) | |||
|
370 | idents = [] | |||
|
371 | while len(msg) > 3: | |||
|
372 | if copy: | |||
|
373 | s = msg[0] | |||
|
374 | else: | |||
|
375 | s = msg[0].bytes | |||
|
376 | if s == DELIM: | |||
|
377 | msg.pop(0) | |||
|
378 | break | |||
|
379 | else: | |||
|
380 | idents.append(s) | |||
|
381 | msg.pop(0) | |||
|
382 | ||||
|
383 | return idents, msg | |||
|
384 | ||||
|
385 | def unpack_message(self, msg, content=True, copy=True): | |||
|
386 | """return a message object from the format | |||
|
387 | sent by self.send. | |||
|
388 | ||||
|
389 | parameters: | |||
|
390 | ||||
|
391 | content : bool (True) | |||
|
392 | whether to unpack the content dict (True), | |||
|
393 | or leave it serialized (False) | |||
|
394 | ||||
|
395 | copy : bool (True) | |||
|
396 | whether to return the bytes (True), | |||
|
397 | or the non-copying Message object in each place (False) | |||
|
398 | ||||
|
399 | """ | |||
|
400 | if not len(msg) >= 3: | |||
|
401 | raise TypeError("malformed message, must have at least 3 elements") | |||
|
402 | message = {} | |||
|
403 | if not copy: | |||
|
404 | for i in range(3): | |||
|
405 | msg[i] = msg[i].bytes | |||
|
406 | message['header'] = self.unpack(msg[0]) | |||
|
407 | message['msg_type'] = message['header']['msg_type'] | |||
|
408 | message['parent_header'] = self.unpack(msg[1]) | |||
|
409 | if content: | |||
|
410 | message['content'] = self.unpack(msg[2]) | |||
|
411 | else: | |||
|
412 | message['content'] = msg[2] | |||
|
413 | ||||
|
414 | # message['buffers'] = msg[3:] | |||
|
415 | # else: | |||
|
416 | # message['header'] = self.unpack(msg[0].bytes) | |||
|
417 | # message['msg_type'] = message['header']['msg_type'] | |||
|
418 | # message['parent_header'] = self.unpack(msg[1].bytes) | |||
|
419 | # if content: | |||
|
420 | # message['content'] = self.unpack(msg[2].bytes) | |||
|
421 | # else: | |||
|
422 | # message['content'] = msg[2].bytes | |||
|
423 | ||||
|
424 | message['buffers'] = msg[3:]# [ m.buffer for m in msg[3:] ] | |||
|
425 | return message | |||
|
426 | ||||
|
427 | ||||
|
428 | ||||
|
429 | def test_msg2obj(): | |||
|
430 | am = dict(x=1) | |||
|
431 | ao = Message(am) | |||
|
432 | assert ao.x == am['x'] | |||
|
433 | ||||
|
434 | am['y'] = dict(z=1) | |||
|
435 | ao = Message(am) | |||
|
436 | assert ao.y.z == am['y']['z'] | |||
|
437 | ||||
|
438 | k1, k2 = 'y', 'z' | |||
|
439 | assert ao[k1][k2] == am[k1][k2] | |||
|
440 | ||||
|
441 | am2 = dict(ao) | |||
|
442 | assert am['x'] == am2['x'] | |||
|
443 | assert am['y']['z'] == am2['y']['z'] |
@@ -0,0 +1,141 b'' | |||||
|
1 | #!/usr/bin/env python | |||
|
2 | """Views""" | |||
|
3 | ||||
|
4 | from IPython.external.decorator import decorator | |||
|
5 | ||||
|
6 | ||||
|
7 | @decorator | |||
|
8 | def myblock(f, self, *args, **kwargs): | |||
|
9 | block = self.client.block | |||
|
10 | self.client.block = self.block | |||
|
11 | ret = f(self, *args, **kwargs) | |||
|
12 | self.client.block = block | |||
|
13 | return ret | |||
|
14 | ||||
|
15 | class View(object): | |||
|
16 | """Base View class""" | |||
|
17 | _targets = None | |||
|
18 | block=None | |||
|
19 | ||||
|
20 | def __init__(self, client, targets): | |||
|
21 | self.client = client | |||
|
22 | self._targets = targets | |||
|
23 | self.block = client.block | |||
|
24 | ||||
|
25 | def __repr__(self): | |||
|
26 | strtargets = str(self._targets) | |||
|
27 | if len(strtargets) > 16: | |||
|
28 | strtargets = strtargets[:12]+'...]' | |||
|
29 | return "<%s %s>"%(self.__class__.__name__, strtargets) | |||
|
30 | ||||
|
31 | @property | |||
|
32 | def results(self): | |||
|
33 | return self.client.results | |||
|
34 | ||||
|
35 | @property | |||
|
36 | def targets(self): | |||
|
37 | return self._targets | |||
|
38 | ||||
|
39 | @targets.setter | |||
|
40 | def targets(self, value): | |||
|
41 | raise TypeError("Cannot set my targets argument after construction!") | |||
|
42 | ||||
|
43 | def apply(self, f, *args, **kwargs): | |||
|
44 | """calls f(*args, **kwargs) on remote engines, returning the result. | |||
|
45 | ||||
|
46 | This method does not involve the engine's namespace. | |||
|
47 | ||||
|
48 | if self.block is False: | |||
|
49 | returns msg_id | |||
|
50 | else: | |||
|
51 | returns actual result of f(*args, **kwargs) | |||
|
52 | """ | |||
|
53 | return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=False) | |||
|
54 | ||||
|
55 | def apply_async(self, f, *args, **kwargs): | |||
|
56 | """calls f(*args, **kwargs) on remote engines in a nonblocking manner. | |||
|
57 | ||||
|
58 | This method does not involve the engine's namespace. | |||
|
59 | ||||
|
60 | returns msg_id | |||
|
61 | """ | |||
|
62 | return self.client.apply(f,args,kwargs, block=False, targets=self.targets, bound=False) | |||
|
63 | ||||
|
64 | def apply_sync(self, f, *args, **kwargs): | |||
|
65 | """calls f(*args, **kwargs) on remote engines in a blocking manner, | |||
|
66 | returning the result. | |||
|
67 | ||||
|
68 | This method does not involve the engine's namespace. | |||
|
69 | ||||
|
70 | returns: actual result of f(*args, **kwargs) | |||
|
71 | """ | |||
|
72 | return self.client.apply(f,args,kwargs, block=True, targets=self.targets, bound=False) | |||
|
73 | ||||
|
74 | def apply_bound(self, f, *args, **kwargs): | |||
|
75 | """calls f(*args, **kwargs) bound to engine namespace(s). | |||
|
76 | ||||
|
77 | if self.block is False: | |||
|
78 | returns msg_id | |||
|
79 | else: | |||
|
80 | returns actual result of f(*args, **kwargs) | |||
|
81 | ||||
|
82 | This method has access to the targets' globals | |||
|
83 | ||||
|
84 | """ | |||
|
85 | return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=True) | |||
|
86 | ||||
|
87 | def apply_async_bound(self, f, *args, **kwargs): | |||
|
88 | """calls f(*args, **kwargs) bound to engine namespace(s) | |||
|
89 | in a nonblocking manner. | |||
|
90 | ||||
|
91 | returns: msg_id | |||
|
92 | ||||
|
93 | This method has access to the targets' globals | |||
|
94 | ||||
|
95 | """ | |||
|
96 | return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True) | |||
|
97 | ||||
|
98 | def apply_sync_bound(self, f, *args, **kwargs): | |||
|
99 | """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result. | |||
|
100 | ||||
|
101 | returns: actual result of f(*args, **kwargs) | |||
|
102 | ||||
|
103 | This method has access to the targets' globals | |||
|
104 | ||||
|
105 | """ | |||
|
106 | return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True) | |||
|
107 | ||||
|
108 | ||||
|
109 | class DirectView(View): | |||
|
110 | """Direct Multiplexer View""" | |||
|
111 | ||||
|
112 | def update(self, ns): | |||
|
113 | """update remote namespace with dict `ns`""" | |||
|
114 | return self.client.push(ns, targets=self.targets, block=self.block) | |||
|
115 | ||||
|
116 | def get(self, key_s): | |||
|
117 | """get object(s) by `key_s` from remote namespace | |||
|
118 | will return one object if it is a key. | |||
|
119 | It also takes a list of keys, and will return a list of objects.""" | |||
|
120 | # block = block if block is not None else self.block | |||
|
121 | return self.client.pull(key_s, block=self.block, targets=self.targets) | |||
|
122 | ||||
|
123 | push = update | |||
|
124 | pull = get | |||
|
125 | ||||
|
126 | def __getitem__(self, key): | |||
|
127 | return self.get(key) | |||
|
128 | ||||
|
129 | def __setitem__(self,key,value): | |||
|
130 | self.update({key:value}) | |||
|
131 | ||||
|
132 | def clear(self): | |||
|
133 | """clear the remote namespace""" | |||
|
134 | return self.client.clear(targets=self.targets,block=self.block) | |||
|
135 | ||||
|
136 | def abort(self): | |||
|
137 | return self.client.abort(targets=self.targets,block=self.block) | |||
|
138 | ||||
|
139 | class LoadBalancedView(View): | |||
|
140 | _targets=None | |||
|
141 | No newline at end of file |
@@ -0,0 +1,95 b'' | |||||
|
1 | # encoding: utf-8 | |||
|
2 | ||||
|
3 | """Pickle related utilities. Perhaps this should be called 'can'.""" | |||
|
4 | ||||
|
5 | __docformat__ = "restructuredtext en" | |||
|
6 | ||||
|
7 | #------------------------------------------------------------------------------- | |||
|
8 | # Copyright (C) 2008 The IPython Development Team | |||
|
9 | # | |||
|
10 | # Distributed under the terms of the BSD License. The full license is in | |||
|
11 | # the file COPYING, distributed as part of this software. | |||
|
12 | #------------------------------------------------------------------------------- | |||
|
13 | ||||
|
14 | #------------------------------------------------------------------------------- | |||
|
15 | # Imports | |||
|
16 | #------------------------------------------------------------------------------- | |||
|
17 | ||||
|
18 | from types import FunctionType | |||
|
19 | ||||
|
20 | # contents of codeutil should either be in here, or codeutil belongs in IPython/util | |||
|
21 | from IPython.kernel import codeutil | |||
|
22 | ||||
|
23 | class CannedObject(object): | |||
|
24 | pass | |||
|
25 | ||||
|
26 | class CannedFunction(CannedObject): | |||
|
27 | ||||
|
28 | def __init__(self, f): | |||
|
29 | self._checkType(f) | |||
|
30 | self.code = f.func_code | |||
|
31 | ||||
|
32 | def _checkType(self, obj): | |||
|
33 | assert isinstance(obj, FunctionType), "Not a function type" | |||
|
34 | ||||
|
35 | def getFunction(self, g=None): | |||
|
36 | if g is None: | |||
|
37 | g = globals() | |||
|
38 | newFunc = FunctionType(self.code, g) | |||
|
39 | return newFunc | |||
|
40 | ||||
|
41 | def can(obj): | |||
|
42 | if isinstance(obj, FunctionType): | |||
|
43 | return CannedFunction(obj) | |||
|
44 | elif isinstance(obj,dict): | |||
|
45 | return canDict(obj) | |||
|
46 | elif isinstance(obj, (list,tuple)): | |||
|
47 | return canSequence(obj) | |||
|
48 | else: | |||
|
49 | return obj | |||
|
50 | ||||
|
51 | def canDict(obj): | |||
|
52 | if isinstance(obj, dict): | |||
|
53 | newobj = {} | |||
|
54 | for k, v in obj.iteritems(): | |||
|
55 | newobj[k] = can(v) | |||
|
56 | return newobj | |||
|
57 | else: | |||
|
58 | return obj | |||
|
59 | ||||
|
60 | def canSequence(obj): | |||
|
61 | if isinstance(obj, (list, tuple)): | |||
|
62 | t = type(obj) | |||
|
63 | return t([can(i) for i in obj]) | |||
|
64 | else: | |||
|
65 | return obj | |||
|
66 | ||||
|
67 | def uncan(obj, g=None): | |||
|
68 | if isinstance(obj, CannedFunction): | |||
|
69 | return obj.getFunction(g) | |||
|
70 | elif isinstance(obj,dict): | |||
|
71 | return uncanDict(obj) | |||
|
72 | elif isinstance(obj, (list,tuple)): | |||
|
73 | return uncanSequence(obj) | |||
|
74 | else: | |||
|
75 | return obj | |||
|
76 | ||||
|
77 | def uncanDict(obj, g=None): | |||
|
78 | if isinstance(obj, dict): | |||
|
79 | newobj = {} | |||
|
80 | for k, v in obj.iteritems(): | |||
|
81 | newobj[k] = uncan(v,g) | |||
|
82 | return newobj | |||
|
83 | else: | |||
|
84 | return obj | |||
|
85 | ||||
|
86 | def uncanSequence(obj, g=None): | |||
|
87 | if isinstance(obj, (list, tuple)): | |||
|
88 | t = type(obj) | |||
|
89 | return t([uncan(i,g) for i in obj]) | |||
|
90 | else: | |||
|
91 | return obj | |||
|
92 | ||||
|
93 | ||||
|
94 | def rebindFunctionGlobals(f, glbls): | |||
|
95 | return FunctionType(f.func_code, glbls) |
@@ -0,0 +1,100 b'' | |||||
|
1 | """Thread for popping Tasks from zmq to Python Queue""" | |||
|
2 | ||||
|
3 | ||||
|
4 | import time | |||
|
5 | from threading import Thread | |||
|
6 | ||||
|
7 | try: | |||
|
8 | from queue import Queue | |||
|
9 | except: | |||
|
10 | from Queue import Queue | |||
|
11 | ||||
|
12 | import zmq | |||
|
13 | from zmq.core.poll import _poll as poll | |||
|
14 | from zmq.devices import ThreadDevice | |||
|
15 | from IPython.zmq import streamsession as ss | |||
|
16 | ||||
|
17 | ||||
|
18 | class QueueStream(object): | |||
|
19 | def __init__(self, in_queue, out_queue): | |||
|
20 | self.in_queue = in_queue | |||
|
21 | self.out_queue = out_queue | |||
|
22 | ||||
|
23 | def send_multipart(self, *args, **kwargs): | |||
|
24 | while self.out_queue.full(): | |||
|
25 | time.sleep(1e-3) | |||
|
26 | self.out_queue.put(('send_multipart', args, kwargs)) | |||
|
27 | ||||
|
28 | def send(self, *args, **kwargs): | |||
|
29 | while self.out_queue.full(): | |||
|
30 | time.sleep(1e-3) | |||
|
31 | self.out_queue.put(('send', args, kwargs)) | |||
|
32 | ||||
|
33 | def recv_multipart(self): | |||
|
34 | return self.in_queue.get() | |||
|
35 | ||||
|
36 | def empty(self): | |||
|
37 | return self.in_queue.empty() | |||
|
38 | ||||
|
39 | class TaskThread(ThreadDevice): | |||
|
40 | """Class for popping Tasks from C-ZMQ->Python Queue""" | |||
|
41 | max_qsize = 100 | |||
|
42 | in_socket = None | |||
|
43 | out_socket = None | |||
|
44 | # queue = None | |||
|
45 | ||||
|
46 | def __init__(self, queue_type, mon_type, engine_id, max_qsize=100): | |||
|
47 | ThreadDevice.__init__(self, 0, queue_type, mon_type) | |||
|
48 | self.session = ss.StreamSession(username='TaskNotifier[%s]'%engine_id) | |||
|
49 | self.engine_id = engine_id | |||
|
50 | self.in_queue = Queue(max_qsize) | |||
|
51 | self.out_queue = Queue(max_qsize) | |||
|
52 | self.max_qsize = max_qsize | |||
|
53 | ||||
|
54 | @property | |||
|
55 | def queues(self): | |||
|
56 | return self.in_queue, self.out_queue | |||
|
57 | ||||
|
58 | @property | |||
|
59 | def can_recv(self): | |||
|
60 | # print self.in_queue.full(), poll((self.queue_socket, zmq.POLLIN),1e-3) | |||
|
61 | return (not self.in_queue.full()) and poll([(self.queue_socket, zmq.POLLIN)], 1e-3 ) | |||
|
62 | ||||
|
63 | @property | |||
|
64 | def can_send(self): | |||
|
65 | return not self.out_queue.empty() | |||
|
66 | ||||
|
67 | def run(self): | |||
|
68 | print 'running' | |||
|
69 | self.queue_socket,self.mon_socket = self._setup_sockets() | |||
|
70 | print 'setup' | |||
|
71 | ||||
|
72 | while True: | |||
|
73 | while not self.can_send and not self.can_recv: | |||
|
74 | # print 'idle' | |||
|
75 | # nothing to do, wait | |||
|
76 | time.sleep(1e-3) | |||
|
77 | while self.can_send: | |||
|
78 | # flush out queue | |||
|
79 | print 'flushing...' | |||
|
80 | meth, args, kwargs = self.out_queue.get() | |||
|
81 | getattr(self.queue_socket, meth)(*args, **kwargs) | |||
|
82 | print 'flushed' | |||
|
83 | ||||
|
84 | if self.can_recv: | |||
|
85 | print 'recving' | |||
|
86 | # get another job from zmq | |||
|
87 | msg = self.queue_socket.recv_multipart(0, copy=False) | |||
|
88 | # put it in the Queue | |||
|
89 | self.in_queue.put(msg) | |||
|
90 | idents,msg = self.session.feed_identities(msg, copy=False) | |||
|
91 | msg = self.session.unpack_message(msg, content=False, copy=False) | |||
|
92 | # notify the Controller that we got it | |||
|
93 | self.mon_socket.send('tracktask', zmq.SNDMORE) | |||
|
94 | header = msg['header'] | |||
|
95 | msg_id = header['msg_id'] | |||
|
96 | content = dict(engine_id=self.engine_id, msg_id = msg_id) | |||
|
97 | self.session.send(self.mon_socket, 'task_receipt', content=content) | |||
|
98 | print 'recvd' | |||
|
99 | ||||
|
100 | No newline at end of file |
1 | NO CONTENT: new file 100644 |
|
NO CONTENT: new file 100644 |
@@ -0,0 +1,82 b'' | |||||
|
1 | ||||
|
2 | import os | |||
|
3 | import uuid | |||
|
4 | import zmq | |||
|
5 | ||||
|
6 | from zmq.tests import BaseZMQTestCase | |||
|
7 | ||||
|
8 | from IPython.zmq.tests import SessionTestCase | |||
|
9 | from IPython.zmq import streamsession as ss | |||
|
10 | ||||
|
11 | class SessionTestCase(BaseZMQTestCase): | |||
|
12 | ||||
|
13 | def setUp(self): | |||
|
14 | BaseZMQTestCase.setUp(self) | |||
|
15 | self.session = ss.StreamSession() | |||
|
16 | ||||
|
17 | class TestSession(SessionTestCase): | |||
|
18 | ||||
|
19 | def test_msg(self): | |||
|
20 | """message format""" | |||
|
21 | msg = self.session.msg('execute') | |||
|
22 | thekeys = set('header msg_id parent_header msg_type content'.split()) | |||
|
23 | s = set(msg.keys()) | |||
|
24 | self.assertEquals(s, thekeys) | |||
|
25 | self.assertTrue(isinstance(msg['content'],dict)) | |||
|
26 | self.assertTrue(isinstance(msg['header'],dict)) | |||
|
27 | self.assertTrue(isinstance(msg['parent_header'],dict)) | |||
|
28 | self.assertEquals(msg['msg_type'], 'execute') | |||
|
29 | ||||
|
30 | ||||
|
31 | ||||
|
32 | def test_args(self): | |||
|
33 | """initialization arguments for StreamSession""" | |||
|
34 | s = ss.StreamSession() | |||
|
35 | self.assertTrue(s.pack is ss.default_packer) | |||
|
36 | self.assertTrue(s.unpack is ss.default_unpacker) | |||
|
37 | self.assertEquals(s.username, os.environ.get('USER', 'username')) | |||
|
38 | ||||
|
39 | s = ss.StreamSession(username=None) | |||
|
40 | self.assertEquals(s.username, os.environ.get('USER', 'username')) | |||
|
41 | ||||
|
42 | self.assertRaises(TypeError, ss.StreamSession, packer='hi') | |||
|
43 | self.assertRaises(TypeError, ss.StreamSession, unpacker='hi') | |||
|
44 | u = str(uuid.uuid4()) | |||
|
45 | s = ss.StreamSession(username='carrot', session=u) | |||
|
46 | self.assertEquals(s.session, u) | |||
|
47 | self.assertEquals(s.username, 'carrot') | |||
|
48 | ||||
|
49 | ||||
|
50 | def test_rekey(self): | |||
|
51 | """rekeying dict around json str keys""" | |||
|
52 | d = {'0': uuid.uuid4(), 0:uuid.uuid4()} | |||
|
53 | self.assertRaises(KeyError, ss.rekey, d) | |||
|
54 | ||||
|
55 | d = {'0': uuid.uuid4(), 1:uuid.uuid4(), 'asdf':uuid.uuid4()} | |||
|
56 | d2 = {0:d['0'],1:d[1],'asdf':d['asdf']} | |||
|
57 | rd = ss.rekey(d) | |||
|
58 | self.assertEquals(d2,rd) | |||
|
59 | ||||
|
60 | d = {'1.5':uuid.uuid4(),'1':uuid.uuid4()} | |||
|
61 | d2 = {1.5:d['1.5'],1:d['1']} | |||
|
62 | rd = ss.rekey(d) | |||
|
63 | self.assertEquals(d2,rd) | |||
|
64 | ||||
|
65 | d = {'1.0':uuid.uuid4(),'1':uuid.uuid4()} | |||
|
66 | self.assertRaises(KeyError, ss.rekey, d) | |||
|
67 | ||||
|
68 | def test_unique_msg_ids(self): | |||
|
69 | """test that messages receive unique ids""" | |||
|
70 | ids = set() | |||
|
71 | for i in range(2**12): | |||
|
72 | h = self.session.msg_header('test') | |||
|
73 | msg_id = h['msg_id'] | |||
|
74 | self.assertTrue(msg_id not in ids) | |||
|
75 | ids.add(msg_id) | |||
|
76 | ||||
|
77 | def test_feed_identities(self): | |||
|
78 | """scrub the front for zmq IDENTITIES""" | |||
|
79 | theids = "engine client other".split() | |||
|
80 | content = dict(code='whoda',stuff=object()) | |||
|
81 | themsg = self.session.msg('execute',content=content) | |||
|
82 | pmsg = theids |
@@ -0,0 +1,23 b'' | |||||
|
1 | """setup the ports""" | |||
|
2 | config = { | |||
|
3 | 'interface': 'tcp://127.0.0.1', | |||
|
4 | 'regport': 10101, | |||
|
5 | 'heartport': 10102, | |||
|
6 | ||||
|
7 | 'cqueueport': 10211, | |||
|
8 | 'equeueport': 10111, | |||
|
9 | ||||
|
10 | 'ctaskport': 10221, | |||
|
11 | 'etaskport': 10121, | |||
|
12 | ||||
|
13 | 'ccontrolport': 10231, | |||
|
14 | 'econtrolport': 10131, | |||
|
15 | ||||
|
16 | 'clientport': 10201, | |||
|
17 | 'notifierport': 10202, | |||
|
18 | ||||
|
19 | 'logport': 20201 | |||
|
20 | } | |||
|
21 | ||||
|
22 | ||||
|
23 |
@@ -0,0 +1,139 b'' | |||||
|
1 | #!/usr/bin/env python | |||
|
2 | """A script to launch a controller with all its queues and connect it to a logger""" | |||
|
3 | ||||
|
4 | import time | |||
|
5 | import logging | |||
|
6 | ||||
|
7 | import zmq | |||
|
8 | from zmq.devices import ProcessMonitoredQueue, ThreadMonitoredQueue | |||
|
9 | from zmq.eventloop import ioloop | |||
|
10 | from zmq.eventloop.zmqstream import ZMQStream | |||
|
11 | from zmq.log import handlers | |||
|
12 | ||||
|
13 | from IPython.zmq import log | |||
|
14 | from IPython.zmq.parallel import controller, heartmonitor, streamsession as session | |||
|
15 | ||||
|
16 | ||||
|
17 | ||||
|
18 | ||||
|
19 | def setup(): | |||
|
20 | """setup a basic controller and open client,registrar, and logging ports. Start the Queue and the heartbeat""" | |||
|
21 | ctx = zmq.Context(1) | |||
|
22 | loop = ioloop.IOLoop.instance() | |||
|
23 | ||||
|
24 | # port config | |||
|
25 | # config={} | |||
|
26 | execfile('config.py', globals()) | |||
|
27 | iface = config['interface'] | |||
|
28 | logport = config['logport'] | |||
|
29 | rport = config['regport'] | |||
|
30 | cport = config['clientport'] | |||
|
31 | cqport = config['cqueueport'] | |||
|
32 | eqport = config['equeueport'] | |||
|
33 | ctport = config['ctaskport'] | |||
|
34 | etport = config['etaskport'] | |||
|
35 | ccport = config['ccontrolport'] | |||
|
36 | ecport = config['econtrolport'] | |||
|
37 | hport = config['heartport'] | |||
|
38 | nport = config['notifierport'] | |||
|
39 | ||||
|
40 | # setup logging | |||
|
41 | lsock = ctx.socket(zmq.PUB) | |||
|
42 | lsock.connect('%s:%i'%(iface,logport)) | |||
|
43 | # connected=False | |||
|
44 | # while not connected: | |||
|
45 | # try: | |||
|
46 | # except: | |||
|
47 | # logport = logport + 1 | |||
|
48 | # else: | |||
|
49 | # connected=True | |||
|
50 | # | |||
|
51 | handler = handlers.PUBHandler(lsock) | |||
|
52 | handler.setLevel(logging.DEBUG) | |||
|
53 | handler.root_topic = "controller" | |||
|
54 | log.logger.addHandler(handler) | |||
|
55 | time.sleep(.5) | |||
|
56 | ||||
|
57 | ### Engine connections ### | |||
|
58 | ||||
|
59 | # Engine registrar socket | |||
|
60 | reg = ZMQStream(ctx.socket(zmq.XREP), loop) | |||
|
61 | reg.bind("%s:%i"%(iface, rport)) | |||
|
62 | ||||
|
63 | # heartbeat | |||
|
64 | hpub = ctx.socket(zmq.PUB) | |||
|
65 | hpub.bind("%s:%i"%(iface, hport)) | |||
|
66 | hrep = ctx.socket(zmq.XREP) | |||
|
67 | hrep.bind("%s:%i"%(iface, hport+1)) | |||
|
68 | ||||
|
69 | hb = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),2500) | |||
|
70 | hb.start() | |||
|
71 | ||||
|
72 | ### Client connections ### | |||
|
73 | # Clientele socket | |||
|
74 | c = ZMQStream(ctx.socket(zmq.XREP), loop) | |||
|
75 | c.bind("%s:%i"%(iface, cport)) | |||
|
76 | ||||
|
77 | n = ZMQStream(ctx.socket(zmq.PUB), loop) | |||
|
78 | n.bind("%s:%i"%(iface, nport)) | |||
|
79 | ||||
|
80 | thesession = session.StreamSession(username="controller") | |||
|
81 | ||||
|
82 | ||||
|
83 | ||||
|
84 | # build and launch the queue | |||
|
85 | sub = ctx.socket(zmq.SUB) | |||
|
86 | sub.setsockopt(zmq.SUBSCRIBE, "") | |||
|
87 | monport = sub.bind_to_random_port(iface) | |||
|
88 | sub = ZMQStream(sub, loop) | |||
|
89 | ||||
|
90 | # Multiplexer Queue (in a Process) | |||
|
91 | q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out') | |||
|
92 | q.bind_in("%s:%i"%(iface, cqport)) | |||
|
93 | q.bind_out("%s:%i"%(iface, eqport)) | |||
|
94 | q.connect_mon("%s:%i"%(iface, monport)) | |||
|
95 | q.daemon=True | |||
|
96 | q.start() | |||
|
97 | ||||
|
98 | # Control Queue (in a Process) | |||
|
99 | q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol') | |||
|
100 | q.bind_in("%s:%i"%(iface, ccport)) | |||
|
101 | q.bind_out("%s:%i"%(iface, ecport)) | |||
|
102 | q.connect_mon("%s:%i"%(iface, monport)) | |||
|
103 | q.daemon=True | |||
|
104 | q.start() | |||
|
105 | ||||
|
106 | # Task Queue (in a Process) | |||
|
107 | q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask') | |||
|
108 | q.bind_in("%s:%i"%(iface, ctport)) | |||
|
109 | q.bind_out("%s:%i"%(iface, etport)) | |||
|
110 | q.connect_mon("%s:%i"%(iface, monport)) | |||
|
111 | q.daemon=True | |||
|
112 | q.start() | |||
|
113 | ||||
|
114 | time.sleep(.25) | |||
|
115 | ||||
|
116 | # build connection dicts | |||
|
117 | engine_addrs = { | |||
|
118 | 'control' : "%s:%i"%(iface, ecport), | |||
|
119 | 'queue': "%s:%i"%(iface, eqport), | |||
|
120 | 'heartbeat': ("%s:%i"%(iface, hport), "%s:%i"%(iface, hport+1)), | |||
|
121 | 'task' : "%s:%i"%(iface, etport), | |||
|
122 | 'monitor' : "%s:%i"%(iface, monport), | |||
|
123 | } | |||
|
124 | ||||
|
125 | client_addrs = { | |||
|
126 | 'control' : "%s:%i"%(iface, ccport), | |||
|
127 | 'controller': "%s:%i"%(iface, cport), | |||
|
128 | 'queue': "%s:%i"%(iface, cqport), | |||
|
129 | 'task' : "%s:%i"%(iface, ctport), | |||
|
130 | 'notification': "%s:%i"%(iface, nport) | |||
|
131 | } | |||
|
132 | con = controller.Controller(loop, thesession, sub, reg, hb, c, n, None, engine_addrs, client_addrs) | |||
|
133 | ||||
|
134 | return loop | |||
|
135 | ||||
|
136 | ||||
|
137 | if __name__ == '__main__': | |||
|
138 | loop = setup() | |||
|
139 | loop.start() No newline at end of file |
@@ -0,0 +1,85 b'' | |||||
|
1 | #!/usr/bin/env python | |||
|
2 | import time | |||
|
3 | import zmq | |||
|
4 | from zmq.eventloop import ioloop | |||
|
5 | from zmq.eventloop.zmqstream import ZMQStream | |||
|
6 | from IPython.zmq import streamsession as session | |||
|
7 | Message = session.Message | |||
|
8 | # from IPython.zmq.messages import send_message_pickle as send_message | |||
|
9 | import uuid | |||
|
10 | ||||
|
11 | thesession = session.StreamSession() | |||
|
12 | ||||
|
13 | max_messages=10000 | |||
|
14 | printstep=1000 | |||
|
15 | ||||
|
16 | counter = dict(count=0, engines=1) | |||
|
17 | ||||
|
18 | def poit(msg): | |||
|
19 | print "POIT" | |||
|
20 | print msg | |||
|
21 | ||||
|
22 | def count(msg): | |||
|
23 | count = counter["count"] = counter["count"]+1 | |||
|
24 | if not count % printstep: | |||
|
25 | print "#########################" | |||
|
26 | print count, time.time()-counter['tic'] | |||
|
27 | ||||
|
28 | def unpack_and_print(msg): | |||
|
29 | global msg_counter | |||
|
30 | msg_counter += 1 | |||
|
31 | print msg | |||
|
32 | try: | |||
|
33 | msg = thesession.unpack_message(msg[-3:]) | |||
|
34 | except Exception, e: | |||
|
35 | print e | |||
|
36 | # pass | |||
|
37 | print msg | |||
|
38 | ||||
|
39 | ||||
|
40 | ctx = zmq.Context() | |||
|
41 | ||||
|
42 | loop = ioloop.IOLoop() | |||
|
43 | sock = ctx.socket(zmq.XREQ) | |||
|
44 | queue = ZMQStream(ctx.socket(zmq.XREQ), loop) | |||
|
45 | client = ZMQStream(sock, loop) | |||
|
46 | client.on_send(poit) | |||
|
47 | def check_engines(msg): | |||
|
48 | # client.on_recv(unpack_and_print) | |||
|
49 | queue.on_recv(count) | |||
|
50 | idents = msg[:-3] | |||
|
51 | msg = thesession.unpack_message(msg[-3:]) | |||
|
52 | msg = Message(msg) | |||
|
53 | print msg | |||
|
54 | queue.connect(str(msg.content.queue)) | |||
|
55 | engines = dict(msg.content.engines) | |||
|
56 | # global tic | |||
|
57 | N=max_messages | |||
|
58 | if engines: | |||
|
59 | tic = time.time() | |||
|
60 | counter['tic']= tic | |||
|
61 | for i in xrange(N/len(engines)): | |||
|
62 | for eid,key in engines.iteritems(): | |||
|
63 | thesession.send(queue, "execute_request", dict(code='id=%i'%(int(eid)+i)),ident=str(key)) | |||
|
64 | toc = time.time() | |||
|
65 | print "#####################################" | |||
|
66 | print N, toc-tic | |||
|
67 | print "#####################################" | |||
|
68 | ||||
|
69 | ||||
|
70 | ||||
|
71 | ||||
|
72 | client.on_recv(check_engines) | |||
|
73 | ||||
|
74 | sock.connect('tcp://127.0.0.1:10102') | |||
|
75 | sock.setsockopt(zmq.IDENTITY, thesession.username) | |||
|
76 | # stream = ZMQStream() | |||
|
77 | # header = dict(msg_id = uuid.uuid4().bytes, msg_type='relay', id=0) | |||
|
78 | parent = dict(targets=2) | |||
|
79 | # content = "GARBAGE" | |||
|
80 | thesession.send(client, "connection_request") | |||
|
81 | ||||
|
82 | # send_message(client, (header, content)) | |||
|
83 | # print thesession.recv(client, 0) | |||
|
84 | ||||
|
85 | loop.start() |
@@ -0,0 +1,62 b'' | |||||
|
1 | #!/usr/bin/env python | |||
|
2 | """A simple log process that prints messages incoming from""" | |||
|
3 | ||||
|
4 | # | |||
|
5 | # Copyright (c) 2010 Min Ragan-Kelley | |||
|
6 | # | |||
|
7 | # This file is part of pyzmq. | |||
|
8 | # | |||
|
9 | # pyzmq is free software; you can redistribute it and/or modify it under | |||
|
10 | # the terms of the Lesser GNU General Public License as published by | |||
|
11 | # the Free Software Foundation; either version 3 of the License, or | |||
|
12 | # (at your option) any later version. | |||
|
13 | # | |||
|
14 | # pyzmq is distributed in the hope that it will be useful, | |||
|
15 | # but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
|
16 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |||
|
17 | # Lesser GNU General Public License for more details. | |||
|
18 | # | |||
|
19 | # You should have received a copy of the Lesser GNU General Public License | |||
|
20 | # along with this program. If not, see <http://www.gnu.org/licenses/>. | |||
|
21 | ||||
|
22 | import zmq | |||
|
23 | logport = 20201 | |||
|
24 | def main(topics, addrs): | |||
|
25 | ||||
|
26 | context = zmq.Context() | |||
|
27 | socket = context.socket(zmq.SUB) | |||
|
28 | for topic in topics: | |||
|
29 | socket.setsockopt(zmq.SUBSCRIBE, topic) | |||
|
30 | if addrs: | |||
|
31 | for addr in addrs: | |||
|
32 | print "Connecting to: ", addr | |||
|
33 | socket.connect(addr) | |||
|
34 | else: | |||
|
35 | socket.bind('tcp://127.0.0.1:%i'%logport) | |||
|
36 | ||||
|
37 | while True: | |||
|
38 | # topic = socket.recv() | |||
|
39 | # print topic | |||
|
40 | topic, msg = socket.recv_multipart() | |||
|
41 | # msg = socket.recv_pyobj() | |||
|
42 | print "%s | %s " % (topic, msg), | |||
|
43 | ||||
|
44 | if __name__ == '__main__': | |||
|
45 | import sys | |||
|
46 | topics = [] | |||
|
47 | addrs = [] | |||
|
48 | for arg in sys.argv[1:]: | |||
|
49 | if '://' in arg: | |||
|
50 | addrs.append(arg) | |||
|
51 | else: | |||
|
52 | topics.append(arg) | |||
|
53 | if not topics: | |||
|
54 | # default to everything | |||
|
55 | topics = [''] | |||
|
56 | if len(addrs) < 1: | |||
|
57 | print "binding instead of connecting" | |||
|
58 | # addrs = ['tcp://127.0.0.1:%i'%p for p in range(logport,logport+10)] | |||
|
59 | # print "usage: display.py <address> [ <topic> <address>...]" | |||
|
60 | # raise SystemExit | |||
|
61 | ||||
|
62 | main(topics, addrs) |
General Comments 0
You need to be logged in to leave comments.
Login now