##// END OF EJS Templates
control channel progress
MinRK -
Show More
@@ -1,562 +1,602 b''
1 1 #!/usr/bin/env python
2 2 """A semi-synchronous Client for the ZMQ controller"""
3 3
4 4 import time
5 5 import threading
6 6
7 from pprint import pprint
8
7 9 from functools import wraps
8 10
9 11 from IPython.external.decorator import decorator
10 12
11 13 import streamsession as ss
12 14 import zmq
13 15
14 16 from remotenamespace import RemoteNamespace
15 17 from view import DirectView
16 18
17 19 def _push(ns):
18 20 globals().update(ns)
19 21
20 22 def _pull(keys):
21 23 g = globals()
22 24 if isinstance(keys, (list,tuple)):
23 25 return map(g.get, keys)
24 26 else:
25 27 return g.get(keys)
26 28
27 29 def _clear():
28 30 globals().clear()
29 31
30 32 def execute(code):
31 33 exec code in globals()
32 34
33 35 # decorators for methods:
34 36 @decorator
35 37 def spinfirst(f,self,*args,**kwargs):
36 38 self.spin()
37 39 return f(self, *args, **kwargs)
38 40
39 41 @decorator
40 42 def defaultblock(f, self, *args, **kwargs):
41 43 block = kwargs.get('block',None)
42 44 block = self.block if block is None else block
43 45 saveblock = self.block
44 46 self.block = block
45 47 ret = f(self, *args, **kwargs)
46 48 self.block = saveblock
47 49 return ret
48 50
49
51 class AbortedTask(object):
52 def __init__(self, msg_id):
53 self.msg_id = msg_id
50 54 # @decorator
51 55 # def checktargets(f):
52 56 # @wraps(f)
53 57 # def checked_method(self, *args, **kwargs):
54 58 # self._build_targets(kwargs['targets'])
55 59 # return f(self, *args, **kwargs)
56 60 # return checked_method
57 61
58 62
59 63 # class _ZMQEventLoopThread(threading.Thread):
60 64 #
61 65 # def __init__(self, loop):
62 66 # self.loop = loop
63 67 # threading.Thread.__init__(self)
64 68 #
65 69 # def run(self):
66 70 # self.loop.start()
67 71 #
68 72 class Client(object):
69 73 """A semi-synchronous client to the IPython ZMQ controller
70 74
71 75 Attributes
72 76 ----------
73 77 ids : set
74 78 a set of engine IDs
75 79 requesting the ids attribute always synchronizes
76 80 the registration state. To request ids without synchronization,
77 81 use _ids
78 82
79 83 history : list of msg_ids
80 84 a list of msg_ids, keeping track of all the execution
81 85 messages you have submitted
82 86
83 87 outstanding : set of msg_ids
84 88 a set of msg_ids that have been submitted, but whose
85 89 results have not been received
86 90
87 91 results : dict
88 92 a dict of all our results, keyed by msg_id
89 93
90 94 block : bool
91 95 determines default behavior when block not specified
92 96 in execution methods
93 97
94 98 Methods
95 99 -------
96 100 spin : flushes incoming results and registration state changes
97 101 control methods spin, and requesting `ids` also ensures up to date
98 102
99 103 barrier : wait on one or more msg_ids
100 104
101 105 execution methods: apply/apply_bound/apply_to
102 106 legacy: execute, run
103 107
104 control methods: queue_status, get_result
108 query methods: queue_status, get_result
109
110 control methods: abort, kill
111
112
105 113
106 114 """
107 115
108 116
109 117 _connected=False
110 118 _engines=None
111 119 registration_socket=None
112 controller_socket=None
120 query_socket=None
121 control_socket=None
113 122 notification_socket=None
114 123 queue_socket=None
115 124 task_socket=None
116 125 block = False
117 126 outstanding=None
118 127 results = None
119 128 history = None
129 debug = False
120 130
121 def __init__(self, addr, context=None, username=None):
131 def __init__(self, addr, context=None, username=None, debug=False):
122 132 if context is None:
123 133 context = zmq.Context()
124 134 self.context = context
125 135 self.addr = addr
126 136 if username is None:
127 137 self.session = ss.StreamSession()
128 138 else:
129 139 self.session = ss.StreamSession(username)
130 140 self.registration_socket = self.context.socket(zmq.PAIR)
131 141 self.registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
132 142 self.registration_socket.connect(addr)
133 143 self._engines = {}
134 144 self._ids = set()
135 145 self.outstanding=set()
136 146 self.results = {}
137 147 self.history = []
148 self.debug = debug
149 self.session.debug = debug
138 150 self._connect()
139 151
140 152 self._notification_handlers = {'registration_notification' : self._register_engine,
141 153 'unregistration_notification' : self._unregister_engine,
142 154 }
143 155 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
144 156 'apply_reply' : self._handle_apply_reply}
145 157
146 158
147 159 @property
148 160 def ids(self):
149 161 self._flush_notifications()
150 162 return self._ids
151 163
152 164 def _update_engines(self, engines):
153 165 for k,v in engines.iteritems():
154 166 eid = int(k)
155 self._engines[eid] = v
167 self._engines[eid] = bytes(v) # force not unicode
156 168 self._ids.add(eid)
157 169
158 170 def _build_targets(self, targets):
159 171 if targets is None:
160 172 targets = self._ids
161 173 elif isinstance(targets, str):
162 174 if targets.lower() == 'all':
163 175 targets = self._ids
164 176 else:
165 177 raise TypeError("%r not valid str target, must be 'all'"%(targets))
166 178 elif isinstance(targets, int):
167 179 targets = [targets]
168 180 return [self._engines[t] for t in targets], list(targets)
169 181
170 182 def _connect(self):
171 183 """setup all our socket connections to the controller"""
172 184 if self._connected:
173 185 return
174 186 self._connected=True
175 187 self.session.send(self.registration_socket, 'connection_request')
176 msg = self.session.recv(self.registration_socket,mode=0)[-1]
188 idents,msg = self.session.recv(self.registration_socket,mode=0)
189 if self.debug:
190 pprint(msg)
177 191 msg = ss.Message(msg)
178 192 content = msg.content
179 193 if content.status == 'ok':
180 194 if content.queue:
181 195 self.queue_socket = self.context.socket(zmq.PAIR)
182 196 self.queue_socket.setsockopt(zmq.IDENTITY, self.session.session)
183 197 self.queue_socket.connect(content.queue)
184 198 if content.task:
185 199 self.task_socket = self.context.socket(zmq.PAIR)
186 200 self.task_socket.setsockopt(zmq.IDENTITY, self.session.session)
187 201 self.task_socket.connect(content.task)
188 202 if content.notification:
189 203 self.notification_socket = self.context.socket(zmq.SUB)
190 204 self.notification_socket.connect(content.notification)
191 205 self.notification_socket.setsockopt(zmq.SUBSCRIBE, "")
192 if content.controller:
193 self.controller_socket = self.context.socket(zmq.PAIR)
194 self.controller_socket.setsockopt(zmq.IDENTITY, self.session.session)
195 self.controller_socket.connect(content.controller)
206 if content.query:
207 self.query_socket = self.context.socket(zmq.PAIR)
208 self.query_socket.setsockopt(zmq.IDENTITY, self.session.session)
209 self.query_socket.connect(content.query)
210 if content.control:
211 self.control_socket = self.context.socket(zmq.PAIR)
212 self.control_socket.setsockopt(zmq.IDENTITY, self.session.session)
213 self.control_socket.connect(content.control)
196 214 self._update_engines(dict(content.engines))
197 215
198 216 else:
199 217 self._connected = False
200 218 raise Exception("Failed to connect!")
201 219
202 220 #### handlers and callbacks for incoming messages #######
203 221 def _register_engine(self, msg):
204 222 content = msg['content']
205 223 eid = content['id']
206 224 d = {eid : content['queue']}
207 225 self._update_engines(d)
208 226 self._ids.add(int(eid))
209 227
210 228 def _unregister_engine(self, msg):
211 229 # print 'unregister',msg
212 230 content = msg['content']
213 231 eid = int(content['id'])
214 232 if eid in self._ids:
215 233 self._ids.remove(eid)
216 234 self._engines.pop(eid)
217 235
218 236 def _handle_execute_reply(self, msg):
219 237 # msg_id = msg['msg_id']
220 238 parent = msg['parent_header']
221 239 msg_id = parent['msg_id']
222 240 if msg_id not in self.outstanding:
223 241 print "got unknown result: %s"%msg_id
224 242 else:
225 243 self.outstanding.remove(msg_id)
226 244 self.results[msg_id] = ss.unwrap_exception(msg['content'])
227 245
228 246 def _handle_apply_reply(self, msg):
229 # print msg
247 # pprint(msg)
230 248 # msg_id = msg['msg_id']
231 249 parent = msg['parent_header']
232 250 msg_id = parent['msg_id']
233 251 if msg_id not in self.outstanding:
234 252 print "got unknown result: %s"%msg_id
235 253 else:
236 254 self.outstanding.remove(msg_id)
237 255 content = msg['content']
238 256 if content['status'] == 'ok':
239 257 self.results[msg_id] = ss.unserialize_object(msg['buffers'])
258 elif content['status'] == 'aborted':
259 self.results[msg_id] = AbortedTask(msg_id)
260 elif content['status'] == 'resubmitted':
261 pass # handle resubmission
240 262 else:
241
242 263 self.results[msg_id] = ss.unwrap_exception(content)
243 264
244 265 def _flush_notifications(self):
245 266 "flush incoming notifications of engine registrations"
246 267 msg = self.session.recv(self.notification_socket, mode=zmq.NOBLOCK)
247 268 while msg is not None:
269 if self.debug:
270 pprint(msg)
248 271 msg = msg[-1]
249 272 msg_type = msg['msg_type']
250 273 handler = self._notification_handlers.get(msg_type, None)
251 274 if handler is None:
252 275 raise Exception("Unhandled message type: %s"%msg.msg_type)
253 276 else:
254 277 handler(msg)
255 278 msg = self.session.recv(self.notification_socket, mode=zmq.NOBLOCK)
256 279
257 280 def _flush_results(self, sock):
258 281 "flush incoming task or queue results"
259 282 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
260 283 while msg is not None:
284 if self.debug:
285 pprint(msg)
261 286 msg = msg[-1]
262 287 msg_type = msg['msg_type']
263 288 handler = self._queue_handlers.get(msg_type, None)
264 289 if handler is None:
265 290 raise Exception("Unhandled message type: %s"%msg.msg_type)
266 291 else:
267 292 handler(msg)
268 293 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
269 294
295 def _flush_control(self, sock):
296 "flush incoming control replies"
297 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
298 while msg is not None:
299 if self.debug:
300 pprint(msg)
301 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
302
270 303 ###### get/setitem ########
271 304
272 305 def __getitem__(self, key):
273 306 if isinstance(key, int):
274 307 if key not in self.ids:
275 308 raise IndexError("No such engine: %i"%key)
276 309 return DirectView(self, key)
277 310
278 311 if isinstance(key, slice):
279 312 indices = range(len(self.ids))[key]
280 313 ids = sorted(self._ids)
281 314 key = [ ids[i] for i in indices ]
282 315 # newkeys = sorted(self._ids)[thekeys[k]]
283 316
284 317 if isinstance(key, (tuple, list, xrange)):
285 318 _,targets = self._build_targets(list(key))
286 319 return DirectView(self, targets)
287 320 else:
288 321 raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
289 322
290 323 ############ begin real methods #############
291 324
292 325 def spin(self):
293 326 """flush incoming notifications and execution results."""
294 327 if self.notification_socket:
295 328 self._flush_notifications()
296 329 if self.queue_socket:
297 330 self._flush_results(self.queue_socket)
298 331 if self.task_socket:
299 332 self._flush_results(self.task_socket)
333 if self.control_socket:
334 self._flush_control(self.control_socket)
300 335
301 336 @spinfirst
302 337 def queue_status(self, targets=None, verbose=False):
303 338 """fetch the status of engine queues
304 339
305 340 Parameters
306 341 ----------
307 342 targets : int/str/list of ints/strs
308 343 the engines on which to execute
309 344 default : all
310 345 verbose : bool
311 whether to return
346 whether to return lengths only, or lists of ids for each element
312 347
313 348 """
314 349 targets = self._build_targets(targets)[1]
315 350 content = dict(targets=targets)
316 self.session.send(self.controller_socket, "queue_request", content=content)
317 idents,msg = self.session.recv(self.controller_socket, 0)
351 self.session.send(self.query_socket, "queue_request", content=content)
352 idents,msg = self.session.recv(self.query_socket, 0)
353 if self.debug:
354 pprint(msg)
318 355 return msg['content']
319 356
320 357 @spinfirst
321 def clear(self, targets=None):
358 @defaultblock
359 def clear(self, targets=None, block=None):
322 360 """clear the namespace in target(s)"""
323 pass
361 targets = self._build_targets(targets)[0]
362 print targets
363 for t in targets:
364 self.session.send(self.control_socket, 'clear_request', content={},ident=t)
365 error = False
366 if self.block:
367 for i in range(len(targets)):
368 idents,msg = self.session.recv(self.control_socket,0)
369 if self.debug:
370 pprint(msg)
371 if msg['content']['status'] != 'ok':
372 error = msg['content']
373 if error:
374 return error
375
324 376
325 377 @spinfirst
326 def abort(self, targets=None):
378 @defaultblock
379 def abort(self, msg_ids = None, targets=None, block=None):
327 380 """abort the Queues of target(s)"""
328 pass
381 targets = self._build_targets(targets)[0]
382 print targets
383 if isinstance(msg_ids, basestring):
384 msg_ids = [msg_ids]
385 content = dict(msg_ids=msg_ids)
386 for t in targets:
387 self.session.send(self.control_socket, 'abort_request',
388 content=content, ident=t)
389 error = False
390 if self.block:
391 for i in range(len(targets)):
392 idents,msg = self.session.recv(self.control_socket,0)
393 if self.debug:
394 pprint(msg)
395 if msg['content']['status'] != 'ok':
396 error = msg['content']
397 if error:
398 return error
329 399
400 @spinfirst
401 @defaultblock
402 def kill(self, targets=None, block=None):
403 """Terminates one or more engine processes."""
404 targets = self._build_targets(targets)[0]
405 print targets
406 for t in targets:
407 self.session.send(self.control_socket, 'kill_request', content={},ident=t)
408 error = False
409 if self.block:
410 for i in range(len(targets)):
411 idents,msg = self.session.recv(self.control_socket,0)
412 if self.debug:
413 pprint(msg)
414 if msg['content']['status'] != 'ok':
415 error = msg['content']
416 if error:
417 return error
418
330 419 @defaultblock
331 420 def execute(self, code, targets='all', block=None):
332 421 """executes `code` on `targets` in blocking or nonblocking manner.
333 422
334 423 Parameters
335 424 ----------
336 425 code : str
337 426 the code string to be executed
338 427 targets : int/str/list of ints/strs
339 428 the engines on which to execute
340 429 default : all
341 430 block : bool
342 431 whether or not to wait until done
343 432 """
344 433 # block = self.block if block is None else block
345 434 # saveblock = self.block
346 435 # self.block = block
347 436 result = self.apply(execute, (code,), targets=targets, block=block, bound=True)
348 437 # self.block = saveblock
349 438 return result
350 439
351 440 def run(self, code, block=None):
352 441 """runs `code` on an engine.
353 442
354 443 Calls to this are load-balanced.
355 444
356 445 Parameters
357 446 ----------
358 447 code : str
359 448 the code string to be executed
360 449 block : bool
361 450 whether or not to wait until done
362 451
363 452 """
364 453 result = self.apply(execute, (code,), targets=None, block=block, bound=False)
365 454 return result
366
367 # a = time.time()
368 # content = dict(code=code)
369 # b = time.time()
370 # msg = self.session.send(self.task_socket, 'execute_request',
371 # content=content)
372 # c = time.time()
373 # msg_id = msg['msg_id']
374 # self.outstanding.add(msg_id)
375 # self.history.append(msg_id)
376 # d = time.time()
377 # if block:
378 # self.barrier(msg_id)
379 # return self.results[msg_id]
380 # else:
381 # return msg_id
382 455
383 456 def _apply_balanced(self, f, args, kwargs, bound=True, block=None):
384 457 """the underlying method for applying functions in a load balanced
385 458 manner."""
386 459 block = block if block is not None else self.block
387 460
388 461 bufs = ss.pack_apply_message(f,args,kwargs)
389 462 content = dict(bound=bound)
390 463 msg = self.session.send(self.task_socket, "apply_request",
391 464 content=content, buffers=bufs)
392 465 msg_id = msg['msg_id']
393 466 self.outstanding.add(msg_id)
394 467 self.history.append(msg_id)
395 468 if block:
396 469 self.barrier(msg_id)
397 470 return self.results[msg_id]
398 471 else:
399 472 return msg_id
400 473
401 474 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None):
402 475 """Then underlying method for applying functions to specific engines."""
403 476 block = block if block is not None else self.block
404 477 queues,targets = self._build_targets(targets)
405
478 print queues
406 479 bufs = ss.pack_apply_message(f,args,kwargs)
407 480 content = dict(bound=bound)
408 481 msg_ids = []
409 482 for queue in queues:
410 483 msg = self.session.send(self.queue_socket, "apply_request",
411 484 content=content, buffers=bufs,ident=queue)
412 485 msg_id = msg['msg_id']
413 486 self.outstanding.add(msg_id)
414 487 self.history.append(msg_id)
415 488 msg_ids.append(msg_id)
416 489 if block:
417 490 self.barrier(msg_ids)
418 491 else:
419 492 if len(msg_ids) == 1:
420 493 return msg_ids[0]
421 494 else:
422 495 return msg_ids
423 496 if len(msg_ids) == 1:
424 497 return self.results[msg_ids[0]]
425 498 else:
426 499 result = {}
427 500 for target,mid in zip(targets, msg_ids):
428 501 result[target] = self.results[mid]
429 502 return result
430 503
431 504 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None):
432 505 """calls f(*args, **kwargs) on a remote engine(s), returning the result.
433 506
434 507 if self.block is False:
435 508 returns msg_id or list of msg_ids
436 509 else:
437 510 returns actual result of f(*args, **kwargs)
438 511 """
439 512 args = args if args is not None else []
440 513 kwargs = kwargs if kwargs is not None else {}
514 if not isinstance(args, (tuple, list)):
515 raise TypeError("args must be tuple or list, not %s"%type(args))
516 if not isinstance(kwargs, dict):
517 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
441 518 if targets is None:
442 519 return self._apply_balanced(f,args,kwargs,bound=bound, block=block)
443 520 else:
444 521 return self._apply_direct(f, args, kwargs,
445 522 bound=bound,block=block, targets=targets)
446 523
447 # def apply_bound(self, f, *args, **kwargs):
448 # """calls f(*args, **kwargs) on a remote engine. This does get
449 # executed in an engine's namespace. The controller selects the
450 # target engine via 0MQ XREQ load balancing.
451 #
452 # if self.block is False:
453 # returns msg_id
454 # else:
455 # returns actual result of f(*args, **kwargs)
456 # """
457 # return self._apply(f, args, kwargs, bound=True)
458 #
459 #
460 # def apply_to(self, targets, f, *args, **kwargs):
461 # """calls f(*args, **kwargs) on a specific engine.
462 #
463 # if self.block is False:
464 # returns msg_id
465 # else:
466 # returns actual result of f(*args, **kwargs)
467 #
468 # The target's namespace is not used here.
469 # Use apply_bound_to() to access target's globals.
470 # """
471 # return self._apply_to(False, targets, f, args, kwargs)
472 #
473 # def apply_bound_to(self, targets, f, *args, **kwargs):
474 # """calls f(*args, **kwargs) on a specific engine.
475 #
476 # if self.block is False:
477 # returns msg_id
478 # else:
479 # returns actual result of f(*args, **kwargs)
480 #
481 # This method has access to the target's globals
482 #
483 # """
484 # return self._apply_to(f, args, kwargs)
485 #
486 524 def push(self, ns, targets=None, block=None):
487 525 """push the contents of `ns` into the namespace on `target`"""
488 526 if not isinstance(ns, dict):
489 527 raise TypeError("Must be a dict, not %s"%type(ns))
490 528 result = self.apply(_push, (ns,), targets=targets, block=block,bound=True)
491 529 return result
492 530
493 531 @spinfirst
494 532 def pull(self, keys, targets=None, block=True):
495 533 """pull objects from `target`'s namespace by `keys`"""
496 534
497 535 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
498 536 return result
499 537
500 538 def barrier(self, msg_ids=None, timeout=-1):
501 539 """waits on one or more `msg_ids`, for up to `timeout` seconds.
502 540
503 541 Parameters
504 542 ----------
505 543 msg_ids : int, str, or list of ints and/or strs
506 544 ints are indices to self.history
507 545 strs are msg_ids
508 546 default: wait on all outstanding messages
509 547 timeout : float
510 548 a time in seconds, after which to give up.
511 549 default is -1, which means no timeout
512 550
513 551 Returns
514 552 -------
515 553 True : when all msg_ids are done
516 554 False : timeout reached, msg_ids still outstanding
517 555 """
518 556 tic = time.time()
519 557 if msg_ids is None:
520 558 theids = self.outstanding
521 559 else:
522 560 if isinstance(msg_ids, (int, str)):
523 561 msg_ids = [msg_ids]
524 562 theids = set()
525 563 for msg_id in msg_ids:
526 564 if isinstance(msg_id, int):
527 565 msg_id = self.history[msg_id]
528 566 theids.add(msg_id)
529 567 self.spin()
530 568 while theids.intersection(self.outstanding):
531 569 if timeout >= 0 and ( time.time()-tic ) > timeout:
532 570 break
533 571 time.sleep(1e-3)
534 572 self.spin()
535 573 return len(theids.intersection(self.outstanding)) == 0
536 574
537 575 @spinfirst
538 576 def get_results(self, msg_ids,status_only=False):
539 577 """returns the result of the execute or task request with `msg_id`"""
540 578 if not isinstance(msg_ids, (list,tuple)):
541 579 msg_ids = [msg_ids]
542 580 theids = []
543 581 for msg_id in msg_ids:
544 582 if isinstance(msg_id, int):
545 583 msg_id = self.history[msg_id]
546 584 theids.append(msg_id)
547 585
548 586 content = dict(msg_ids=theids, status_only=status_only)
549 msg = self.session.send(self.controller_socket, "result_request", content=content)
550 zmq.select([self.controller_socket], [], [])
551 idents,msg = self.session.recv(self.controller_socket, zmq.NOBLOCK)
587 msg = self.session.send(self.query_socket, "result_request", content=content)
588 zmq.select([self.query_socket], [], [])
589 idents,msg = self.session.recv(self.query_socket, zmq.NOBLOCK)
590 if self.debug:
591 pprint(msg)
552 592
553 593 # while True:
554 594 # try:
555 595 # except zmq.ZMQError:
556 596 # time.sleep(1e-3)
557 597 # continue
558 598 # else:
559 599 # break
560 600 return msg['content']
561 601
562 602 No newline at end of file
@@ -1,770 +1,772 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 """The IPython Controller with 0MQ
5 5 This is the master object that handles connections from engines, clients, and
6 6 """
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17 from datetime import datetime
18 18
19 19 import zmq
20 20 from zmq.eventloop import zmqstream, ioloop
21 21 import uuid
22 22
23 23 # internal:
24 24 from streamsession import Message, wrap_exception # default_unpacker as unpack, default_packer as pack
25 25 from IPython.zmq.log import logger # a Logger object
26 26
27 27 # from messages import json # use the same import switches
28 28
29 29 #-----------------------------------------------------------------------------
30 30 # Code
31 31 #-----------------------------------------------------------------------------
32 32
33 33 class ReverseDict(dict):
34 34 """simple double-keyed subset of dict methods."""
35 35
36 36 def __init__(self, *args, **kwargs):
37 37 dict.__init__(self, *args, **kwargs)
38 38 self.reverse = dict()
39 39 for key, value in self.iteritems():
40 40 self.reverse[value] = key
41 41
42 42 def __getitem__(self, key):
43 43 try:
44 44 return dict.__getitem__(self, key)
45 45 except KeyError:
46 46 return self.reverse[key]
47 47
48 48 def __setitem__(self, key, value):
49 49 if key in self.reverse:
50 50 raise KeyError("Can't have key %r on both sides!"%key)
51 51 dict.__setitem__(self, key, value)
52 52 self.reverse[value] = key
53 53
54 54 def pop(self, key):
55 55 value = dict.pop(self, key)
56 56 self.d1.pop(value)
57 57 return value
58 58
59 59
60 60 class EngineConnector(object):
61 61 """A simple object for accessing the various zmq connections of an object.
62 62 Attributes are:
63 63 id (int): engine ID
64 64 uuid (str): uuid (unused?)
65 65 queue (str): identity of queue's XREQ socket
66 66 registration (str): identity of registration XREQ socket
67 67 heartbeat (str): identity of heartbeat XREQ socket
68 68 """
69 69 id=0
70 70 queue=None
71 71 control=None
72 72 registration=None
73 73 heartbeat=None
74 74 pending=None
75 75
76 76 def __init__(self, id, queue, registration, control, heartbeat=None):
77 77 logger.info("engine::Engine Connected: %i"%id)
78 78 self.id = id
79 79 self.queue = queue
80 80 self.registration = registration
81 81 self.control = control
82 82 self.heartbeat = heartbeat
83 83
84 84 class Controller(object):
85 85 """The IPython Controller with 0MQ connections
86 86
87 87 Parameters
88 88 ==========
89 89 loop: zmq IOLoop instance
90 90 session: StreamSession object
91 91 <removed> context: zmq context for creating new connections (?)
92 92 registrar: ZMQStream for engine registration requests (XREP)
93 93 clientele: ZMQStream for client connections (XREP)
94 94 not used for jobs, only query/control commands
95 95 queue: ZMQStream for monitoring the command queue (SUB)
96 96 heartbeat: HeartMonitor object checking the pulse of the engines
97 97 db_stream: connection to db for out of memory logging of commands
98 98 NotImplemented
99 99 queue_addr: zmq connection address of the XREP socket for the queue
100 100 hb_addr: zmq connection address of the PUB socket for heartbeats
101 101 task_addr: zmq connection address of the XREQ socket for task queue
102 102 """
103 103 # internal data structures:
104 104 ids=None # engine IDs
105 105 keytable=None
106 106 engines=None
107 107 clients=None
108 108 hearts=None
109 109 pending=None
110 110 results=None
111 111 tasks=None
112 112 completed=None
113 113 mia=None
114 114 incoming_registrations=None
115 115 registration_timeout=None
116 116
117 117 #objects from constructor:
118 118 loop=None
119 119 registrar=None
120 120 clientelle=None
121 121 queue=None
122 122 heartbeat=None
123 123 notifier=None
124 124 db=None
125 125 client_addr=None
126 126 engine_addrs=None
127 127
128 128
129 129 def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifier, db, engine_addrs, client_addrs):
130 130 """
131 131 # universal:
132 132 loop: IOLoop for creating future connections
133 133 session: streamsession for sending serialized data
134 134 # engine:
135 135 queue: ZMQStream for monitoring queue messages
136 136 registrar: ZMQStream for engine registration
137 137 heartbeat: HeartMonitor object for tracking engines
138 138 # client:
139 139 clientele: ZMQStream for client connections
140 140 # extra:
141 141 db: ZMQStream for db connection (NotImplemented)
142 142 engine_addrs: zmq address/protocol dict for engine connections
143 143 client_addrs: zmq address/protocol dict for client connections
144 144 """
145 145 self.ids = set()
146 146 self.keytable={}
147 147 self.incoming_registrations={}
148 148 self.engines = {}
149 149 self.by_ident = {}
150 150 self.clients = {}
151 151 self.hearts = {}
152 152 self.mia = set()
153 153
154 154 # self.sockets = {}
155 155 self.loop = loop
156 156 self.session = session
157 157 self.registrar = registrar
158 158 self.clientele = clientele
159 159 self.queue = queue
160 160 self.heartbeat = heartbeat
161 161 self.notifier = notifier
162 162 self.db = db
163 163
164 164 self.client_addrs = client_addrs
165 165 assert isinstance(client_addrs['queue'], str)
166 166 # self.hb_addrs = hb_addrs
167 167 self.engine_addrs = engine_addrs
168 168 assert isinstance(engine_addrs['queue'], str)
169 169 assert len(engine_addrs['heartbeat']) == 2
170 170
171 171
172 172 # register our callbacks
173 173 self.registrar.on_recv(self.dispatch_register_request)
174 174 self.clientele.on_recv(self.dispatch_client_msg)
175 175 self.queue.on_recv(self.dispatch_queue_traffic)
176 176
177 177 if heartbeat is not None:
178 178 heartbeat.add_heart_failure_handler(self.handle_heart_failure)
179 179 heartbeat.add_new_heart_handler(self.handle_new_heart)
180 180
181 181 if self.db is not None:
182 182 self.db.on_recv(self.dispatch_db)
183 183
184 184 self.client_handlers = {'queue_request': self.queue_status,
185 185 'result_request': self.get_results,
186 186 'purge_request': self.purge_results,
187 187 'resubmit_request': self.resubmit_task,
188 188 }
189 189
190 190 self.registrar_handlers = {'registration_request' : self.register_engine,
191 191 'unregistration_request' : self.unregister_engine,
192 192 'connection_request': self.connection_request,
193 193
194 194 }
195 195 #
196 196 # this is the stuff that will move to DB:
197 197 self.results = {} # completed results
198 198 self.pending = {} # pending messages, keyed by msg_id
199 199 self.queues = {} # pending msg_ids keyed by engine_id
200 200 self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
201 201 self.completed = {} # completed msg_ids keyed by engine_id
202 202 self.registration_timeout = max(5000, 2*self.heartbeat.period)
203 203
204 204 logger.info("controller::created controller")
205 205
206 206 def _new_id(self):
207 207 """gemerate a new ID"""
208 208 newid = 0
209 209 incoming = [id[0] for id in self.incoming_registrations.itervalues()]
210 210 # print newid, self.ids, self.incoming_registrations
211 211 while newid in self.ids or newid in incoming:
212 212 newid += 1
213 213 return newid
214 214
215 215
216 216 #-----------------------------------------------------------------------------
217 217 # message validation
218 218 #-----------------------------------------------------------------------------
219 219 def _validate_targets(self, targets):
220 220 """turn any valid targets argument into a list of integer ids"""
221 221 if targets is None:
222 222 # default to all
223 223 targets = self.ids
224 224
225 225 if isinstance(targets, (int,str,unicode)):
226 226 # only one target specified
227 227 targets = [targets]
228 228 _targets = []
229 229 for t in targets:
230 230 # map raw identities to ids
231 231 if isinstance(t, (str,unicode)):
232 232 t = self.by_ident.get(t, t)
233 233 _targets.append(t)
234 234 targets = _targets
235 235 bad_targets = [ t for t in targets if t not in self.ids ]
236 236 if bad_targets:
237 237 raise IndexError("No Such Engine: %r"%bad_targets)
238 238 if not targets:
239 239 raise IndexError("No Engines Registered")
240 240 return targets
241 241
242 242 def _validate_client_msg(self, msg):
243 243 """validates and unpacks headers of a message. Returns False if invalid,
244 244 (ident, header, parent, content)"""
245 245 client_id = msg[0]
246 246 try:
247 247 msg = self.session.unpack_message(msg[1:], content=True)
248 248 except:
249 249 logger.error("client::Invalid Message %s"%msg)
250 250 return False
251 251
252 252 msg_type = msg.get('msg_type', None)
253 253 if msg_type is None:
254 254 return False
255 255 header = msg.get('header')
256 256 # session doesn't handle split content for now:
257 257 return client_id, msg
258 258
259 259
260 260 #-----------------------------------------------------------------------------
261 261 # dispatch methods (1 per socket)
262 262 #-----------------------------------------------------------------------------
263 263
264 264 def dispatch_register_request(self, msg):
265 265 """"""
266 266 logger.debug("registration::dispatch_register_request(%s)"%msg)
267 267 idents,msg = self.session.feed_identities(msg)
268 268 print idents,msg, len(msg)
269 269 try:
270 270 msg = self.session.unpack_message(msg,content=True)
271 271 except Exception, e:
272 272 logger.error("registration::got bad registration message: %s"%msg)
273 273 raise e
274 274 return
275 275
276 276 msg_type = msg['msg_type']
277 277 content = msg['content']
278 278
279 279 handler = self.registrar_handlers.get(msg_type, None)
280 280 if handler is None:
281 281 logger.error("registration::got bad registration message: %s"%msg)
282 282 else:
283 283 handler(idents, msg)
284 284
285 285 def dispatch_queue_traffic(self, msg):
286 286 """all ME and Task queue messages come through here"""
287 287 logger.debug("queue traffic: %s"%msg[:2])
288 288 switch = msg[0]
289 289 idents, msg = self.session.feed_identities(msg[1:])
290 290 if switch == 'in':
291 291 self.save_queue_request(idents, msg)
292 292 elif switch == 'out':
293 293 self.save_queue_result(idents, msg)
294 294 elif switch == 'intask':
295 295 self.save_task_request(idents, msg)
296 296 elif switch == 'outtask':
297 297 self.save_task_result(idents, msg)
298 298 elif switch == 'tracktask':
299 299 self.save_task_destination(idents, msg)
300 elif switch in ('incontrol', 'outcontrol'):
301 pass
300 302 else:
301 303 logger.error("Invalid message topic: %s"%switch)
302 304
303 305
304 306 def dispatch_client_msg(self, msg):
305 307 """Route messages from clients"""
306 308 idents, msg = self.session.feed_identities(msg)
307 309 client_id = idents[0]
308 310 try:
309 311 msg = self.session.unpack_message(msg, content=True)
310 312 except:
311 313 content = wrap_exception()
312 314 logger.error("Bad Client Message: %s"%msg)
313 315 self.session.send(self.clientele, "controller_error", ident=client_id,
314 316 content=content)
315 317 return
316 318
317 319 # print client_id, header, parent, content
318 320 #switch on message type:
319 321 msg_type = msg['msg_type']
320 322 logger.info("client:: client %s requested %s"%(client_id, msg_type))
321 323 handler = self.client_handlers.get(msg_type, None)
322 324 try:
323 325 assert handler is not None, "Bad Message Type: %s"%msg_type
324 326 except:
325 327 content = wrap_exception()
326 328 logger.error("Bad Message Type: %s"%msg_type)
327 329 self.session.send(self.clientele, "controller_error", ident=client_id,
328 330 content=content)
329 331 return
330 332 else:
331 333 handler(client_id, msg)
332 334
333 335 def dispatch_db(self, msg):
334 336 """"""
335 337 raise NotImplementedError
336 338
337 339 #---------------------------------------------------------------------------
338 340 # handler methods (1 per event)
339 341 #---------------------------------------------------------------------------
340 342
341 343 #----------------------- Heartbeat --------------------------------------
342 344
343 345 def handle_new_heart(self, heart):
344 346 """handler to attach to heartbeater.
345 347 Called when a new heart starts to beat.
346 348 Triggers completion of registration."""
347 349 logger.debug("heartbeat::handle_new_heart(%r)"%heart)
348 350 if heart not in self.incoming_registrations:
349 351 logger.info("heartbeat::ignoring new heart: %r"%heart)
350 352 else:
351 353 self.finish_registration(heart)
352 354
353 355
354 356 def handle_heart_failure(self, heart):
355 357 """handler to attach to heartbeater.
356 358 called when a previously registered heart fails to respond to beat request.
357 359 triggers unregistration"""
358 360 logger.debug("heartbeat::handle_heart_failure(%r)"%heart)
359 361 eid = self.hearts.get(heart, None)
360 362 if eid is None:
361 363 logger.info("heartbeat::ignoring heart failure %r"%heart)
362 364 else:
363 365 self.unregister_engine(heart, dict(content=dict(id=eid)))
364 366
365 367 #----------------------- MUX Queue Traffic ------------------------------
366 368
367 369 def save_queue_request(self, idents, msg):
368 370 queue_id, client_id = idents[:2]
369 371
370 372 try:
371 373 msg = self.session.unpack_message(msg, content=False)
372 374 except:
373 375 logger.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg))
374 376 return
375 377
376 378 eid = self.by_ident.get(queue_id, None)
377 379 if eid is None:
378 380 logger.error("queue::target %r not registered"%queue_id)
379 381 logger.debug("queue:: valid are: %s"%(self.by_ident.keys()))
380 382 return
381 383
382 384 header = msg['header']
383 385 msg_id = header['msg_id']
384 386 info = dict(submit=datetime.now(),
385 387 received=None,
386 388 engine=(eid, queue_id))
387 389 self.pending[msg_id] = ( msg, info )
388 390 self.queues[eid][0].append(msg_id)
389 391
390 392 def save_queue_result(self, idents, msg):
391 393 client_id, queue_id = idents[:2]
392 394
393 395 try:
394 396 msg = self.session.unpack_message(msg, content=False)
395 397 except:
396 398 logger.error("queue::engine %r sent invalid message to %r: %s"%(
397 399 queue_id,client_id, msg))
398 400 return
399 401
400 402 eid = self.by_ident.get(queue_id, None)
401 403 if eid is None:
402 404 logger.error("queue::unknown engine %r is sending a reply: "%queue_id)
403 405 logger.debug("queue:: %s"%msg[2:])
404 406 return
405 407
406 408 parent = msg['parent_header']
407 409 if not parent:
408 410 return
409 411 msg_id = parent['msg_id']
410 412 self.results[msg_id] = msg
411 413 if msg_id in self.pending:
412 414 self.pending.pop(msg_id)
413 415 self.queues[eid][0].remove(msg_id)
414 416 self.completed[eid].append(msg_id)
415 417 else:
416 418 logger.debug("queue:: unknown msg finished %s"%msg_id)
417 419
418 420 #--------------------- Task Queue Traffic ------------------------------
419 421
420 422 def save_task_request(self, idents, msg):
421 423 client_id = idents[0]
422 424
423 425 try:
424 426 msg = self.session.unpack_message(msg, content=False)
425 427 except:
426 428 logger.error("task::client %r sent invalid task message: %s"%(
427 429 client_id, msg))
428 430 return
429 431
430 432 header = msg['header']
431 433 msg_id = header['msg_id']
432 434 self.mia.add(msg_id)
433 435 self.pending[msg_id] = msg
434 436 if not self.tasks.has_key(client_id):
435 437 self.tasks[client_id] = []
436 438 self.tasks[client_id].append(msg_id)
437 439
438 440 def save_task_result(self, idents, msg):
439 441 client_id = idents[0]
440 442 try:
441 443 msg = self.session.unpack_message(msg, content=False)
442 444 except:
443 445 logger.error("task::invalid task result message send to %r: %s"%(
444 446 client_id, msg))
445 447 return
446 448
447 449 parent = msg['parent_header']
448 450 if not parent:
449 451 # print msg
450 452 # logger.warn("")
451 453 return
452 454 msg_id = parent['msg_id']
453 455 self.results[msg_id] = msg
454 456 if msg_id in self.pending:
455 457 self.pending.pop(msg_id)
456 458 if msg_id in self.mia:
457 459 self.mia.remove(msg_id)
458 460 else:
459 461 logger.debug("task:: unknown task %s finished"%msg_id)
460 462
461 463 def save_task_destination(self, idents, msg):
462 464 try:
463 465 msg = self.session.unpack_message(msg, content=True)
464 466 except:
465 467 logger.error("task::invalid task tracking message")
466 468 return
467 469 content = msg['content']
468 470 print content
469 471 msg_id = content['msg_id']
470 472 engine_uuid = content['engine_id']
471 473 for eid,queue_id in self.keytable.iteritems():
472 474 if queue_id == engine_uuid:
473 475 break
474 476
475 477 logger.info("task:: task %s arrived on %s"%(msg_id, eid))
476 478 if msg_id in self.mia:
477 479 self.mia.remove(msg_id)
478 480 else:
479 481 logger.debug("task::task %s not listed as MIA?!"%(msg_id))
480 482 self.tasks[engine_uuid].append(msg_id)
481 483
482 484 def mia_task_request(self, idents, msg):
483 485 client_id = idents[0]
484 486 content = dict(mia=self.mia,status='ok')
485 487 self.session.send('mia_reply', content=content, idents=client_id)
486 488
487 489
488 490
489 491 #-------------------- Registration -----------------------------
490 492
491 493 def connection_request(self, client_id, msg):
492 494 """reply with connection addresses for clients"""
493 495 logger.info("client::client %s connected"%client_id)
494 496 content = dict(status='ok')
495 497 content.update(self.client_addrs)
496 498 jsonable = {}
497 499 for k,v in self.keytable.iteritems():
498 500 jsonable[str(k)] = v
499 501 content['engines'] = jsonable
500 502 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
501 503
502 504 def register_engine(self, reg, msg):
503 505 """register an engine"""
504 506 content = msg['content']
505 507 try:
506 508 queue = content['queue']
507 509 except KeyError:
508 510 logger.error("registration::queue not specified")
509 511 return
510 512 heart = content.get('heartbeat', None)
511 513 """register a new engine, and create the socket(s) necessary"""
512 514 eid = self._new_id()
513 515 # print (eid, queue, reg, heart)
514 516
515 517 logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
516 518
517 519 content = dict(id=eid,status='ok')
518 520 content.update(self.engine_addrs)
519 521 # check if requesting available IDs:
520 522 if queue in self.by_ident:
521 523 content = {'status': 'error', 'reason': "queue_id %r in use"%queue}
522 524 elif heart in self.hearts: # need to check unique hearts?
523 525 content = {'status': 'error', 'reason': "heart_id %r in use"%heart}
524 526 else:
525 527 for h, pack in self.incoming_registrations.iteritems():
526 528 if heart == h:
527 529 content = {'status': 'error', 'reason': "heart_id %r in use"%heart}
528 530 break
529 531 elif queue == pack[1]:
530 532 content = {'status': 'error', 'reason': "queue_id %r in use"%queue}
531 533 break
532 534
533 535 msg = self.session.send(self.registrar, "registration_reply",
534 536 content=content,
535 537 ident=reg)
536 538
537 539 if content['status'] == 'ok':
538 540 if heart in self.heartbeat.hearts:
539 541 # already beating
540 542 self.incoming_registrations[heart] = (eid,queue,reg,None)
541 543 self.finish_registration(heart)
542 544 else:
543 545 purge = lambda : self._purge_stalled_registration(heart)
544 546 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
545 547 dc.start()
546 548 self.incoming_registrations[heart] = (eid,queue,reg,dc)
547 549 else:
548 550 logger.error("registration::registration %i failed: %s"%(eid, content['reason']))
549 551 return eid
550 552
551 553 def unregister_engine(self, ident, msg):
552 554 try:
553 555 eid = msg['content']['id']
554 556 except:
555 557 logger.error("registration::bad engine id for unregistration: %s"%ident)
556 558 return
557 559 logger.info("registration::unregister_engine(%s)"%eid)
558 560 content=dict(id=eid, queue=self.engines[eid].queue)
559 561 self.ids.remove(eid)
560 562 self.keytable.pop(eid)
561 563 ec = self.engines.pop(eid)
562 564 self.hearts.pop(ec.heartbeat)
563 565 self.by_ident.pop(ec.queue)
564 566 self.completed.pop(eid)
565 567 for msg_id in self.queues.pop(eid)[0]:
566 568 msg = self.pending.pop(msg_id)
567 569 ############## TODO: HANDLE IT ################
568 570
569 571 if self.notifier:
570 572 self.session.send(self.notifier, "unregistration_notification", content=content)
571 573
572 574 def finish_registration(self, heart):
573 575 try:
574 576 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
575 577 except KeyError:
576 578 logger.error("registration::tried to finish nonexistant registration")
577 579 return
578 580 logger.info("registration::finished registering engine %i:%r"%(eid,queue))
579 581 if purge is not None:
580 582 purge.stop()
581 583 control = queue
582 584 self.ids.add(eid)
583 585 self.keytable[eid] = queue
584 586 self.engines[eid] = EngineConnector(eid, queue, reg, control, heart)
585 587 self.by_ident[queue] = eid
586 588 self.queues[eid] = ([],[])
587 589 self.completed[eid] = list()
588 590 self.hearts[heart] = eid
589 591 content = dict(id=eid, queue=self.engines[eid].queue)
590 592 if self.notifier:
591 593 self.session.send(self.notifier, "registration_notification", content=content)
592 594
593 595 def _purge_stalled_registration(self, heart):
594 596 if heart in self.incoming_registrations:
595 597 eid = self.incoming_registrations.pop(heart)[0]
596 598 logger.info("registration::purging stalled registration: %i"%eid)
597 599 else:
598 600 pass
599 601
600 602 #------------------- Client Requests -------------------------------
601 603
602 604 def check_load(self, client_id, msg):
603 605 content = msg['content']
604 606 try:
605 607 targets = content['targets']
606 608 targets = self._validate_targets(targets)
607 609 except:
608 610 content = wrap_exception()
609 611 self.session.send(self.clientele, "controller_error",
610 612 content=content, ident=client_id)
611 613 return
612 614
613 615 content = dict(status='ok')
614 616 # loads = {}
615 617 for t in targets:
616 618 content[str(t)] = len(self.queues[t])
617 619 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
618 620
619 621
620 622 def queue_status(self, client_id, msg):
621 623 """handle queue_status request"""
622 624 content = msg['content']
623 625 targets = content['targets']
624 626 try:
625 627 targets = self._validate_targets(targets)
626 628 except:
627 629 content = wrap_exception()
628 630 self.session.send(self.clientele, "controller_error",
629 631 content=content, ident=client_id)
630 632 return
631 633 verbose = msg.get('verbose', False)
632 634 content = dict()
633 635 for t in targets:
634 636 queue = self.queues[t]
635 637 completed = self.completed[t]
636 638 if not verbose:
637 639 queue = len(queue)
638 640 completed = len(completed)
639 641 content[str(t)] = {'queue': queue, 'completed': completed }
640 642 # pending
641 643 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
642 644
643 645 def job_status(self, client_id, msg):
644 646 """handle queue_status request"""
645 647 content = msg['content']
646 648 msg_ids = content['msg_ids']
647 649 try:
648 650 targets = self._validate_targets(targets)
649 651 except:
650 652 content = wrap_exception()
651 653 self.session.send(self.clientele, "controller_error",
652 654 content=content, ident=client_id)
653 655 return
654 656 verbose = msg.get('verbose', False)
655 657 content = dict()
656 658 for t in targets:
657 659 queue = self.queues[t]
658 660 completed = self.completed[t]
659 661 if not verbose:
660 662 queue = len(queue)
661 663 completed = len(completed)
662 664 content[str(t)] = {'queue': queue, 'completed': completed }
663 665 # pending
664 666 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
665 667
666 668 def purge_results(self, client_id, msg):
667 669 content = msg['content']
668 670 msg_ids = content.get('msg_ids', [])
669 671 reply = dict(status='ok')
670 672 if msg_ids == 'all':
671 673 self.results = {}
672 674 else:
673 675 for msg_id in msg_ids:
674 676 if msg_id in self.results:
675 677 self.results.pop(msg_id)
676 678 else:
677 679 if msg_id in self.pending:
678 680 reply = dict(status='error', reason="msg pending: %r"%msg_id)
679 681 else:
680 682 reply = dict(status='error', reason="No such msg: %r"%msg_id)
681 683 break
682 684 eids = content.get('engine_ids', [])
683 685 for eid in eids:
684 686 if eid not in self.engines:
685 687 reply = dict(status='error', reason="No such engine: %i"%eid)
686 688 break
687 689 msg_ids = self.completed.pop(eid)
688 690 for msg_id in msg_ids:
689 691 self.results.pop(msg_id)
690 692
691 693 self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
692 694
693 695 def resubmit_task(self, client_id, msg, buffers):
694 696 content = msg['content']
695 697 header = msg['header']
696 698
697 699
698 700 msg_ids = content.get('msg_ids', [])
699 701 reply = dict(status='ok')
700 702 if msg_ids == 'all':
701 703 self.results = {}
702 704 else:
703 705 for msg_id in msg_ids:
704 706 if msg_id in self.results:
705 707 self.results.pop(msg_id)
706 708 else:
707 709 if msg_id in self.pending:
708 710 reply = dict(status='error', reason="msg pending: %r"%msg_id)
709 711 else:
710 712 reply = dict(status='error', reason="No such msg: %r"%msg_id)
711 713 break
712 714 eids = content.get('engine_ids', [])
713 715 for eid in eids:
714 716 if eid not in self.engines:
715 717 reply = dict(status='error', reason="No such engine: %i"%eid)
716 718 break
717 719 msg_ids = self.completed.pop(eid)
718 720 for msg_id in msg_ids:
719 721 self.results.pop(msg_id)
720 722
721 723 self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
722 724
723 725 def get_results(self, client_id, msg):
724 726 """get the result of 1 or more messages"""
725 727 content = msg['content']
726 728 msg_ids = set(content['msg_ids'])
727 729 statusonly = content.get('status_only', False)
728 730 pending = []
729 731 completed = []
730 732 content = dict(status='ok')
731 733 content['pending'] = pending
732 734 content['completed'] = completed
733 735 for msg_id in msg_ids:
734 736 if msg_id in self.pending:
735 737 pending.append(msg_id)
736 738 elif msg_id in self.results:
737 739 completed.append(msg_id)
738 740 if not statusonly:
739 741 content[msg_id] = self.results[msg_id]['content']
740 742 else:
741 743 content = dict(status='error')
742 744 content['reason'] = 'no such message: '+msg_id
743 745 break
744 746 self.session.send(self.clientele, "result_reply", content=content,
745 747 parent=msg, ident=client_id)
746 748
747 749
748 750
749 751 ############ OLD METHODS for Python Relay Controller ###################
750 752 def _validate_engine_msg(self, msg):
751 753 """validates and unpacks headers of a message. Returns False if invalid,
752 754 (ident, message)"""
753 755 ident = msg[0]
754 756 try:
755 757 msg = self.session.unpack_message(msg[1:], content=False)
756 758 except:
757 759 logger.error("engine.%s::Invalid Message %s"%(ident, msg))
758 760 return False
759 761
760 762 try:
761 763 eid = msg.header.username
762 764 assert self.engines.has_key(eid)
763 765 except:
764 766 logger.error("engine::Invalid Engine ID %s"%(ident))
765 767 return False
766 768
767 769 return eid, msg
768 770
769 771
770 772 No newline at end of file
@@ -1,143 +1,141 b''
1 1 #!/usr/bin/env python
2 2 """A simple engine that talks to a controller over 0MQ.
3 3 it handles registration, etc. and launches a kernel
4 4 connected to the Controller's queue(s).
5 5 """
6 6 import sys
7 7 import time
8 8 import traceback
9 9 import uuid
10 from pprint import pprint
10 11
11 12 import zmq
12 13 from zmq.eventloop import ioloop, zmqstream
13 14
14 15 from streamsession import Message, StreamSession
15 16 from client import Client
16 17 import streamkernel as kernel
17 18 import heartmonitor
18 19 # import taskthread
19 20 # from log import logger
20 21
21 22
22 23 def printer(*msg):
23 print msg
24 pprint(msg)
24 25
25 26 class Engine(object):
26 27 """IPython engine"""
27 28
28 29 id=None
29 30 context=None
30 31 loop=None
31 32 session=None
32 queue_id=None
33 control_id=None
34 heart_id=None
33 ident=None
35 34 registrar=None
36 35 heart=None
37 36 kernel=None
38 37
39 def __init__(self, context, loop, session, registrar, client, queue_id=None, heart_id=None):
38 def __init__(self, context, loop, session, registrar, client, ident=None, heart_id=None):
40 39 self.context = context
41 40 self.loop = loop
42 41 self.session = session
43 42 self.registrar = registrar
44 43 self.client = client
45 self.queue_id = queue_id or str(uuid.uuid4())
46 self.heart_id = heart_id or self.queue_id
44 self.ident = ident if ident else str(uuid.uuid4())
47 45 self.registrar.on_send(printer)
48 46
49 47 def register(self):
50 48
51 content = dict(queue=self.queue_id, heartbeat=self.heart_id)
49 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
52 50 self.registrar.on_recv(self.complete_registration)
53 51 self.session.send(self.registrar, "registration_request",content=content)
54 52
55 53 def complete_registration(self, msg):
56 54 # print msg
57 55 idents,msg = self.session.feed_identities(msg)
58 56 msg = Message(self.session.unpack_message(msg))
59 57 if msg.content.status == 'ok':
60 58 self.session.username = str(msg.content.id)
61 59 queue_addr = msg.content.queue
62 60 if queue_addr:
63 61 queue = self.context.socket(zmq.PAIR)
64 queue.setsockopt(zmq.IDENTITY, self.queue_id)
62 queue.setsockopt(zmq.IDENTITY, self.ident)
65 63 queue.connect(str(queue_addr))
66 64 self.queue = zmqstream.ZMQStream(queue, self.loop)
67 65
68 66 control_addr = msg.content.control
69 67 if control_addr:
70 68 control = self.context.socket(zmq.PAIR)
71 control.setsockopt(zmq.IDENTITY, self.queue_id)
69 control.setsockopt(zmq.IDENTITY, self.ident)
72 70 control.connect(str(control_addr))
73 71 self.control = zmqstream.ZMQStream(control, self.loop)
74 72
75 73 task_addr = msg.content.task
76 74 print task_addr
77 75 if task_addr:
78 76 # task as stream:
79 77 task = self.context.socket(zmq.PAIR)
80 78 task.connect(str(task_addr))
81 79 self.task_stream = zmqstream.ZMQStream(task, self.loop)
82 80 # TaskThread:
83 81 # mon_addr = msg.content.monitor
84 # task = taskthread.TaskThread(zmq.PAIR, zmq.PUB, self.queue_id)
82 # task = taskthread.TaskThread(zmq.PAIR, zmq.PUB, self.ident)
85 83 # task.connect_in(str(task_addr))
86 84 # task.connect_out(str(mon_addr))
87 85 # self.task_stream = taskthread.QueueStream(*task.queues)
88 86 # task.start()
89 87
90 88 hbs = msg.content.heartbeat
91 self.heart = heartmonitor.Heart(*map(str, hbs), heart_id=self.heart_id)
89 self.heart = heartmonitor.Heart(*map(str, hbs), heart_id=self.ident)
92 90 self.heart.start()
93 91 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
94 92 # placeholder for now:
95 93 pub = self.context.socket(zmq.PUB)
96 94 pub = zmqstream.ZMQStream(pub, self.loop)
97 95 # create and start the kernel
98 96 self.kernel = kernel.Kernel(self.session, self.control, self.queue, pub, self.task_stream, self.client)
99 97 self.kernel.start()
100 98 else:
101 99 # logger.error("Registration Failed: %s"%msg)
102 100 raise Exception("Registration Failed: %s"%msg)
103 101
104 102 # logger.info("engine::completed registration with id %s"%self.session.username)
105 103
106 104 print msg
107 105
108 106 def unregister(self):
109 107 self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username)))
110 108 time.sleep(1)
111 109 sys.exit(0)
112 110
113 111 def start(self):
114 112 print "registering"
115 113 self.register()
116 114
117 115
118 116 if __name__ == '__main__':
119 117
120 118 loop = ioloop.IOLoop.instance()
121 119 session = StreamSession()
122 120 ctx = zmq.Context()
123 121
124 122 ip = '127.0.0.1'
125 123 reg_port = 10101
126 124 connection = ('tcp://%s' % ip) + ':%i'
127 125 reg_conn = connection % reg_port
128 126 print reg_conn
129 127 print >>sys.__stdout__, "Starting the engine..."
130 128
131 129 reg = ctx.socket(zmq.PAIR)
132 130 reg.connect(reg_conn)
133 131 reg = zmqstream.ZMQStream(reg, loop)
134 132 client = Client(reg_conn)
135 133 if len(sys.argv) > 1:
136 134 queue_id=sys.argv[1]
137 135 else:
138 136 queue_id = None
139 137
140 138 e = Engine(ctx, loop, session, reg, client, queue_id)
141 139 dc = ioloop.DelayedCallback(e.start, 500, loop)
142 140 dc.start()
143 141 loop.start() No newline at end of file
@@ -1,482 +1,510 b''
1 1 #!/usr/bin/env python
2 2 """
3 3 Kernel adapted from kernel.py to use ZMQ Streams
4 4 """
5 5
6 6 import __builtin__
7 import os
7 8 import sys
8 9 import time
9 10 import traceback
10 11 from signal import SIGTERM, SIGKILL
12 from pprint import pprint
11 13
12 14 from code import CommandCompiler
13 15
14 16 import zmq
15 17 from zmq.eventloop import ioloop, zmqstream
16 18
17 19 from streamsession import StreamSession, Message, extract_header, serialize_object,\
18 20 unpack_apply_message
19 21 from IPython.zmq.completer import KernelCompleter
20 22
23 def printer(*args):
24 pprint(args)
25
21 26 class OutStream(object):
22 27 """A file like object that publishes the stream to a 0MQ PUB socket."""
23 28
24 29 def __init__(self, session, pub_socket, name, max_buffer=200):
25 30 self.session = session
26 31 self.pub_socket = pub_socket
27 32 self.name = name
28 33 self._buffer = []
29 34 self._buffer_len = 0
30 35 self.max_buffer = max_buffer
31 36 self.parent_header = {}
32 37
33 38 def set_parent(self, parent):
34 39 self.parent_header = extract_header(parent)
35 40
36 41 def close(self):
37 42 self.pub_socket = None
38 43
39 44 def flush(self):
40 45 if self.pub_socket is None:
41 46 raise ValueError(u'I/O operation on closed file')
42 47 else:
43 48 if self._buffer:
44 49 data = ''.join(self._buffer)
45 50 content = {u'name':self.name, u'data':data}
46 51 # msg = self.session.msg(u'stream', content=content,
47 52 # parent=self.parent_header)
48 53 msg = self.session.send(self.pub_socket, u'stream', content=content, parent=self.parent_header)
49 54 # print>>sys.__stdout__, Message(msg)
50 55 # self.pub_socket.send_json(msg)
51 56 self._buffer_len = 0
52 57 self._buffer = []
53 58
54 59 def isattr(self):
55 60 return False
56 61
57 62 def next(self):
58 63 raise IOError('Read not supported on a write only stream.')
59 64
60 65 def read(self, size=None):
61 66 raise IOError('Read not supported on a write only stream.')
62 67
63 68 readline=read
64 69
65 70 def write(self, s):
66 71 if self.pub_socket is None:
67 72 raise ValueError('I/O operation on closed file')
68 73 else:
69 74 self._buffer.append(s)
70 75 self._buffer_len += len(s)
71 76 self._maybe_send()
72 77
73 78 def _maybe_send(self):
74 79 if '\n' in self._buffer[-1]:
75 80 self.flush()
76 81 if self._buffer_len > self.max_buffer:
77 82 self.flush()
78 83
79 84 def writelines(self, sequence):
80 85 if self.pub_socket is None:
81 86 raise ValueError('I/O operation on closed file')
82 87 else:
83 88 for s in sequence:
84 89 self.write(s)
85 90
86 91
87 92 class DisplayHook(object):
88 93
89 94 def __init__(self, session, pub_socket):
90 95 self.session = session
91 96 self.pub_socket = pub_socket
92 97 self.parent_header = {}
93 98
94 99 def __call__(self, obj):
95 100 if obj is None:
96 101 return
97 102
98 103 __builtin__._ = obj
99 104 # msg = self.session.msg(u'pyout', {u'data':repr(obj)},
100 105 # parent=self.parent_header)
101 106 # self.pub_socket.send_json(msg)
102 107 self.session.send(self.pub_socket, u'pyout', content={u'data':repr(obj)}, parent=self.parent_header)
103 108
104 109 def set_parent(self, parent):
105 110 self.parent_header = extract_header(parent)
106 111
107 112
108 113 class RawInput(object):
109 114
110 115 def __init__(self, session, socket):
111 116 self.session = session
112 117 self.socket = socket
113 118
114 119 def __call__(self, prompt=None):
115 120 msg = self.session.msg(u'raw_input')
116 121 self.socket.send_json(msg)
117 122 while True:
118 123 try:
119 124 reply = self.socket.recv_json(zmq.NOBLOCK)
120 125 except zmq.ZMQError, e:
121 126 if e.errno == zmq.EAGAIN:
122 127 pass
123 128 else:
124 129 raise
125 130 else:
126 131 break
127 132 return reply[u'content'][u'data']
128 133
129 134
130 135 class Kernel(object):
131 136
132 137 def __init__(self, session, control_stream, reply_stream, pub_stream,
133 138 task_stream=None, client=None):
134 139 self.session = session
135 140 self.control_stream = control_stream
141 self.control_socket = control_stream.socket
136 142 self.reply_stream = reply_stream
137 143 self.task_stream = task_stream
138 144 self.pub_stream = pub_stream
139 145 self.client = client
140 146 self.user_ns = {}
141 147 self.history = []
142 148 self.compiler = CommandCompiler()
143 149 self.completer = KernelCompleter(self.user_ns)
144 150 self.aborted = set()
145 151
146 152 # Build dict of handlers for message types
147 153 self.queue_handlers = {}
148 154 self.control_handlers = {}
149 155 for msg_type in ['execute_request', 'complete_request', 'apply_request']:
150 156 self.queue_handlers[msg_type] = getattr(self, msg_type)
151 157
152 158 for msg_type in ['kill_request', 'abort_request']:
153 159 self.control_handlers[msg_type] = getattr(self, msg_type)
154 160
155 161 #-------------------- control handlers -----------------------------
162 def abort_queues(self):
163 for stream in (self.task_stream, self.reply_stream):
164 if stream:
165 self.abort_queue(stream)
156 166
157 167 def abort_queue(self, stream):
158 168 while True:
159 169 try:
160 170 msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
161 171 except zmq.ZMQError, e:
162 172 if e.errno == zmq.EAGAIN:
163 173 break
164 174 else:
165 175 return
166 176 else:
167 177 if msg is None:
168 178 return
169 179 else:
170 180 idents,msg = msg
171 181
172 182 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
173 183 # msg = self.reply_socket.recv_json()
174 184 print>>sys.__stdout__, "Aborting:"
175 185 print>>sys.__stdout__, Message(msg)
176 186 msg_type = msg['msg_type']
177 187 reply_type = msg_type.split('_')[0] + '_reply'
178 188 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
179 189 # self.reply_socket.send(ident,zmq.SNDMORE)
180 190 # self.reply_socket.send_json(reply_msg)
181 191 reply_msg = self.session.send(stream, reply_type,
182 192 content={'status' : 'aborted'}, parent=msg, ident=idents)
183 193 print>>sys.__stdout__, Message(reply_msg)
184 194 # We need to wait a bit for requests to come in. This can probably
185 195 # be set shorter for true asynchronous clients.
186 196 time.sleep(0.05)
187 197
188 198 def abort_request(self, stream, ident, parent):
199 """abort a specifig msg by id"""
189 200 msg_ids = parent['content'].get('msg_ids', None)
201 if isinstance(msg_ids, basestring):
202 msg_ids = [msg_ids]
190 203 if not msg_ids:
191 self.abort_queue(self.task_stream)
192 self.abort_queue(self.reply_stream)
204 self.abort_queues()
193 205 for mid in msg_ids:
194 self.aborted.add(mid)
206 self.aborted.add(str(mid))
195 207
196 208 content = dict(status='ok')
197 self.session.send(stream, 'abort_reply', content=content, parent=parent,
209 reply_msg = self.session.send(stream, 'abort_reply', content=content, parent=parent,
198 210 ident=ident)
211 print>>sys.__stdout__, Message(reply_msg)
199 212
200 213 def kill_request(self, stream, idents, parent):
201 self.abort_queue(self.reply_stream)
202 if self.task_stream:
203 self.abort_queue(self.task_stream)
214 """kill ourselves. This should really be handled in an external process"""
215 self.abort_queues()
204 216 msg = self.session.send(stream, 'kill_reply', ident=idents, parent=parent,
205 217 content = dict(status='ok'))
206 218 # we can know that a message is done if we *don't* use streams, but
207 219 # use a socket directly with MessageTracker
208 time.sleep(1)
220 time.sleep(.5)
209 221 os.kill(os.getpid(), SIGTERM)
210 time.sleep(.25)
222 time.sleep(1)
211 223 os.kill(os.getpid(), SIGKILL)
212 224
213 225 def dispatch_control(self, msg):
214 226 idents,msg = self.session.feed_identities(msg, copy=False)
215 227 msg = self.session.unpack_message(msg, content=True, copy=False)
216 228
217 229 header = msg['header']
218 230 msg_id = header['msg_id']
219 231
220 232 handler = self.control_handlers.get(msg['msg_type'], None)
221 233 if handler is None:
222 234 print >> sys.__stderr__, "UNKNOWN CONTROL MESSAGE TYPE:", msg
223 235 else:
224 handler(stream, idents, msg)
236 handler(self.control_stream, idents, msg)
225 237
226 238 def flush_control(self):
227 239 while any(zmq.select([self.control_socket],[],[],1e-4)):
228 240 try:
229 241 msg = self.control_socket.recv_multipart(zmq.NOBLOCK, copy=False)
230 242 except zmq.ZMQError, e:
231 243 if e.errno != zmq.EAGAIN:
232 244 raise e
233 245 return
234 246 else:
235 247 self.dispatch_control(msg)
236 248
237 249
238 250 #-------------------- queue helpers ------------------------------
239 251
240 252 def check_dependencies(self, dependencies):
241 253 if not dependencies:
242 254 return True
243 255 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
244 256 anyorall = dependencies[0]
245 257 dependencies = dependencies[1]
246 258 else:
247 259 anyorall = 'all'
248 260 results = self.client.get_results(dependencies,status_only=True)
249 261 if results['status'] != 'ok':
250 262 return False
251 263
252 264 if anyorall == 'any':
253 265 if not results['completed']:
254 266 return False
255 267 else:
256 268 if results['pending']:
257 269 return False
258 270
259 271 return True
260 272
273 def check_aborted(self, msg_id):
274 return msg_id in self.aborted
275
276 def unmet_dependencies(self, stream, idents, msg):
277 reply_type = msg['msg_type'].split('_')[0] + '_reply'
278 content = dict(status='resubmitted', reason='unmet dependencies')
279 reply_msg = self.session.send(stream, reply_type,
280 content=content, parent=msg, ident=idents)
281 ### TODO: actually resubmit it ###
282
261 283 #-------------------- queue handlers -----------------------------
262 284
263 285 def execute_request(self, stream, ident, parent):
264 286 try:
265 287 code = parent[u'content'][u'code']
266 288 except:
267 289 print>>sys.__stderr__, "Got bad msg: "
268 290 print>>sys.__stderr__, Message(parent)
269 291 return
270 292 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
271 293 # self.pub_stream.send(pyin_msg)
272 294 self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent)
273 295 try:
274 296 comp_code = self.compiler(code, '<zmq-kernel>')
275 297 # allow for not overriding displayhook
276 298 if hasattr(sys.displayhook, 'set_parent'):
277 299 sys.displayhook.set_parent(parent)
278 300 exec comp_code in self.user_ns, self.user_ns
279 301 except:
280 302 # result = u'error'
281 303 etype, evalue, tb = sys.exc_info()
282 304 tb = traceback.format_exception(etype, evalue, tb)
283 305 exc_content = {
284 306 u'status' : u'error',
285 307 u'traceback' : tb,
286 308 u'etype' : unicode(etype),
287 309 u'evalue' : unicode(evalue)
288 310 }
289 311 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
290 312 self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent)
291 313 reply_content = exc_content
292 314 else:
293 315 reply_content = {'status' : 'ok'}
294 316 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
295 317 # self.reply_socket.send(ident, zmq.SNDMORE)
296 318 # self.reply_socket.send_json(reply_msg)
297 319 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, ident=ident)
298 320 # print>>sys.__stdout__, Message(reply_msg)
299 321 if reply_msg['content']['status'] == u'error':
300 self.abort_queue()
322 self.abort_queues()
301 323
302 324 def complete_request(self, stream, ident, parent):
303 325 matches = {'matches' : self.complete(parent),
304 326 'status' : 'ok'}
305 327 completion_msg = self.session.send(stream, 'complete_reply',
306 328 matches, parent, ident)
307 329 # print >> sys.__stdout__, completion_msg
308 330
309 331 def complete(self, msg):
310 332 return self.completer.complete(msg.content.line, msg.content.text)
311 333
312 334 def apply_request(self, stream, ident, parent):
313 335 try:
314 336 content = parent[u'content']
315 337 bufs = parent[u'buffers']
316 338 msg_id = parent['header']['msg_id']
317 339 bound = content.get('bound', False)
318 340 except:
319 341 print>>sys.__stderr__, "Got bad msg: "
320 342 print>>sys.__stderr__, Message(parent)
321 343 return
322 344 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
323 345 # self.pub_stream.send(pyin_msg)
324 346 # self.session.send(self.pub_stream, u'pyin', {u'code':code},parent=parent)
325 347 try:
326 348 # allow for not overriding displayhook
327 349 if hasattr(sys.displayhook, 'set_parent'):
328 350 sys.displayhook.set_parent(parent)
329 351 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
330 352 if bound:
331 353 working = self.user_ns
332 354 suffix = str(msg_id).replace("-","")
333 355 prefix = "_"
334 356
335 357 else:
336 358 working = dict()
337 suffix = prefix = ""
359 suffix = prefix = "_" # prevent keyword collisions with lambda
338 360 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
339 361 # if f.fun
340 362 fname = prefix+f.func_name.strip('<>')+suffix
341 363 argname = prefix+"args"+suffix
342 364 kwargname = prefix+"kwargs"+suffix
343 365 resultname = prefix+"result"+suffix
344 366
345 367 ns = { fname : f, argname : args, kwargname : kwargs }
346 368 # print ns
347 369 working.update(ns)
348 370 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
349 371 exec code in working, working
350 372 result = working.get(resultname)
351 373 # clear the namespace
352 374 if bound:
353 375 for key in ns.iterkeys():
354 376 self.user_ns.pop(key)
355 377 else:
356 378 del working
357 379
358 380 packed_result,buf = serialize_object(result)
359 381 result_buf = [packed_result]+buf
360 382 except:
361 383 result = u'error'
362 384 etype, evalue, tb = sys.exc_info()
363 385 tb = traceback.format_exception(etype, evalue, tb)
364 386 exc_content = {
365 387 u'status' : u'error',
366 388 u'traceback' : tb,
367 389 u'etype' : unicode(etype),
368 390 u'evalue' : unicode(evalue)
369 391 }
370 392 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
371 393 self.session.send(self.pub_stream, u'pyerr', exc_content, parent=parent)
372 394 reply_content = exc_content
373 395 result_buf = []
374 396 else:
375 397 reply_content = {'status' : 'ok'}
376 398 # reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
377 399 # self.reply_socket.send(ident, zmq.SNDMORE)
378 400 # self.reply_socket.send_json(reply_msg)
379 401 reply_msg = self.session.send(stream, u'apply_reply', reply_content, parent=parent, ident=ident,buffers=result_buf)
380 402 # print>>sys.__stdout__, Message(reply_msg)
381 403 if reply_msg['content']['status'] == u'error':
382 self.abort_queue()
404 self.abort_queues()
383 405
384 406 def dispatch_queue(self, stream, msg):
385 407 self.flush_control()
386 408 idents,msg = self.session.feed_identities(msg, copy=False)
387 409 msg = self.session.unpack_message(msg, content=True, copy=False)
388 410
389 411 header = msg['header']
390 412 msg_id = header['msg_id']
391 413 dependencies = header.get('dependencies', [])
392
393 414 if self.check_aborted(msg_id):
394 return self.abort_reply(stream, msg)
415 self.aborted.remove(msg_id)
416 # is it safe to assume a msg_id will not be resubmitted?
417 reply_type = msg['msg_type'].split('_')[0] + '_reply'
418 reply_msg = self.session.send(stream, reply_type,
419 content={'status' : 'aborted'}, parent=msg, ident=idents)
420 return
395 421 if not self.check_dependencies(dependencies):
396 return self.unmet_dependencies(stream, msg)
397
422 return self.unmet_dependencies(stream, idents, msg)
398 423 handler = self.queue_handlers.get(msg['msg_type'], None)
399 424 if handler is None:
400 425 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", msg
401 426 else:
402 427 handler(stream, idents, msg)
403 428
404 429 def start(self):
405 430 #### stream mode:
406 431 if self.control_stream:
407 432 self.control_stream.on_recv(self.dispatch_control, copy=False)
433 self.control_stream.on_err(printer)
408 434 if self.reply_stream:
409 435 self.reply_stream.on_recv(lambda msg:
410 436 self.dispatch_queue(self.reply_stream, msg), copy=False)
437 self.reply_stream.on_err(printer)
411 438 if self.task_stream:
412 439 self.task_stream.on_recv(lambda msg:
413 440 self.dispatch_queue(self.task_stream, msg), copy=False)
441 self.task_stream.on_err(printer)
414 442
415 443 #### while True mode:
416 444 # while True:
417 445 # idle = True
418 446 # try:
419 447 # msg = self.reply_stream.socket.recv_multipart(
420 448 # zmq.NOBLOCK, copy=False)
421 449 # except zmq.ZMQError, e:
422 450 # if e.errno != zmq.EAGAIN:
423 451 # raise e
424 452 # else:
425 453 # idle=False
426 454 # self.dispatch_queue(self.reply_stream, msg)
427 455 #
428 456 # if not self.task_stream.empty():
429 457 # idle=False
430 458 # msg = self.task_stream.recv_multipart()
431 459 # self.dispatch_queue(self.task_stream, msg)
432 460 # if idle:
433 461 # # don't busywait
434 462 # time.sleep(1e-3)
435 463
436 464
437 465 def main():
438 466 raise Exception("Don't run me anymore")
439 467 loop = ioloop.IOLoop.instance()
440 468 c = zmq.Context()
441 469
442 470 ip = '127.0.0.1'
443 471 port_base = 5575
444 472 connection = ('tcp://%s' % ip) + ':%i'
445 473 rep_conn = connection % port_base
446 474 pub_conn = connection % (port_base+1)
447 475
448 476 print >>sys.__stdout__, "Starting the kernel..."
449 477 # print >>sys.__stdout__, "XREQ Channel:", rep_conn
450 478 # print >>sys.__stdout__, "PUB Channel:", pub_conn
451 479
452 480 session = StreamSession(username=u'kernel')
453 481
454 482 reply_socket = c.socket(zmq.XREQ)
455 483 reply_socket.connect(rep_conn)
456 484
457 485 pub_socket = c.socket(zmq.PUB)
458 486 pub_socket.connect(pub_conn)
459 487
460 488 stdout = OutStream(session, pub_socket, u'stdout')
461 489 stderr = OutStream(session, pub_socket, u'stderr')
462 490 sys.stdout = stdout
463 491 sys.stderr = stderr
464 492
465 493 display_hook = DisplayHook(session, pub_socket)
466 494 sys.displayhook = display_hook
467 495 reply_stream = zmqstream.ZMQStream(reply_socket,loop)
468 496 pub_stream = zmqstream.ZMQStream(pub_socket,loop)
469 497 kernel = Kernel(session, reply_stream, pub_stream)
470 498
471 499 # For debugging convenience, put sleep and a string in the namespace, so we
472 500 # have them every time we start.
473 501 kernel.user_ns['sleep'] = time.sleep
474 502 kernel.user_ns['s'] = 'Test string'
475 503
476 504 print >>sys.__stdout__, "Use Ctrl-\\ (NOT Ctrl-C!) to terminate."
477 505 kernel.start()
478 506 loop.start()
479 507
480 508
481 509 if __name__ == '__main__':
482 510 main()
@@ -1,443 +1,447 b''
1 1 #!/usr/bin/env python
2 2 """edited session.py to work with streams, and move msg_type to the header
3 3 """
4 4
5 5
6 6 import os
7 7 import sys
8 8 import traceback
9 9 import pprint
10 10 import uuid
11 11
12 12 import zmq
13 13 from zmq.utils import jsonapi
14 14 from zmq.eventloop.zmqstream import ZMQStream
15 15
16 16 from IPython.zmq.pickleutil import can, uncan, canSequence, uncanSequence
17 17 from IPython.zmq.newserialized import serialize, unserialize
18 18
19 19 try:
20 20 import cPickle
21 21 pickle = cPickle
22 22 except:
23 23 cPickle = None
24 24 import pickle
25 25
26 26 # packer priority: jsonlib[2], cPickle, simplejson/json, pickle
27 27 json_name = '' if not jsonapi.jsonmod else jsonapi.jsonmod.__name__
28 28 if json_name in ('jsonlib', 'jsonlib2'):
29 29 use_json = True
30 30 elif json_name:
31 31 if cPickle is None:
32 32 use_json = True
33 33 else:
34 34 use_json = False
35 35 else:
36 36 use_json = False
37 37
38 38 if use_json:
39 39 default_packer = jsonapi.dumps
40 40 default_unpacker = jsonapi.loads
41 41 else:
42 42 default_packer = lambda o: pickle.dumps(o,-1)
43 43 default_unpacker = pickle.loads
44 44
45 45
46 46 DELIM="<IDS|MSG>"
47 47
48 48 def wrap_exception():
49 49 etype, evalue, tb = sys.exc_info()
50 50 tb = traceback.format_exception(etype, evalue, tb)
51 51 exc_content = {
52 52 u'status' : u'error',
53 53 u'traceback' : tb,
54 54 u'etype' : unicode(etype),
55 55 u'evalue' : unicode(evalue)
56 56 }
57 57 return exc_content
58 58
59 59 class KernelError(Exception):
60 60 pass
61 61
62 62 def unwrap_exception(content):
63 63 err = KernelError(content['etype'], content['evalue'])
64 64 err.evalue = content['evalue']
65 65 err.etype = content['etype']
66 66 err.traceback = ''.join(content['traceback'])
67 67 return err
68 68
69 69
70 70 class Message(object):
71 71 """A simple message object that maps dict keys to attributes.
72 72
73 73 A Message can be created from a dict and a dict from a Message instance
74 74 simply by calling dict(msg_obj)."""
75 75
76 76 def __init__(self, msg_dict):
77 77 dct = self.__dict__
78 78 for k, v in dict(msg_dict).iteritems():
79 79 if isinstance(v, dict):
80 80 v = Message(v)
81 81 dct[k] = v
82 82
83 83 # Having this iterator lets dict(msg_obj) work out of the box.
84 84 def __iter__(self):
85 85 return iter(self.__dict__.iteritems())
86 86
87 87 def __repr__(self):
88 88 return repr(self.__dict__)
89 89
90 90 def __str__(self):
91 91 return pprint.pformat(self.__dict__)
92 92
93 93 def __contains__(self, k):
94 94 return k in self.__dict__
95 95
96 96 def __getitem__(self, k):
97 97 return self.__dict__[k]
98 98
99 99
100 100 def msg_header(msg_id, msg_type, username, session):
101 101 return locals()
102 102 # return {
103 103 # 'msg_id' : msg_id,
104 104 # 'msg_type': msg_type,
105 105 # 'username' : username,
106 106 # 'session' : session
107 107 # }
108 108
109 109
110 110 def extract_header(msg_or_header):
111 111 """Given a message or header, return the header."""
112 112 if not msg_or_header:
113 113 return {}
114 114 try:
115 115 # See if msg_or_header is the entire message.
116 116 h = msg_or_header['header']
117 117 except KeyError:
118 118 try:
119 119 # See if msg_or_header is just the header
120 120 h = msg_or_header['msg_id']
121 121 except KeyError:
122 122 raise
123 123 else:
124 124 h = msg_or_header
125 125 if not isinstance(h, dict):
126 126 h = dict(h)
127 127 return h
128 128
129 129 def rekey(dikt):
130 130 """rekey a dict that has been forced to use str keys where there should be
131 131 ints by json. This belongs in the jsonutil added by fperez."""
132 132 for k in dikt.iterkeys():
133 133 if isinstance(k, str):
134 134 ik=fk=None
135 135 try:
136 136 ik = int(k)
137 137 except ValueError:
138 138 try:
139 139 fk = float(k)
140 140 except ValueError:
141 141 continue
142 142 if ik is not None:
143 143 nk = ik
144 144 else:
145 145 nk = fk
146 146 if nk in dikt:
147 147 raise KeyError("already have key %r"%nk)
148 148 dikt[nk] = dikt.pop(k)
149 149 return dikt
150 150
151 151 def serialize_object(obj, threshold=64e-6):
152 152 """serialize an object into a list of sendable buffers.
153 153
154 154 Returns: (pmd, bufs)
155 155 where pmd is the pickled metadata wrapper, and bufs
156 156 is a list of data buffers"""
157 157 # threshold is 100 B
158 158 databuffers = []
159 159 if isinstance(obj, (list, tuple)):
160 160 clist = canSequence(obj)
161 161 slist = map(serialize, clist)
162 162 for s in slist:
163 163 if s.getDataSize() > threshold:
164 164 databuffers.append(s.getData())
165 165 s.data = None
166 166 return pickle.dumps(slist,-1), databuffers
167 167 elif isinstance(obj, dict):
168 168 sobj = {}
169 169 for k in sorted(obj.iterkeys()):
170 170 s = serialize(can(obj[k]))
171 171 if s.getDataSize() > threshold:
172 172 databuffers.append(s.getData())
173 173 s.data = None
174 174 sobj[k] = s
175 175 return pickle.dumps(sobj,-1),databuffers
176 176 else:
177 177 s = serialize(can(obj))
178 178 if s.getDataSize() > threshold:
179 179 databuffers.append(s.getData())
180 180 s.data = None
181 181 return pickle.dumps(s,-1),databuffers
182 182
183 183
184 184 def unserialize_object(bufs):
185 185 """reconstruct an object serialized by serialize_object from data buffers"""
186 186 bufs = list(bufs)
187 187 sobj = pickle.loads(bufs.pop(0))
188 188 if isinstance(sobj, (list, tuple)):
189 189 for s in sobj:
190 190 if s.data is None:
191 191 s.data = bufs.pop(0)
192 192 return uncanSequence(map(unserialize, sobj))
193 193 elif isinstance(sobj, dict):
194 194 newobj = {}
195 195 for k in sorted(sobj.iterkeys()):
196 196 s = sobj[k]
197 197 if s.data is None:
198 198 s.data = bufs.pop(0)
199 199 newobj[k] = uncan(unserialize(s))
200 200 return newobj
201 201 else:
202 202 if sobj.data is None:
203 203 sobj.data = bufs.pop(0)
204 204 return uncan(unserialize(sobj))
205 205
206 206 def pack_apply_message(f, args, kwargs, threshold=64e-6):
207 207 """pack up a function, args, and kwargs to be sent over the wire
208 208 as a series of buffers. Any object whose data is larger than `threshold`
209 209 will not have their data copied (currently only numpy arrays support zero-copy)"""
210 210 msg = [pickle.dumps(can(f),-1)]
211 211 databuffers = [] # for large objects
212 212 sargs, bufs = serialize_object(args,threshold)
213 213 msg.append(sargs)
214 214 databuffers.extend(bufs)
215 215 skwargs, bufs = serialize_object(kwargs,threshold)
216 216 msg.append(skwargs)
217 217 databuffers.extend(bufs)
218 218 msg.extend(databuffers)
219 219 return msg
220 220
221 221 def unpack_apply_message(bufs, g=None, copy=True):
222 222 """unpack f,args,kwargs from buffers packed by pack_apply_message()
223 223 Returns: original f,args,kwargs"""
224 224 bufs = list(bufs) # allow us to pop
225 225 assert len(bufs) >= 3, "not enough buffers!"
226 226 if not copy:
227 227 for i in range(3):
228 228 bufs[i] = bufs[i].bytes
229 229 cf = pickle.loads(bufs.pop(0))
230 230 sargs = list(pickle.loads(bufs.pop(0)))
231 231 skwargs = dict(pickle.loads(bufs.pop(0)))
232 232 # print sargs, skwargs
233 233 f = cf.getFunction(g)
234 234 for sa in sargs:
235 235 if sa.data is None:
236 236 m = bufs.pop(0)
237 237 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
238 238 if copy:
239 239 sa.data = buffer(m)
240 240 else:
241 241 sa.data = m.buffer
242 242 else:
243 243 if copy:
244 244 sa.data = m
245 245 else:
246 246 sa.data = m.bytes
247 247
248 248 args = uncanSequence(map(unserialize, sargs), g)
249 249 kwargs = {}
250 250 for k in sorted(skwargs.iterkeys()):
251 251 sa = skwargs[k]
252 252 if sa.data is None:
253 253 sa.data = bufs.pop(0)
254 254 kwargs[k] = uncan(unserialize(sa), g)
255 255
256 256 return f,args,kwargs
257 257
258 258 class StreamSession(object):
259 259 """tweaked version of IPython.zmq.session.Session, for development in Parallel"""
260
260 debug=False
261 261 def __init__(self, username=None, session=None, packer=None, unpacker=None):
262 262 if username is None:
263 263 username = os.environ.get('USER','username')
264 264 self.username = username
265 265 if session is None:
266 266 self.session = str(uuid.uuid4())
267 267 else:
268 268 self.session = session
269 269 self.msg_id = str(uuid.uuid4())
270 270 if packer is None:
271 271 self.pack = default_packer
272 272 else:
273 273 if not callable(packer):
274 274 raise TypeError("packer must be callable, not %s"%type(packer))
275 275 self.pack = packer
276 276
277 277 if unpacker is None:
278 278 self.unpack = default_unpacker
279 279 else:
280 280 if not callable(unpacker):
281 281 raise TypeError("unpacker must be callable, not %s"%type(unpacker))
282 282 self.unpack = unpacker
283 283
284 284 self.none = self.pack({})
285 285
286 286 def msg_header(self, msg_type):
287 287 h = msg_header(self.msg_id, msg_type, self.username, self.session)
288 288 self.msg_id = str(uuid.uuid4())
289 289 return h
290 290
291 291 def msg(self, msg_type, content=None, parent=None, subheader=None):
292 292 msg = {}
293 293 msg['header'] = self.msg_header(msg_type)
294 294 msg['msg_id'] = msg['header']['msg_id']
295 295 msg['parent_header'] = {} if parent is None else extract_header(parent)
296 296 msg['msg_type'] = msg_type
297 297 msg['content'] = {} if content is None else content
298 298 sub = {} if subheader is None else subheader
299 299 msg['header'].update(sub)
300 300 return msg
301 301
302 302 def send(self, stream, msg_type, content=None, buffers=None, parent=None, subheader=None, ident=None):
303 303 """send a message via stream"""
304 304 msg = self.msg(msg_type, content, parent, subheader)
305 305 buffers = [] if buffers is None else buffers
306 306 to_send = []
307 307 if isinstance(ident, list):
308 308 # accept list of idents
309 309 to_send.extend(ident)
310 310 elif ident is not None:
311 311 to_send.append(ident)
312 312 to_send.append(DELIM)
313 313 to_send.append(self.pack(msg['header']))
314 314 to_send.append(self.pack(msg['parent_header']))
315 315 # if parent is None:
316 316 # to_send.append(self.none)
317 317 # else:
318 318 # to_send.append(self.pack(dict(parent)))
319 319 if content is None:
320 320 content = self.none
321 321 elif isinstance(content, dict):
322 322 content = self.pack(content)
323 323 elif isinstance(content, str):
324 324 # content is already packed, as in a relayed message
325 325 pass
326 326 else:
327 327 raise TypeError("Content incorrect type: %s"%type(content))
328 328 to_send.append(content)
329 329 flag = 0
330 330 if buffers:
331 331 flag = zmq.SNDMORE
332 332 stream.send_multipart(to_send, flag, copy=False)
333 333 for b in buffers[:-1]:
334 334 stream.send(b, flag, copy=False)
335 335 if buffers:
336 336 stream.send(buffers[-1], copy=False)
337 337 omsg = Message(msg)
338 if self.debug:
339 pprint.pprint(omsg)
340 pprint.pprint(to_send)
341 pprint.pprint(buffers)
338 342 return omsg
339 343
340 344 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
341 345 """receives and unpacks a message
342 346 returns [idents], msg"""
343 347 if isinstance(socket, ZMQStream):
344 348 socket = socket.socket
345 349 try:
346 350 msg = socket.recv_multipart(mode)
347 351 except zmq.ZMQError, e:
348 352 if e.errno == zmq.EAGAIN:
349 353 # We can convert EAGAIN to None as we know in this case
350 354 # recv_json won't return None.
351 355 return None
352 356 else:
353 357 raise
354 358 # return an actual Message object
355 359 # determine the number of idents by trying to unpack them.
356 360 # this is terrible:
357 361 idents, msg = self.feed_identities(msg, copy)
358 362 try:
359 363 return idents, self.unpack_message(msg, content=content, copy=copy)
360 364 except Exception, e:
361 365 print idents, msg
362 366 # TODO: handle it
363 367 raise e
364 368
365 369 def feed_identities(self, msg, copy=True):
366 370 """This is a completely horrible thing, but it strips the zmq
367 371 ident prefixes off of a message. It will break if any identities
368 372 are unpackable by self.unpack."""
369 373 msg = list(msg)
370 374 idents = []
371 375 while len(msg) > 3:
372 376 if copy:
373 377 s = msg[0]
374 378 else:
375 379 s = msg[0].bytes
376 380 if s == DELIM:
377 381 msg.pop(0)
378 382 break
379 383 else:
380 384 idents.append(s)
381 385 msg.pop(0)
382 386
383 387 return idents, msg
384 388
385 389 def unpack_message(self, msg, content=True, copy=True):
386 390 """return a message object from the format
387 391 sent by self.send.
388 392
389 393 parameters:
390 394
391 395 content : bool (True)
392 396 whether to unpack the content dict (True),
393 397 or leave it serialized (False)
394 398
395 399 copy : bool (True)
396 400 whether to return the bytes (True),
397 401 or the non-copying Message object in each place (False)
398 402
399 403 """
400 404 if not len(msg) >= 3:
401 405 raise TypeError("malformed message, must have at least 3 elements")
402 406 message = {}
403 407 if not copy:
404 408 for i in range(3):
405 409 msg[i] = msg[i].bytes
406 410 message['header'] = self.unpack(msg[0])
407 411 message['msg_type'] = message['header']['msg_type']
408 412 message['parent_header'] = self.unpack(msg[1])
409 413 if content:
410 414 message['content'] = self.unpack(msg[2])
411 415 else:
412 416 message['content'] = msg[2]
413 417
414 418 # message['buffers'] = msg[3:]
415 419 # else:
416 420 # message['header'] = self.unpack(msg[0].bytes)
417 421 # message['msg_type'] = message['header']['msg_type']
418 422 # message['parent_header'] = self.unpack(msg[1].bytes)
419 423 # if content:
420 424 # message['content'] = self.unpack(msg[2].bytes)
421 425 # else:
422 426 # message['content'] = msg[2].bytes
423 427
424 428 message['buffers'] = msg[3:]# [ m.buffer for m in msg[3:] ]
425 429 return message
426 430
427 431
428 432
429 433 def test_msg2obj():
430 434 am = dict(x=1)
431 435 ao = Message(am)
432 436 assert ao.x == am['x']
433 437
434 438 am['y'] = dict(z=1)
435 439 ao = Message(am)
436 440 assert ao.y.z == am['y']['z']
437 441
438 442 k1, k2 = 'y', 'z'
439 443 assert ao[k1][k2] == am[k1][k2]
440 444
441 445 am2 = dict(ao)
442 446 assert am['x'] == am2['x']
443 447 assert am['y']['z'] == am2['y']['z']
@@ -1,141 +1,157 b''
1 1 #!/usr/bin/env python
2 2 """Views"""
3 3
4 4 from IPython.external.decorator import decorator
5 5
6 6
7 7 @decorator
8 8 def myblock(f, self, *args, **kwargs):
9 9 block = self.client.block
10 10 self.client.block = self.block
11 11 ret = f(self, *args, **kwargs)
12 12 self.client.block = block
13 13 return ret
14 14
15 15 class View(object):
16 16 """Base View class"""
17 17 _targets = None
18 18 block=None
19 19
20 20 def __init__(self, client, targets):
21 21 self.client = client
22 22 self._targets = targets
23 23 self.block = client.block
24 24
25 25 def __repr__(self):
26 26 strtargets = str(self._targets)
27 27 if len(strtargets) > 16:
28 28 strtargets = strtargets[:12]+'...]'
29 29 return "<%s %s>"%(self.__class__.__name__, strtargets)
30 30
31 31 @property
32 32 def results(self):
33 33 return self.client.results
34 34
35 35 @property
36 36 def targets(self):
37 37 return self._targets
38 38
39 39 @targets.setter
40 40 def targets(self, value):
41 41 raise TypeError("Cannot set my targets argument after construction!")
42 42
43 43 def apply(self, f, *args, **kwargs):
44 44 """calls f(*args, **kwargs) on remote engines, returning the result.
45 45
46 46 This method does not involve the engine's namespace.
47 47
48 48 if self.block is False:
49 49 returns msg_id
50 50 else:
51 51 returns actual result of f(*args, **kwargs)
52 52 """
53 53 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=False)
54 54
55 55 def apply_async(self, f, *args, **kwargs):
56 56 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
57 57
58 58 This method does not involve the engine's namespace.
59 59
60 60 returns msg_id
61 61 """
62 62 return self.client.apply(f,args,kwargs, block=False, targets=self.targets, bound=False)
63 63
64 64 def apply_sync(self, f, *args, **kwargs):
65 65 """calls f(*args, **kwargs) on remote engines in a blocking manner,
66 66 returning the result.
67 67
68 68 This method does not involve the engine's namespace.
69 69
70 70 returns: actual result of f(*args, **kwargs)
71 71 """
72 72 return self.client.apply(f,args,kwargs, block=True, targets=self.targets, bound=False)
73 73
74 74 def apply_bound(self, f, *args, **kwargs):
75 75 """calls f(*args, **kwargs) bound to engine namespace(s).
76 76
77 77 if self.block is False:
78 78 returns msg_id
79 79 else:
80 80 returns actual result of f(*args, **kwargs)
81 81
82 82 This method has access to the targets' globals
83 83
84 84 """
85 85 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=True)
86 86
87 87 def apply_async_bound(self, f, *args, **kwargs):
88 88 """calls f(*args, **kwargs) bound to engine namespace(s)
89 89 in a nonblocking manner.
90 90
91 91 returns: msg_id
92 92
93 93 This method has access to the targets' globals
94 94
95 95 """
96 96 return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True)
97 97
98 98 def apply_sync_bound(self, f, *args, **kwargs):
99 99 """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
100 100
101 101 returns: actual result of f(*args, **kwargs)
102 102
103 103 This method has access to the targets' globals
104 104
105 105 """
106 return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True)
106 return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True)
107 107
108 108
109 109 class DirectView(View):
110 110 """Direct Multiplexer View"""
111 111
112 112 def update(self, ns):
113 113 """update remote namespace with dict `ns`"""
114 114 return self.client.push(ns, targets=self.targets, block=self.block)
115 115
116 116 def get(self, key_s):
117 117 """get object(s) by `key_s` from remote namespace
118 118 will return one object if it is a key.
119 119 It also takes a list of keys, and will return a list of objects."""
120 120 # block = block if block is not None else self.block
121 121 return self.client.pull(key_s, block=self.block, targets=self.targets)
122 122
123 123 push = update
124 124 pull = get
125 125
126 126 def __getitem__(self, key):
127 127 return self.get(key)
128 128
129 129 def __setitem__(self,key,value):
130 130 self.update({key:value})
131 131
132 def clear(self):
133 """clear the remote namespace"""
134 return self.client.clear(targets=self.targets,block=self.block)
132 def clear(self, block=False):
133 """Clear the remote namespaces on my engines."""
134 block = block if block is not None else self.block
135 return self.client.clear(targets=self.targets,block=block)
136
137 def kill(self, block=True):
138 """Kill my engines."""
139 block = block if block is not None else self.block
140 return self.client.kill(targets=self.targets,block=block)
135 141
136 def abort(self):
137 return self.client.abort(targets=self.targets,block=self.block)
142 def abort(self, msg_ids=None, block=None):
143 """Abort jobs on my engines.
144
145 Parameters
146 ----------
147
148 msg_ids : None, str, list of strs, optional
149 if None: abort all jobs.
150 else: abort specific msg_id(s).
151 """
152 block = block if block is not None else self.block
153 return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block)
138 154
139 155 class LoadBalancedView(View):
140 156 _targets=None
141 157 No newline at end of file
@@ -1,139 +1,139 b''
1 1 #!/usr/bin/env python
2 2 """A script to launch a controller with all its queues and connect it to a logger"""
3 3
4 4 import time
5 5 import logging
6 6
7 7 import zmq
8 8 from zmq.devices import ProcessMonitoredQueue, ThreadMonitoredQueue
9 9 from zmq.eventloop import ioloop
10 10 from zmq.eventloop.zmqstream import ZMQStream
11 11 from zmq.log import handlers
12 12
13 13 from IPython.zmq import log
14 14 from IPython.zmq.parallel import controller, heartmonitor, streamsession as session
15 15
16 16
17 17
18 18
19 19 def setup():
20 20 """setup a basic controller and open client,registrar, and logging ports. Start the Queue and the heartbeat"""
21 21 ctx = zmq.Context(1)
22 22 loop = ioloop.IOLoop.instance()
23 23
24 24 # port config
25 25 # config={}
26 26 execfile('config.py', globals())
27 27 iface = config['interface']
28 28 logport = config['logport']
29 29 rport = config['regport']
30 30 cport = config['clientport']
31 31 cqport = config['cqueueport']
32 32 eqport = config['equeueport']
33 33 ctport = config['ctaskport']
34 34 etport = config['etaskport']
35 35 ccport = config['ccontrolport']
36 36 ecport = config['econtrolport']
37 37 hport = config['heartport']
38 38 nport = config['notifierport']
39 39
40 40 # setup logging
41 41 lsock = ctx.socket(zmq.PUB)
42 42 lsock.connect('%s:%i'%(iface,logport))
43 43 # connected=False
44 44 # while not connected:
45 45 # try:
46 46 # except:
47 47 # logport = logport + 1
48 48 # else:
49 49 # connected=True
50 50 #
51 51 handler = handlers.PUBHandler(lsock)
52 52 handler.setLevel(logging.DEBUG)
53 53 handler.root_topic = "controller"
54 54 log.logger.addHandler(handler)
55 55 time.sleep(.5)
56 56
57 57 ### Engine connections ###
58 58
59 59 # Engine registrar socket
60 60 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
61 61 reg.bind("%s:%i"%(iface, rport))
62 62
63 63 # heartbeat
64 64 hpub = ctx.socket(zmq.PUB)
65 65 hpub.bind("%s:%i"%(iface, hport))
66 66 hrep = ctx.socket(zmq.XREP)
67 67 hrep.bind("%s:%i"%(iface, hport+1))
68 68
69 69 hb = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),2500)
70 70 hb.start()
71 71
72 72 ### Client connections ###
73 73 # Clientele socket
74 74 c = ZMQStream(ctx.socket(zmq.XREP), loop)
75 75 c.bind("%s:%i"%(iface, cport))
76 76
77 77 n = ZMQStream(ctx.socket(zmq.PUB), loop)
78 78 n.bind("%s:%i"%(iface, nport))
79 79
80 80 thesession = session.StreamSession(username="controller")
81 81
82 82
83 83
84 84 # build and launch the queue
85 85 sub = ctx.socket(zmq.SUB)
86 86 sub.setsockopt(zmq.SUBSCRIBE, "")
87 87 monport = sub.bind_to_random_port(iface)
88 88 sub = ZMQStream(sub, loop)
89 89
90 90 # Multiplexer Queue (in a Process)
91 91 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
92 92 q.bind_in("%s:%i"%(iface, cqport))
93 93 q.bind_out("%s:%i"%(iface, eqport))
94 94 q.connect_mon("%s:%i"%(iface, monport))
95 95 q.daemon=True
96 96 q.start()
97 97
98 98 # Control Queue (in a Process)
99 99 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
100 100 q.bind_in("%s:%i"%(iface, ccport))
101 101 q.bind_out("%s:%i"%(iface, ecport))
102 102 q.connect_mon("%s:%i"%(iface, monport))
103 103 q.daemon=True
104 104 q.start()
105 105
106 106 # Task Queue (in a Process)
107 107 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
108 108 q.bind_in("%s:%i"%(iface, ctport))
109 109 q.bind_out("%s:%i"%(iface, etport))
110 110 q.connect_mon("%s:%i"%(iface, monport))
111 111 q.daemon=True
112 112 q.start()
113 113
114 114 time.sleep(.25)
115 115
116 116 # build connection dicts
117 117 engine_addrs = {
118 118 'control' : "%s:%i"%(iface, ecport),
119 119 'queue': "%s:%i"%(iface, eqport),
120 120 'heartbeat': ("%s:%i"%(iface, hport), "%s:%i"%(iface, hport+1)),
121 121 'task' : "%s:%i"%(iface, etport),
122 122 'monitor' : "%s:%i"%(iface, monport),
123 123 }
124 124
125 125 client_addrs = {
126 126 'control' : "%s:%i"%(iface, ccport),
127 'controller': "%s:%i"%(iface, cport),
127 'query': "%s:%i"%(iface, cport),
128 128 'queue': "%s:%i"%(iface, cqport),
129 129 'task' : "%s:%i"%(iface, ctport),
130 130 'notification': "%s:%i"%(iface, nport)
131 131 }
132 132 con = controller.Controller(loop, thesession, sub, reg, hb, c, n, None, engine_addrs, client_addrs)
133 133
134 134 return loop
135 135
136 136
137 137 if __name__ == '__main__':
138 138 loop = setup()
139 139 loop.start() No newline at end of file
General Comments 0
You need to be logged in to leave comments. Login now