##// END OF EJS Templates
prep newparallel for rebase...
MinRK -
Show More
@@ -0,0 +1,27 b''
1 import logging
2 from logging import INFO, DEBUG, WARN, ERROR, FATAL
3
4 import zmq
5 from zmq.log.handlers import PUBHandler
6
7 class EnginePUBHandler(PUBHandler):
8 """A simple PUBHandler subclass that sets root_topic"""
9 engine=None
10
11 def __init__(self, engine, *args, **kwargs):
12 PUBHandler.__init__(self,*args, **kwargs)
13 self.engine = engine
14
15 @property
16 def root_topic(self):
17 """this is a property, in case the handler is created
18 before the engine gets registered with an id"""
19 if isinstance(getattr(self.engine, 'id', None), int):
20 return "engine.%i"%self.engine.id
21 else:
22 return "engine"
23
24
25 logger = logging.getLogger('ipzmq')
26 logger.setLevel(logging.DEBUG)
27
@@ -0,0 +1,167 b''
1 # encoding: utf-8
2 # -*- test-case-name: IPython.kernel.test.test_newserialized -*-
3
4 """Refactored serialization classes and interfaces."""
5
6 __docformat__ = "restructuredtext en"
7
8 # Tell nose to skip this module
9 __test__ = {}
10
11 #-------------------------------------------------------------------------------
12 # Copyright (C) 2008 The IPython Development Team
13 #
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
16 #-------------------------------------------------------------------------------
17
18 #-------------------------------------------------------------------------------
19 # Imports
20 #-------------------------------------------------------------------------------
21
22 import cPickle as pickle
23
24 # from twisted.python import components
25 # from zope.interface import Interface, implements
26
27 try:
28 import numpy
29 except ImportError:
30 pass
31
32 from IPython.kernel.error import SerializationError
33
34 #-----------------------------------------------------------------------------
35 # Classes and functions
36 #-----------------------------------------------------------------------------
37
38 class ISerialized:
39
40 def getData():
41 """"""
42
43 def getDataSize(units=10.0**6):
44 """"""
45
46 def getTypeDescriptor():
47 """"""
48
49 def getMetadata():
50 """"""
51
52
53 class IUnSerialized:
54
55 def getObject():
56 """"""
57
58 class Serialized(object):
59
60 # implements(ISerialized)
61
62 def __init__(self, data, typeDescriptor, metadata={}):
63 self.data = data
64 self.typeDescriptor = typeDescriptor
65 self.metadata = metadata
66
67 def getData(self):
68 return self.data
69
70 def getDataSize(self, units=10.0**6):
71 return len(self.data)/units
72
73 def getTypeDescriptor(self):
74 return self.typeDescriptor
75
76 def getMetadata(self):
77 return self.metadata
78
79
80 class UnSerialized(object):
81
82 # implements(IUnSerialized)
83
84 def __init__(self, obj):
85 self.obj = obj
86
87 def getObject(self):
88 return self.obj
89
90
91 class SerializeIt(object):
92
93 # implements(ISerialized)
94
95 def __init__(self, unSerialized):
96 self.data = None
97 self.obj = unSerialized.getObject()
98 if globals().has_key('numpy') and isinstance(self.obj, numpy.ndarray):
99 if len(self.obj) == 0: # length 0 arrays can't be reconstructed
100 raise SerializationError("You cannot send a length 0 array")
101 self.obj = numpy.ascontiguousarray(self.obj, dtype=None)
102 self.typeDescriptor = 'ndarray'
103 self.metadata = {'shape':self.obj.shape,
104 'dtype':self.obj.dtype.str}
105 elif isinstance(self.obj, str):
106 self.typeDescriptor = 'bytes'
107 self.metadata = {}
108 elif isinstance(self.obj, buffer):
109 self.typeDescriptor = 'buffer'
110 self.metadata = {}
111 else:
112 self.typeDescriptor = 'pickle'
113 self.metadata = {}
114 self._generateData()
115
116 def _generateData(self):
117 if self.typeDescriptor == 'ndarray':
118 self.data = numpy.getbuffer(self.obj)
119 elif self.typeDescriptor in ('bytes', 'buffer'):
120 self.data = self.obj
121 elif self.typeDescriptor == 'pickle':
122 self.data = pickle.dumps(self.obj, 2)
123 else:
124 raise SerializationError("Really wierd serialization error.")
125 del self.obj
126
127 def getData(self):
128 return self.data
129
130 def getDataSize(self, units=10.0**6):
131 return 1.0*len(self.data)/units
132
133 def getTypeDescriptor(self):
134 return self.typeDescriptor
135
136 def getMetadata(self):
137 return self.metadata
138
139
140 class UnSerializeIt(UnSerialized):
141
142 # implements(IUnSerialized)
143
144 def __init__(self, serialized):
145 self.serialized = serialized
146
147 def getObject(self):
148 typeDescriptor = self.serialized.getTypeDescriptor()
149 if globals().has_key('numpy') and typeDescriptor == 'ndarray':
150 result = numpy.frombuffer(self.serialized.getData(), dtype = self.serialized.metadata['dtype'])
151 result.shape = self.serialized.metadata['shape']
152 # This is a hack to make the array writable. We are working with
153 # the numpy folks to address this issue.
154 result = result.copy()
155 elif typeDescriptor == 'pickle':
156 result = pickle.loads(self.serialized.getData())
157 elif typeDescriptor in ('bytes', 'buffer'):
158 result = self.serialized.getData()
159 else:
160 raise SerializationError("Really wierd serialization error.")
161 return result
162
163 def serialize(obj):
164 return SerializeIt(UnSerialized(obj))
165
166 def unserialize(serialized):
167 return UnSerializeIt(serialized).getObject()
1 NO CONTENT: new file 100644
This diff has been collapsed as it changes many lines, (562 lines changed) Show them Hide them
@@ -0,0 +1,562 b''
1 #!/usr/bin/env python
2 """A semi-synchronous Client for the ZMQ controller"""
3
4 import time
5 import threading
6
7 from functools import wraps
8
9 from IPython.external.decorator import decorator
10
11 import streamsession as ss
12 import zmq
13
14 from remotenamespace import RemoteNamespace
15 from view import DirectView
16
17 def _push(ns):
18 globals().update(ns)
19
20 def _pull(keys):
21 g = globals()
22 if isinstance(keys, (list,tuple)):
23 return map(g.get, keys)
24 else:
25 return g.get(keys)
26
27 def _clear():
28 globals().clear()
29
30 def execute(code):
31 exec code in globals()
32
33 # decorators for methods:
34 @decorator
35 def spinfirst(f,self,*args,**kwargs):
36 self.spin()
37 return f(self, *args, **kwargs)
38
39 @decorator
40 def defaultblock(f, self, *args, **kwargs):
41 block = kwargs.get('block',None)
42 block = self.block if block is None else block
43 saveblock = self.block
44 self.block = block
45 ret = f(self, *args, **kwargs)
46 self.block = saveblock
47 return ret
48
49
50 # @decorator
51 # def checktargets(f):
52 # @wraps(f)
53 # def checked_method(self, *args, **kwargs):
54 # self._build_targets(kwargs['targets'])
55 # return f(self, *args, **kwargs)
56 # return checked_method
57
58
59 # class _ZMQEventLoopThread(threading.Thread):
60 #
61 # def __init__(self, loop):
62 # self.loop = loop
63 # threading.Thread.__init__(self)
64 #
65 # def run(self):
66 # self.loop.start()
67 #
68 class Client(object):
69 """A semi-synchronous client to the IPython ZMQ controller
70
71 Attributes
72 ----------
73 ids : set
74 a set of engine IDs
75 requesting the ids attribute always synchronizes
76 the registration state. To request ids without synchronization,
77 use _ids
78
79 history : list of msg_ids
80 a list of msg_ids, keeping track of all the execution
81 messages you have submitted
82
83 outstanding : set of msg_ids
84 a set of msg_ids that have been submitted, but whose
85 results have not been received
86
87 results : dict
88 a dict of all our results, keyed by msg_id
89
90 block : bool
91 determines default behavior when block not specified
92 in execution methods
93
94 Methods
95 -------
96 spin : flushes incoming results and registration state changes
97 control methods spin, and requesting `ids` also ensures up to date
98
99 barrier : wait on one or more msg_ids
100
101 execution methods: apply/apply_bound/apply_to
102 legacy: execute, run
103
104 control methods: queue_status, get_result
105
106 """
107
108
109 _connected=False
110 _engines=None
111 registration_socket=None
112 controller_socket=None
113 notification_socket=None
114 queue_socket=None
115 task_socket=None
116 block = False
117 outstanding=None
118 results = None
119 history = None
120
121 def __init__(self, addr, context=None, username=None):
122 if context is None:
123 context = zmq.Context()
124 self.context = context
125 self.addr = addr
126 if username is None:
127 self.session = ss.StreamSession()
128 else:
129 self.session = ss.StreamSession(username)
130 self.registration_socket = self.context.socket(zmq.PAIR)
131 self.registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
132 self.registration_socket.connect(addr)
133 self._engines = {}
134 self._ids = set()
135 self.outstanding=set()
136 self.results = {}
137 self.history = []
138 self._connect()
139
140 self._notification_handlers = {'registration_notification' : self._register_engine,
141 'unregistration_notification' : self._unregister_engine,
142 }
143 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
144 'apply_reply' : self._handle_apply_reply}
145
146
147 @property
148 def ids(self):
149 self._flush_notifications()
150 return self._ids
151
152 def _update_engines(self, engines):
153 for k,v in engines.iteritems():
154 eid = int(k)
155 self._engines[eid] = v
156 self._ids.add(eid)
157
158 def _build_targets(self, targets):
159 if targets is None:
160 targets = self._ids
161 elif isinstance(targets, str):
162 if targets.lower() == 'all':
163 targets = self._ids
164 else:
165 raise TypeError("%r not valid str target, must be 'all'"%(targets))
166 elif isinstance(targets, int):
167 targets = [targets]
168 return [self._engines[t] for t in targets], list(targets)
169
170 def _connect(self):
171 """setup all our socket connections to the controller"""
172 if self._connected:
173 return
174 self._connected=True
175 self.session.send(self.registration_socket, 'connection_request')
176 msg = self.session.recv(self.registration_socket,mode=0)[-1]
177 msg = ss.Message(msg)
178 content = msg.content
179 if content.status == 'ok':
180 if content.queue:
181 self.queue_socket = self.context.socket(zmq.PAIR)
182 self.queue_socket.setsockopt(zmq.IDENTITY, self.session.session)
183 self.queue_socket.connect(content.queue)
184 if content.task:
185 self.task_socket = self.context.socket(zmq.PAIR)
186 self.task_socket.setsockopt(zmq.IDENTITY, self.session.session)
187 self.task_socket.connect(content.task)
188 if content.notification:
189 self.notification_socket = self.context.socket(zmq.SUB)
190 self.notification_socket.connect(content.notification)
191 self.notification_socket.setsockopt(zmq.SUBSCRIBE, "")
192 if content.controller:
193 self.controller_socket = self.context.socket(zmq.PAIR)
194 self.controller_socket.setsockopt(zmq.IDENTITY, self.session.session)
195 self.controller_socket.connect(content.controller)
196 self._update_engines(dict(content.engines))
197
198 else:
199 self._connected = False
200 raise Exception("Failed to connect!")
201
202 #### handlers and callbacks for incoming messages #######
203 def _register_engine(self, msg):
204 content = msg['content']
205 eid = content['id']
206 d = {eid : content['queue']}
207 self._update_engines(d)
208 self._ids.add(int(eid))
209
210 def _unregister_engine(self, msg):
211 # print 'unregister',msg
212 content = msg['content']
213 eid = int(content['id'])
214 if eid in self._ids:
215 self._ids.remove(eid)
216 self._engines.pop(eid)
217
218 def _handle_execute_reply(self, msg):
219 # msg_id = msg['msg_id']
220 parent = msg['parent_header']
221 msg_id = parent['msg_id']
222 if msg_id not in self.outstanding:
223 print "got unknown result: %s"%msg_id
224 else:
225 self.outstanding.remove(msg_id)
226 self.results[msg_id] = ss.unwrap_exception(msg['content'])
227
228 def _handle_apply_reply(self, msg):
229 # print msg
230 # msg_id = msg['msg_id']
231 parent = msg['parent_header']
232 msg_id = parent['msg_id']
233 if msg_id not in self.outstanding:
234 print "got unknown result: %s"%msg_id
235 else:
236 self.outstanding.remove(msg_id)
237 content = msg['content']
238 if content['status'] == 'ok':
239 self.results[msg_id] = ss.unserialize_object(msg['buffers'])
240 else:
241
242 self.results[msg_id] = ss.unwrap_exception(content)
243
244 def _flush_notifications(self):
245 "flush incoming notifications of engine registrations"
246 msg = self.session.recv(self.notification_socket, mode=zmq.NOBLOCK)
247 while msg is not None:
248 msg = msg[-1]
249 msg_type = msg['msg_type']
250 handler = self._notification_handlers.get(msg_type, None)
251 if handler is None:
252 raise Exception("Unhandled message type: %s"%msg.msg_type)
253 else:
254 handler(msg)
255 msg = self.session.recv(self.notification_socket, mode=zmq.NOBLOCK)
256
257 def _flush_results(self, sock):
258 "flush incoming task or queue results"
259 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
260 while msg is not None:
261 msg = msg[-1]
262 msg_type = msg['msg_type']
263 handler = self._queue_handlers.get(msg_type, None)
264 if handler is None:
265 raise Exception("Unhandled message type: %s"%msg.msg_type)
266 else:
267 handler(msg)
268 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
269
270 ###### get/setitem ########
271
272 def __getitem__(self, key):
273 if isinstance(key, int):
274 if key not in self.ids:
275 raise IndexError("No such engine: %i"%key)
276 return DirectView(self, key)
277
278 if isinstance(key, slice):
279 indices = range(len(self.ids))[key]
280 ids = sorted(self._ids)
281 key = [ ids[i] for i in indices ]
282 # newkeys = sorted(self._ids)[thekeys[k]]
283
284 if isinstance(key, (tuple, list, xrange)):
285 _,targets = self._build_targets(list(key))
286 return DirectView(self, targets)
287 else:
288 raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
289
290 ############ begin real methods #############
291
292 def spin(self):
293 """flush incoming notifications and execution results."""
294 if self.notification_socket:
295 self._flush_notifications()
296 if self.queue_socket:
297 self._flush_results(self.queue_socket)
298 if self.task_socket:
299 self._flush_results(self.task_socket)
300
301 @spinfirst
302 def queue_status(self, targets=None, verbose=False):
303 """fetch the status of engine queues
304
305 Parameters
306 ----------
307 targets : int/str/list of ints/strs
308 the engines on which to execute
309 default : all
310 verbose : bool
311 whether to return
312
313 """
314 targets = self._build_targets(targets)[1]
315 content = dict(targets=targets)
316 self.session.send(self.controller_socket, "queue_request", content=content)
317 idents,msg = self.session.recv(self.controller_socket, 0)
318 return msg['content']
319
320 @spinfirst
321 def clear(self, targets=None):
322 """clear the namespace in target(s)"""
323 pass
324
325 @spinfirst
326 def abort(self, targets=None):
327 """abort the Queues of target(s)"""
328 pass
329
330 @defaultblock
331 def execute(self, code, targets='all', block=None):
332 """executes `code` on `targets` in blocking or nonblocking manner.
333
334 Parameters
335 ----------
336 code : str
337 the code string to be executed
338 targets : int/str/list of ints/strs
339 the engines on which to execute
340 default : all
341 block : bool
342 whether or not to wait until done
343 """
344 # block = self.block if block is None else block
345 # saveblock = self.block
346 # self.block = block
347 result = self.apply(execute, (code,), targets=targets, block=block, bound=True)
348 # self.block = saveblock
349 return result
350
351 def run(self, code, block=None):
352 """runs `code` on an engine.
353
354 Calls to this are load-balanced.
355
356 Parameters
357 ----------
358 code : str
359 the code string to be executed
360 block : bool
361 whether or not to wait until done
362
363 """
364 result = self.apply(execute, (code,), targets=None, block=block, bound=False)
365 return result
366
367 # a = time.time()
368 # content = dict(code=code)
369 # b = time.time()
370 # msg = self.session.send(self.task_socket, 'execute_request',
371 # content=content)
372 # c = time.time()
373 # msg_id = msg['msg_id']
374 # self.outstanding.add(msg_id)
375 # self.history.append(msg_id)
376 # d = time.time()
377 # if block:
378 # self.barrier(msg_id)
379 # return self.results[msg_id]
380 # else:
381 # return msg_id
382
383 def _apply_balanced(self, f, args, kwargs, bound=True, block=None):
384 """the underlying method for applying functions in a load balanced
385 manner."""
386 block = block if block is not None else self.block
387
388 bufs = ss.pack_apply_message(f,args,kwargs)
389 content = dict(bound=bound)
390 msg = self.session.send(self.task_socket, "apply_request",
391 content=content, buffers=bufs)
392 msg_id = msg['msg_id']
393 self.outstanding.add(msg_id)
394 self.history.append(msg_id)
395 if block:
396 self.barrier(msg_id)
397 return self.results[msg_id]
398 else:
399 return msg_id
400
401 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None):
402 """Then underlying method for applying functions to specific engines."""
403 block = block if block is not None else self.block
404 queues,targets = self._build_targets(targets)
405
406 bufs = ss.pack_apply_message(f,args,kwargs)
407 content = dict(bound=bound)
408 msg_ids = []
409 for queue in queues:
410 msg = self.session.send(self.queue_socket, "apply_request",
411 content=content, buffers=bufs,ident=queue)
412 msg_id = msg['msg_id']
413 self.outstanding.add(msg_id)
414 self.history.append(msg_id)
415 msg_ids.append(msg_id)
416 if block:
417 self.barrier(msg_ids)
418 else:
419 if len(msg_ids) == 1:
420 return msg_ids[0]
421 else:
422 return msg_ids
423 if len(msg_ids) == 1:
424 return self.results[msg_ids[0]]
425 else:
426 result = {}
427 for target,mid in zip(targets, msg_ids):
428 result[target] = self.results[mid]
429 return result
430
431 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None):
432 """calls f(*args, **kwargs) on a remote engine(s), returning the result.
433
434 if self.block is False:
435 returns msg_id or list of msg_ids
436 else:
437 returns actual result of f(*args, **kwargs)
438 """
439 args = args if args is not None else []
440 kwargs = kwargs if kwargs is not None else {}
441 if targets is None:
442 return self._apply_balanced(f,args,kwargs,bound=bound, block=block)
443 else:
444 return self._apply_direct(f, args, kwargs,
445 bound=bound,block=block, targets=targets)
446
447 # def apply_bound(self, f, *args, **kwargs):
448 # """calls f(*args, **kwargs) on a remote engine. This does get
449 # executed in an engine's namespace. The controller selects the
450 # target engine via 0MQ XREQ load balancing.
451 #
452 # if self.block is False:
453 # returns msg_id
454 # else:
455 # returns actual result of f(*args, **kwargs)
456 # """
457 # return self._apply(f, args, kwargs, bound=True)
458 #
459 #
460 # def apply_to(self, targets, f, *args, **kwargs):
461 # """calls f(*args, **kwargs) on a specific engine.
462 #
463 # if self.block is False:
464 # returns msg_id
465 # else:
466 # returns actual result of f(*args, **kwargs)
467 #
468 # The target's namespace is not used here.
469 # Use apply_bound_to() to access target's globals.
470 # """
471 # return self._apply_to(False, targets, f, args, kwargs)
472 #
473 # def apply_bound_to(self, targets, f, *args, **kwargs):
474 # """calls f(*args, **kwargs) on a specific engine.
475 #
476 # if self.block is False:
477 # returns msg_id
478 # else:
479 # returns actual result of f(*args, **kwargs)
480 #
481 # This method has access to the target's globals
482 #
483 # """
484 # return self._apply_to(f, args, kwargs)
485 #
486 def push(self, ns, targets=None, block=None):
487 """push the contents of `ns` into the namespace on `target`"""
488 if not isinstance(ns, dict):
489 raise TypeError("Must be a dict, not %s"%type(ns))
490 result = self.apply(_push, (ns,), targets=targets, block=block,bound=True)
491 return result
492
493 @spinfirst
494 def pull(self, keys, targets=None, block=True):
495 """pull objects from `target`'s namespace by `keys`"""
496
497 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
498 return result
499
500 def barrier(self, msg_ids=None, timeout=-1):
501 """waits on one or more `msg_ids`, for up to `timeout` seconds.
502
503 Parameters
504 ----------
505 msg_ids : int, str, or list of ints and/or strs
506 ints are indices to self.history
507 strs are msg_ids
508 default: wait on all outstanding messages
509 timeout : float
510 a time in seconds, after which to give up.
511 default is -1, which means no timeout
512
513 Returns
514 -------
515 True : when all msg_ids are done
516 False : timeout reached, msg_ids still outstanding
517 """
518 tic = time.time()
519 if msg_ids is None:
520 theids = self.outstanding
521 else:
522 if isinstance(msg_ids, (int, str)):
523 msg_ids = [msg_ids]
524 theids = set()
525 for msg_id in msg_ids:
526 if isinstance(msg_id, int):
527 msg_id = self.history[msg_id]
528 theids.add(msg_id)
529 self.spin()
530 while theids.intersection(self.outstanding):
531 if timeout >= 0 and ( time.time()-tic ) > timeout:
532 break
533 time.sleep(1e-3)
534 self.spin()
535 return len(theids.intersection(self.outstanding)) == 0
536
537 @spinfirst
538 def get_results(self, msg_ids,status_only=False):
539 """returns the result of the execute or task request with `msg_id`"""
540 if not isinstance(msg_ids, (list,tuple)):
541 msg_ids = [msg_ids]
542 theids = []
543 for msg_id in msg_ids:
544 if isinstance(msg_id, int):
545 msg_id = self.history[msg_id]
546 theids.append(msg_id)
547
548 content = dict(msg_ids=theids, status_only=status_only)
549 msg = self.session.send(self.controller_socket, "result_request", content=content)
550 zmq.select([self.controller_socket], [], [])
551 idents,msg = self.session.recv(self.controller_socket, zmq.NOBLOCK)
552
553 # while True:
554 # try:
555 # except zmq.ZMQError:
556 # time.sleep(1e-3)
557 # continue
558 # else:
559 # break
560 return msg['content']
561
562 No newline at end of file
This diff has been collapsed as it changes many lines, (770 lines changed) Show them Hide them
@@ -0,0 +1,770 b''
1 #!/usr/bin/env python
2 # encoding: utf-8
3
4 """The IPython Controller with 0MQ
5 This is the master object that handles connections from engines, clients, and
6 """
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
13
14 #-----------------------------------------------------------------------------
15 # Imports
16 #-----------------------------------------------------------------------------
17 from datetime import datetime
18
19 import zmq
20 from zmq.eventloop import zmqstream, ioloop
21 import uuid
22
23 # internal:
24 from streamsession import Message, wrap_exception # default_unpacker as unpack, default_packer as pack
25 from IPython.zmq.log import logger # a Logger object
26
27 # from messages import json # use the same import switches
28
29 #-----------------------------------------------------------------------------
30 # Code
31 #-----------------------------------------------------------------------------
32
33 class ReverseDict(dict):
34 """simple double-keyed subset of dict methods."""
35
36 def __init__(self, *args, **kwargs):
37 dict.__init__(self, *args, **kwargs)
38 self.reverse = dict()
39 for key, value in self.iteritems():
40 self.reverse[value] = key
41
42 def __getitem__(self, key):
43 try:
44 return dict.__getitem__(self, key)
45 except KeyError:
46 return self.reverse[key]
47
48 def __setitem__(self, key, value):
49 if key in self.reverse:
50 raise KeyError("Can't have key %r on both sides!"%key)
51 dict.__setitem__(self, key, value)
52 self.reverse[value] = key
53
54 def pop(self, key):
55 value = dict.pop(self, key)
56 self.d1.pop(value)
57 return value
58
59
60 class EngineConnector(object):
61 """A simple object for accessing the various zmq connections of an object.
62 Attributes are:
63 id (int): engine ID
64 uuid (str): uuid (unused?)
65 queue (str): identity of queue's XREQ socket
66 registration (str): identity of registration XREQ socket
67 heartbeat (str): identity of heartbeat XREQ socket
68 """
69 id=0
70 queue=None
71 control=None
72 registration=None
73 heartbeat=None
74 pending=None
75
76 def __init__(self, id, queue, registration, control, heartbeat=None):
77 logger.info("engine::Engine Connected: %i"%id)
78 self.id = id
79 self.queue = queue
80 self.registration = registration
81 self.control = control
82 self.heartbeat = heartbeat
83
84 class Controller(object):
85 """The IPython Controller with 0MQ connections
86
87 Parameters
88 ==========
89 loop: zmq IOLoop instance
90 session: StreamSession object
91 <removed> context: zmq context for creating new connections (?)
92 registrar: ZMQStream for engine registration requests (XREP)
93 clientele: ZMQStream for client connections (XREP)
94 not used for jobs, only query/control commands
95 queue: ZMQStream for monitoring the command queue (SUB)
96 heartbeat: HeartMonitor object checking the pulse of the engines
97 db_stream: connection to db for out of memory logging of commands
98 NotImplemented
99 queue_addr: zmq connection address of the XREP socket for the queue
100 hb_addr: zmq connection address of the PUB socket for heartbeats
101 task_addr: zmq connection address of the XREQ socket for task queue
102 """
103 # internal data structures:
104 ids=None # engine IDs
105 keytable=None
106 engines=None
107 clients=None
108 hearts=None
109 pending=None
110 results=None
111 tasks=None
112 completed=None
113 mia=None
114 incoming_registrations=None
115 registration_timeout=None
116
117 #objects from constructor:
118 loop=None
119 registrar=None
120 clientelle=None
121 queue=None
122 heartbeat=None
123 notifier=None
124 db=None
125 client_addr=None
126 engine_addrs=None
127
128
129 def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifier, db, engine_addrs, client_addrs):
130 """
131 # universal:
132 loop: IOLoop for creating future connections
133 session: streamsession for sending serialized data
134 # engine:
135 queue: ZMQStream for monitoring queue messages
136 registrar: ZMQStream for engine registration
137 heartbeat: HeartMonitor object for tracking engines
138 # client:
139 clientele: ZMQStream for client connections
140 # extra:
141 db: ZMQStream for db connection (NotImplemented)
142 engine_addrs: zmq address/protocol dict for engine connections
143 client_addrs: zmq address/protocol dict for client connections
144 """
145 self.ids = set()
146 self.keytable={}
147 self.incoming_registrations={}
148 self.engines = {}
149 self.by_ident = {}
150 self.clients = {}
151 self.hearts = {}
152 self.mia = set()
153
154 # self.sockets = {}
155 self.loop = loop
156 self.session = session
157 self.registrar = registrar
158 self.clientele = clientele
159 self.queue = queue
160 self.heartbeat = heartbeat
161 self.notifier = notifier
162 self.db = db
163
164 self.client_addrs = client_addrs
165 assert isinstance(client_addrs['queue'], str)
166 # self.hb_addrs = hb_addrs
167 self.engine_addrs = engine_addrs
168 assert isinstance(engine_addrs['queue'], str)
169 assert len(engine_addrs['heartbeat']) == 2
170
171
172 # register our callbacks
173 self.registrar.on_recv(self.dispatch_register_request)
174 self.clientele.on_recv(self.dispatch_client_msg)
175 self.queue.on_recv(self.dispatch_queue_traffic)
176
177 if heartbeat is not None:
178 heartbeat.add_heart_failure_handler(self.handle_heart_failure)
179 heartbeat.add_new_heart_handler(self.handle_new_heart)
180
181 if self.db is not None:
182 self.db.on_recv(self.dispatch_db)
183
184 self.client_handlers = {'queue_request': self.queue_status,
185 'result_request': self.get_results,
186 'purge_request': self.purge_results,
187 'resubmit_request': self.resubmit_task,
188 }
189
190 self.registrar_handlers = {'registration_request' : self.register_engine,
191 'unregistration_request' : self.unregister_engine,
192 'connection_request': self.connection_request,
193
194 }
195 #
196 # this is the stuff that will move to DB:
197 self.results = {} # completed results
198 self.pending = {} # pending messages, keyed by msg_id
199 self.queues = {} # pending msg_ids keyed by engine_id
200 self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
201 self.completed = {} # completed msg_ids keyed by engine_id
202 self.registration_timeout = max(5000, 2*self.heartbeat.period)
203
204 logger.info("controller::created controller")
205
206 def _new_id(self):
207 """gemerate a new ID"""
208 newid = 0
209 incoming = [id[0] for id in self.incoming_registrations.itervalues()]
210 # print newid, self.ids, self.incoming_registrations
211 while newid in self.ids or newid in incoming:
212 newid += 1
213 return newid
214
215
216 #-----------------------------------------------------------------------------
217 # message validation
218 #-----------------------------------------------------------------------------
219 def _validate_targets(self, targets):
220 """turn any valid targets argument into a list of integer ids"""
221 if targets is None:
222 # default to all
223 targets = self.ids
224
225 if isinstance(targets, (int,str,unicode)):
226 # only one target specified
227 targets = [targets]
228 _targets = []
229 for t in targets:
230 # map raw identities to ids
231 if isinstance(t, (str,unicode)):
232 t = self.by_ident.get(t, t)
233 _targets.append(t)
234 targets = _targets
235 bad_targets = [ t for t in targets if t not in self.ids ]
236 if bad_targets:
237 raise IndexError("No Such Engine: %r"%bad_targets)
238 if not targets:
239 raise IndexError("No Engines Registered")
240 return targets
241
242 def _validate_client_msg(self, msg):
243 """validates and unpacks headers of a message. Returns False if invalid,
244 (ident, header, parent, content)"""
245 client_id = msg[0]
246 try:
247 msg = self.session.unpack_message(msg[1:], content=True)
248 except:
249 logger.error("client::Invalid Message %s"%msg)
250 return False
251
252 msg_type = msg.get('msg_type', None)
253 if msg_type is None:
254 return False
255 header = msg.get('header')
256 # session doesn't handle split content for now:
257 return client_id, msg
258
259
260 #-----------------------------------------------------------------------------
261 # dispatch methods (1 per socket)
262 #-----------------------------------------------------------------------------
263
264 def dispatch_register_request(self, msg):
265 """"""
266 logger.debug("registration::dispatch_register_request(%s)"%msg)
267 idents,msg = self.session.feed_identities(msg)
268 print idents,msg, len(msg)
269 try:
270 msg = self.session.unpack_message(msg,content=True)
271 except Exception, e:
272 logger.error("registration::got bad registration message: %s"%msg)
273 raise e
274 return
275
276 msg_type = msg['msg_type']
277 content = msg['content']
278
279 handler = self.registrar_handlers.get(msg_type, None)
280 if handler is None:
281 logger.error("registration::got bad registration message: %s"%msg)
282 else:
283 handler(idents, msg)
284
285 def dispatch_queue_traffic(self, msg):
286 """all ME and Task queue messages come through here"""
287 logger.debug("queue traffic: %s"%msg[:2])
288 switch = msg[0]
289 idents, msg = self.session.feed_identities(msg[1:])
290 if switch == 'in':
291 self.save_queue_request(idents, msg)
292 elif switch == 'out':
293 self.save_queue_result(idents, msg)
294 elif switch == 'intask':
295 self.save_task_request(idents, msg)
296 elif switch == 'outtask':
297 self.save_task_result(idents, msg)
298 elif switch == 'tracktask':
299 self.save_task_destination(idents, msg)
300 else:
301 logger.error("Invalid message topic: %s"%switch)
302
303
304 def dispatch_client_msg(self, msg):
305 """Route messages from clients"""
306 idents, msg = self.session.feed_identities(msg)
307 client_id = idents[0]
308 try:
309 msg = self.session.unpack_message(msg, content=True)
310 except:
311 content = wrap_exception()
312 logger.error("Bad Client Message: %s"%msg)
313 self.session.send(self.clientele, "controller_error", ident=client_id,
314 content=content)
315 return
316
317 # print client_id, header, parent, content
318 #switch on message type:
319 msg_type = msg['msg_type']
320 logger.info("client:: client %s requested %s"%(client_id, msg_type))
321 handler = self.client_handlers.get(msg_type, None)
322 try:
323 assert handler is not None, "Bad Message Type: %s"%msg_type
324 except:
325 content = wrap_exception()
326 logger.error("Bad Message Type: %s"%msg_type)
327 self.session.send(self.clientele, "controller_error", ident=client_id,
328 content=content)
329 return
330 else:
331 handler(client_id, msg)
332
333 def dispatch_db(self, msg):
334 """"""
335 raise NotImplementedError
336
337 #---------------------------------------------------------------------------
338 # handler methods (1 per event)
339 #---------------------------------------------------------------------------
340
341 #----------------------- Heartbeat --------------------------------------
342
343 def handle_new_heart(self, heart):
344 """handler to attach to heartbeater.
345 Called when a new heart starts to beat.
346 Triggers completion of registration."""
347 logger.debug("heartbeat::handle_new_heart(%r)"%heart)
348 if heart not in self.incoming_registrations:
349 logger.info("heartbeat::ignoring new heart: %r"%heart)
350 else:
351 self.finish_registration(heart)
352
353
354 def handle_heart_failure(self, heart):
355 """handler to attach to heartbeater.
356 called when a previously registered heart fails to respond to beat request.
357 triggers unregistration"""
358 logger.debug("heartbeat::handle_heart_failure(%r)"%heart)
359 eid = self.hearts.get(heart, None)
360 if eid is None:
361 logger.info("heartbeat::ignoring heart failure %r"%heart)
362 else:
363 self.unregister_engine(heart, dict(content=dict(id=eid)))
364
365 #----------------------- MUX Queue Traffic ------------------------------
366
367 def save_queue_request(self, idents, msg):
368 queue_id, client_id = idents[:2]
369
370 try:
371 msg = self.session.unpack_message(msg, content=False)
372 except:
373 logger.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg))
374 return
375
376 eid = self.by_ident.get(queue_id, None)
377 if eid is None:
378 logger.error("queue::target %r not registered"%queue_id)
379 logger.debug("queue:: valid are: %s"%(self.by_ident.keys()))
380 return
381
382 header = msg['header']
383 msg_id = header['msg_id']
384 info = dict(submit=datetime.now(),
385 received=None,
386 engine=(eid, queue_id))
387 self.pending[msg_id] = ( msg, info )
388 self.queues[eid][0].append(msg_id)
389
390 def save_queue_result(self, idents, msg):
391 client_id, queue_id = idents[:2]
392
393 try:
394 msg = self.session.unpack_message(msg, content=False)
395 except:
396 logger.error("queue::engine %r sent invalid message to %r: %s"%(
397 queue_id,client_id, msg))
398 return
399
400 eid = self.by_ident.get(queue_id, None)
401 if eid is None:
402 logger.error("queue::unknown engine %r is sending a reply: "%queue_id)
403 logger.debug("queue:: %s"%msg[2:])
404 return
405
406 parent = msg['parent_header']
407 if not parent:
408 return
409 msg_id = parent['msg_id']
410 self.results[msg_id] = msg
411 if msg_id in self.pending:
412 self.pending.pop(msg_id)
413 self.queues[eid][0].remove(msg_id)
414 self.completed[eid].append(msg_id)
415 else:
416 logger.debug("queue:: unknown msg finished %s"%msg_id)
417
418 #--------------------- Task Queue Traffic ------------------------------
419
420 def save_task_request(self, idents, msg):
421 client_id = idents[0]
422
423 try:
424 msg = self.session.unpack_message(msg, content=False)
425 except:
426 logger.error("task::client %r sent invalid task message: %s"%(
427 client_id, msg))
428 return
429
430 header = msg['header']
431 msg_id = header['msg_id']
432 self.mia.add(msg_id)
433 self.pending[msg_id] = msg
434 if not self.tasks.has_key(client_id):
435 self.tasks[client_id] = []
436 self.tasks[client_id].append(msg_id)
437
438 def save_task_result(self, idents, msg):
439 client_id = idents[0]
440 try:
441 msg = self.session.unpack_message(msg, content=False)
442 except:
443 logger.error("task::invalid task result message send to %r: %s"%(
444 client_id, msg))
445 return
446
447 parent = msg['parent_header']
448 if not parent:
449 # print msg
450 # logger.warn("")
451 return
452 msg_id = parent['msg_id']
453 self.results[msg_id] = msg
454 if msg_id in self.pending:
455 self.pending.pop(msg_id)
456 if msg_id in self.mia:
457 self.mia.remove(msg_id)
458 else:
459 logger.debug("task:: unknown task %s finished"%msg_id)
460
461 def save_task_destination(self, idents, msg):
462 try:
463 msg = self.session.unpack_message(msg, content=True)
464 except:
465 logger.error("task::invalid task tracking message")
466 return
467 content = msg['content']
468 print content
469 msg_id = content['msg_id']
470 engine_uuid = content['engine_id']
471 for eid,queue_id in self.keytable.iteritems():
472 if queue_id == engine_uuid:
473 break
474
475 logger.info("task:: task %s arrived on %s"%(msg_id, eid))
476 if msg_id in self.mia:
477 self.mia.remove(msg_id)
478 else:
479 logger.debug("task::task %s not listed as MIA?!"%(msg_id))
480 self.tasks[engine_uuid].append(msg_id)
481
482 def mia_task_request(self, idents, msg):
483 client_id = idents[0]
484 content = dict(mia=self.mia,status='ok')
485 self.session.send('mia_reply', content=content, idents=client_id)
486
487
488
489 #-------------------- Registration -----------------------------
490
491 def connection_request(self, client_id, msg):
492 """reply with connection addresses for clients"""
493 logger.info("client::client %s connected"%client_id)
494 content = dict(status='ok')
495 content.update(self.client_addrs)
496 jsonable = {}
497 for k,v in self.keytable.iteritems():
498 jsonable[str(k)] = v
499 content['engines'] = jsonable
500 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
501
502 def register_engine(self, reg, msg):
503 """register an engine"""
504 content = msg['content']
505 try:
506 queue = content['queue']
507 except KeyError:
508 logger.error("registration::queue not specified")
509 return
510 heart = content.get('heartbeat', None)
511 """register a new engine, and create the socket(s) necessary"""
512 eid = self._new_id()
513 # print (eid, queue, reg, heart)
514
515 logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
516
517 content = dict(id=eid,status='ok')
518 content.update(self.engine_addrs)
519 # check if requesting available IDs:
520 if queue in self.by_ident:
521 content = {'status': 'error', 'reason': "queue_id %r in use"%queue}
522 elif heart in self.hearts: # need to check unique hearts?
523 content = {'status': 'error', 'reason': "heart_id %r in use"%heart}
524 else:
525 for h, pack in self.incoming_registrations.iteritems():
526 if heart == h:
527 content = {'status': 'error', 'reason': "heart_id %r in use"%heart}
528 break
529 elif queue == pack[1]:
530 content = {'status': 'error', 'reason': "queue_id %r in use"%queue}
531 break
532
533 msg = self.session.send(self.registrar, "registration_reply",
534 content=content,
535 ident=reg)
536
537 if content['status'] == 'ok':
538 if heart in self.heartbeat.hearts:
539 # already beating
540 self.incoming_registrations[heart] = (eid,queue,reg,None)
541 self.finish_registration(heart)
542 else:
543 purge = lambda : self._purge_stalled_registration(heart)
544 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
545 dc.start()
546 self.incoming_registrations[heart] = (eid,queue,reg,dc)
547 else:
548 logger.error("registration::registration %i failed: %s"%(eid, content['reason']))
549 return eid
550
551 def unregister_engine(self, ident, msg):
552 try:
553 eid = msg['content']['id']
554 except:
555 logger.error("registration::bad engine id for unregistration: %s"%ident)
556 return
557 logger.info("registration::unregister_engine(%s)"%eid)
558 content=dict(id=eid, queue=self.engines[eid].queue)
559 self.ids.remove(eid)
560 self.keytable.pop(eid)
561 ec = self.engines.pop(eid)
562 self.hearts.pop(ec.heartbeat)
563 self.by_ident.pop(ec.queue)
564 self.completed.pop(eid)
565 for msg_id in self.queues.pop(eid)[0]:
566 msg = self.pending.pop(msg_id)
567 ############## TODO: HANDLE IT ################
568
569 if self.notifier:
570 self.session.send(self.notifier, "unregistration_notification", content=content)
571
572 def finish_registration(self, heart):
573 try:
574 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
575 except KeyError:
576 logger.error("registration::tried to finish nonexistant registration")
577 return
578 logger.info("registration::finished registering engine %i:%r"%(eid,queue))
579 if purge is not None:
580 purge.stop()
581 control = queue
582 self.ids.add(eid)
583 self.keytable[eid] = queue
584 self.engines[eid] = EngineConnector(eid, queue, reg, control, heart)
585 self.by_ident[queue] = eid
586 self.queues[eid] = ([],[])
587 self.completed[eid] = list()
588 self.hearts[heart] = eid
589 content = dict(id=eid, queue=self.engines[eid].queue)
590 if self.notifier:
591 self.session.send(self.notifier, "registration_notification", content=content)
592
593 def _purge_stalled_registration(self, heart):
594 if heart in self.incoming_registrations:
595 eid = self.incoming_registrations.pop(heart)[0]
596 logger.info("registration::purging stalled registration: %i"%eid)
597 else:
598 pass
599
600 #------------------- Client Requests -------------------------------
601
602 def check_load(self, client_id, msg):
603 content = msg['content']
604 try:
605 targets = content['targets']
606 targets = self._validate_targets(targets)
607 except:
608 content = wrap_exception()
609 self.session.send(self.clientele, "controller_error",
610 content=content, ident=client_id)
611 return
612
613 content = dict(status='ok')
614 # loads = {}
615 for t in targets:
616 content[str(t)] = len(self.queues[t])
617 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
618
619
620 def queue_status(self, client_id, msg):
621 """handle queue_status request"""
622 content = msg['content']
623 targets = content['targets']
624 try:
625 targets = self._validate_targets(targets)
626 except:
627 content = wrap_exception()
628 self.session.send(self.clientele, "controller_error",
629 content=content, ident=client_id)
630 return
631 verbose = msg.get('verbose', False)
632 content = dict()
633 for t in targets:
634 queue = self.queues[t]
635 completed = self.completed[t]
636 if not verbose:
637 queue = len(queue)
638 completed = len(completed)
639 content[str(t)] = {'queue': queue, 'completed': completed }
640 # pending
641 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
642
643 def job_status(self, client_id, msg):
644 """handle queue_status request"""
645 content = msg['content']
646 msg_ids = content['msg_ids']
647 try:
648 targets = self._validate_targets(targets)
649 except:
650 content = wrap_exception()
651 self.session.send(self.clientele, "controller_error",
652 content=content, ident=client_id)
653 return
654 verbose = msg.get('verbose', False)
655 content = dict()
656 for t in targets:
657 queue = self.queues[t]
658 completed = self.completed[t]
659 if not verbose:
660 queue = len(queue)
661 completed = len(completed)
662 content[str(t)] = {'queue': queue, 'completed': completed }
663 # pending
664 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
665
666 def purge_results(self, client_id, msg):
667 content = msg['content']
668 msg_ids = content.get('msg_ids', [])
669 reply = dict(status='ok')
670 if msg_ids == 'all':
671 self.results = {}
672 else:
673 for msg_id in msg_ids:
674 if msg_id in self.results:
675 self.results.pop(msg_id)
676 else:
677 if msg_id in self.pending:
678 reply = dict(status='error', reason="msg pending: %r"%msg_id)
679 else:
680 reply = dict(status='error', reason="No such msg: %r"%msg_id)
681 break
682 eids = content.get('engine_ids', [])
683 for eid in eids:
684 if eid not in self.engines:
685 reply = dict(status='error', reason="No such engine: %i"%eid)
686 break
687 msg_ids = self.completed.pop(eid)
688 for msg_id in msg_ids:
689 self.results.pop(msg_id)
690
691 self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
692
693 def resubmit_task(self, client_id, msg, buffers):
694 content = msg['content']
695 header = msg['header']
696
697
698 msg_ids = content.get('msg_ids', [])
699 reply = dict(status='ok')
700 if msg_ids == 'all':
701 self.results = {}
702 else:
703 for msg_id in msg_ids:
704 if msg_id in self.results:
705 self.results.pop(msg_id)
706 else:
707 if msg_id in self.pending:
708 reply = dict(status='error', reason="msg pending: %r"%msg_id)
709 else:
710 reply = dict(status='error', reason="No such msg: %r"%msg_id)
711 break
712 eids = content.get('engine_ids', [])
713 for eid in eids:
714 if eid not in self.engines:
715 reply = dict(status='error', reason="No such engine: %i"%eid)
716 break
717 msg_ids = self.completed.pop(eid)
718 for msg_id in msg_ids:
719 self.results.pop(msg_id)
720
721 self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
722
723 def get_results(self, client_id, msg):
724 """get the result of 1 or more messages"""
725 content = msg['content']
726 msg_ids = set(content['msg_ids'])
727 statusonly = content.get('status_only', False)
728 pending = []
729 completed = []
730 content = dict(status='ok')
731 content['pending'] = pending
732 content['completed'] = completed
733 for msg_id in msg_ids:
734 if msg_id in self.pending:
735 pending.append(msg_id)
736 elif msg_id in self.results:
737 completed.append(msg_id)
738 if not statusonly:
739 content[msg_id] = self.results[msg_id]['content']
740 else:
741 content = dict(status='error')
742 content['reason'] = 'no such message: '+msg_id
743 break
744 self.session.send(self.clientele, "result_reply", content=content,
745 parent=msg, ident=client_id)
746
747
748
749 ############ OLD METHODS for Python Relay Controller ###################
750 def _validate_engine_msg(self, msg):
751 """validates and unpacks headers of a message. Returns False if invalid,
752 (ident, message)"""
753 ident = msg[0]
754 try:
755 msg = self.session.unpack_message(msg[1:], content=False)
756 except:
757 logger.error("engine.%s::Invalid Message %s"%(ident, msg))
758 return False
759
760 try:
761 eid = msg.header.username
762 assert self.engines.has_key(eid)
763 except:
764 logger.error("engine::Invalid Engine ID %s"%(ident))
765 return False
766
767 return eid, msg
768
769
770 No newline at end of file
@@ -0,0 +1,143 b''
1 #!/usr/bin/env python
2 """A simple engine that talks to a controller over 0MQ.
3 it handles registration, etc. and launches a kernel
4 connected to the Controller's queue(s).
5 """
6 import sys
7 import time
8 import traceback
9 import uuid
10
11 import zmq
12 from zmq.eventloop import ioloop, zmqstream
13
14 from streamsession import Message, StreamSession
15 from client import Client
16 import streamkernel as kernel
17 import heartmonitor
18 # import taskthread
19 # from log import logger
20
21
22 def printer(*msg):
23 print msg
24
25 class Engine(object):
26 """IPython engine"""
27
28 id=None
29 context=None
30 loop=None
31 session=None
32 queue_id=None
33 control_id=None
34 heart_id=None
35 registrar=None
36 heart=None
37 kernel=None
38
39 def __init__(self, context, loop, session, registrar, client, queue_id=None, heart_id=None):
40 self.context = context
41 self.loop = loop
42 self.session = session
43 self.registrar = registrar
44 self.client = client
45 self.queue_id = queue_id or str(uuid.uuid4())
46 self.heart_id = heart_id or self.queue_id
47 self.registrar.on_send(printer)
48
49 def register(self):
50
51 content = dict(queue=self.queue_id, heartbeat=self.heart_id)
52 self.registrar.on_recv(self.complete_registration)
53 self.session.send(self.registrar, "registration_request",content=content)
54
55 def complete_registration(self, msg):
56 # print msg
57 idents,msg = self.session.feed_identities(msg)
58 msg = Message(self.session.unpack_message(msg))
59 if msg.content.status == 'ok':
60 self.session.username = str(msg.content.id)
61 queue_addr = msg.content.queue
62 if queue_addr:
63 queue = self.context.socket(zmq.PAIR)
64 queue.setsockopt(zmq.IDENTITY, self.queue_id)
65 queue.connect(str(queue_addr))
66 self.queue = zmqstream.ZMQStream(queue, self.loop)
67
68 control_addr = msg.content.control
69 if control_addr:
70 control = self.context.socket(zmq.PAIR)
71 control.setsockopt(zmq.IDENTITY, self.queue_id)
72 control.connect(str(control_addr))
73 self.control = zmqstream.ZMQStream(control, self.loop)
74
75 task_addr = msg.content.task
76 print task_addr
77 if task_addr:
78 # task as stream:
79 task = self.context.socket(zmq.PAIR)
80 task.connect(str(task_addr))
81 self.task_stream = zmqstream.ZMQStream(task, self.loop)
82 # TaskThread:
83 # mon_addr = msg.content.monitor
84 # task = taskthread.TaskThread(zmq.PAIR, zmq.PUB, self.queue_id)
85 # task.connect_in(str(task_addr))
86 # task.connect_out(str(mon_addr))
87 # self.task_stream = taskthread.QueueStream(*task.queues)
88 # task.start()
89
90 hbs = msg.content.heartbeat
91 self.heart = heartmonitor.Heart(*map(str, hbs), heart_id=self.heart_id)
92 self.heart.start()
93 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
94 # placeholder for now:
95 pub = self.context.socket(zmq.PUB)
96 pub = zmqstream.ZMQStream(pub, self.loop)
97 # create and start the kernel
98 self.kernel = kernel.Kernel(self.session, self.control, self.queue, pub, self.task_stream, self.client)
99 self.kernel.start()
100 else:
101 # logger.error("Registration Failed: %s"%msg)
102 raise Exception("Registration Failed: %s"%msg)
103
104 # logger.info("engine::completed registration with id %s"%self.session.username)
105
106 print msg
107
108 def unregister(self):
109 self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username)))
110 time.sleep(1)
111 sys.exit(0)
112
113 def start(self):
114 print "registering"
115 self.register()
116
117
118 if __name__ == '__main__':
119
120 loop = ioloop.IOLoop.instance()
121 session = StreamSession()
122 ctx = zmq.Context()
123
124 ip = '127.0.0.1'
125 reg_port = 10101
126 connection = ('tcp://%s' % ip) + ':%i'
127 reg_conn = connection % reg_port
128 print reg_conn
129 print >>sys.__stdout__, "Starting the engine..."
130
131 reg = ctx.socket(zmq.PAIR)
132 reg.connect(reg_conn)
133 reg = zmqstream.ZMQStream(reg, loop)
134 client = Client(reg_conn)
135 if len(sys.argv) > 1:
136 queue_id=sys.argv[1]
137 else:
138 queue_id = None
139
140 e = Engine(ctx, loop, session, reg, client, queue_id)
141 dc = ioloop.DelayedCallback(e.start, 500, loop)
142 dc.start()
143 loop.start() No newline at end of file
@@ -0,0 +1,169 b''
1 #!/usr/bin/env python
2 """
3 A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
4 and hearts are tracked based on their XREQ identities.
5 """
6
7 import time
8 import uuid
9
10 import zmq
11 from zmq.devices import ProcessDevice
12 from zmq.eventloop import ioloop, zmqstream
13
14 #internal
15 from IPython.zmq.log import logger
16
17 class Heart(object):
18 """A basic heart object for responding to a HeartMonitor.
19 This is a simple wrapper with defaults for the most common
20 Device model for responding to heartbeats.
21
22 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
23 SUB/XREQ for in/out.
24
25 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
26 device=None
27 id=None
28 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.XREQ, heart_id=None):
29 self.device = ProcessDevice(zmq.FORWARDER, in_type, out_type)
30 self.device.connect_in(in_addr)
31 self.device.connect_out(out_addr)
32 if in_type == zmq.SUB:
33 self.device.setsockopt_in(zmq.SUBSCRIBE, "")
34 if heart_id is None:
35 heart_id = str(uuid.uuid4())
36 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
37 self.id = heart_id
38
39 def start(self):
40 return self.device.start()
41
42 class HeartMonitor(object):
43 """A basic HeartMonitor class
44 pingstream: a PUB stream
45 pongstream: an XREP stream
46 period: the period of the heartbeat in milliseconds"""
47 loop=None
48 pingstream=None
49 pongstream=None
50 period=None
51 hearts=None
52 on_probation=None
53 last_ping=None
54
55 def __init__(self, loop, pingstream, pongstream, period=1000):
56 self.loop = loop
57 self.period = period
58
59 self.pingstream = pingstream
60 self.pongstream = pongstream
61 self.pongstream.on_recv(self.handle_pong)
62
63 self.hearts = set()
64 self.responses = set()
65 self.on_probation = set()
66 self.lifetime = 0
67 self.tic = time.time()
68
69 self._new_handlers = set()
70 self._failure_handlers = set()
71
72 def start(self):
73 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
74 self.caller.start()
75
76 def add_new_heart_handler(self, handler):
77 """add a new handler for new hearts"""
78 logger.debug("heartbeat::new_heart_handler: %s"%handler)
79 self._new_handlers.add(handler)
80
81 def add_heart_failure_handler(self, handler):
82 """add a new handler for heart failure"""
83 logger.debug("heartbeat::new heart failure handler: %s"%handler)
84 self._failure_handlers.add(handler)
85
86 def _flush(self):
87 """override IOLoop triggers"""
88 while True:
89 try:
90 msg = self.pongstream.socket.recv_multipart(zmq.NOBLOCK)
91 logger.warn("IOLoop triggered beat with incoming heartbeat waiting to be handled")
92 except zmq.ZMQError:
93 return
94 else:
95 self.handle_pong(msg)
96 # print '.'
97
98
99 def beat(self):
100 self._flush()
101 self.last_ping = self.lifetime
102
103 toc = time.time()
104 self.lifetime += toc-self.tic
105 self.tic = toc
106 logger.debug("heartbeat::%s"%self.lifetime)
107 goodhearts = self.hearts.intersection(self.responses)
108 missed_beats = self.hearts.difference(goodhearts)
109 heartfailures = self.on_probation.intersection(missed_beats)
110 newhearts = self.responses.difference(goodhearts)
111 map(self.handle_new_heart, newhearts)
112 map(self.handle_heart_failure, heartfailures)
113 self.on_probation = missed_beats.intersection(self.hearts)
114 self.responses = set()
115 # print self.on_probation, self.hearts
116 # logger.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
117 self.pingstream.send(str(self.lifetime))
118
119 def handle_new_heart(self, heart):
120 if self._new_handlers:
121 for handler in self._new_handlers:
122 handler(heart)
123 else:
124 logger.info("heartbeat::yay, got new heart %s!"%heart)
125 self.hearts.add(heart)
126
127 def handle_heart_failure(self, heart):
128 if self._failure_handlers:
129 for handler in self._failure_handlers:
130 try:
131 handler(heart)
132 except Exception, e:
133 print e
134 logger.error("heartbeat::Bad Handler! %s"%handler)
135 pass
136 else:
137 logger.info("heartbeat::Heart %s failed :("%heart)
138 self.hearts.remove(heart)
139
140
141 def handle_pong(self, msg):
142 "a heart just beat"
143 if msg[1] == str(self.lifetime):
144 delta = time.time()-self.tic
145 logger.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
146 self.responses.add(msg[0])
147 elif msg[1] == str(self.last_ping):
148 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
149 logger.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
150 self.responses.add(msg[0])
151 else:
152 logger.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
153 (msg[1],self.lifetime))
154
155
156 if __name__ == '__main__':
157 loop = ioloop.IOLoop.instance()
158 context = zmq.Context()
159 pub = context.socket(zmq.PUB)
160 pub.bind('tcp://127.0.0.1:5555')
161 xrep = context.socket(zmq.XREP)
162 xrep.bind('tcp://127.0.0.1:5556')
163
164 outstream = zmqstream.ZMQStream(pub, loop)
165 instream = zmqstream.ZMQStream(xrep, loop)
166
167 hb = HeartMonitor(loop, outstream, instream)
168
169 loop.start()
@@ -0,0 +1,95 b''
1 """RemoteNamespace object, for dict style interaction with a remote
2 execution kernel."""
3
4 from functools import wraps
5 from IPython.external.decorator import decorator
6
7 def _clear():
8 globals().clear()
9
10 @decorator
11 def spinfirst(f):
12 @wraps(f)
13 def spun_method(self, *args, **kwargs):
14 self.spin()
15 return f(self, *args, **kwargs)
16 return spun_method
17
18 @decorator
19 def myblock(f, self, *args, **kwargs):
20 block = self.client.block
21 self.client.block = self.block
22 ret = f(self, *args, **kwargs)
23 self.client.block = block
24 return ret
25
26 class RemoteNamespace(object):
27 """A RemoteNamespace object, providing dictionary
28 access to an engine via an IPython.zmq.client object.
29
30
31 """
32 client = None
33 queue = None
34 id = None
35 block = False
36
37 def __init__(self, client, id):
38 self.client = client
39 self.id = id
40 self.block = client.block # initial state is same as client
41
42 def __repr__(self):
43 return "<RemoteNamespace[%i]>"%self.id
44
45 @myblock
46 def apply(self, f, *args, **kwargs):
47 """call f(*args, **kwargs) in remote namespace
48
49 This method has no access to the user namespace"""
50 return self.client.apply_to(self.id, f, *args, **kwargs)
51
52 @myblock
53 def apply_bound(self, f, *args, **kwargs):
54 """call `f(*args, **kwargs)` in remote namespace.
55
56 `f` will have access to the user namespace as globals()."""
57 return self.client.apply_bound_to(self.id, f, *args, **kwargs)
58
59 @myblock
60 def update(self, ns):
61 """update remote namespace with dict `ns`"""
62 return self.client.push(self.id, ns, self.block)
63
64 def get(self, key_s):
65 """get object(s) by `key_s` from remote namespace
66 will return one object if it is a key.
67 It also takes a list of keys, and will return a list of objects."""
68 return self.client.pull(self.id, key_s, self.block)
69
70 push = update
71 pull = get
72
73 def __getitem__(self, key):
74 return self.get(key)
75
76 def __setitem__(self,key,value):
77 self.update({key:value})
78
79 def clear(self):
80 """clear the remote namespace"""
81 return self.client.apply_bound_to(self.id, _clear)
82
83 @decorator
84 def withme(self, toapply):
85 """for use as a decorator, this turns a function into
86 one that executes remotely."""
87 @wraps(toapply)
88 def applied(self, *args, **kwargs):
89 return self.apply_bound(self, toapply, *args, **kwargs)
90 return applied
91
92
93
94
95
@@ -0,0 +1,482 b''
1 #!/usr/bin/env python
2 """
3 Kernel adapted from kernel.py to use ZMQ Streams
4 """
5
6 import __builtin__
7 import sys
8 import time
9 import traceback
10 from signal import SIGTERM, SIGKILL
11
12 from code import CommandCompiler
13
14 import zmq
15 from zmq.eventloop import ioloop, zmqstream
16
17 from streamsession import StreamSession, Message, extract_header, serialize_object,\
18 unpack_apply_message
19 from IPython.zmq.completer import KernelCompleter
20
21 class OutStream(object):
22 """A file like object that publishes the stream to a 0MQ PUB socket."""
23
24 def __init__(self, session, pub_socket, name, max_buffer=200):
25 self.session = session
26 self.pub_socket = pub_socket
27 self.name = name
28 self._buffer = []
29 self._buffer_len = 0
30 self.max_buffer = max_buffer
31 self.parent_header = {}
32
33 def set_parent(self, parent):
34 self.parent_header = extract_header(parent)
35
36 def close(self):
37 self.pub_socket = None
38
39 def flush(self):
40 if self.pub_socket is None:
41 raise ValueError(u'I/O operation on closed file')
42 else:
43 if self._buffer:
44 data = ''.join(self._buffer)
45 content = {u'name':self.name, u'data':data}
46 # msg = self.session.msg(u'stream', content=content,
47 # parent=self.parent_header)
48 msg = self.session.send(self.pub_socket, u'stream', content=content, parent=self.parent_header)
49 # print>>sys.__stdout__, Message(msg)
50 # self.pub_socket.send_json(msg)
51 self._buffer_len = 0
52 self._buffer = []
53
54 def isattr(self):
55 return False
56
57 def next(self):
58 raise IOError('Read not supported on a write only stream.')
59
60 def read(self, size=None):
61 raise IOError('Read not supported on a write only stream.')
62
63 readline=read
64
65 def write(self, s):
66 if self.pub_socket is None:
67 raise ValueError('I/O operation on closed file')
68 else:
69 self._buffer.append(s)
70 self._buffer_len += len(s)
71 self._maybe_send()
72
73 def _maybe_send(self):
74 if '\n' in self._buffer[-1]:
75 self.flush()
76 if self._buffer_len > self.max_buffer:
77 self.flush()
78
79 def writelines(self, sequence):
80 if self.pub_socket is None:
81 raise ValueError('I/O operation on closed file')
82 else:
83 for s in sequence:
84 self.write(s)
85
86
87 class DisplayHook(object):
88
89 def __init__(self, session, pub_socket):
90 self.session = session
91 self.pub_socket = pub_socket
92 self.parent_header = {}
93
94 def __call__(self, obj):
95 if obj is None:
96 return
97
98 __builtin__._ = obj
99 # msg = self.session.msg(u'pyout', {u'data':repr(obj)},
100 # parent=self.parent_header)
101 # self.pub_socket.send_json(msg)
102 self.session.send(self.pub_socket, u'pyout', content={u'data':repr(obj)}, parent=self.parent_header)
103
104 def set_parent(self, parent):
105 self.parent_header = extract_header(parent)
106
107
108 class RawInput(object):
109
110 def __init__(self, session, socket):
111 self.session = session
112 self.socket = socket
113
114 def __call__(self, prompt=None):
115 msg = self.session.msg(u'raw_input')
116 self.socket.send_json(msg)
117 while True:
118 try:
119 reply = self.socket.recv_json(zmq.NOBLOCK)
120 except zmq.ZMQError, e:
121 if e.errno == zmq.EAGAIN:
122 pass
123 else:
124 raise
125 else:
126 break
127 return reply[u'content'][u'data']
128
129
130 class Kernel(object):
131
132 def __init__(self, session, control_stream, reply_stream, pub_stream,
133 task_stream=None, client=None):
134 self.session = session
135 self.control_stream = control_stream
136 self.reply_stream = reply_stream
137 self.task_stream = task_stream
138 self.pub_stream = pub_stream
139 self.client = client
140 self.user_ns = {}
141 self.history = []
142 self.compiler = CommandCompiler()
143 self.completer = KernelCompleter(self.user_ns)
144 self.aborted = set()
145
146 # Build dict of handlers for message types
147 self.queue_handlers = {}
148 self.control_handlers = {}
149 for msg_type in ['execute_request', 'complete_request', 'apply_request']:
150 self.queue_handlers[msg_type] = getattr(self, msg_type)
151
152 for msg_type in ['kill_request', 'abort_request']:
153 self.control_handlers[msg_type] = getattr(self, msg_type)
154
155 #-------------------- control handlers -----------------------------
156
157 def abort_queue(self, stream):
158 while True:
159 try:
160 msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
161 except zmq.ZMQError, e:
162 if e.errno == zmq.EAGAIN:
163 break
164 else:
165 return
166 else:
167 if msg is None:
168 return
169 else:
170 idents,msg = msg
171
172 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
173 # msg = self.reply_socket.recv_json()
174 print>>sys.__stdout__, "Aborting:"
175 print>>sys.__stdout__, Message(msg)
176 msg_type = msg['msg_type']
177 reply_type = msg_type.split('_')[0] + '_reply'
178 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
179 # self.reply_socket.send(ident,zmq.SNDMORE)
180 # self.reply_socket.send_json(reply_msg)
181 reply_msg = self.session.send(stream, reply_type,
182 content={'status' : 'aborted'}, parent=msg, ident=idents)
183 print>>sys.__stdout__, Message(reply_msg)
184 # We need to wait a bit for requests to come in. This can probably
185 # be set shorter for true asynchronous clients.
186 time.sleep(0.05)
187
188 def abort_request(self, stream, ident, parent):
189 msg_ids = parent['content'].get('msg_ids', None)
190 if not msg_ids:
191 self.abort_queue(self.task_stream)
192 self.abort_queue(self.reply_stream)
193 for mid in msg_ids:
194 self.aborted.add(mid)
195
196 content = dict(status='ok')
197 self.session.send(stream, 'abort_reply', content=content, parent=parent,
198 ident=ident)
199
200 def kill_request(self, stream, idents, parent):
201 self.abort_queue(self.reply_stream)
202 if self.task_stream:
203 self.abort_queue(self.task_stream)
204 msg = self.session.send(stream, 'kill_reply', ident=idents, parent=parent,
205 content = dict(status='ok'))
206 # we can know that a message is done if we *don't* use streams, but
207 # use a socket directly with MessageTracker
208 time.sleep(1)
209 os.kill(os.getpid(), SIGTERM)
210 time.sleep(.25)
211 os.kill(os.getpid(), SIGKILL)
212
213 def dispatch_control(self, msg):
214 idents,msg = self.session.feed_identities(msg, copy=False)
215 msg = self.session.unpack_message(msg, content=True, copy=False)
216
217 header = msg['header']
218 msg_id = header['msg_id']
219
220 handler = self.control_handlers.get(msg['msg_type'], None)
221 if handler is None:
222 print >> sys.__stderr__, "UNKNOWN CONTROL MESSAGE TYPE:", msg
223 else:
224 handler(stream, idents, msg)
225
226 def flush_control(self):
227 while any(zmq.select([self.control_socket],[],[],1e-4)):
228 try:
229 msg = self.control_socket.recv_multipart(zmq.NOBLOCK, copy=False)
230 except zmq.ZMQError, e:
231 if e.errno != zmq.EAGAIN:
232 raise e
233 return
234 else:
235 self.dispatch_control(msg)
236
237
238 #-------------------- queue helpers ------------------------------
239
240 def check_dependencies(self, dependencies):
241 if not dependencies:
242 return True
243 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
244 anyorall = dependencies[0]
245 dependencies = dependencies[1]
246 else:
247 anyorall = 'all'
248 results = self.client.get_results(dependencies,status_only=True)
249 if results['status'] != 'ok':
250 return False
251
252 if anyorall == 'any':
253 if not results['completed']:
254 return False
255 else:
256 if results['pending']:
257 return False
258
259 return True
260
261 #-------------------- queue handlers -----------------------------
262
263 def execute_request(self, stream, ident, parent):
264 try:
265 code = parent[u'content'][u'code']
266 except:
267 print>>sys.__stderr__, "Got bad msg: "
268 print>>sys.__stderr__, Message(parent)
269 return
270 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
271 # self.pub_stream.send(pyin_msg)
272 self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent)
273 try:
274 comp_code = self.compiler(code, '<zmq-kernel>')
275 # allow for not overriding displayhook
276 if hasattr(sys.displayhook, 'set_parent'):
277 sys.displayhook.set_parent(parent)
278 exec comp_code in self.user_ns, self.user_ns
279 except:
280 # result = u'error'
281 etype, evalue, tb = sys.exc_info()
282 tb = traceback.format_exception(etype, evalue, tb)
283 exc_content = {
284 u'status' : u'error',
285 u'traceback' : tb,
286 u'etype' : unicode(etype),
287 u'evalue' : unicode(evalue)
288 }
289 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
290 self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent)
291 reply_content = exc_content
292 else:
293 reply_content = {'status' : 'ok'}
294 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
295 # self.reply_socket.send(ident, zmq.SNDMORE)
296 # self.reply_socket.send_json(reply_msg)
297 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, ident=ident)
298 # print>>sys.__stdout__, Message(reply_msg)
299 if reply_msg['content']['status'] == u'error':
300 self.abort_queue()
301
302 def complete_request(self, stream, ident, parent):
303 matches = {'matches' : self.complete(parent),
304 'status' : 'ok'}
305 completion_msg = self.session.send(stream, 'complete_reply',
306 matches, parent, ident)
307 # print >> sys.__stdout__, completion_msg
308
309 def complete(self, msg):
310 return self.completer.complete(msg.content.line, msg.content.text)
311
312 def apply_request(self, stream, ident, parent):
313 try:
314 content = parent[u'content']
315 bufs = parent[u'buffers']
316 msg_id = parent['header']['msg_id']
317 bound = content.get('bound', False)
318 except:
319 print>>sys.__stderr__, "Got bad msg: "
320 print>>sys.__stderr__, Message(parent)
321 return
322 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
323 # self.pub_stream.send(pyin_msg)
324 # self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent)
325 try:
326 # allow for not overriding displayhook
327 if hasattr(sys.displayhook, 'set_parent'):
328 sys.displayhook.set_parent(parent)
329 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
330 if bound:
331 working = self.user_ns
332 suffix = str(msg_id).replace("-","")
333 prefix = "_"
334
335 else:
336 working = dict()
337 suffix = prefix = ""
338 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
339 # if f.fun
340 fname = prefix+f.func_name.strip('<>')+suffix
341 argname = prefix+"args"+suffix
342 kwargname = prefix+"kwargs"+suffix
343 resultname = prefix+"result"+suffix
344
345 ns = { fname : f, argname : args, kwargname : kwargs }
346 # print ns
347 working.update(ns)
348 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
349 exec code in working, working
350 result = working.get(resultname)
351 # clear the namespace
352 if bound:
353 for key in ns.iterkeys():
354 self.user_ns.pop(key)
355 else:
356 del working
357
358 packed_result,buf = serialize_object(result)
359 result_buf = [packed_result]+buf
360 except:
361 result = u'error'
362 etype, evalue, tb = sys.exc_info()
363 tb = traceback.format_exception(etype, evalue, tb)
364 exc_content = {
365 u'status' : u'error',
366 u'traceback' : tb,
367 u'etype' : unicode(etype),
368 u'evalue' : unicode(evalue)
369 }
370 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
371 self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent)
372 reply_content = exc_content
373 result_buf = []
374 else:
375 reply_content = {'status' : 'ok'}
376 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
377 # self.reply_socket.send(ident, zmq.SNDMORE)
378 # self.reply_socket.send_json(reply_msg)
379 reply_msg = self.session.send(stream, u'apply_reply', reply_content, parent=parent, ident=ident,buffers=result_buf)
380 # print>>sys.__stdout__, Message(reply_msg)
381 if reply_msg['content']['status'] == u'error':
382 self.abort_queue()
383
384 def dispatch_queue(self, stream, msg):
385 self.flush_control()
386 idents,msg = self.session.feed_identities(msg, copy=False)
387 msg = self.session.unpack_message(msg, content=True, copy=False)
388
389 header = msg['header']
390 msg_id = header['msg_id']
391 dependencies = header.get('dependencies', [])
392
393 if self.check_aborted(msg_id):
394 return self.abort_reply(stream, msg)
395 if not self.check_dependencies(dependencies):
396 return self.unmet_dependencies(stream, msg)
397
398 handler = self.queue_handlers.get(msg['msg_type'], None)
399 if handler is None:
400 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", msg
401 else:
402 handler(stream, idents, msg)
403
404 def start(self):
405 #### stream mode:
406 if self.control_stream:
407 self.control_stream.on_recv(self.dispatch_control, copy=False)
408 if self.reply_stream:
409 self.reply_stream.on_recv(lambda msg:
410 self.dispatch_queue(self.reply_stream, msg), copy=False)
411 if self.task_stream:
412 self.task_stream.on_recv(lambda msg:
413 self.dispatch_queue(self.task_stream, msg), copy=False)
414
415 #### while True mode:
416 # while True:
417 # idle = True
418 # try:
419 # msg = self.reply_stream.socket.recv_multipart(
420 # zmq.NOBLOCK, copy=False)
421 # except zmq.ZMQError, e:
422 # if e.errno != zmq.EAGAIN:
423 # raise e
424 # else:
425 # idle=False
426 # self.dispatch_queue(self.reply_stream, msg)
427 #
428 # if not self.task_stream.empty():
429 # idle=False
430 # msg = self.task_stream.recv_multipart()
431 # self.dispatch_queue(self.task_stream, msg)
432 # if idle:
433 # # don't busywait
434 # time.sleep(1e-3)
435
436
437 def main():
438 raise Exception("Don't run me anymore")
439 loop = ioloop.IOLoop.instance()
440 c = zmq.Context()
441
442 ip = '127.0.0.1'
443 port_base = 5575
444 connection = ('tcp://%s' % ip) + ':%i'
445 rep_conn = connection % port_base
446 pub_conn = connection % (port_base+1)
447
448 print >>sys.__stdout__, "Starting the kernel..."
449 # print >>sys.__stdout__, "XREQ Channel:", rep_conn
450 # print >>sys.__stdout__, "PUB Channel:", pub_conn
451
452 session = StreamSession(username=u'kernel')
453
454 reply_socket = c.socket(zmq.XREQ)
455 reply_socket.connect(rep_conn)
456
457 pub_socket = c.socket(zmq.PUB)
458 pub_socket.connect(pub_conn)
459
460 stdout = OutStream(session, pub_socket, u'stdout')
461 stderr = OutStream(session, pub_socket, u'stderr')
462 sys.stdout = stdout
463 sys.stderr = stderr
464
465 display_hook = DisplayHook(session, pub_socket)
466 sys.displayhook = display_hook
467 reply_stream = zmqstream.ZMQStream(reply_socket,loop)
468 pub_stream = zmqstream.ZMQStream(pub_socket,loop)
469 kernel = Kernel(session, reply_stream, pub_stream)
470
471 # For debugging convenience, put sleep and a string in the namespace, so we
472 # have them every time we start.
473 kernel.user_ns['sleep'] = time.sleep
474 kernel.user_ns['s'] = 'Test string'
475
476 print >>sys.__stdout__, "Use Ctrl-\\ (NOT Ctrl-C!) to terminate."
477 kernel.start()
478 loop.start()
479
480
481 if __name__ == '__main__':
482 main()
@@ -0,0 +1,443 b''
1 #!/usr/bin/env python
2 """edited session.py to work with streams, and move msg_type to the header
3 """
4
5
6 import os
7 import sys
8 import traceback
9 import pprint
10 import uuid
11
12 import zmq
13 from zmq.utils import jsonapi
14 from zmq.eventloop.zmqstream import ZMQStream
15
16 from IPython.zmq.pickleutil import can, uncan, canSequence, uncanSequence
17 from IPython.zmq.newserialized import serialize, unserialize
18
19 try:
20 import cPickle
21 pickle = cPickle
22 except:
23 cPickle = None
24 import pickle
25
26 # packer priority: jsonlib[2], cPickle, simplejson/json, pickle
27 json_name = '' if not jsonapi.jsonmod else jsonapi.jsonmod.__name__
28 if json_name in ('jsonlib', 'jsonlib2'):
29 use_json = True
30 elif json_name:
31 if cPickle is None:
32 use_json = True
33 else:
34 use_json = False
35 else:
36 use_json = False
37
38 if use_json:
39 default_packer = jsonapi.dumps
40 default_unpacker = jsonapi.loads
41 else:
42 default_packer = lambda o: pickle.dumps(o,-1)
43 default_unpacker = pickle.loads
44
45
46 DELIM="<IDS|MSG>"
47
48 def wrap_exception():
49 etype, evalue, tb = sys.exc_info()
50 tb = traceback.format_exception(etype, evalue, tb)
51 exc_content = {
52 u'status' : u'error',
53 u'traceback' : tb,
54 u'etype' : unicode(etype),
55 u'evalue' : unicode(evalue)
56 }
57 return exc_content
58
59 class KernelError(Exception):
60 pass
61
62 def unwrap_exception(content):
63 err = KernelError(content['etype'], content['evalue'])
64 err.evalue = content['evalue']
65 err.etype = content['etype']
66 err.traceback = ''.join(content['traceback'])
67 return err
68
69
70 class Message(object):
71 """A simple message object that maps dict keys to attributes.
72
73 A Message can be created from a dict and a dict from a Message instance
74 simply by calling dict(msg_obj)."""
75
76 def __init__(self, msg_dict):
77 dct = self.__dict__
78 for k, v in dict(msg_dict).iteritems():
79 if isinstance(v, dict):
80 v = Message(v)
81 dct[k] = v
82
83 # Having this iterator lets dict(msg_obj) work out of the box.
84 def __iter__(self):
85 return iter(self.__dict__.iteritems())
86
87 def __repr__(self):
88 return repr(self.__dict__)
89
90 def __str__(self):
91 return pprint.pformat(self.__dict__)
92
93 def __contains__(self, k):
94 return k in self.__dict__
95
96 def __getitem__(self, k):
97 return self.__dict__[k]
98
99
100 def msg_header(msg_id, msg_type, username, session):
101 return locals()
102 # return {
103 # 'msg_id' : msg_id,
104 # 'msg_type': msg_type,
105 # 'username' : username,
106 # 'session' : session
107 # }
108
109
110 def extract_header(msg_or_header):
111 """Given a message or header, return the header."""
112 if not msg_or_header:
113 return {}
114 try:
115 # See if msg_or_header is the entire message.
116 h = msg_or_header['header']
117 except KeyError:
118 try:
119 # See if msg_or_header is just the header
120 h = msg_or_header['msg_id']
121 except KeyError:
122 raise
123 else:
124 h = msg_or_header
125 if not isinstance(h, dict):
126 h = dict(h)
127 return h
128
129 def rekey(dikt):
130 """rekey a dict that has been forced to use str keys where there should be
131 ints by json. This belongs in the jsonutil added by fperez."""
132 for k in dikt.iterkeys():
133 if isinstance(k, str):
134 ik=fk=None
135 try:
136 ik = int(k)
137 except ValueError:
138 try:
139 fk = float(k)
140 except ValueError:
141 continue
142 if ik is not None:
143 nk = ik
144 else:
145 nk = fk
146 if nk in dikt:
147 raise KeyError("already have key %r"%nk)
148 dikt[nk] = dikt.pop(k)
149 return dikt
150
151 def serialize_object(obj, threshold=64e-6):
152 """serialize an object into a list of sendable buffers.
153
154 Returns: (pmd, bufs)
155 where pmd is the pickled metadata wrapper, and bufs
156 is a list of data buffers"""
157 # threshold is 100 B
158 databuffers = []
159 if isinstance(obj, (list, tuple)):
160 clist = canSequence(obj)
161 slist = map(serialize, clist)
162 for s in slist:
163 if s.getDataSize() > threshold:
164 databuffers.append(s.getData())
165 s.data = None
166 return pickle.dumps(slist,-1), databuffers
167 elif isinstance(obj, dict):
168 sobj = {}
169 for k in sorted(obj.iterkeys()):
170 s = serialize(can(obj[k]))
171 if s.getDataSize() > threshold:
172 databuffers.append(s.getData())
173 s.data = None
174 sobj[k] = s
175 return pickle.dumps(sobj,-1),databuffers
176 else:
177 s = serialize(can(obj))
178 if s.getDataSize() > threshold:
179 databuffers.append(s.getData())
180 s.data = None
181 return pickle.dumps(s,-1),databuffers
182
183
184 def unserialize_object(bufs):
185 """reconstruct an object serialized by serialize_object from data buffers"""
186 bufs = list(bufs)
187 sobj = pickle.loads(bufs.pop(0))
188 if isinstance(sobj, (list, tuple)):
189 for s in sobj:
190 if s.data is None:
191 s.data = bufs.pop(0)
192 return uncanSequence(map(unserialize, sobj))
193 elif isinstance(sobj, dict):
194 newobj = {}
195 for k in sorted(sobj.iterkeys()):
196 s = sobj[k]
197 if s.data is None:
198 s.data = bufs.pop(0)
199 newobj[k] = uncan(unserialize(s))
200 return newobj
201 else:
202 if sobj.data is None:
203 sobj.data = bufs.pop(0)
204 return uncan(unserialize(sobj))
205
206 def pack_apply_message(f, args, kwargs, threshold=64e-6):
207 """pack up a function, args, and kwargs to be sent over the wire
208 as a series of buffers. Any object whose data is larger than `threshold`
209 will not have their data copied (currently only numpy arrays support zero-copy)"""
210 msg = [pickle.dumps(can(f),-1)]
211 databuffers = [] # for large objects
212 sargs, bufs = serialize_object(args,threshold)
213 msg.append(sargs)
214 databuffers.extend(bufs)
215 skwargs, bufs = serialize_object(kwargs,threshold)
216 msg.append(skwargs)
217 databuffers.extend(bufs)
218 msg.extend(databuffers)
219 return msg
220
221 def unpack_apply_message(bufs, g=None, copy=True):
222 """unpack f,args,kwargs from buffers packed by pack_apply_message()
223 Returns: original f,args,kwargs"""
224 bufs = list(bufs) # allow us to pop
225 assert len(bufs) >= 3, "not enough buffers!"
226 if not copy:
227 for i in range(3):
228 bufs[i] = bufs[i].bytes
229 cf = pickle.loads(bufs.pop(0))
230 sargs = list(pickle.loads(bufs.pop(0)))
231 skwargs = dict(pickle.loads(bufs.pop(0)))
232 # print sargs, skwargs
233 f = cf.getFunction(g)
234 for sa in sargs:
235 if sa.data is None:
236 m = bufs.pop(0)
237 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
238 if copy:
239 sa.data = buffer(m)
240 else:
241 sa.data = m.buffer
242 else:
243 if copy:
244 sa.data = m
245 else:
246 sa.data = m.bytes
247
248 args = uncanSequence(map(unserialize, sargs), g)
249 kwargs = {}
250 for k in sorted(skwargs.iterkeys()):
251 sa = skwargs[k]
252 if sa.data is None:
253 sa.data = bufs.pop(0)
254 kwargs[k] = uncan(unserialize(sa), g)
255
256 return f,args,kwargs
257
258 class StreamSession(object):
259 """tweaked version of IPython.zmq.session.Session, for development in Parallel"""
260
261 def __init__(self, username=None, session=None, packer=None, unpacker=None):
262 if username is None:
263 username = os.environ.get('USER','username')
264 self.username = username
265 if session is None:
266 self.session = str(uuid.uuid4())
267 else:
268 self.session = session
269 self.msg_id = str(uuid.uuid4())
270 if packer is None:
271 self.pack = default_packer
272 else:
273 if not callable(packer):
274 raise TypeError("packer must be callable, not %s"%type(packer))
275 self.pack = packer
276
277 if unpacker is None:
278 self.unpack = default_unpacker
279 else:
280 if not callable(unpacker):
281 raise TypeError("unpacker must be callable, not %s"%type(unpacker))
282 self.unpack = unpacker
283
284 self.none = self.pack({})
285
286 def msg_header(self, msg_type):
287 h = msg_header(self.msg_id, msg_type, self.username, self.session)
288 self.msg_id = str(uuid.uuid4())
289 return h
290
291 def msg(self, msg_type, content=None, parent=None, subheader=None):
292 msg = {}
293 msg['header'] = self.msg_header(msg_type)
294 msg['msg_id'] = msg['header']['msg_id']
295 msg['parent_header'] = {} if parent is None else extract_header(parent)
296 msg['msg_type'] = msg_type
297 msg['content'] = {} if content is None else content
298 sub = {} if subheader is None else subheader
299 msg['header'].update(sub)
300 return msg
301
302 def send(self, stream, msg_type, content=None, buffers=None, parent=None, subheader=None, ident=None):
303 """send a message via stream"""
304 msg = self.msg(msg_type, content, parent, subheader)
305 buffers = [] if buffers is None else buffers
306 to_send = []
307 if isinstance(ident, list):
308 # accept list of idents
309 to_send.extend(ident)
310 elif ident is not None:
311 to_send.append(ident)
312 to_send.append(DELIM)
313 to_send.append(self.pack(msg['header']))
314 to_send.append(self.pack(msg['parent_header']))
315 # if parent is None:
316 # to_send.append(self.none)
317 # else:
318 # to_send.append(self.pack(dict(parent)))
319 if content is None:
320 content = self.none
321 elif isinstance(content, dict):
322 content = self.pack(content)
323 elif isinstance(content, str):
324 # content is already packed, as in a relayed message
325 pass
326 else:
327 raise TypeError("Content incorrect type: %s"%type(content))
328 to_send.append(content)
329 flag = 0
330 if buffers:
331 flag = zmq.SNDMORE
332 stream.send_multipart(to_send, flag, copy=False)
333 for b in buffers[:-1]:
334 stream.send(b, flag, copy=False)
335 if buffers:
336 stream.send(buffers[-1], copy=False)
337 omsg = Message(msg)
338 return omsg
339
340 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
341 """receives and unpacks a message
342 returns [idents], msg"""
343 if isinstance(socket, ZMQStream):
344 socket = socket.socket
345 try:
346 msg = socket.recv_multipart(mode)
347 except zmq.ZMQError, e:
348 if e.errno == zmq.EAGAIN:
349 # We can convert EAGAIN to None as we know in this case
350 # recv_json won't return None.
351 return None
352 else:
353 raise
354 # return an actual Message object
355 # determine the number of idents by trying to unpack them.
356 # this is terrible:
357 idents, msg = self.feed_identities(msg, copy)
358 try:
359 return idents, self.unpack_message(msg, content=content, copy=copy)
360 except Exception, e:
361 print idents, msg
362 # TODO: handle it
363 raise e
364
365 def feed_identities(self, msg, copy=True):
366 """This is a completely horrible thing, but it strips the zmq
367 ident prefixes off of a message. It will break if any identities
368 are unpackable by self.unpack."""
369 msg = list(msg)
370 idents = []
371 while len(msg) > 3:
372 if copy:
373 s = msg[0]
374 else:
375 s = msg[0].bytes
376 if s == DELIM:
377 msg.pop(0)
378 break
379 else:
380 idents.append(s)
381 msg.pop(0)
382
383 return idents, msg
384
385 def unpack_message(self, msg, content=True, copy=True):
386 """return a message object from the format
387 sent by self.send.
388
389 parameters:
390
391 content : bool (True)
392 whether to unpack the content dict (True),
393 or leave it serialized (False)
394
395 copy : bool (True)
396 whether to return the bytes (True),
397 or the non-copying Message object in each place (False)
398
399 """
400 if not len(msg) >= 3:
401 raise TypeError("malformed message, must have at least 3 elements")
402 message = {}
403 if not copy:
404 for i in range(3):
405 msg[i] = msg[i].bytes
406 message['header'] = self.unpack(msg[0])
407 message['msg_type'] = message['header']['msg_type']
408 message['parent_header'] = self.unpack(msg[1])
409 if content:
410 message['content'] = self.unpack(msg[2])
411 else:
412 message['content'] = msg[2]
413
414 # message['buffers'] = msg[3:]
415 # else:
416 # message['header'] = self.unpack(msg[0].bytes)
417 # message['msg_type'] = message['header']['msg_type']
418 # message['parent_header'] = self.unpack(msg[1].bytes)
419 # if content:
420 # message['content'] = self.unpack(msg[2].bytes)
421 # else:
422 # message['content'] = msg[2].bytes
423
424 message['buffers'] = msg[3:]# [ m.buffer for m in msg[3:] ]
425 return message
426
427
428
429 def test_msg2obj():
430 am = dict(x=1)
431 ao = Message(am)
432 assert ao.x == am['x']
433
434 am['y'] = dict(z=1)
435 ao = Message(am)
436 assert ao.y.z == am['y']['z']
437
438 k1, k2 = 'y', 'z'
439 assert ao[k1][k2] == am[k1][k2]
440
441 am2 = dict(ao)
442 assert am['x'] == am2['x']
443 assert am['y']['z'] == am2['y']['z']
@@ -0,0 +1,141 b''
1 #!/usr/bin/env python
2 """Views"""
3
4 from IPython.external.decorator import decorator
5
6
7 @decorator
8 def myblock(f, self, *args, **kwargs):
9 block = self.client.block
10 self.client.block = self.block
11 ret = f(self, *args, **kwargs)
12 self.client.block = block
13 return ret
14
15 class View(object):
16 """Base View class"""
17 _targets = None
18 block=None
19
20 def __init__(self, client, targets):
21 self.client = client
22 self._targets = targets
23 self.block = client.block
24
25 def __repr__(self):
26 strtargets = str(self._targets)
27 if len(strtargets) > 16:
28 strtargets = strtargets[:12]+'...]'
29 return "<%s %s>"%(self.__class__.__name__, strtargets)
30
31 @property
32 def results(self):
33 return self.client.results
34
35 @property
36 def targets(self):
37 return self._targets
38
39 @targets.setter
40 def targets(self, value):
41 raise TypeError("Cannot set my targets argument after construction!")
42
43 def apply(self, f, *args, **kwargs):
44 """calls f(*args, **kwargs) on remote engines, returning the result.
45
46 This method does not involve the engine's namespace.
47
48 if self.block is False:
49 returns msg_id
50 else:
51 returns actual result of f(*args, **kwargs)
52 """
53 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=False)
54
55 def apply_async(self, f, *args, **kwargs):
56 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
57
58 This method does not involve the engine's namespace.
59
60 returns msg_id
61 """
62 return self.client.apply(f,args,kwargs, block=False, targets=self.targets, bound=False)
63
64 def apply_sync(self, f, *args, **kwargs):
65 """calls f(*args, **kwargs) on remote engines in a blocking manner,
66 returning the result.
67
68 This method does not involve the engine's namespace.
69
70 returns: actual result of f(*args, **kwargs)
71 """
72 return self.client.apply(f,args,kwargs, block=True, targets=self.targets, bound=False)
73
74 def apply_bound(self, f, *args, **kwargs):
75 """calls f(*args, **kwargs) bound to engine namespace(s).
76
77 if self.block is False:
78 returns msg_id
79 else:
80 returns actual result of f(*args, **kwargs)
81
82 This method has access to the targets' globals
83
84 """
85 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=True)
86
87 def apply_async_bound(self, f, *args, **kwargs):
88 """calls f(*args, **kwargs) bound to engine namespace(s)
89 in a nonblocking manner.
90
91 returns: msg_id
92
93 This method has access to the targets' globals
94
95 """
96 return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True)
97
98 def apply_sync_bound(self, f, *args, **kwargs):
99 """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
100
101 returns: actual result of f(*args, **kwargs)
102
103 This method has access to the targets' globals
104
105 """
106 return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True)
107
108
109 class DirectView(View):
110 """Direct Multiplexer View"""
111
112 def update(self, ns):
113 """update remote namespace with dict `ns`"""
114 return self.client.push(ns, targets=self.targets, block=self.block)
115
116 def get(self, key_s):
117 """get object(s) by `key_s` from remote namespace
118 will return one object if it is a key.
119 It also takes a list of keys, and will return a list of objects."""
120 # block = block if block is not None else self.block
121 return self.client.pull(key_s, block=self.block, targets=self.targets)
122
123 push = update
124 pull = get
125
126 def __getitem__(self, key):
127 return self.get(key)
128
129 def __setitem__(self,key,value):
130 self.update({key:value})
131
132 def clear(self):
133 """clear the remote namespace"""
134 return self.client.clear(targets=self.targets,block=self.block)
135
136 def abort(self):
137 return self.client.abort(targets=self.targets,block=self.block)
138
139 class LoadBalancedView(View):
140 _targets=None
141 No newline at end of file
@@ -0,0 +1,95 b''
1 # encoding: utf-8
2
3 """Pickle related utilities. Perhaps this should be called 'can'."""
4
5 __docformat__ = "restructuredtext en"
6
7 #-------------------------------------------------------------------------------
8 # Copyright (C) 2008 The IPython Development Team
9 #
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
12 #-------------------------------------------------------------------------------
13
14 #-------------------------------------------------------------------------------
15 # Imports
16 #-------------------------------------------------------------------------------
17
18 from types import FunctionType
19
20 # contents of codeutil should either be in here, or codeutil belongs in IPython/util
21 from IPython.kernel import codeutil
22
23 class CannedObject(object):
24 pass
25
26 class CannedFunction(CannedObject):
27
28 def __init__(self, f):
29 self._checkType(f)
30 self.code = f.func_code
31
32 def _checkType(self, obj):
33 assert isinstance(obj, FunctionType), "Not a function type"
34
35 def getFunction(self, g=None):
36 if g is None:
37 g = globals()
38 newFunc = FunctionType(self.code, g)
39 return newFunc
40
41 def can(obj):
42 if isinstance(obj, FunctionType):
43 return CannedFunction(obj)
44 elif isinstance(obj,dict):
45 return canDict(obj)
46 elif isinstance(obj, (list,tuple)):
47 return canSequence(obj)
48 else:
49 return obj
50
51 def canDict(obj):
52 if isinstance(obj, dict):
53 newobj = {}
54 for k, v in obj.iteritems():
55 newobj[k] = can(v)
56 return newobj
57 else:
58 return obj
59
60 def canSequence(obj):
61 if isinstance(obj, (list, tuple)):
62 t = type(obj)
63 return t([can(i) for i in obj])
64 else:
65 return obj
66
67 def uncan(obj, g=None):
68 if isinstance(obj, CannedFunction):
69 return obj.getFunction(g)
70 elif isinstance(obj,dict):
71 return uncanDict(obj)
72 elif isinstance(obj, (list,tuple)):
73 return uncanSequence(obj)
74 else:
75 return obj
76
77 def uncanDict(obj, g=None):
78 if isinstance(obj, dict):
79 newobj = {}
80 for k, v in obj.iteritems():
81 newobj[k] = uncan(v,g)
82 return newobj
83 else:
84 return obj
85
86 def uncanSequence(obj, g=None):
87 if isinstance(obj, (list, tuple)):
88 t = type(obj)
89 return t([uncan(i,g) for i in obj])
90 else:
91 return obj
92
93
94 def rebindFunctionGlobals(f, glbls):
95 return FunctionType(f.func_code, glbls)
@@ -0,0 +1,100 b''
1 """Thread for popping Tasks from zmq to Python Queue"""
2
3
4 import time
5 from threading import Thread
6
7 try:
8 from queue import Queue
9 except:
10 from Queue import Queue
11
12 import zmq
13 from zmq.core.poll import _poll as poll
14 from zmq.devices import ThreadDevice
15 from IPython.zmq import streamsession as ss
16
17
18 class QueueStream(object):
19 def __init__(self, in_queue, out_queue):
20 self.in_queue = in_queue
21 self.out_queue = out_queue
22
23 def send_multipart(self, *args, **kwargs):
24 while self.out_queue.full():
25 time.sleep(1e-3)
26 self.out_queue.put(('send_multipart', args, kwargs))
27
28 def send(self, *args, **kwargs):
29 while self.out_queue.full():
30 time.sleep(1e-3)
31 self.out_queue.put(('send', args, kwargs))
32
33 def recv_multipart(self):
34 return self.in_queue.get()
35
36 def empty(self):
37 return self.in_queue.empty()
38
39 class TaskThread(ThreadDevice):
40 """Class for popping Tasks from C-ZMQ->Python Queue"""
41 max_qsize = 100
42 in_socket = None
43 out_socket = None
44 # queue = None
45
46 def __init__(self, queue_type, mon_type, engine_id, max_qsize=100):
47 ThreadDevice.__init__(self, 0, queue_type, mon_type)
48 self.session = ss.StreamSession(username='TaskNotifier[%s]'%engine_id)
49 self.engine_id = engine_id
50 self.in_queue = Queue(max_qsize)
51 self.out_queue = Queue(max_qsize)
52 self.max_qsize = max_qsize
53
54 @property
55 def queues(self):
56 return self.in_queue, self.out_queue
57
58 @property
59 def can_recv(self):
60 # print self.in_queue.full(), poll((self.queue_socket, zmq.POLLIN),1e-3)
61 return (not self.in_queue.full()) and poll([(self.queue_socket, zmq.POLLIN)], 1e-3 )
62
63 @property
64 def can_send(self):
65 return not self.out_queue.empty()
66
67 def run(self):
68 print 'running'
69 self.queue_socket,self.mon_socket = self._setup_sockets()
70 print 'setup'
71
72 while True:
73 while not self.can_send and not self.can_recv:
74 # print 'idle'
75 # nothing to do, wait
76 time.sleep(1e-3)
77 while self.can_send:
78 # flush out queue
79 print 'flushing...'
80 meth, args, kwargs = self.out_queue.get()
81 getattr(self.queue_socket, meth)(*args, **kwargs)
82 print 'flushed'
83
84 if self.can_recv:
85 print 'recving'
86 # get another job from zmq
87 msg = self.queue_socket.recv_multipart(0, copy=False)
88 # put it in the Queue
89 self.in_queue.put(msg)
90 idents,msg = self.session.feed_identities(msg, copy=False)
91 msg = self.session.unpack_message(msg, content=False, copy=False)
92 # notify the Controller that we got it
93 self.mon_socket.send('tracktask', zmq.SNDMORE)
94 header = msg['header']
95 msg_id = header['msg_id']
96 content = dict(engine_id=self.engine_id, msg_id = msg_id)
97 self.session.send(self.mon_socket, 'task_receipt', content=content)
98 print 'recvd'
99
100 No newline at end of file
1 NO CONTENT: new file 100644
@@ -0,0 +1,4 b''
1
2 from unittest import TestCase
3 from zmq.tests import BaseZMQTest
4
@@ -0,0 +1,82 b''
1
2 import os
3 import uuid
4 import zmq
5
6 from zmq.tests import BaseZMQTestCase
7
8 from IPython.zmq.tests import SessionTestCase
9 from IPython.zmq import streamsession as ss
10
11 class SessionTestCase(BaseZMQTestCase):
12
13 def setUp(self):
14 BaseZMQTestCase.setUp(self)
15 self.session = ss.StreamSession()
16
17 class TestSession(SessionTestCase):
18
19 def test_msg(self):
20 """message format"""
21 msg = self.session.msg('execute')
22 thekeys = set('header msg_id parent_header msg_type content'.split())
23 s = set(msg.keys())
24 self.assertEquals(s, thekeys)
25 self.assertTrue(isinstance(msg['content'],dict))
26 self.assertTrue(isinstance(msg['header'],dict))
27 self.assertTrue(isinstance(msg['parent_header'],dict))
28 self.assertEquals(msg['msg_type'], 'execute')
29
30
31
32 def test_args(self):
33 """initialization arguments for StreamSession"""
34 s = ss.StreamSession()
35 self.assertTrue(s.pack is ss.default_packer)
36 self.assertTrue(s.unpack is ss.default_unpacker)
37 self.assertEquals(s.username, os.environ.get('USER', 'username'))
38
39 s = ss.StreamSession(username=None)
40 self.assertEquals(s.username, os.environ.get('USER', 'username'))
41
42 self.assertRaises(TypeError, ss.StreamSession, packer='hi')
43 self.assertRaises(TypeError, ss.StreamSession, unpacker='hi')
44 u = str(uuid.uuid4())
45 s = ss.StreamSession(username='carrot', session=u)
46 self.assertEquals(s.session, u)
47 self.assertEquals(s.username, 'carrot')
48
49
50 def test_rekey(self):
51 """rekeying dict around json str keys"""
52 d = {'0': uuid.uuid4(), 0:uuid.uuid4()}
53 self.assertRaises(KeyError, ss.rekey, d)
54
55 d = {'0': uuid.uuid4(), 1:uuid.uuid4(), 'asdf':uuid.uuid4()}
56 d2 = {0:d['0'],1:d[1],'asdf':d['asdf']}
57 rd = ss.rekey(d)
58 self.assertEquals(d2,rd)
59
60 d = {'1.5':uuid.uuid4(),'1':uuid.uuid4()}
61 d2 = {1.5:d['1.5'],1:d['1']}
62 rd = ss.rekey(d)
63 self.assertEquals(d2,rd)
64
65 d = {'1.0':uuid.uuid4(),'1':uuid.uuid4()}
66 self.assertRaises(KeyError, ss.rekey, d)
67
68 def test_unique_msg_ids(self):
69 """test that messages receive unique ids"""
70 ids = set()
71 for i in range(2**12):
72 h = self.session.msg_header('test')
73 msg_id = h['msg_id']
74 self.assertTrue(msg_id not in ids)
75 ids.add(msg_id)
76
77 def test_feed_identities(self):
78 """scrub the front for zmq IDENTITIES"""
79 theids = "engine client other".split()
80 content = dict(code='whoda',stuff=object())
81 themsg = self.session.msg('execute',content=content)
82 pmsg = theids
@@ -0,0 +1,23 b''
1 """setup the ports"""
2 config = {
3 'interface': 'tcp://127.0.0.1',
4 'regport': 10101,
5 'heartport': 10102,
6
7 'cqueueport': 10211,
8 'equeueport': 10111,
9
10 'ctaskport': 10221,
11 'etaskport': 10121,
12
13 'ccontrolport': 10231,
14 'econtrolport': 10131,
15
16 'clientport': 10201,
17 'notifierport': 10202,
18
19 'logport': 20201
20 }
21
22
23
@@ -0,0 +1,139 b''
1 #!/usr/bin/env python
2 """A script to launch a controller with all its queues and connect it to a logger"""
3
4 import time
5 import logging
6
7 import zmq
8 from zmq.devices import ProcessMonitoredQueue, ThreadMonitoredQueue
9 from zmq.eventloop import ioloop
10 from zmq.eventloop.zmqstream import ZMQStream
11 from zmq.log import handlers
12
13 from IPython.zmq import log
14 from IPython.zmq.parallel import controller, heartmonitor, streamsession as session
15
16
17
18
19 def setup():
20 """setup a basic controller and open client,registrar, and logging ports. Start the Queue and the heartbeat"""
21 ctx = zmq.Context(1)
22 loop = ioloop.IOLoop.instance()
23
24 # port config
25 # config={}
26 execfile('config.py', globals())
27 iface = config['interface']
28 logport = config['logport']
29 rport = config['regport']
30 cport = config['clientport']
31 cqport = config['cqueueport']
32 eqport = config['equeueport']
33 ctport = config['ctaskport']
34 etport = config['etaskport']
35 ccport = config['ccontrolport']
36 ecport = config['econtrolport']
37 hport = config['heartport']
38 nport = config['notifierport']
39
40 # setup logging
41 lsock = ctx.socket(zmq.PUB)
42 lsock.connect('%s:%i'%(iface,logport))
43 # connected=False
44 # while not connected:
45 # try:
46 # except:
47 # logport = logport + 1
48 # else:
49 # connected=True
50 #
51 handler = handlers.PUBHandler(lsock)
52 handler.setLevel(logging.DEBUG)
53 handler.root_topic = "controller"
54 log.logger.addHandler(handler)
55 time.sleep(.5)
56
57 ### Engine connections ###
58
59 # Engine registrar socket
60 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
61 reg.bind("%s:%i"%(iface, rport))
62
63 # heartbeat
64 hpub = ctx.socket(zmq.PUB)
65 hpub.bind("%s:%i"%(iface, hport))
66 hrep = ctx.socket(zmq.XREP)
67 hrep.bind("%s:%i"%(iface, hport+1))
68
69 hb = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),2500)
70 hb.start()
71
72 ### Client connections ###
73 # Clientele socket
74 c = ZMQStream(ctx.socket(zmq.XREP), loop)
75 c.bind("%s:%i"%(iface, cport))
76
77 n = ZMQStream(ctx.socket(zmq.PUB), loop)
78 n.bind("%s:%i"%(iface, nport))
79
80 thesession = session.StreamSession(username="controller")
81
82
83
84 # build and launch the queue
85 sub = ctx.socket(zmq.SUB)
86 sub.setsockopt(zmq.SUBSCRIBE, "")
87 monport = sub.bind_to_random_port(iface)
88 sub = ZMQStream(sub, loop)
89
90 # Multiplexer Queue (in a Process)
91 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
92 q.bind_in("%s:%i"%(iface, cqport))
93 q.bind_out("%s:%i"%(iface, eqport))
94 q.connect_mon("%s:%i"%(iface, monport))
95 q.daemon=True
96 q.start()
97
98 # Control Queue (in a Process)
99 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
100 q.bind_in("%s:%i"%(iface, ccport))
101 q.bind_out("%s:%i"%(iface, ecport))
102 q.connect_mon("%s:%i"%(iface, monport))
103 q.daemon=True
104 q.start()
105
106 # Task Queue (in a Process)
107 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
108 q.bind_in("%s:%i"%(iface, ctport))
109 q.bind_out("%s:%i"%(iface, etport))
110 q.connect_mon("%s:%i"%(iface, monport))
111 q.daemon=True
112 q.start()
113
114 time.sleep(.25)
115
116 # build connection dicts
117 engine_addrs = {
118 'control' : "%s:%i"%(iface, ecport),
119 'queue': "%s:%i"%(iface, eqport),
120 'heartbeat': ("%s:%i"%(iface, hport), "%s:%i"%(iface, hport+1)),
121 'task' : "%s:%i"%(iface, etport),
122 'monitor' : "%s:%i"%(iface, monport),
123 }
124
125 client_addrs = {
126 'control' : "%s:%i"%(iface, ccport),
127 'controller': "%s:%i"%(iface, cport),
128 'queue': "%s:%i"%(iface, cqport),
129 'task' : "%s:%i"%(iface, ctport),
130 'notification': "%s:%i"%(iface, nport)
131 }
132 con = controller.Controller(loop, thesession, sub, reg, hb, c, n, None, engine_addrs, client_addrs)
133
134 return loop
135
136
137 if __name__ == '__main__':
138 loop = setup()
139 loop.start() No newline at end of file
@@ -0,0 +1,85 b''
1 #!/usr/bin/env python
2 import time
3 import zmq
4 from zmq.eventloop import ioloop
5 from zmq.eventloop.zmqstream import ZMQStream
6 from IPython.zmq import streamsession as session
7 Message = session.Message
8 # from IPython.zmq.messages import send_message_pickle as send_message
9 import uuid
10
11 thesession = session.StreamSession()
12
13 max_messages=10000
14 printstep=1000
15
16 counter = dict(count=0, engines=1)
17
18 def poit(msg):
19 print "POIT"
20 print msg
21
22 def count(msg):
23 count = counter["count"] = counter["count"]+1
24 if not count % printstep:
25 print "#########################"
26 print count, time.time()-counter['tic']
27
28 def unpack_and_print(msg):
29 global msg_counter
30 msg_counter += 1
31 print msg
32 try:
33 msg = thesession.unpack_message(msg[-3:])
34 except Exception, e:
35 print e
36 # pass
37 print msg
38
39
40 ctx = zmq.Context()
41
42 loop = ioloop.IOLoop()
43 sock = ctx.socket(zmq.XREQ)
44 queue = ZMQStream(ctx.socket(zmq.XREQ), loop)
45 client = ZMQStream(sock, loop)
46 client.on_send(poit)
47 def check_engines(msg):
48 # client.on_recv(unpack_and_print)
49 queue.on_recv(count)
50 idents = msg[:-3]
51 msg = thesession.unpack_message(msg[-3:])
52 msg = Message(msg)
53 print msg
54 queue.connect(str(msg.content.queue))
55 engines = dict(msg.content.engines)
56 # global tic
57 N=max_messages
58 if engines:
59 tic = time.time()
60 counter['tic']= tic
61 for i in xrange(N/len(engines)):
62 for eid,key in engines.iteritems():
63 thesession.send(queue, "execute_request", dict(code='id=%i'%(int(eid)+i)),ident=str(key))
64 toc = time.time()
65 print "#####################################"
66 print N, toc-tic
67 print "#####################################"
68
69
70
71
72 client.on_recv(check_engines)
73
74 sock.connect('tcp://127.0.0.1:10102')
75 sock.setsockopt(zmq.IDENTITY, thesession.username)
76 # stream = ZMQStream()
77 # header = dict(msg_id = uuid.uuid4().bytes, msg_type='relay', id=0)
78 parent = dict(targets=2)
79 # content = "GARBAGE"
80 thesession.send(client, "connection_request")
81
82 # send_message(client, (header, content))
83 # print thesession.recv(client, 0)
84
85 loop.start()
@@ -0,0 +1,62 b''
1 #!/usr/bin/env python
2 """A simple log process that prints messages incoming from"""
3
4 #
5 # Copyright (c) 2010 Min Ragan-Kelley
6 #
7 # This file is part of pyzmq.
8 #
9 # pyzmq is free software; you can redistribute it and/or modify it under
10 # the terms of the Lesser GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # pyzmq is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 # Lesser GNU General Public License for more details.
18 #
19 # You should have received a copy of the Lesser GNU General Public License
20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21
22 import zmq
23 logport = 20201
24 def main(topics, addrs):
25
26 context = zmq.Context()
27 socket = context.socket(zmq.SUB)
28 for topic in topics:
29 socket.setsockopt(zmq.SUBSCRIBE, topic)
30 if addrs:
31 for addr in addrs:
32 print "Connecting to: ", addr
33 socket.connect(addr)
34 else:
35 socket.bind('tcp://127.0.0.1:%i'%logport)
36
37 while True:
38 # topic = socket.recv()
39 # print topic
40 topic, msg = socket.recv_multipart()
41 # msg = socket.recv_pyobj()
42 print "%s | %s " % (topic, msg),
43
44 if __name__ == '__main__':
45 import sys
46 topics = []
47 addrs = []
48 for arg in sys.argv[1:]:
49 if '://' in arg:
50 addrs.append(arg)
51 else:
52 topics.append(arg)
53 if not topics:
54 # default to everything
55 topics = ['']
56 if len(addrs) < 1:
57 print "binding instead of connecting"
58 # addrs = ['tcp://127.0.0.1:%i'%p for p in range(logport,logport+10)]
59 # print "usage: display.py <address> [ <topic> <address>...]"
60 # raise SystemExit
61
62 main(topics, addrs)
@@ -23,3 +23,4 b" IPython developer's guide"
23 23 ipgraph.txt
24 24 ipython_qt.txt
25 25 ipythonzmq.txt
26 parallelzmq.txt
General Comments 0
You need to be logged in to leave comments. Login now