##// END OF EJS Templates
control channel progress
MinRK -
Show More
@@ -4,6 +4,8 b''
4 import time
4 import time
5 import threading
5 import threading
6
6
7 from pprint import pprint
8
7 from functools import wraps
9 from functools import wraps
8
10
9 from IPython.external.decorator import decorator
11 from IPython.external.decorator import decorator
@@ -46,7 +48,9 b' def defaultblock(f, self, *args, **kwargs):'
46 self.block = saveblock
48 self.block = saveblock
47 return ret
49 return ret
48
50
49
51 class AbortedTask(object):
52 def __init__(self, msg_id):
53 self.msg_id = msg_id
50 # @decorator
54 # @decorator
51 # def checktargets(f):
55 # def checktargets(f):
52 # @wraps(f)
56 # @wraps(f)
@@ -101,7 +105,11 b' class Client(object):'
101 execution methods: apply/apply_bound/apply_to
105 execution methods: apply/apply_bound/apply_to
102 legacy: execute, run
106 legacy: execute, run
103
107
104 control methods: queue_status, get_result
108 query methods: queue_status, get_result
109
110 control methods: abort, kill
111
112
105
113
106 """
114 """
107
115
@@ -109,7 +117,8 b' class Client(object):'
109 _connected=False
117 _connected=False
110 _engines=None
118 _engines=None
111 registration_socket=None
119 registration_socket=None
112 controller_socket=None
120 query_socket=None
121 control_socket=None
113 notification_socket=None
122 notification_socket=None
114 queue_socket=None
123 queue_socket=None
115 task_socket=None
124 task_socket=None
@@ -117,8 +126,9 b' class Client(object):'
117 outstanding=None
126 outstanding=None
118 results = None
127 results = None
119 history = None
128 history = None
129 debug = False
120
130
121 def __init__(self, addr, context=None, username=None):
131 def __init__(self, addr, context=None, username=None, debug=False):
122 if context is None:
132 if context is None:
123 context = zmq.Context()
133 context = zmq.Context()
124 self.context = context
134 self.context = context
@@ -135,6 +145,8 b' class Client(object):'
135 self.outstanding=set()
145 self.outstanding=set()
136 self.results = {}
146 self.results = {}
137 self.history = []
147 self.history = []
148 self.debug = debug
149 self.session.debug = debug
138 self._connect()
150 self._connect()
139
151
140 self._notification_handlers = {'registration_notification' : self._register_engine,
152 self._notification_handlers = {'registration_notification' : self._register_engine,
@@ -152,7 +164,7 b' class Client(object):'
152 def _update_engines(self, engines):
164 def _update_engines(self, engines):
153 for k,v in engines.iteritems():
165 for k,v in engines.iteritems():
154 eid = int(k)
166 eid = int(k)
155 self._engines[eid] = v
167 self._engines[eid] = bytes(v) # force not unicode
156 self._ids.add(eid)
168 self._ids.add(eid)
157
169
158 def _build_targets(self, targets):
170 def _build_targets(self, targets):
@@ -173,7 +185,9 b' class Client(object):'
173 return
185 return
174 self._connected=True
186 self._connected=True
175 self.session.send(self.registration_socket, 'connection_request')
187 self.session.send(self.registration_socket, 'connection_request')
176 msg = self.session.recv(self.registration_socket,mode=0)[-1]
188 idents,msg = self.session.recv(self.registration_socket,mode=0)
189 if self.debug:
190 pprint(msg)
177 msg = ss.Message(msg)
191 msg = ss.Message(msg)
178 content = msg.content
192 content = msg.content
179 if content.status == 'ok':
193 if content.status == 'ok':
@@ -189,10 +203,14 b' class Client(object):'
189 self.notification_socket = self.context.socket(zmq.SUB)
203 self.notification_socket = self.context.socket(zmq.SUB)
190 self.notification_socket.connect(content.notification)
204 self.notification_socket.connect(content.notification)
191 self.notification_socket.setsockopt(zmq.SUBSCRIBE, "")
205 self.notification_socket.setsockopt(zmq.SUBSCRIBE, "")
192 if content.controller:
206 if content.query:
193 self.controller_socket = self.context.socket(zmq.PAIR)
207 self.query_socket = self.context.socket(zmq.PAIR)
194 self.controller_socket.setsockopt(zmq.IDENTITY, self.session.session)
208 self.query_socket.setsockopt(zmq.IDENTITY, self.session.session)
195 self.controller_socket.connect(content.controller)
209 self.query_socket.connect(content.query)
210 if content.control:
211 self.control_socket = self.context.socket(zmq.PAIR)
212 self.control_socket.setsockopt(zmq.IDENTITY, self.session.session)
213 self.control_socket.connect(content.control)
196 self._update_engines(dict(content.engines))
214 self._update_engines(dict(content.engines))
197
215
198 else:
216 else:
@@ -226,7 +244,7 b' class Client(object):'
226 self.results[msg_id] = ss.unwrap_exception(msg['content'])
244 self.results[msg_id] = ss.unwrap_exception(msg['content'])
227
245
228 def _handle_apply_reply(self, msg):
246 def _handle_apply_reply(self, msg):
229 # print msg
247 # pprint(msg)
230 # msg_id = msg['msg_id']
248 # msg_id = msg['msg_id']
231 parent = msg['parent_header']
249 parent = msg['parent_header']
232 msg_id = parent['msg_id']
250 msg_id = parent['msg_id']
@@ -237,14 +255,19 b' class Client(object):'
237 content = msg['content']
255 content = msg['content']
238 if content['status'] == 'ok':
256 if content['status'] == 'ok':
239 self.results[msg_id] = ss.unserialize_object(msg['buffers'])
257 self.results[msg_id] = ss.unserialize_object(msg['buffers'])
258 elif content['status'] == 'aborted':
259 self.results[msg_id] = AbortedTask(msg_id)
260 elif content['status'] == 'resubmitted':
261 pass # handle resubmission
240 else:
262 else:
241
242 self.results[msg_id] = ss.unwrap_exception(content)
263 self.results[msg_id] = ss.unwrap_exception(content)
243
264
244 def _flush_notifications(self):
265 def _flush_notifications(self):
245 "flush incoming notifications of engine registrations"
266 "flush incoming notifications of engine registrations"
246 msg = self.session.recv(self.notification_socket, mode=zmq.NOBLOCK)
267 msg = self.session.recv(self.notification_socket, mode=zmq.NOBLOCK)
247 while msg is not None:
268 while msg is not None:
269 if self.debug:
270 pprint(msg)
248 msg = msg[-1]
271 msg = msg[-1]
249 msg_type = msg['msg_type']
272 msg_type = msg['msg_type']
250 handler = self._notification_handlers.get(msg_type, None)
273 handler = self._notification_handlers.get(msg_type, None)
@@ -258,6 +281,8 b' class Client(object):'
258 "flush incoming task or queue results"
281 "flush incoming task or queue results"
259 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
282 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
260 while msg is not None:
283 while msg is not None:
284 if self.debug:
285 pprint(msg)
261 msg = msg[-1]
286 msg = msg[-1]
262 msg_type = msg['msg_type']
287 msg_type = msg['msg_type']
263 handler = self._queue_handlers.get(msg_type, None)
288 handler = self._queue_handlers.get(msg_type, None)
@@ -267,6 +292,14 b' class Client(object):'
267 handler(msg)
292 handler(msg)
268 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
293 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
269
294
295 def _flush_control(self, sock):
296 "flush incoming control replies"
297 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
298 while msg is not None:
299 if self.debug:
300 pprint(msg)
301 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
302
270 ###### get/setitem ########
303 ###### get/setitem ########
271
304
272 def __getitem__(self, key):
305 def __getitem__(self, key):
@@ -297,6 +330,8 b' class Client(object):'
297 self._flush_results(self.queue_socket)
330 self._flush_results(self.queue_socket)
298 if self.task_socket:
331 if self.task_socket:
299 self._flush_results(self.task_socket)
332 self._flush_results(self.task_socket)
333 if self.control_socket:
334 self._flush_control(self.control_socket)
300
335
301 @spinfirst
336 @spinfirst
302 def queue_status(self, targets=None, verbose=False):
337 def queue_status(self, targets=None, verbose=False):
@@ -308,24 +343,78 b' class Client(object):'
308 the engines on which to execute
343 the engines on which to execute
309 default : all
344 default : all
310 verbose : bool
345 verbose : bool
311 whether to return
346 whether to return lengths only, or lists of ids for each element
312
347
313 """
348 """
314 targets = self._build_targets(targets)[1]
349 targets = self._build_targets(targets)[1]
315 content = dict(targets=targets)
350 content = dict(targets=targets)
316 self.session.send(self.controller_socket, "queue_request", content=content)
351 self.session.send(self.query_socket, "queue_request", content=content)
317 idents,msg = self.session.recv(self.controller_socket, 0)
352 idents,msg = self.session.recv(self.query_socket, 0)
353 if self.debug:
354 pprint(msg)
318 return msg['content']
355 return msg['content']
319
356
320 @spinfirst
357 @spinfirst
321 def clear(self, targets=None):
358 @defaultblock
359 def clear(self, targets=None, block=None):
322 """clear the namespace in target(s)"""
360 """clear the namespace in target(s)"""
323 pass
361 targets = self._build_targets(targets)[0]
362 print targets
363 for t in targets:
364 self.session.send(self.control_socket, 'clear_request', content={},ident=t)
365 error = False
366 if self.block:
367 for i in range(len(targets)):
368 idents,msg = self.session.recv(self.control_socket,0)
369 if self.debug:
370 pprint(msg)
371 if msg['content']['status'] != 'ok':
372 error = msg['content']
373 if error:
374 return error
375
324
376
325 @spinfirst
377 @spinfirst
326 def abort(self, targets=None):
378 @defaultblock
379 def abort(self, msg_ids = None, targets=None, block=None):
327 """abort the Queues of target(s)"""
380 """abort the Queues of target(s)"""
328 pass
381 targets = self._build_targets(targets)[0]
382 print targets
383 if isinstance(msg_ids, basestring):
384 msg_ids = [msg_ids]
385 content = dict(msg_ids=msg_ids)
386 for t in targets:
387 self.session.send(self.control_socket, 'abort_request',
388 content=content, ident=t)
389 error = False
390 if self.block:
391 for i in range(len(targets)):
392 idents,msg = self.session.recv(self.control_socket,0)
393 if self.debug:
394 pprint(msg)
395 if msg['content']['status'] != 'ok':
396 error = msg['content']
397 if error:
398 return error
399
400 @spinfirst
401 @defaultblock
402 def kill(self, targets=None, block=None):
403 """Terminates one or more engine processes."""
404 targets = self._build_targets(targets)[0]
405 print targets
406 for t in targets:
407 self.session.send(self.control_socket, 'kill_request', content={},ident=t)
408 error = False
409 if self.block:
410 for i in range(len(targets)):
411 idents,msg = self.session.recv(self.control_socket,0)
412 if self.debug:
413 pprint(msg)
414 if msg['content']['status'] != 'ok':
415 error = msg['content']
416 if error:
417 return error
329
418
330 @defaultblock
419 @defaultblock
331 def execute(self, code, targets='all', block=None):
420 def execute(self, code, targets='all', block=None):
@@ -364,22 +453,6 b' class Client(object):'
364 result = self.apply(execute, (code,), targets=None, block=block, bound=False)
453 result = self.apply(execute, (code,), targets=None, block=block, bound=False)
365 return result
454 return result
366
455
367 # a = time.time()
368 # content = dict(code=code)
369 # b = time.time()
370 # msg = self.session.send(self.task_socket, 'execute_request',
371 # content=content)
372 # c = time.time()
373 # msg_id = msg['msg_id']
374 # self.outstanding.add(msg_id)
375 # self.history.append(msg_id)
376 # d = time.time()
377 # if block:
378 # self.barrier(msg_id)
379 # return self.results[msg_id]
380 # else:
381 # return msg_id
382
383 def _apply_balanced(self, f, args, kwargs, bound=True, block=None):
456 def _apply_balanced(self, f, args, kwargs, bound=True, block=None):
384 """the underlying method for applying functions in a load balanced
457 """the underlying method for applying functions in a load balanced
385 manner."""
458 manner."""
@@ -402,7 +475,7 b' class Client(object):'
402 """Then underlying method for applying functions to specific engines."""
475 """Then underlying method for applying functions to specific engines."""
403 block = block if block is not None else self.block
476 block = block if block is not None else self.block
404 queues,targets = self._build_targets(targets)
477 queues,targets = self._build_targets(targets)
405
478 print queues
406 bufs = ss.pack_apply_message(f,args,kwargs)
479 bufs = ss.pack_apply_message(f,args,kwargs)
407 content = dict(bound=bound)
480 content = dict(bound=bound)
408 msg_ids = []
481 msg_ids = []
@@ -438,51 +511,16 b' class Client(object):'
438 """
511 """
439 args = args if args is not None else []
512 args = args if args is not None else []
440 kwargs = kwargs if kwargs is not None else {}
513 kwargs = kwargs if kwargs is not None else {}
514 if not isinstance(args, (tuple, list)):
515 raise TypeError("args must be tuple or list, not %s"%type(args))
516 if not isinstance(kwargs, dict):
517 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
441 if targets is None:
518 if targets is None:
442 return self._apply_balanced(f,args,kwargs,bound=bound, block=block)
519 return self._apply_balanced(f,args,kwargs,bound=bound, block=block)
443 else:
520 else:
444 return self._apply_direct(f, args, kwargs,
521 return self._apply_direct(f, args, kwargs,
445 bound=bound,block=block, targets=targets)
522 bound=bound,block=block, targets=targets)
446
523
447 # def apply_bound(self, f, *args, **kwargs):
448 # """calls f(*args, **kwargs) on a remote engine. This does get
449 # executed in an engine's namespace. The controller selects the
450 # target engine via 0MQ XREQ load balancing.
451 #
452 # if self.block is False:
453 # returns msg_id
454 # else:
455 # returns actual result of f(*args, **kwargs)
456 # """
457 # return self._apply(f, args, kwargs, bound=True)
458 #
459 #
460 # def apply_to(self, targets, f, *args, **kwargs):
461 # """calls f(*args, **kwargs) on a specific engine.
462 #
463 # if self.block is False:
464 # returns msg_id
465 # else:
466 # returns actual result of f(*args, **kwargs)
467 #
468 # The target's namespace is not used here.
469 # Use apply_bound_to() to access target's globals.
470 # """
471 # return self._apply_to(False, targets, f, args, kwargs)
472 #
473 # def apply_bound_to(self, targets, f, *args, **kwargs):
474 # """calls f(*args, **kwargs) on a specific engine.
475 #
476 # if self.block is False:
477 # returns msg_id
478 # else:
479 # returns actual result of f(*args, **kwargs)
480 #
481 # This method has access to the target's globals
482 #
483 # """
484 # return self._apply_to(f, args, kwargs)
485 #
486 def push(self, ns, targets=None, block=None):
524 def push(self, ns, targets=None, block=None):
487 """push the contents of `ns` into the namespace on `target`"""
525 """push the contents of `ns` into the namespace on `target`"""
488 if not isinstance(ns, dict):
526 if not isinstance(ns, dict):
@@ -546,9 +584,11 b' class Client(object):'
546 theids.append(msg_id)
584 theids.append(msg_id)
547
585
548 content = dict(msg_ids=theids, status_only=status_only)
586 content = dict(msg_ids=theids, status_only=status_only)
549 msg = self.session.send(self.controller_socket, "result_request", content=content)
587 msg = self.session.send(self.query_socket, "result_request", content=content)
550 zmq.select([self.controller_socket], [], [])
588 zmq.select([self.query_socket], [], [])
551 idents,msg = self.session.recv(self.controller_socket, zmq.NOBLOCK)
589 idents,msg = self.session.recv(self.query_socket, zmq.NOBLOCK)
590 if self.debug:
591 pprint(msg)
552
592
553 # while True:
593 # while True:
554 # try:
594 # try:
@@ -297,6 +297,8 b' class Controller(object):'
297 self.save_task_result(idents, msg)
297 self.save_task_result(idents, msg)
298 elif switch == 'tracktask':
298 elif switch == 'tracktask':
299 self.save_task_destination(idents, msg)
299 self.save_task_destination(idents, msg)
300 elif switch in ('incontrol', 'outcontrol'):
301 pass
300 else:
302 else:
301 logger.error("Invalid message topic: %s"%switch)
303 logger.error("Invalid message topic: %s"%switch)
302
304
@@ -7,6 +7,7 b' import sys'
7 import time
7 import time
8 import traceback
8 import traceback
9 import uuid
9 import uuid
10 from pprint import pprint
10
11
11 import zmq
12 import zmq
12 from zmq.eventloop import ioloop, zmqstream
13 from zmq.eventloop import ioloop, zmqstream
@@ -20,7 +21,7 b' import heartmonitor'
20
21
21
22
22 def printer(*msg):
23 def printer(*msg):
23 print msg
24 pprint(msg)
24
25
25 class Engine(object):
26 class Engine(object):
26 """IPython engine"""
27 """IPython engine"""
@@ -29,26 +30,23 b' class Engine(object):'
29 context=None
30 context=None
30 loop=None
31 loop=None
31 session=None
32 session=None
32 queue_id=None
33 ident=None
33 control_id=None
34 heart_id=None
35 registrar=None
34 registrar=None
36 heart=None
35 heart=None
37 kernel=None
36 kernel=None
38
37
39 def __init__(self, context, loop, session, registrar, client, queue_id=None, heart_id=None):
38 def __init__(self, context, loop, session, registrar, client, ident=None, heart_id=None):
40 self.context = context
39 self.context = context
41 self.loop = loop
40 self.loop = loop
42 self.session = session
41 self.session = session
43 self.registrar = registrar
42 self.registrar = registrar
44 self.client = client
43 self.client = client
45 self.queue_id = queue_id or str(uuid.uuid4())
44 self.ident = ident if ident else str(uuid.uuid4())
46 self.heart_id = heart_id or self.queue_id
47 self.registrar.on_send(printer)
45 self.registrar.on_send(printer)
48
46
49 def register(self):
47 def register(self):
50
48
51 content = dict(queue=self.queue_id, heartbeat=self.heart_id)
49 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
52 self.registrar.on_recv(self.complete_registration)
50 self.registrar.on_recv(self.complete_registration)
53 self.session.send(self.registrar, "registration_request",content=content)
51 self.session.send(self.registrar, "registration_request",content=content)
54
52
@@ -61,14 +59,14 b' class Engine(object):'
61 queue_addr = msg.content.queue
59 queue_addr = msg.content.queue
62 if queue_addr:
60 if queue_addr:
63 queue = self.context.socket(zmq.PAIR)
61 queue = self.context.socket(zmq.PAIR)
64 queue.setsockopt(zmq.IDENTITY, self.queue_id)
62 queue.setsockopt(zmq.IDENTITY, self.ident)
65 queue.connect(str(queue_addr))
63 queue.connect(str(queue_addr))
66 self.queue = zmqstream.ZMQStream(queue, self.loop)
64 self.queue = zmqstream.ZMQStream(queue, self.loop)
67
65
68 control_addr = msg.content.control
66 control_addr = msg.content.control
69 if control_addr:
67 if control_addr:
70 control = self.context.socket(zmq.PAIR)
68 control = self.context.socket(zmq.PAIR)
71 control.setsockopt(zmq.IDENTITY, self.queue_id)
69 control.setsockopt(zmq.IDENTITY, self.ident)
72 control.connect(str(control_addr))
70 control.connect(str(control_addr))
73 self.control = zmqstream.ZMQStream(control, self.loop)
71 self.control = zmqstream.ZMQStream(control, self.loop)
74
72
@@ -81,14 +79,14 b' class Engine(object):'
81 self.task_stream = zmqstream.ZMQStream(task, self.loop)
79 self.task_stream = zmqstream.ZMQStream(task, self.loop)
82 # TaskThread:
80 # TaskThread:
83 # mon_addr = msg.content.monitor
81 # mon_addr = msg.content.monitor
84 # task = taskthread.TaskThread(zmq.PAIR, zmq.PUB, self.queue_id)
82 # task = taskthread.TaskThread(zmq.PAIR, zmq.PUB, self.ident)
85 # task.connect_in(str(task_addr))
83 # task.connect_in(str(task_addr))
86 # task.connect_out(str(mon_addr))
84 # task.connect_out(str(mon_addr))
87 # self.task_stream = taskthread.QueueStream(*task.queues)
85 # self.task_stream = taskthread.QueueStream(*task.queues)
88 # task.start()
86 # task.start()
89
87
90 hbs = msg.content.heartbeat
88 hbs = msg.content.heartbeat
91 self.heart = heartmonitor.Heart(*map(str, hbs), heart_id=self.heart_id)
89 self.heart = heartmonitor.Heart(*map(str, hbs), heart_id=self.ident)
92 self.heart.start()
90 self.heart.start()
93 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
91 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
94 # placeholder for now:
92 # placeholder for now:
@@ -4,10 +4,12 b' Kernel adapted from kernel.py to use ZMQ Streams'
4 """
4 """
5
5
6 import __builtin__
6 import __builtin__
7 import os
7 import sys
8 import sys
8 import time
9 import time
9 import traceback
10 import traceback
10 from signal import SIGTERM, SIGKILL
11 from signal import SIGTERM, SIGKILL
12 from pprint import pprint
11
13
12 from code import CommandCompiler
14 from code import CommandCompiler
13
15
@@ -18,6 +20,9 b' from streamsession import StreamSession, Message, extract_header, serialize_obje'
18 unpack_apply_message
20 unpack_apply_message
19 from IPython.zmq.completer import KernelCompleter
21 from IPython.zmq.completer import KernelCompleter
20
22
23 def printer(*args):
24 pprint(args)
25
21 class OutStream(object):
26 class OutStream(object):
22 """A file like object that publishes the stream to a 0MQ PUB socket."""
27 """A file like object that publishes the stream to a 0MQ PUB socket."""
23
28
@@ -133,6 +138,7 b' class Kernel(object):'
133 task_stream=None, client=None):
138 task_stream=None, client=None):
134 self.session = session
139 self.session = session
135 self.control_stream = control_stream
140 self.control_stream = control_stream
141 self.control_socket = control_stream.socket
136 self.reply_stream = reply_stream
142 self.reply_stream = reply_stream
137 self.task_stream = task_stream
143 self.task_stream = task_stream
138 self.pub_stream = pub_stream
144 self.pub_stream = pub_stream
@@ -153,6 +159,10 b' class Kernel(object):'
153 self.control_handlers[msg_type] = getattr(self, msg_type)
159 self.control_handlers[msg_type] = getattr(self, msg_type)
154
160
155 #-------------------- control handlers -----------------------------
161 #-------------------- control handlers -----------------------------
162 def abort_queues(self):
163 for stream in (self.task_stream, self.reply_stream):
164 if stream:
165 self.abort_queue(stream)
156
166
157 def abort_queue(self, stream):
167 def abort_queue(self, stream):
158 while True:
168 while True:
@@ -186,28 +196,30 b' class Kernel(object):'
186 time.sleep(0.05)
196 time.sleep(0.05)
187
197
188 def abort_request(self, stream, ident, parent):
198 def abort_request(self, stream, ident, parent):
199 """abort a specifig msg by id"""
189 msg_ids = parent['content'].get('msg_ids', None)
200 msg_ids = parent['content'].get('msg_ids', None)
201 if isinstance(msg_ids, basestring):
202 msg_ids = [msg_ids]
190 if not msg_ids:
203 if not msg_ids:
191 self.abort_queue(self.task_stream)
204 self.abort_queues()
192 self.abort_queue(self.reply_stream)
193 for mid in msg_ids:
205 for mid in msg_ids:
194 self.aborted.add(mid)
206 self.aborted.add(str(mid))
195
207
196 content = dict(status='ok')
208 content = dict(status='ok')
197 self.session.send(stream, 'abort_reply', content=content, parent=parent,
209 reply_msg = self.session.send(stream, 'abort_reply', content=content, parent=parent,
198 ident=ident)
210 ident=ident)
211 print>>sys.__stdout__, Message(reply_msg)
199
212
200 def kill_request(self, stream, idents, parent):
213 def kill_request(self, stream, idents, parent):
201 self.abort_queue(self.reply_stream)
214 """kill ourselves. This should really be handled in an external process"""
202 if self.task_stream:
215 self.abort_queues()
203 self.abort_queue(self.task_stream)
204 msg = self.session.send(stream, 'kill_reply', ident=idents, parent=parent,
216 msg = self.session.send(stream, 'kill_reply', ident=idents, parent=parent,
205 content = dict(status='ok'))
217 content = dict(status='ok'))
206 # we can know that a message is done if we *don't* use streams, but
218 # we can know that a message is done if we *don't* use streams, but
207 # use a socket directly with MessageTracker
219 # use a socket directly with MessageTracker
208 time.sleep(1)
220 time.sleep(.5)
209 os.kill(os.getpid(), SIGTERM)
221 os.kill(os.getpid(), SIGTERM)
210 time.sleep(.25)
222 time.sleep(1)
211 os.kill(os.getpid(), SIGKILL)
223 os.kill(os.getpid(), SIGKILL)
212
224
213 def dispatch_control(self, msg):
225 def dispatch_control(self, msg):
@@ -221,7 +233,7 b' class Kernel(object):'
221 if handler is None:
233 if handler is None:
222 print >> sys.__stderr__, "UNKNOWN CONTROL MESSAGE TYPE:", msg
234 print >> sys.__stderr__, "UNKNOWN CONTROL MESSAGE TYPE:", msg
223 else:
235 else:
224 handler(stream, idents, msg)
236 handler(self.control_stream, idents, msg)
225
237
226 def flush_control(self):
238 def flush_control(self):
227 while any(zmq.select([self.control_socket],[],[],1e-4)):
239 while any(zmq.select([self.control_socket],[],[],1e-4)):
@@ -258,6 +270,16 b' class Kernel(object):'
258
270
259 return True
271 return True
260
272
273 def check_aborted(self, msg_id):
274 return msg_id in self.aborted
275
276 def unmet_dependencies(self, stream, idents, msg):
277 reply_type = msg['msg_type'].split('_')[0] + '_reply'
278 content = dict(status='resubmitted', reason='unmet dependencies')
279 reply_msg = self.session.send(stream, reply_type,
280 content=content, parent=msg, ident=idents)
281 ### TODO: actually resubmit it ###
282
261 #-------------------- queue handlers -----------------------------
283 #-------------------- queue handlers -----------------------------
262
284
263 def execute_request(self, stream, ident, parent):
285 def execute_request(self, stream, ident, parent):
@@ -297,7 +319,7 b' class Kernel(object):'
297 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, ident=ident)
319 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent, ident=ident)
298 # print>>sys.__stdout__, Message(reply_msg)
320 # print>>sys.__stdout__, Message(reply_msg)
299 if reply_msg['content']['status'] == u'error':
321 if reply_msg['content']['status'] == u'error':
300 self.abort_queue()
322 self.abort_queues()
301
323
302 def complete_request(self, stream, ident, parent):
324 def complete_request(self, stream, ident, parent):
303 matches = {'matches' : self.complete(parent),
325 matches = {'matches' : self.complete(parent),
@@ -334,7 +356,7 b' class Kernel(object):'
334
356
335 else:
357 else:
336 working = dict()
358 working = dict()
337 suffix = prefix = ""
359 suffix = prefix = "_" # prevent keyword collisions with lambda
338 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
360 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
339 # if f.fun
361 # if f.fun
340 fname = prefix+f.func_name.strip('<>')+suffix
362 fname = prefix+f.func_name.strip('<>')+suffix
@@ -379,7 +401,7 b' class Kernel(object):'
379 reply_msg = self.session.send(stream, u'apply_reply', reply_content, parent=parent, ident=ident,buffers=result_buf)
401 reply_msg = self.session.send(stream, u'apply_reply', reply_content, parent=parent, ident=ident,buffers=result_buf)
380 # print>>sys.__stdout__, Message(reply_msg)
402 # print>>sys.__stdout__, Message(reply_msg)
381 if reply_msg['content']['status'] == u'error':
403 if reply_msg['content']['status'] == u'error':
382 self.abort_queue()
404 self.abort_queues()
383
405
384 def dispatch_queue(self, stream, msg):
406 def dispatch_queue(self, stream, msg):
385 self.flush_control()
407 self.flush_control()
@@ -389,12 +411,15 b' class Kernel(object):'
389 header = msg['header']
411 header = msg['header']
390 msg_id = header['msg_id']
412 msg_id = header['msg_id']
391 dependencies = header.get('dependencies', [])
413 dependencies = header.get('dependencies', [])
392
393 if self.check_aborted(msg_id):
414 if self.check_aborted(msg_id):
394 return self.abort_reply(stream, msg)
415 self.aborted.remove(msg_id)
416 # is it safe to assume a msg_id will not be resubmitted?
417 reply_type = msg['msg_type'].split('_')[0] + '_reply'
418 reply_msg = self.session.send(stream, reply_type,
419 content={'status' : 'aborted'}, parent=msg, ident=idents)
420 return
395 if not self.check_dependencies(dependencies):
421 if not self.check_dependencies(dependencies):
396 return self.unmet_dependencies(stream, msg)
422 return self.unmet_dependencies(stream, idents, msg)
397
398 handler = self.queue_handlers.get(msg['msg_type'], None)
423 handler = self.queue_handlers.get(msg['msg_type'], None)
399 if handler is None:
424 if handler is None:
400 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", msg
425 print >> sys.__stderr__, "UNKNOWN MESSAGE TYPE:", msg
@@ -405,12 +430,15 b' class Kernel(object):'
405 #### stream mode:
430 #### stream mode:
406 if self.control_stream:
431 if self.control_stream:
407 self.control_stream.on_recv(self.dispatch_control, copy=False)
432 self.control_stream.on_recv(self.dispatch_control, copy=False)
433 self.control_stream.on_err(printer)
408 if self.reply_stream:
434 if self.reply_stream:
409 self.reply_stream.on_recv(lambda msg:
435 self.reply_stream.on_recv(lambda msg:
410 self.dispatch_queue(self.reply_stream, msg), copy=False)
436 self.dispatch_queue(self.reply_stream, msg), copy=False)
437 self.reply_stream.on_err(printer)
411 if self.task_stream:
438 if self.task_stream:
412 self.task_stream.on_recv(lambda msg:
439 self.task_stream.on_recv(lambda msg:
413 self.dispatch_queue(self.task_stream, msg), copy=False)
440 self.dispatch_queue(self.task_stream, msg), copy=False)
441 self.task_stream.on_err(printer)
414
442
415 #### while True mode:
443 #### while True mode:
416 # while True:
444 # while True:
@@ -257,7 +257,7 b' def unpack_apply_message(bufs, g=None, copy=True):'
257
257
258 class StreamSession(object):
258 class StreamSession(object):
259 """tweaked version of IPython.zmq.session.Session, for development in Parallel"""
259 """tweaked version of IPython.zmq.session.Session, for development in Parallel"""
260
260 debug=False
261 def __init__(self, username=None, session=None, packer=None, unpacker=None):
261 def __init__(self, username=None, session=None, packer=None, unpacker=None):
262 if username is None:
262 if username is None:
263 username = os.environ.get('USER','username')
263 username = os.environ.get('USER','username')
@@ -335,6 +335,10 b' class StreamSession(object):'
335 if buffers:
335 if buffers:
336 stream.send(buffers[-1], copy=False)
336 stream.send(buffers[-1], copy=False)
337 omsg = Message(msg)
337 omsg = Message(msg)
338 if self.debug:
339 pprint.pprint(omsg)
340 pprint.pprint(to_send)
341 pprint.pprint(buffers)
338 return omsg
342 return omsg
339
343
340 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
344 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
@@ -103,7 +103,7 b' class View(object):'
103 This method has access to the targets' globals
103 This method has access to the targets' globals
104
104
105 """
105 """
106 return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True)
106 return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True)
107
107
108
108
109 class DirectView(View):
109 class DirectView(View):
@@ -129,12 +129,28 b' class DirectView(View):'
129 def __setitem__(self,key,value):
129 def __setitem__(self,key,value):
130 self.update({key:value})
130 self.update({key:value})
131
131
132 def clear(self):
132 def clear(self, block=False):
133 """clear the remote namespace"""
133 """Clear the remote namespaces on my engines."""
134 return self.client.clear(targets=self.targets,block=self.block)
134 block = block if block is not None else self.block
135 return self.client.clear(targets=self.targets,block=block)
136
137 def kill(self, block=True):
138 """Kill my engines."""
139 block = block if block is not None else self.block
140 return self.client.kill(targets=self.targets,block=block)
141
142 def abort(self, msg_ids=None, block=None):
143 """Abort jobs on my engines.
135
144
136 def abort(self):
145 Parameters
137 return self.client.abort(targets=self.targets,block=self.block)
146 ----------
147
148 msg_ids : None, str, list of strs, optional
149 if None: abort all jobs.
150 else: abort specific msg_id(s).
151 """
152 block = block if block is not None else self.block
153 return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block)
138
154
139 class LoadBalancedView(View):
155 class LoadBalancedView(View):
140 _targets=None
156 _targets=None
@@ -124,7 +124,7 b' def setup():'
124
124
125 client_addrs = {
125 client_addrs = {
126 'control' : "%s:%i"%(iface, ccport),
126 'control' : "%s:%i"%(iface, ccport),
127 'controller': "%s:%i"%(iface, cport),
127 'query': "%s:%i"%(iface, cport),
128 'queue': "%s:%i"%(iface, cqport),
128 'queue': "%s:%i"%(iface, cqport),
129 'task' : "%s:%i"%(iface, ctport),
129 'task' : "%s:%i"%(iface, ctport),
130 'notification': "%s:%i"%(iface, nport)
130 'notification': "%s:%i"%(iface, nport)
General Comments 0
You need to be logged in to leave comments. Login now