##// END OF EJS Templates
add log_errors decorator for on_recv callbacks...
MinRK -
Show More
@@ -1,180 +1,181 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
3 A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
4 and hearts are tracked based on their XREQ identities.
4 and hearts are tracked based on their XREQ identities.
5
5
6 Authors:
6 Authors:
7
7
8 * Min RK
8 * Min RK
9 """
9 """
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11 # Copyright (C) 2010-2011 The IPython Development Team
11 # Copyright (C) 2010-2011 The IPython Development Team
12 #
12 #
13 # Distributed under the terms of the BSD License. The full license is in
13 # Distributed under the terms of the BSD License. The full license is in
14 # the file COPYING, distributed as part of this software.
14 # the file COPYING, distributed as part of this software.
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 from __future__ import print_function
17 from __future__ import print_function
18 import time
18 import time
19 import uuid
19 import uuid
20
20
21 import zmq
21 import zmq
22 from zmq.devices import ThreadDevice
22 from zmq.devices import ThreadDevice
23 from zmq.eventloop import ioloop, zmqstream
23 from zmq.eventloop import ioloop, zmqstream
24
24
25 from IPython.config.configurable import LoggingConfigurable
25 from IPython.config.configurable import LoggingConfigurable
26 from IPython.utils.traitlets import Set, Instance, CFloat, Integer
26 from IPython.utils.traitlets import Set, Instance, CFloat, Integer
27
27
28 from IPython.parallel.util import asbytes
28 from IPython.parallel.util import asbytes, log_errors
29
29
30 class Heart(object):
30 class Heart(object):
31 """A basic heart object for responding to a HeartMonitor.
31 """A basic heart object for responding to a HeartMonitor.
32 This is a simple wrapper with defaults for the most common
32 This is a simple wrapper with defaults for the most common
33 Device model for responding to heartbeats.
33 Device model for responding to heartbeats.
34
34
35 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
35 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
36 SUB/XREQ for in/out.
36 SUB/XREQ for in/out.
37
37
38 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
38 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
39 device=None
39 device=None
40 id=None
40 id=None
41 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.DEALER, heart_id=None):
41 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.DEALER, heart_id=None):
42 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
42 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
43 # do not allow the device to share global Context.instance,
43 # do not allow the device to share global Context.instance,
44 # which is the default behavior in pyzmq > 2.1.10
44 # which is the default behavior in pyzmq > 2.1.10
45 self.device.context_factory = zmq.Context
45 self.device.context_factory = zmq.Context
46
46
47 self.device.daemon=True
47 self.device.daemon=True
48 self.device.connect_in(in_addr)
48 self.device.connect_in(in_addr)
49 self.device.connect_out(out_addr)
49 self.device.connect_out(out_addr)
50 if in_type == zmq.SUB:
50 if in_type == zmq.SUB:
51 self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
51 self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
52 if heart_id is None:
52 if heart_id is None:
53 heart_id = uuid.uuid4().bytes
53 heart_id = uuid.uuid4().bytes
54 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
54 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
55 self.id = heart_id
55 self.id = heart_id
56
56
57 def start(self):
57 def start(self):
58 return self.device.start()
58 return self.device.start()
59
59
60
60
61 class HeartMonitor(LoggingConfigurable):
61 class HeartMonitor(LoggingConfigurable):
62 """A basic HeartMonitor class
62 """A basic HeartMonitor class
63 pingstream: a PUB stream
63 pingstream: a PUB stream
64 pongstream: an XREP stream
64 pongstream: an XREP stream
65 period: the period of the heartbeat in milliseconds"""
65 period: the period of the heartbeat in milliseconds"""
66
66
67 period = Integer(3000, config=True,
67 period = Integer(3000, config=True,
68 help='The frequency at which the Hub pings the engines for heartbeats '
68 help='The frequency at which the Hub pings the engines for heartbeats '
69 '(in ms)',
69 '(in ms)',
70 )
70 )
71
71
72 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
72 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
73 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
73 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
74 loop = Instance('zmq.eventloop.ioloop.IOLoop')
74 loop = Instance('zmq.eventloop.ioloop.IOLoop')
75 def _loop_default(self):
75 def _loop_default(self):
76 return ioloop.IOLoop.instance()
76 return ioloop.IOLoop.instance()
77
77
78 # not settable:
78 # not settable:
79 hearts=Set()
79 hearts=Set()
80 responses=Set()
80 responses=Set()
81 on_probation=Set()
81 on_probation=Set()
82 last_ping=CFloat(0)
82 last_ping=CFloat(0)
83 _new_handlers = Set()
83 _new_handlers = Set()
84 _failure_handlers = Set()
84 _failure_handlers = Set()
85 lifetime = CFloat(0)
85 lifetime = CFloat(0)
86 tic = CFloat(0)
86 tic = CFloat(0)
87
87
88 def __init__(self, **kwargs):
88 def __init__(self, **kwargs):
89 super(HeartMonitor, self).__init__(**kwargs)
89 super(HeartMonitor, self).__init__(**kwargs)
90
90
91 self.pongstream.on_recv(self.handle_pong)
91 self.pongstream.on_recv(self.handle_pong)
92
92
93 def start(self):
93 def start(self):
94 self.tic = time.time()
94 self.tic = time.time()
95 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
95 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
96 self.caller.start()
96 self.caller.start()
97
97
98 def add_new_heart_handler(self, handler):
98 def add_new_heart_handler(self, handler):
99 """add a new handler for new hearts"""
99 """add a new handler for new hearts"""
100 self.log.debug("heartbeat::new_heart_handler: %s", handler)
100 self.log.debug("heartbeat::new_heart_handler: %s", handler)
101 self._new_handlers.add(handler)
101 self._new_handlers.add(handler)
102
102
103 def add_heart_failure_handler(self, handler):
103 def add_heart_failure_handler(self, handler):
104 """add a new handler for heart failure"""
104 """add a new handler for heart failure"""
105 self.log.debug("heartbeat::new heart failure handler: %s", handler)
105 self.log.debug("heartbeat::new heart failure handler: %s", handler)
106 self._failure_handlers.add(handler)
106 self._failure_handlers.add(handler)
107
107
108 def beat(self):
108 def beat(self):
109 self.pongstream.flush()
109 self.pongstream.flush()
110 self.last_ping = self.lifetime
110 self.last_ping = self.lifetime
111
111
112 toc = time.time()
112 toc = time.time()
113 self.lifetime += toc-self.tic
113 self.lifetime += toc-self.tic
114 self.tic = toc
114 self.tic = toc
115 self.log.debug("heartbeat::sending %s", self.lifetime)
115 self.log.debug("heartbeat::sending %s", self.lifetime)
116 goodhearts = self.hearts.intersection(self.responses)
116 goodhearts = self.hearts.intersection(self.responses)
117 missed_beats = self.hearts.difference(goodhearts)
117 missed_beats = self.hearts.difference(goodhearts)
118 heartfailures = self.on_probation.intersection(missed_beats)
118 heartfailures = self.on_probation.intersection(missed_beats)
119 newhearts = self.responses.difference(goodhearts)
119 newhearts = self.responses.difference(goodhearts)
120 map(self.handle_new_heart, newhearts)
120 map(self.handle_new_heart, newhearts)
121 map(self.handle_heart_failure, heartfailures)
121 map(self.handle_heart_failure, heartfailures)
122 self.on_probation = missed_beats.intersection(self.hearts)
122 self.on_probation = missed_beats.intersection(self.hearts)
123 self.responses = set()
123 self.responses = set()
124 # print self.on_probation, self.hearts
124 # print self.on_probation, self.hearts
125 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
125 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
126 self.pingstream.send(asbytes(str(self.lifetime)))
126 self.pingstream.send(asbytes(str(self.lifetime)))
127 # flush stream to force immediate socket send
127 # flush stream to force immediate socket send
128 self.pingstream.flush()
128 self.pingstream.flush()
129
129
130 def handle_new_heart(self, heart):
130 def handle_new_heart(self, heart):
131 if self._new_handlers:
131 if self._new_handlers:
132 for handler in self._new_handlers:
132 for handler in self._new_handlers:
133 handler(heart)
133 handler(heart)
134 else:
134 else:
135 self.log.info("heartbeat::yay, got new heart %s!", heart)
135 self.log.info("heartbeat::yay, got new heart %s!", heart)
136 self.hearts.add(heart)
136 self.hearts.add(heart)
137
137
138 def handle_heart_failure(self, heart):
138 def handle_heart_failure(self, heart):
139 if self._failure_handlers:
139 if self._failure_handlers:
140 for handler in self._failure_handlers:
140 for handler in self._failure_handlers:
141 try:
141 try:
142 handler(heart)
142 handler(heart)
143 except Exception as e:
143 except Exception as e:
144 self.log.error("heartbeat::Bad Handler! %s", handler, exc_info=True)
144 self.log.error("heartbeat::Bad Handler! %s", handler, exc_info=True)
145 pass
145 pass
146 else:
146 else:
147 self.log.info("heartbeat::Heart %s failed :(", heart)
147 self.log.info("heartbeat::Heart %s failed :(", heart)
148 self.hearts.remove(heart)
148 self.hearts.remove(heart)
149
149
150
150
151 @log_errors
151 def handle_pong(self, msg):
152 def handle_pong(self, msg):
152 "a heart just beat"
153 "a heart just beat"
153 current = asbytes(str(self.lifetime))
154 current = asbytes(str(self.lifetime))
154 last = asbytes(str(self.last_ping))
155 last = asbytes(str(self.last_ping))
155 if msg[1] == current:
156 if msg[1] == current:
156 delta = time.time()-self.tic
157 delta = time.time()-self.tic
157 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
158 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
158 self.responses.add(msg[0])
159 self.responses.add(msg[0])
159 elif msg[1] == last:
160 elif msg[1] == last:
160 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
161 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
161 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond", msg[0], 1000*delta)
162 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond", msg[0], 1000*delta)
162 self.responses.add(msg[0])
163 self.responses.add(msg[0])
163 else:
164 else:
164 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)", msg[1], self.lifetime)
165 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)", msg[1], self.lifetime)
165
166
166
167
167 if __name__ == '__main__':
168 if __name__ == '__main__':
168 loop = ioloop.IOLoop.instance()
169 loop = ioloop.IOLoop.instance()
169 context = zmq.Context()
170 context = zmq.Context()
170 pub = context.socket(zmq.PUB)
171 pub = context.socket(zmq.PUB)
171 pub.bind('tcp://127.0.0.1:5555')
172 pub.bind('tcp://127.0.0.1:5555')
172 xrep = context.socket(zmq.ROUTER)
173 xrep = context.socket(zmq.ROUTER)
173 xrep.bind('tcp://127.0.0.1:5556')
174 xrep.bind('tcp://127.0.0.1:5556')
174
175
175 outstream = zmqstream.ZMQStream(pub, loop)
176 outstream = zmqstream.ZMQStream(pub, loop)
176 instream = zmqstream.ZMQStream(xrep, loop)
177 instream = zmqstream.ZMQStream(xrep, loop)
177
178
178 hb = HeartMonitor(loop, outstream, instream)
179 hb = HeartMonitor(loop, outstream, instream)
179
180
180 loop.start()
181 loop.start()
@@ -1,1293 +1,1295 b''
1 """The IPython Controller Hub with 0MQ
1 """The IPython Controller Hub with 0MQ
2 This is the master object that handles connections from engines and clients,
2 This is the master object that handles connections from engines and clients,
3 and monitors traffic through the various queues.
3 and monitors traffic through the various queues.
4
4
5 Authors:
5 Authors:
6
6
7 * Min RK
7 * Min RK
8 """
8 """
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Copyright (C) 2010-2011 The IPython Development Team
10 # Copyright (C) 2010-2011 The IPython Development Team
11 #
11 #
12 # Distributed under the terms of the BSD License. The full license is in
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
13 # the file COPYING, distributed as part of this software.
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 # Imports
17 # Imports
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 from __future__ import print_function
19 from __future__ import print_function
20
20
21 import sys
21 import sys
22 import time
22 import time
23 from datetime import datetime
23 from datetime import datetime
24
24
25 import zmq
25 import zmq
26 from zmq.eventloop import ioloop
26 from zmq.eventloop import ioloop
27 from zmq.eventloop.zmqstream import ZMQStream
27 from zmq.eventloop.zmqstream import ZMQStream
28
28
29 # internal:
29 # internal:
30 from IPython.utils.importstring import import_item
30 from IPython.utils.importstring import import_item
31 from IPython.utils.traitlets import (
31 from IPython.utils.traitlets import (
32 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
32 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
33 )
33 )
34
34
35 from IPython.parallel import error, util
35 from IPython.parallel import error, util
36 from IPython.parallel.factory import RegistrationFactory
36 from IPython.parallel.factory import RegistrationFactory
37
37
38 from IPython.zmq.session import SessionFactory
38 from IPython.zmq.session import SessionFactory
39
39
40 from .heartmonitor import HeartMonitor
40 from .heartmonitor import HeartMonitor
41
41
42 #-----------------------------------------------------------------------------
42 #-----------------------------------------------------------------------------
43 # Code
43 # Code
44 #-----------------------------------------------------------------------------
44 #-----------------------------------------------------------------------------
45
45
46 def _passer(*args, **kwargs):
46 def _passer(*args, **kwargs):
47 return
47 return
48
48
49 def _printer(*args, **kwargs):
49 def _printer(*args, **kwargs):
50 print (args)
50 print (args)
51 print (kwargs)
51 print (kwargs)
52
52
53 def empty_record():
53 def empty_record():
54 """Return an empty dict with all record keys."""
54 """Return an empty dict with all record keys."""
55 return {
55 return {
56 'msg_id' : None,
56 'msg_id' : None,
57 'header' : None,
57 'header' : None,
58 'content': None,
58 'content': None,
59 'buffers': None,
59 'buffers': None,
60 'submitted': None,
60 'submitted': None,
61 'client_uuid' : None,
61 'client_uuid' : None,
62 'engine_uuid' : None,
62 'engine_uuid' : None,
63 'started': None,
63 'started': None,
64 'completed': None,
64 'completed': None,
65 'resubmitted': None,
65 'resubmitted': None,
66 'result_header' : None,
66 'result_header' : None,
67 'result_content' : None,
67 'result_content' : None,
68 'result_buffers' : None,
68 'result_buffers' : None,
69 'queue' : None,
69 'queue' : None,
70 'pyin' : None,
70 'pyin' : None,
71 'pyout': None,
71 'pyout': None,
72 'pyerr': None,
72 'pyerr': None,
73 'stdout': '',
73 'stdout': '',
74 'stderr': '',
74 'stderr': '',
75 }
75 }
76
76
77 def init_record(msg):
77 def init_record(msg):
78 """Initialize a TaskRecord based on a request."""
78 """Initialize a TaskRecord based on a request."""
79 header = msg['header']
79 header = msg['header']
80 return {
80 return {
81 'msg_id' : header['msg_id'],
81 'msg_id' : header['msg_id'],
82 'header' : header,
82 'header' : header,
83 'content': msg['content'],
83 'content': msg['content'],
84 'buffers': msg['buffers'],
84 'buffers': msg['buffers'],
85 'submitted': header['date'],
85 'submitted': header['date'],
86 'client_uuid' : None,
86 'client_uuid' : None,
87 'engine_uuid' : None,
87 'engine_uuid' : None,
88 'started': None,
88 'started': None,
89 'completed': None,
89 'completed': None,
90 'resubmitted': None,
90 'resubmitted': None,
91 'result_header' : None,
91 'result_header' : None,
92 'result_content' : None,
92 'result_content' : None,
93 'result_buffers' : None,
93 'result_buffers' : None,
94 'queue' : None,
94 'queue' : None,
95 'pyin' : None,
95 'pyin' : None,
96 'pyout': None,
96 'pyout': None,
97 'pyerr': None,
97 'pyerr': None,
98 'stdout': '',
98 'stdout': '',
99 'stderr': '',
99 'stderr': '',
100 }
100 }
101
101
102
102
103 class EngineConnector(HasTraits):
103 class EngineConnector(HasTraits):
104 """A simple object for accessing the various zmq connections of an object.
104 """A simple object for accessing the various zmq connections of an object.
105 Attributes are:
105 Attributes are:
106 id (int): engine ID
106 id (int): engine ID
107 uuid (str): uuid (unused?)
107 uuid (str): uuid (unused?)
108 queue (str): identity of queue's XREQ socket
108 queue (str): identity of queue's XREQ socket
109 registration (str): identity of registration XREQ socket
109 registration (str): identity of registration XREQ socket
110 heartbeat (str): identity of heartbeat XREQ socket
110 heartbeat (str): identity of heartbeat XREQ socket
111 """
111 """
112 id=Integer(0)
112 id=Integer(0)
113 queue=CBytes()
113 queue=CBytes()
114 control=CBytes()
114 control=CBytes()
115 registration=CBytes()
115 registration=CBytes()
116 heartbeat=CBytes()
116 heartbeat=CBytes()
117 pending=Set()
117 pending=Set()
118
118
119 class HubFactory(RegistrationFactory):
119 class HubFactory(RegistrationFactory):
120 """The Configurable for setting up a Hub."""
120 """The Configurable for setting up a Hub."""
121
121
122 # port-pairs for monitoredqueues:
122 # port-pairs for monitoredqueues:
123 hb = Tuple(Integer,Integer,config=True,
123 hb = Tuple(Integer,Integer,config=True,
124 help="""XREQ/SUB Port pair for Engine heartbeats""")
124 help="""XREQ/SUB Port pair for Engine heartbeats""")
125 def _hb_default(self):
125 def _hb_default(self):
126 return tuple(util.select_random_ports(2))
126 return tuple(util.select_random_ports(2))
127
127
128 mux = Tuple(Integer,Integer,config=True,
128 mux = Tuple(Integer,Integer,config=True,
129 help="""Engine/Client Port pair for MUX queue""")
129 help="""Engine/Client Port pair for MUX queue""")
130
130
131 def _mux_default(self):
131 def _mux_default(self):
132 return tuple(util.select_random_ports(2))
132 return tuple(util.select_random_ports(2))
133
133
134 task = Tuple(Integer,Integer,config=True,
134 task = Tuple(Integer,Integer,config=True,
135 help="""Engine/Client Port pair for Task queue""")
135 help="""Engine/Client Port pair for Task queue""")
136 def _task_default(self):
136 def _task_default(self):
137 return tuple(util.select_random_ports(2))
137 return tuple(util.select_random_ports(2))
138
138
139 control = Tuple(Integer,Integer,config=True,
139 control = Tuple(Integer,Integer,config=True,
140 help="""Engine/Client Port pair for Control queue""")
140 help="""Engine/Client Port pair for Control queue""")
141
141
142 def _control_default(self):
142 def _control_default(self):
143 return tuple(util.select_random_ports(2))
143 return tuple(util.select_random_ports(2))
144
144
145 iopub = Tuple(Integer,Integer,config=True,
145 iopub = Tuple(Integer,Integer,config=True,
146 help="""Engine/Client Port pair for IOPub relay""")
146 help="""Engine/Client Port pair for IOPub relay""")
147
147
148 def _iopub_default(self):
148 def _iopub_default(self):
149 return tuple(util.select_random_ports(2))
149 return tuple(util.select_random_ports(2))
150
150
151 # single ports:
151 # single ports:
152 mon_port = Integer(config=True,
152 mon_port = Integer(config=True,
153 help="""Monitor (SUB) port for queue traffic""")
153 help="""Monitor (SUB) port for queue traffic""")
154
154
155 def _mon_port_default(self):
155 def _mon_port_default(self):
156 return util.select_random_ports(1)[0]
156 return util.select_random_ports(1)[0]
157
157
158 notifier_port = Integer(config=True,
158 notifier_port = Integer(config=True,
159 help="""PUB port for sending engine status notifications""")
159 help="""PUB port for sending engine status notifications""")
160
160
161 def _notifier_port_default(self):
161 def _notifier_port_default(self):
162 return util.select_random_ports(1)[0]
162 return util.select_random_ports(1)[0]
163
163
164 engine_ip = Unicode('127.0.0.1', config=True,
164 engine_ip = Unicode('127.0.0.1', config=True,
165 help="IP on which to listen for engine connections. [default: loopback]")
165 help="IP on which to listen for engine connections. [default: loopback]")
166 engine_transport = Unicode('tcp', config=True,
166 engine_transport = Unicode('tcp', config=True,
167 help="0MQ transport for engine connections. [default: tcp]")
167 help="0MQ transport for engine connections. [default: tcp]")
168
168
169 client_ip = Unicode('127.0.0.1', config=True,
169 client_ip = Unicode('127.0.0.1', config=True,
170 help="IP on which to listen for client connections. [default: loopback]")
170 help="IP on which to listen for client connections. [default: loopback]")
171 client_transport = Unicode('tcp', config=True,
171 client_transport = Unicode('tcp', config=True,
172 help="0MQ transport for client connections. [default : tcp]")
172 help="0MQ transport for client connections. [default : tcp]")
173
173
174 monitor_ip = Unicode('127.0.0.1', config=True,
174 monitor_ip = Unicode('127.0.0.1', config=True,
175 help="IP on which to listen for monitor messages. [default: loopback]")
175 help="IP on which to listen for monitor messages. [default: loopback]")
176 monitor_transport = Unicode('tcp', config=True,
176 monitor_transport = Unicode('tcp', config=True,
177 help="0MQ transport for monitor messages. [default : tcp]")
177 help="0MQ transport for monitor messages. [default : tcp]")
178
178
179 monitor_url = Unicode('')
179 monitor_url = Unicode('')
180
180
181 db_class = DottedObjectName('IPython.parallel.controller.dictdb.DictDB',
181 db_class = DottedObjectName('IPython.parallel.controller.dictdb.DictDB',
182 config=True, help="""The class to use for the DB backend""")
182 config=True, help="""The class to use for the DB backend""")
183
183
184 # not configurable
184 # not configurable
185 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
185 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
186 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
186 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
187
187
188 def _ip_changed(self, name, old, new):
188 def _ip_changed(self, name, old, new):
189 self.engine_ip = new
189 self.engine_ip = new
190 self.client_ip = new
190 self.client_ip = new
191 self.monitor_ip = new
191 self.monitor_ip = new
192 self._update_monitor_url()
192 self._update_monitor_url()
193
193
194 def _update_monitor_url(self):
194 def _update_monitor_url(self):
195 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
195 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
196
196
197 def _transport_changed(self, name, old, new):
197 def _transport_changed(self, name, old, new):
198 self.engine_transport = new
198 self.engine_transport = new
199 self.client_transport = new
199 self.client_transport = new
200 self.monitor_transport = new
200 self.monitor_transport = new
201 self._update_monitor_url()
201 self._update_monitor_url()
202
202
203 def __init__(self, **kwargs):
203 def __init__(self, **kwargs):
204 super(HubFactory, self).__init__(**kwargs)
204 super(HubFactory, self).__init__(**kwargs)
205 self._update_monitor_url()
205 self._update_monitor_url()
206
206
207
207
208 def construct(self):
208 def construct(self):
209 self.init_hub()
209 self.init_hub()
210
210
211 def start(self):
211 def start(self):
212 self.heartmonitor.start()
212 self.heartmonitor.start()
213 self.log.info("Heartmonitor started")
213 self.log.info("Heartmonitor started")
214
214
215 def init_hub(self):
215 def init_hub(self):
216 """construct"""
216 """construct"""
217 client_iface = "%s://%s:" % (self.client_transport, self.client_ip) + "%i"
217 client_iface = "%s://%s:" % (self.client_transport, self.client_ip) + "%i"
218 engine_iface = "%s://%s:" % (self.engine_transport, self.engine_ip) + "%i"
218 engine_iface = "%s://%s:" % (self.engine_transport, self.engine_ip) + "%i"
219
219
220 ctx = self.context
220 ctx = self.context
221 loop = self.loop
221 loop = self.loop
222
222
223 # Registrar socket
223 # Registrar socket
224 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
224 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
225 q.bind(client_iface % self.regport)
225 q.bind(client_iface % self.regport)
226 self.log.info("Hub listening on %s for registration.", client_iface % self.regport)
226 self.log.info("Hub listening on %s for registration.", client_iface % self.regport)
227 if self.client_ip != self.engine_ip:
227 if self.client_ip != self.engine_ip:
228 q.bind(engine_iface % self.regport)
228 q.bind(engine_iface % self.regport)
229 self.log.info("Hub listening on %s for registration.", engine_iface % self.regport)
229 self.log.info("Hub listening on %s for registration.", engine_iface % self.regport)
230
230
231 ### Engine connections ###
231 ### Engine connections ###
232
232
233 # heartbeat
233 # heartbeat
234 hpub = ctx.socket(zmq.PUB)
234 hpub = ctx.socket(zmq.PUB)
235 hpub.bind(engine_iface % self.hb[0])
235 hpub.bind(engine_iface % self.hb[0])
236 hrep = ctx.socket(zmq.ROUTER)
236 hrep = ctx.socket(zmq.ROUTER)
237 hrep.bind(engine_iface % self.hb[1])
237 hrep.bind(engine_iface % self.hb[1])
238 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
238 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
239 pingstream=ZMQStream(hpub,loop),
239 pingstream=ZMQStream(hpub,loop),
240 pongstream=ZMQStream(hrep,loop)
240 pongstream=ZMQStream(hrep,loop)
241 )
241 )
242
242
243 ### Client connections ###
243 ### Client connections ###
244 # Notifier socket
244 # Notifier socket
245 n = ZMQStream(ctx.socket(zmq.PUB), loop)
245 n = ZMQStream(ctx.socket(zmq.PUB), loop)
246 n.bind(client_iface%self.notifier_port)
246 n.bind(client_iface%self.notifier_port)
247
247
248 ### build and launch the queues ###
248 ### build and launch the queues ###
249
249
250 # monitor socket
250 # monitor socket
251 sub = ctx.socket(zmq.SUB)
251 sub = ctx.socket(zmq.SUB)
252 sub.setsockopt(zmq.SUBSCRIBE, b"")
252 sub.setsockopt(zmq.SUBSCRIBE, b"")
253 sub.bind(self.monitor_url)
253 sub.bind(self.monitor_url)
254 sub.bind('inproc://monitor')
254 sub.bind('inproc://monitor')
255 sub = ZMQStream(sub, loop)
255 sub = ZMQStream(sub, loop)
256
256
257 # connect the db
257 # connect the db
258 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
258 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
259 # cdir = self.config.Global.cluster_dir
259 # cdir = self.config.Global.cluster_dir
260 self.db = import_item(str(self.db_class))(session=self.session.session,
260 self.db = import_item(str(self.db_class))(session=self.session.session,
261 config=self.config, log=self.log)
261 config=self.config, log=self.log)
262 time.sleep(.25)
262 time.sleep(.25)
263 try:
263 try:
264 scheme = self.config.TaskScheduler.scheme_name
264 scheme = self.config.TaskScheduler.scheme_name
265 except AttributeError:
265 except AttributeError:
266 from .scheduler import TaskScheduler
266 from .scheduler import TaskScheduler
267 scheme = TaskScheduler.scheme_name.get_default_value()
267 scheme = TaskScheduler.scheme_name.get_default_value()
268 # build connection dicts
268 # build connection dicts
269 self.engine_info = {
269 self.engine_info = {
270 'control' : engine_iface%self.control[1],
270 'control' : engine_iface%self.control[1],
271 'mux': engine_iface%self.mux[1],
271 'mux': engine_iface%self.mux[1],
272 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
272 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
273 'task' : engine_iface%self.task[1],
273 'task' : engine_iface%self.task[1],
274 'iopub' : engine_iface%self.iopub[1],
274 'iopub' : engine_iface%self.iopub[1],
275 # 'monitor' : engine_iface%self.mon_port,
275 # 'monitor' : engine_iface%self.mon_port,
276 }
276 }
277
277
278 self.client_info = {
278 self.client_info = {
279 'control' : client_iface%self.control[0],
279 'control' : client_iface%self.control[0],
280 'mux': client_iface%self.mux[0],
280 'mux': client_iface%self.mux[0],
281 'task' : (scheme, client_iface%self.task[0]),
281 'task' : (scheme, client_iface%self.task[0]),
282 'iopub' : client_iface%self.iopub[0],
282 'iopub' : client_iface%self.iopub[0],
283 'notification': client_iface%self.notifier_port
283 'notification': client_iface%self.notifier_port
284 }
284 }
285 self.log.debug("Hub engine addrs: %s", self.engine_info)
285 self.log.debug("Hub engine addrs: %s", self.engine_info)
286 self.log.debug("Hub client addrs: %s", self.client_info)
286 self.log.debug("Hub client addrs: %s", self.client_info)
287
287
288 # resubmit stream
288 # resubmit stream
289 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
289 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
290 url = util.disambiguate_url(self.client_info['task'][-1])
290 url = util.disambiguate_url(self.client_info['task'][-1])
291 r.setsockopt(zmq.IDENTITY, self.session.bsession)
291 r.setsockopt(zmq.IDENTITY, self.session.bsession)
292 r.connect(url)
292 r.connect(url)
293
293
294 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
294 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
295 query=q, notifier=n, resubmit=r, db=self.db,
295 query=q, notifier=n, resubmit=r, db=self.db,
296 engine_info=self.engine_info, client_info=self.client_info,
296 engine_info=self.engine_info, client_info=self.client_info,
297 log=self.log)
297 log=self.log)
298
298
299
299
300 class Hub(SessionFactory):
300 class Hub(SessionFactory):
301 """The IPython Controller Hub with 0MQ connections
301 """The IPython Controller Hub with 0MQ connections
302
302
303 Parameters
303 Parameters
304 ==========
304 ==========
305 loop: zmq IOLoop instance
305 loop: zmq IOLoop instance
306 session: Session object
306 session: Session object
307 <removed> context: zmq context for creating new connections (?)
307 <removed> context: zmq context for creating new connections (?)
308 queue: ZMQStream for monitoring the command queue (SUB)
308 queue: ZMQStream for monitoring the command queue (SUB)
309 query: ZMQStream for engine registration and client queries requests (XREP)
309 query: ZMQStream for engine registration and client queries requests (XREP)
310 heartbeat: HeartMonitor object checking the pulse of the engines
310 heartbeat: HeartMonitor object checking the pulse of the engines
311 notifier: ZMQStream for broadcasting engine registration changes (PUB)
311 notifier: ZMQStream for broadcasting engine registration changes (PUB)
312 db: connection to db for out of memory logging of commands
312 db: connection to db for out of memory logging of commands
313 NotImplemented
313 NotImplemented
314 engine_info: dict of zmq connection information for engines to connect
314 engine_info: dict of zmq connection information for engines to connect
315 to the queues.
315 to the queues.
316 client_info: dict of zmq connection information for engines to connect
316 client_info: dict of zmq connection information for engines to connect
317 to the queues.
317 to the queues.
318 """
318 """
319 # internal data structures:
319 # internal data structures:
320 ids=Set() # engine IDs
320 ids=Set() # engine IDs
321 keytable=Dict()
321 keytable=Dict()
322 by_ident=Dict()
322 by_ident=Dict()
323 engines=Dict()
323 engines=Dict()
324 clients=Dict()
324 clients=Dict()
325 hearts=Dict()
325 hearts=Dict()
326 pending=Set()
326 pending=Set()
327 queues=Dict() # pending msg_ids keyed by engine_id
327 queues=Dict() # pending msg_ids keyed by engine_id
328 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
328 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
329 completed=Dict() # completed msg_ids keyed by engine_id
329 completed=Dict() # completed msg_ids keyed by engine_id
330 all_completed=Set() # completed msg_ids keyed by engine_id
330 all_completed=Set() # completed msg_ids keyed by engine_id
331 dead_engines=Set() # completed msg_ids keyed by engine_id
331 dead_engines=Set() # completed msg_ids keyed by engine_id
332 unassigned=Set() # set of task msg_ds not yet assigned a destination
332 unassigned=Set() # set of task msg_ds not yet assigned a destination
333 incoming_registrations=Dict()
333 incoming_registrations=Dict()
334 registration_timeout=Integer()
334 registration_timeout=Integer()
335 _idcounter=Integer(0)
335 _idcounter=Integer(0)
336
336
337 # objects from constructor:
337 # objects from constructor:
338 query=Instance(ZMQStream)
338 query=Instance(ZMQStream)
339 monitor=Instance(ZMQStream)
339 monitor=Instance(ZMQStream)
340 notifier=Instance(ZMQStream)
340 notifier=Instance(ZMQStream)
341 resubmit=Instance(ZMQStream)
341 resubmit=Instance(ZMQStream)
342 heartmonitor=Instance(HeartMonitor)
342 heartmonitor=Instance(HeartMonitor)
343 db=Instance(object)
343 db=Instance(object)
344 client_info=Dict()
344 client_info=Dict()
345 engine_info=Dict()
345 engine_info=Dict()
346
346
347
347
348 def __init__(self, **kwargs):
348 def __init__(self, **kwargs):
349 """
349 """
350 # universal:
350 # universal:
351 loop: IOLoop for creating future connections
351 loop: IOLoop for creating future connections
352 session: streamsession for sending serialized data
352 session: streamsession for sending serialized data
353 # engine:
353 # engine:
354 queue: ZMQStream for monitoring queue messages
354 queue: ZMQStream for monitoring queue messages
355 query: ZMQStream for engine+client registration and client requests
355 query: ZMQStream for engine+client registration and client requests
356 heartbeat: HeartMonitor object for tracking engines
356 heartbeat: HeartMonitor object for tracking engines
357 # extra:
357 # extra:
358 db: ZMQStream for db connection (NotImplemented)
358 db: ZMQStream for db connection (NotImplemented)
359 engine_info: zmq address/protocol dict for engine connections
359 engine_info: zmq address/protocol dict for engine connections
360 client_info: zmq address/protocol dict for client connections
360 client_info: zmq address/protocol dict for client connections
361 """
361 """
362
362
363 super(Hub, self).__init__(**kwargs)
363 super(Hub, self).__init__(**kwargs)
364 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
364 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
365
365
366 # validate connection dicts:
366 # validate connection dicts:
367 for k,v in self.client_info.iteritems():
367 for k,v in self.client_info.iteritems():
368 if k == 'task':
368 if k == 'task':
369 util.validate_url_container(v[1])
369 util.validate_url_container(v[1])
370 else:
370 else:
371 util.validate_url_container(v)
371 util.validate_url_container(v)
372 # util.validate_url_container(self.client_info)
372 # util.validate_url_container(self.client_info)
373 util.validate_url_container(self.engine_info)
373 util.validate_url_container(self.engine_info)
374
374
375 # register our callbacks
375 # register our callbacks
376 self.query.on_recv(self.dispatch_query)
376 self.query.on_recv(self.dispatch_query)
377 self.monitor.on_recv(self.dispatch_monitor_traffic)
377 self.monitor.on_recv(self.dispatch_monitor_traffic)
378
378
379 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
379 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
380 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
380 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
381
381
382 self.monitor_handlers = {b'in' : self.save_queue_request,
382 self.monitor_handlers = {b'in' : self.save_queue_request,
383 b'out': self.save_queue_result,
383 b'out': self.save_queue_result,
384 b'intask': self.save_task_request,
384 b'intask': self.save_task_request,
385 b'outtask': self.save_task_result,
385 b'outtask': self.save_task_result,
386 b'tracktask': self.save_task_destination,
386 b'tracktask': self.save_task_destination,
387 b'incontrol': _passer,
387 b'incontrol': _passer,
388 b'outcontrol': _passer,
388 b'outcontrol': _passer,
389 b'iopub': self.save_iopub_message,
389 b'iopub': self.save_iopub_message,
390 }
390 }
391
391
392 self.query_handlers = {'queue_request': self.queue_status,
392 self.query_handlers = {'queue_request': self.queue_status,
393 'result_request': self.get_results,
393 'result_request': self.get_results,
394 'history_request': self.get_history,
394 'history_request': self.get_history,
395 'db_request': self.db_query,
395 'db_request': self.db_query,
396 'purge_request': self.purge_results,
396 'purge_request': self.purge_results,
397 'load_request': self.check_load,
397 'load_request': self.check_load,
398 'resubmit_request': self.resubmit_task,
398 'resubmit_request': self.resubmit_task,
399 'shutdown_request': self.shutdown_request,
399 'shutdown_request': self.shutdown_request,
400 'registration_request' : self.register_engine,
400 'registration_request' : self.register_engine,
401 'unregistration_request' : self.unregister_engine,
401 'unregistration_request' : self.unregister_engine,
402 'connection_request': self.connection_request,
402 'connection_request': self.connection_request,
403 }
403 }
404
404
405 # ignore resubmit replies
405 # ignore resubmit replies
406 self.resubmit.on_recv(lambda msg: None, copy=False)
406 self.resubmit.on_recv(lambda msg: None, copy=False)
407
407
408 self.log.info("hub::created hub")
408 self.log.info("hub::created hub")
409
409
410 @property
410 @property
411 def _next_id(self):
411 def _next_id(self):
412 """gemerate a new ID.
412 """gemerate a new ID.
413
413
414 No longer reuse old ids, just count from 0."""
414 No longer reuse old ids, just count from 0."""
415 newid = self._idcounter
415 newid = self._idcounter
416 self._idcounter += 1
416 self._idcounter += 1
417 return newid
417 return newid
418 # newid = 0
418 # newid = 0
419 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
419 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
420 # # print newid, self.ids, self.incoming_registrations
420 # # print newid, self.ids, self.incoming_registrations
421 # while newid in self.ids or newid in incoming:
421 # while newid in self.ids or newid in incoming:
422 # newid += 1
422 # newid += 1
423 # return newid
423 # return newid
424
424
425 #-----------------------------------------------------------------------------
425 #-----------------------------------------------------------------------------
426 # message validation
426 # message validation
427 #-----------------------------------------------------------------------------
427 #-----------------------------------------------------------------------------
428
428
429 def _validate_targets(self, targets):
429 def _validate_targets(self, targets):
430 """turn any valid targets argument into a list of integer ids"""
430 """turn any valid targets argument into a list of integer ids"""
431 if targets is None:
431 if targets is None:
432 # default to all
432 # default to all
433 return self.ids
433 return self.ids
434
434
435 if isinstance(targets, (int,str,unicode)):
435 if isinstance(targets, (int,str,unicode)):
436 # only one target specified
436 # only one target specified
437 targets = [targets]
437 targets = [targets]
438 _targets = []
438 _targets = []
439 for t in targets:
439 for t in targets:
440 # map raw identities to ids
440 # map raw identities to ids
441 if isinstance(t, (str,unicode)):
441 if isinstance(t, (str,unicode)):
442 t = self.by_ident.get(t, t)
442 t = self.by_ident.get(t, t)
443 _targets.append(t)
443 _targets.append(t)
444 targets = _targets
444 targets = _targets
445 bad_targets = [ t for t in targets if t not in self.ids ]
445 bad_targets = [ t for t in targets if t not in self.ids ]
446 if bad_targets:
446 if bad_targets:
447 raise IndexError("No Such Engine: %r" % bad_targets)
447 raise IndexError("No Such Engine: %r" % bad_targets)
448 if not targets:
448 if not targets:
449 raise IndexError("No Engines Registered")
449 raise IndexError("No Engines Registered")
450 return targets
450 return targets
451
451
452 #-----------------------------------------------------------------------------
452 #-----------------------------------------------------------------------------
453 # dispatch methods (1 per stream)
453 # dispatch methods (1 per stream)
454 #-----------------------------------------------------------------------------
454 #-----------------------------------------------------------------------------
455
455
456
456
457 @util.log_errors
457 def dispatch_monitor_traffic(self, msg):
458 def dispatch_monitor_traffic(self, msg):
458 """all ME and Task queue messages come through here, as well as
459 """all ME and Task queue messages come through here, as well as
459 IOPub traffic."""
460 IOPub traffic."""
460 self.log.debug("monitor traffic: %r", msg[0])
461 self.log.debug("monitor traffic: %r", msg[0])
461 switch = msg[0]
462 switch = msg[0]
462 try:
463 try:
463 idents, msg = self.session.feed_identities(msg[1:])
464 idents, msg = self.session.feed_identities(msg[1:])
464 except ValueError:
465 except ValueError:
465 idents=[]
466 idents=[]
466 if not idents:
467 if not idents:
467 self.log.error("Bad Monitor Message: %r", msg)
468 self.log.error("Bad Monitor Message: %r", msg)
468 return
469 return
469 handler = self.monitor_handlers.get(switch, None)
470 handler = self.monitor_handlers.get(switch, None)
470 if handler is not None:
471 if handler is not None:
471 handler(idents, msg)
472 handler(idents, msg)
472 else:
473 else:
473 self.log.error("Invalid monitor topic: %r", switch)
474 self.log.error("Invalid monitor topic: %r", switch)
474
475
475
476
477 @util.log_errors
476 def dispatch_query(self, msg):
478 def dispatch_query(self, msg):
477 """Route registration requests and queries from clients."""
479 """Route registration requests and queries from clients."""
478 try:
480 try:
479 idents, msg = self.session.feed_identities(msg)
481 idents, msg = self.session.feed_identities(msg)
480 except ValueError:
482 except ValueError:
481 idents = []
483 idents = []
482 if not idents:
484 if not idents:
483 self.log.error("Bad Query Message: %r", msg)
485 self.log.error("Bad Query Message: %r", msg)
484 return
486 return
485 client_id = idents[0]
487 client_id = idents[0]
486 try:
488 try:
487 msg = self.session.unserialize(msg, content=True)
489 msg = self.session.unserialize(msg, content=True)
488 except Exception:
490 except Exception:
489 content = error.wrap_exception()
491 content = error.wrap_exception()
490 self.log.error("Bad Query Message: %r", msg, exc_info=True)
492 self.log.error("Bad Query Message: %r", msg, exc_info=True)
491 self.session.send(self.query, "hub_error", ident=client_id,
493 self.session.send(self.query, "hub_error", ident=client_id,
492 content=content)
494 content=content)
493 return
495 return
494 # print client_id, header, parent, content
496 # print client_id, header, parent, content
495 #switch on message type:
497 #switch on message type:
496 msg_type = msg['header']['msg_type']
498 msg_type = msg['header']['msg_type']
497 self.log.info("client::client %r requested %r", client_id, msg_type)
499 self.log.info("client::client %r requested %r", client_id, msg_type)
498 handler = self.query_handlers.get(msg_type, None)
500 handler = self.query_handlers.get(msg_type, None)
499 try:
501 try:
500 assert handler is not None, "Bad Message Type: %r" % msg_type
502 assert handler is not None, "Bad Message Type: %r" % msg_type
501 except:
503 except:
502 content = error.wrap_exception()
504 content = error.wrap_exception()
503 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
505 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
504 self.session.send(self.query, "hub_error", ident=client_id,
506 self.session.send(self.query, "hub_error", ident=client_id,
505 content=content)
507 content=content)
506 return
508 return
507
509
508 else:
510 else:
509 handler(idents, msg)
511 handler(idents, msg)
510
512
511 def dispatch_db(self, msg):
513 def dispatch_db(self, msg):
512 """"""
514 """"""
513 raise NotImplementedError
515 raise NotImplementedError
514
516
515 #---------------------------------------------------------------------------
517 #---------------------------------------------------------------------------
516 # handler methods (1 per event)
518 # handler methods (1 per event)
517 #---------------------------------------------------------------------------
519 #---------------------------------------------------------------------------
518
520
519 #----------------------- Heartbeat --------------------------------------
521 #----------------------- Heartbeat --------------------------------------
520
522
521 def handle_new_heart(self, heart):
523 def handle_new_heart(self, heart):
522 """handler to attach to heartbeater.
524 """handler to attach to heartbeater.
523 Called when a new heart starts to beat.
525 Called when a new heart starts to beat.
524 Triggers completion of registration."""
526 Triggers completion of registration."""
525 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
527 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
526 if heart not in self.incoming_registrations:
528 if heart not in self.incoming_registrations:
527 self.log.info("heartbeat::ignoring new heart: %r", heart)
529 self.log.info("heartbeat::ignoring new heart: %r", heart)
528 else:
530 else:
529 self.finish_registration(heart)
531 self.finish_registration(heart)
530
532
531
533
532 def handle_heart_failure(self, heart):
534 def handle_heart_failure(self, heart):
533 """handler to attach to heartbeater.
535 """handler to attach to heartbeater.
534 called when a previously registered heart fails to respond to beat request.
536 called when a previously registered heart fails to respond to beat request.
535 triggers unregistration"""
537 triggers unregistration"""
536 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
538 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
537 eid = self.hearts.get(heart, None)
539 eid = self.hearts.get(heart, None)
538 queue = self.engines[eid].queue
540 queue = self.engines[eid].queue
539 if eid is None or self.keytable[eid] in self.dead_engines:
541 if eid is None or self.keytable[eid] in self.dead_engines:
540 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
542 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
541 else:
543 else:
542 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
544 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
543
545
544 #----------------------- MUX Queue Traffic ------------------------------
546 #----------------------- MUX Queue Traffic ------------------------------
545
547
546 def save_queue_request(self, idents, msg):
548 def save_queue_request(self, idents, msg):
547 if len(idents) < 2:
549 if len(idents) < 2:
548 self.log.error("invalid identity prefix: %r", idents)
550 self.log.error("invalid identity prefix: %r", idents)
549 return
551 return
550 queue_id, client_id = idents[:2]
552 queue_id, client_id = idents[:2]
551 try:
553 try:
552 msg = self.session.unserialize(msg)
554 msg = self.session.unserialize(msg)
553 except Exception:
555 except Exception:
554 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
556 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
555 return
557 return
556
558
557 eid = self.by_ident.get(queue_id, None)
559 eid = self.by_ident.get(queue_id, None)
558 if eid is None:
560 if eid is None:
559 self.log.error("queue::target %r not registered", queue_id)
561 self.log.error("queue::target %r not registered", queue_id)
560 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
562 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
561 return
563 return
562 record = init_record(msg)
564 record = init_record(msg)
563 msg_id = record['msg_id']
565 msg_id = record['msg_id']
564 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
566 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
565 # Unicode in records
567 # Unicode in records
566 record['engine_uuid'] = queue_id.decode('ascii')
568 record['engine_uuid'] = queue_id.decode('ascii')
567 record['client_uuid'] = client_id.decode('ascii')
569 record['client_uuid'] = client_id.decode('ascii')
568 record['queue'] = 'mux'
570 record['queue'] = 'mux'
569
571
570 try:
572 try:
571 # it's posible iopub arrived first:
573 # it's posible iopub arrived first:
572 existing = self.db.get_record(msg_id)
574 existing = self.db.get_record(msg_id)
573 for key,evalue in existing.iteritems():
575 for key,evalue in existing.iteritems():
574 rvalue = record.get(key, None)
576 rvalue = record.get(key, None)
575 if evalue and rvalue and evalue != rvalue:
577 if evalue and rvalue and evalue != rvalue:
576 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
578 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
577 elif evalue and not rvalue:
579 elif evalue and not rvalue:
578 record[key] = evalue
580 record[key] = evalue
579 try:
581 try:
580 self.db.update_record(msg_id, record)
582 self.db.update_record(msg_id, record)
581 except Exception:
583 except Exception:
582 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
584 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
583 except KeyError:
585 except KeyError:
584 try:
586 try:
585 self.db.add_record(msg_id, record)
587 self.db.add_record(msg_id, record)
586 except Exception:
588 except Exception:
587 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
589 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
588
590
589
591
590 self.pending.add(msg_id)
592 self.pending.add(msg_id)
591 self.queues[eid].append(msg_id)
593 self.queues[eid].append(msg_id)
592
594
593 def save_queue_result(self, idents, msg):
595 def save_queue_result(self, idents, msg):
594 if len(idents) < 2:
596 if len(idents) < 2:
595 self.log.error("invalid identity prefix: %r", idents)
597 self.log.error("invalid identity prefix: %r", idents)
596 return
598 return
597
599
598 client_id, queue_id = idents[:2]
600 client_id, queue_id = idents[:2]
599 try:
601 try:
600 msg = self.session.unserialize(msg)
602 msg = self.session.unserialize(msg)
601 except Exception:
603 except Exception:
602 self.log.error("queue::engine %r sent invalid message to %r: %r",
604 self.log.error("queue::engine %r sent invalid message to %r: %r",
603 queue_id, client_id, msg, exc_info=True)
605 queue_id, client_id, msg, exc_info=True)
604 return
606 return
605
607
606 eid = self.by_ident.get(queue_id, None)
608 eid = self.by_ident.get(queue_id, None)
607 if eid is None:
609 if eid is None:
608 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
610 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
609 return
611 return
610
612
611 parent = msg['parent_header']
613 parent = msg['parent_header']
612 if not parent:
614 if not parent:
613 return
615 return
614 msg_id = parent['msg_id']
616 msg_id = parent['msg_id']
615 if msg_id in self.pending:
617 if msg_id in self.pending:
616 self.pending.remove(msg_id)
618 self.pending.remove(msg_id)
617 self.all_completed.add(msg_id)
619 self.all_completed.add(msg_id)
618 self.queues[eid].remove(msg_id)
620 self.queues[eid].remove(msg_id)
619 self.completed[eid].append(msg_id)
621 self.completed[eid].append(msg_id)
620 self.log.info("queue::request %r completed on %s", msg_id, eid)
622 self.log.info("queue::request %r completed on %s", msg_id, eid)
621 elif msg_id not in self.all_completed:
623 elif msg_id not in self.all_completed:
622 # it could be a result from a dead engine that died before delivering the
624 # it could be a result from a dead engine that died before delivering the
623 # result
625 # result
624 self.log.warn("queue:: unknown msg finished %r", msg_id)
626 self.log.warn("queue:: unknown msg finished %r", msg_id)
625 return
627 return
626 # update record anyway, because the unregistration could have been premature
628 # update record anyway, because the unregistration could have been premature
627 rheader = msg['header']
629 rheader = msg['header']
628 completed = rheader['date']
630 completed = rheader['date']
629 started = rheader.get('started', None)
631 started = rheader.get('started', None)
630 result = {
632 result = {
631 'result_header' : rheader,
633 'result_header' : rheader,
632 'result_content': msg['content'],
634 'result_content': msg['content'],
633 'started' : started,
635 'started' : started,
634 'completed' : completed
636 'completed' : completed
635 }
637 }
636
638
637 result['result_buffers'] = msg['buffers']
639 result['result_buffers'] = msg['buffers']
638 try:
640 try:
639 self.db.update_record(msg_id, result)
641 self.db.update_record(msg_id, result)
640 except Exception:
642 except Exception:
641 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
643 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
642
644
643
645
644 #--------------------- Task Queue Traffic ------------------------------
646 #--------------------- Task Queue Traffic ------------------------------
645
647
646 def save_task_request(self, idents, msg):
648 def save_task_request(self, idents, msg):
647 """Save the submission of a task."""
649 """Save the submission of a task."""
648 client_id = idents[0]
650 client_id = idents[0]
649
651
650 try:
652 try:
651 msg = self.session.unserialize(msg)
653 msg = self.session.unserialize(msg)
652 except Exception:
654 except Exception:
653 self.log.error("task::client %r sent invalid task message: %r",
655 self.log.error("task::client %r sent invalid task message: %r",
654 client_id, msg, exc_info=True)
656 client_id, msg, exc_info=True)
655 return
657 return
656 record = init_record(msg)
658 record = init_record(msg)
657
659
658 record['client_uuid'] = client_id.decode('ascii')
660 record['client_uuid'] = client_id.decode('ascii')
659 record['queue'] = 'task'
661 record['queue'] = 'task'
660 header = msg['header']
662 header = msg['header']
661 msg_id = header['msg_id']
663 msg_id = header['msg_id']
662 self.pending.add(msg_id)
664 self.pending.add(msg_id)
663 self.unassigned.add(msg_id)
665 self.unassigned.add(msg_id)
664 try:
666 try:
665 # it's posible iopub arrived first:
667 # it's posible iopub arrived first:
666 existing = self.db.get_record(msg_id)
668 existing = self.db.get_record(msg_id)
667 if existing['resubmitted']:
669 if existing['resubmitted']:
668 for key in ('submitted', 'client_uuid', 'buffers'):
670 for key in ('submitted', 'client_uuid', 'buffers'):
669 # don't clobber these keys on resubmit
671 # don't clobber these keys on resubmit
670 # submitted and client_uuid should be different
672 # submitted and client_uuid should be different
671 # and buffers might be big, and shouldn't have changed
673 # and buffers might be big, and shouldn't have changed
672 record.pop(key)
674 record.pop(key)
673 # still check content,header which should not change
675 # still check content,header which should not change
674 # but are not expensive to compare as buffers
676 # but are not expensive to compare as buffers
675
677
676 for key,evalue in existing.iteritems():
678 for key,evalue in existing.iteritems():
677 if key.endswith('buffers'):
679 if key.endswith('buffers'):
678 # don't compare buffers
680 # don't compare buffers
679 continue
681 continue
680 rvalue = record.get(key, None)
682 rvalue = record.get(key, None)
681 if evalue and rvalue and evalue != rvalue:
683 if evalue and rvalue and evalue != rvalue:
682 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
684 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
683 elif evalue and not rvalue:
685 elif evalue and not rvalue:
684 record[key] = evalue
686 record[key] = evalue
685 try:
687 try:
686 self.db.update_record(msg_id, record)
688 self.db.update_record(msg_id, record)
687 except Exception:
689 except Exception:
688 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
690 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
689 except KeyError:
691 except KeyError:
690 try:
692 try:
691 self.db.add_record(msg_id, record)
693 self.db.add_record(msg_id, record)
692 except Exception:
694 except Exception:
693 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
695 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
694 except Exception:
696 except Exception:
695 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
697 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
696
698
697 def save_task_result(self, idents, msg):
699 def save_task_result(self, idents, msg):
698 """save the result of a completed task."""
700 """save the result of a completed task."""
699 client_id = idents[0]
701 client_id = idents[0]
700 try:
702 try:
701 msg = self.session.unserialize(msg)
703 msg = self.session.unserialize(msg)
702 except Exception:
704 except Exception:
703 self.log.error("task::invalid task result message send to %r: %r",
705 self.log.error("task::invalid task result message send to %r: %r",
704 client_id, msg, exc_info=True)
706 client_id, msg, exc_info=True)
705 return
707 return
706
708
707 parent = msg['parent_header']
709 parent = msg['parent_header']
708 if not parent:
710 if not parent:
709 # print msg
711 # print msg
710 self.log.warn("Task %r had no parent!", msg)
712 self.log.warn("Task %r had no parent!", msg)
711 return
713 return
712 msg_id = parent['msg_id']
714 msg_id = parent['msg_id']
713 if msg_id in self.unassigned:
715 if msg_id in self.unassigned:
714 self.unassigned.remove(msg_id)
716 self.unassigned.remove(msg_id)
715
717
716 header = msg['header']
718 header = msg['header']
717 engine_uuid = header.get('engine', None)
719 engine_uuid = header.get('engine', None)
718 eid = self.by_ident.get(engine_uuid, None)
720 eid = self.by_ident.get(engine_uuid, None)
719
721
720 if msg_id in self.pending:
722 if msg_id in self.pending:
721 self.log.info("task::task %r finished on %s", msg_id, eid)
723 self.log.info("task::task %r finished on %s", msg_id, eid)
722 self.pending.remove(msg_id)
724 self.pending.remove(msg_id)
723 self.all_completed.add(msg_id)
725 self.all_completed.add(msg_id)
724 if eid is not None:
726 if eid is not None:
725 self.completed[eid].append(msg_id)
727 self.completed[eid].append(msg_id)
726 if msg_id in self.tasks[eid]:
728 if msg_id in self.tasks[eid]:
727 self.tasks[eid].remove(msg_id)
729 self.tasks[eid].remove(msg_id)
728 completed = header['date']
730 completed = header['date']
729 started = header.get('started', None)
731 started = header.get('started', None)
730 result = {
732 result = {
731 'result_header' : header,
733 'result_header' : header,
732 'result_content': msg['content'],
734 'result_content': msg['content'],
733 'started' : started,
735 'started' : started,
734 'completed' : completed,
736 'completed' : completed,
735 'engine_uuid': engine_uuid
737 'engine_uuid': engine_uuid
736 }
738 }
737
739
738 result['result_buffers'] = msg['buffers']
740 result['result_buffers'] = msg['buffers']
739 try:
741 try:
740 self.db.update_record(msg_id, result)
742 self.db.update_record(msg_id, result)
741 except Exception:
743 except Exception:
742 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
744 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
743
745
744 else:
746 else:
745 self.log.debug("task::unknown task %r finished", msg_id)
747 self.log.debug("task::unknown task %r finished", msg_id)
746
748
747 def save_task_destination(self, idents, msg):
749 def save_task_destination(self, idents, msg):
748 try:
750 try:
749 msg = self.session.unserialize(msg, content=True)
751 msg = self.session.unserialize(msg, content=True)
750 except Exception:
752 except Exception:
751 self.log.error("task::invalid task tracking message", exc_info=True)
753 self.log.error("task::invalid task tracking message", exc_info=True)
752 return
754 return
753 content = msg['content']
755 content = msg['content']
754 # print (content)
756 # print (content)
755 msg_id = content['msg_id']
757 msg_id = content['msg_id']
756 engine_uuid = content['engine_id']
758 engine_uuid = content['engine_id']
757 eid = self.by_ident[util.asbytes(engine_uuid)]
759 eid = self.by_ident[util.asbytes(engine_uuid)]
758
760
759 self.log.info("task::task %r arrived on %r", msg_id, eid)
761 self.log.info("task::task %r arrived on %r", msg_id, eid)
760 if msg_id in self.unassigned:
762 if msg_id in self.unassigned:
761 self.unassigned.remove(msg_id)
763 self.unassigned.remove(msg_id)
762 # else:
764 # else:
763 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
765 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
764
766
765 self.tasks[eid].append(msg_id)
767 self.tasks[eid].append(msg_id)
766 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
768 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
767 try:
769 try:
768 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
770 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
769 except Exception:
771 except Exception:
770 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
772 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
771
773
772
774
773 def mia_task_request(self, idents, msg):
775 def mia_task_request(self, idents, msg):
774 raise NotImplementedError
776 raise NotImplementedError
775 client_id = idents[0]
777 client_id = idents[0]
776 # content = dict(mia=self.mia,status='ok')
778 # content = dict(mia=self.mia,status='ok')
777 # self.session.send('mia_reply', content=content, idents=client_id)
779 # self.session.send('mia_reply', content=content, idents=client_id)
778
780
779
781
780 #--------------------- IOPub Traffic ------------------------------
782 #--------------------- IOPub Traffic ------------------------------
781
783
782 def save_iopub_message(self, topics, msg):
784 def save_iopub_message(self, topics, msg):
783 """save an iopub message into the db"""
785 """save an iopub message into the db"""
784 # print (topics)
786 # print (topics)
785 try:
787 try:
786 msg = self.session.unserialize(msg, content=True)
788 msg = self.session.unserialize(msg, content=True)
787 except Exception:
789 except Exception:
788 self.log.error("iopub::invalid IOPub message", exc_info=True)
790 self.log.error("iopub::invalid IOPub message", exc_info=True)
789 return
791 return
790
792
791 parent = msg['parent_header']
793 parent = msg['parent_header']
792 if not parent:
794 if not parent:
793 self.log.error("iopub::invalid IOPub message: %r", msg)
795 self.log.error("iopub::invalid IOPub message: %r", msg)
794 return
796 return
795 msg_id = parent['msg_id']
797 msg_id = parent['msg_id']
796 msg_type = msg['header']['msg_type']
798 msg_type = msg['header']['msg_type']
797 content = msg['content']
799 content = msg['content']
798
800
799 # ensure msg_id is in db
801 # ensure msg_id is in db
800 try:
802 try:
801 rec = self.db.get_record(msg_id)
803 rec = self.db.get_record(msg_id)
802 except KeyError:
804 except KeyError:
803 rec = empty_record()
805 rec = empty_record()
804 rec['msg_id'] = msg_id
806 rec['msg_id'] = msg_id
805 self.db.add_record(msg_id, rec)
807 self.db.add_record(msg_id, rec)
806 # stream
808 # stream
807 d = {}
809 d = {}
808 if msg_type == 'stream':
810 if msg_type == 'stream':
809 name = content['name']
811 name = content['name']
810 s = rec[name] or ''
812 s = rec[name] or ''
811 d[name] = s + content['data']
813 d[name] = s + content['data']
812
814
813 elif msg_type == 'pyerr':
815 elif msg_type == 'pyerr':
814 d['pyerr'] = content
816 d['pyerr'] = content
815 elif msg_type == 'pyin':
817 elif msg_type == 'pyin':
816 d['pyin'] = content['code']
818 d['pyin'] = content['code']
817 else:
819 else:
818 d[msg_type] = content.get('data', '')
820 d[msg_type] = content.get('data', '')
819
821
820 try:
822 try:
821 self.db.update_record(msg_id, d)
823 self.db.update_record(msg_id, d)
822 except Exception:
824 except Exception:
823 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
825 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
824
826
825
827
826
828
827 #-------------------------------------------------------------------------
829 #-------------------------------------------------------------------------
828 # Registration requests
830 # Registration requests
829 #-------------------------------------------------------------------------
831 #-------------------------------------------------------------------------
830
832
831 def connection_request(self, client_id, msg):
833 def connection_request(self, client_id, msg):
832 """Reply with connection addresses for clients."""
834 """Reply with connection addresses for clients."""
833 self.log.info("client::client %r connected", client_id)
835 self.log.info("client::client %r connected", client_id)
834 content = dict(status='ok')
836 content = dict(status='ok')
835 content.update(self.client_info)
837 content.update(self.client_info)
836 jsonable = {}
838 jsonable = {}
837 for k,v in self.keytable.iteritems():
839 for k,v in self.keytable.iteritems():
838 if v not in self.dead_engines:
840 if v not in self.dead_engines:
839 jsonable[str(k)] = v.decode('ascii')
841 jsonable[str(k)] = v.decode('ascii')
840 content['engines'] = jsonable
842 content['engines'] = jsonable
841 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
843 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
842
844
843 def register_engine(self, reg, msg):
845 def register_engine(self, reg, msg):
844 """Register a new engine."""
846 """Register a new engine."""
845 content = msg['content']
847 content = msg['content']
846 try:
848 try:
847 queue = util.asbytes(content['queue'])
849 queue = util.asbytes(content['queue'])
848 except KeyError:
850 except KeyError:
849 self.log.error("registration::queue not specified", exc_info=True)
851 self.log.error("registration::queue not specified", exc_info=True)
850 return
852 return
851 heart = content.get('heartbeat', None)
853 heart = content.get('heartbeat', None)
852 if heart:
854 if heart:
853 heart = util.asbytes(heart)
855 heart = util.asbytes(heart)
854 """register a new engine, and create the socket(s) necessary"""
856 """register a new engine, and create the socket(s) necessary"""
855 eid = self._next_id
857 eid = self._next_id
856 # print (eid, queue, reg, heart)
858 # print (eid, queue, reg, heart)
857
859
858 self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart)
860 self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart)
859
861
860 content = dict(id=eid,status='ok')
862 content = dict(id=eid,status='ok')
861 content.update(self.engine_info)
863 content.update(self.engine_info)
862 # check if requesting available IDs:
864 # check if requesting available IDs:
863 if queue in self.by_ident:
865 if queue in self.by_ident:
864 try:
866 try:
865 raise KeyError("queue_id %r in use" % queue)
867 raise KeyError("queue_id %r in use" % queue)
866 except:
868 except:
867 content = error.wrap_exception()
869 content = error.wrap_exception()
868 self.log.error("queue_id %r in use", queue, exc_info=True)
870 self.log.error("queue_id %r in use", queue, exc_info=True)
869 elif heart in self.hearts: # need to check unique hearts?
871 elif heart in self.hearts: # need to check unique hearts?
870 try:
872 try:
871 raise KeyError("heart_id %r in use" % heart)
873 raise KeyError("heart_id %r in use" % heart)
872 except:
874 except:
873 self.log.error("heart_id %r in use", heart, exc_info=True)
875 self.log.error("heart_id %r in use", heart, exc_info=True)
874 content = error.wrap_exception()
876 content = error.wrap_exception()
875 else:
877 else:
876 for h, pack in self.incoming_registrations.iteritems():
878 for h, pack in self.incoming_registrations.iteritems():
877 if heart == h:
879 if heart == h:
878 try:
880 try:
879 raise KeyError("heart_id %r in use" % heart)
881 raise KeyError("heart_id %r in use" % heart)
880 except:
882 except:
881 self.log.error("heart_id %r in use", heart, exc_info=True)
883 self.log.error("heart_id %r in use", heart, exc_info=True)
882 content = error.wrap_exception()
884 content = error.wrap_exception()
883 break
885 break
884 elif queue == pack[1]:
886 elif queue == pack[1]:
885 try:
887 try:
886 raise KeyError("queue_id %r in use" % queue)
888 raise KeyError("queue_id %r in use" % queue)
887 except:
889 except:
888 self.log.error("queue_id %r in use", queue, exc_info=True)
890 self.log.error("queue_id %r in use", queue, exc_info=True)
889 content = error.wrap_exception()
891 content = error.wrap_exception()
890 break
892 break
891
893
892 msg = self.session.send(self.query, "registration_reply",
894 msg = self.session.send(self.query, "registration_reply",
893 content=content,
895 content=content,
894 ident=reg)
896 ident=reg)
895
897
896 if content['status'] == 'ok':
898 if content['status'] == 'ok':
897 if heart in self.heartmonitor.hearts:
899 if heart in self.heartmonitor.hearts:
898 # already beating
900 # already beating
899 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
901 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
900 self.finish_registration(heart)
902 self.finish_registration(heart)
901 else:
903 else:
902 purge = lambda : self._purge_stalled_registration(heart)
904 purge = lambda : self._purge_stalled_registration(heart)
903 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
905 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
904 dc.start()
906 dc.start()
905 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
907 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
906 else:
908 else:
907 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
909 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
908 return eid
910 return eid
909
911
910 def unregister_engine(self, ident, msg):
912 def unregister_engine(self, ident, msg):
911 """Unregister an engine that explicitly requested to leave."""
913 """Unregister an engine that explicitly requested to leave."""
912 try:
914 try:
913 eid = msg['content']['id']
915 eid = msg['content']['id']
914 except:
916 except:
915 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
917 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
916 return
918 return
917 self.log.info("registration::unregister_engine(%r)", eid)
919 self.log.info("registration::unregister_engine(%r)", eid)
918 # print (eid)
920 # print (eid)
919 uuid = self.keytable[eid]
921 uuid = self.keytable[eid]
920 content=dict(id=eid, queue=uuid.decode('ascii'))
922 content=dict(id=eid, queue=uuid.decode('ascii'))
921 self.dead_engines.add(uuid)
923 self.dead_engines.add(uuid)
922 # self.ids.remove(eid)
924 # self.ids.remove(eid)
923 # uuid = self.keytable.pop(eid)
925 # uuid = self.keytable.pop(eid)
924 #
926 #
925 # ec = self.engines.pop(eid)
927 # ec = self.engines.pop(eid)
926 # self.hearts.pop(ec.heartbeat)
928 # self.hearts.pop(ec.heartbeat)
927 # self.by_ident.pop(ec.queue)
929 # self.by_ident.pop(ec.queue)
928 # self.completed.pop(eid)
930 # self.completed.pop(eid)
929 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
931 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
930 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
932 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
931 dc.start()
933 dc.start()
932 ############## TODO: HANDLE IT ################
934 ############## TODO: HANDLE IT ################
933
935
934 if self.notifier:
936 if self.notifier:
935 self.session.send(self.notifier, "unregistration_notification", content=content)
937 self.session.send(self.notifier, "unregistration_notification", content=content)
936
938
937 def _handle_stranded_msgs(self, eid, uuid):
939 def _handle_stranded_msgs(self, eid, uuid):
938 """Handle messages known to be on an engine when the engine unregisters.
940 """Handle messages known to be on an engine when the engine unregisters.
939
941
940 It is possible that this will fire prematurely - that is, an engine will
942 It is possible that this will fire prematurely - that is, an engine will
941 go down after completing a result, and the client will be notified
943 go down after completing a result, and the client will be notified
942 that the result failed and later receive the actual result.
944 that the result failed and later receive the actual result.
943 """
945 """
944
946
945 outstanding = self.queues[eid]
947 outstanding = self.queues[eid]
946
948
947 for msg_id in outstanding:
949 for msg_id in outstanding:
948 self.pending.remove(msg_id)
950 self.pending.remove(msg_id)
949 self.all_completed.add(msg_id)
951 self.all_completed.add(msg_id)
950 try:
952 try:
951 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
953 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
952 except:
954 except:
953 content = error.wrap_exception()
955 content = error.wrap_exception()
954 # build a fake header:
956 # build a fake header:
955 header = {}
957 header = {}
956 header['engine'] = uuid
958 header['engine'] = uuid
957 header['date'] = datetime.now()
959 header['date'] = datetime.now()
958 rec = dict(result_content=content, result_header=header, result_buffers=[])
960 rec = dict(result_content=content, result_header=header, result_buffers=[])
959 rec['completed'] = header['date']
961 rec['completed'] = header['date']
960 rec['engine_uuid'] = uuid
962 rec['engine_uuid'] = uuid
961 try:
963 try:
962 self.db.update_record(msg_id, rec)
964 self.db.update_record(msg_id, rec)
963 except Exception:
965 except Exception:
964 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
966 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
965
967
966
968
967 def finish_registration(self, heart):
969 def finish_registration(self, heart):
968 """Second half of engine registration, called after our HeartMonitor
970 """Second half of engine registration, called after our HeartMonitor
969 has received a beat from the Engine's Heart."""
971 has received a beat from the Engine's Heart."""
970 try:
972 try:
971 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
973 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
972 except KeyError:
974 except KeyError:
973 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
975 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
974 return
976 return
975 self.log.info("registration::finished registering engine %i:%r", eid, queue)
977 self.log.info("registration::finished registering engine %i:%r", eid, queue)
976 if purge is not None:
978 if purge is not None:
977 purge.stop()
979 purge.stop()
978 control = queue
980 control = queue
979 self.ids.add(eid)
981 self.ids.add(eid)
980 self.keytable[eid] = queue
982 self.keytable[eid] = queue
981 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
983 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
982 control=control, heartbeat=heart)
984 control=control, heartbeat=heart)
983 self.by_ident[queue] = eid
985 self.by_ident[queue] = eid
984 self.queues[eid] = list()
986 self.queues[eid] = list()
985 self.tasks[eid] = list()
987 self.tasks[eid] = list()
986 self.completed[eid] = list()
988 self.completed[eid] = list()
987 self.hearts[heart] = eid
989 self.hearts[heart] = eid
988 content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
990 content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
989 if self.notifier:
991 if self.notifier:
990 self.session.send(self.notifier, "registration_notification", content=content)
992 self.session.send(self.notifier, "registration_notification", content=content)
991 self.log.info("engine::Engine Connected: %i", eid)
993 self.log.info("engine::Engine Connected: %i", eid)
992
994
993 def _purge_stalled_registration(self, heart):
995 def _purge_stalled_registration(self, heart):
994 if heart in self.incoming_registrations:
996 if heart in self.incoming_registrations:
995 eid = self.incoming_registrations.pop(heart)[0]
997 eid = self.incoming_registrations.pop(heart)[0]
996 self.log.info("registration::purging stalled registration: %i", eid)
998 self.log.info("registration::purging stalled registration: %i", eid)
997 else:
999 else:
998 pass
1000 pass
999
1001
1000 #-------------------------------------------------------------------------
1002 #-------------------------------------------------------------------------
1001 # Client Requests
1003 # Client Requests
1002 #-------------------------------------------------------------------------
1004 #-------------------------------------------------------------------------
1003
1005
1004 def shutdown_request(self, client_id, msg):
1006 def shutdown_request(self, client_id, msg):
1005 """handle shutdown request."""
1007 """handle shutdown request."""
1006 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1008 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1007 # also notify other clients of shutdown
1009 # also notify other clients of shutdown
1008 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1010 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1009 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1011 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1010 dc.start()
1012 dc.start()
1011
1013
1012 def _shutdown(self):
1014 def _shutdown(self):
1013 self.log.info("hub::hub shutting down.")
1015 self.log.info("hub::hub shutting down.")
1014 time.sleep(0.1)
1016 time.sleep(0.1)
1015 sys.exit(0)
1017 sys.exit(0)
1016
1018
1017
1019
1018 def check_load(self, client_id, msg):
1020 def check_load(self, client_id, msg):
1019 content = msg['content']
1021 content = msg['content']
1020 try:
1022 try:
1021 targets = content['targets']
1023 targets = content['targets']
1022 targets = self._validate_targets(targets)
1024 targets = self._validate_targets(targets)
1023 except:
1025 except:
1024 content = error.wrap_exception()
1026 content = error.wrap_exception()
1025 self.session.send(self.query, "hub_error",
1027 self.session.send(self.query, "hub_error",
1026 content=content, ident=client_id)
1028 content=content, ident=client_id)
1027 return
1029 return
1028
1030
1029 content = dict(status='ok')
1031 content = dict(status='ok')
1030 # loads = {}
1032 # loads = {}
1031 for t in targets:
1033 for t in targets:
1032 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1034 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1033 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1035 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1034
1036
1035
1037
1036 def queue_status(self, client_id, msg):
1038 def queue_status(self, client_id, msg):
1037 """Return the Queue status of one or more targets.
1039 """Return the Queue status of one or more targets.
1038 if verbose: return the msg_ids
1040 if verbose: return the msg_ids
1039 else: return len of each type.
1041 else: return len of each type.
1040 keys: queue (pending MUX jobs)
1042 keys: queue (pending MUX jobs)
1041 tasks (pending Task jobs)
1043 tasks (pending Task jobs)
1042 completed (finished jobs from both queues)"""
1044 completed (finished jobs from both queues)"""
1043 content = msg['content']
1045 content = msg['content']
1044 targets = content['targets']
1046 targets = content['targets']
1045 try:
1047 try:
1046 targets = self._validate_targets(targets)
1048 targets = self._validate_targets(targets)
1047 except:
1049 except:
1048 content = error.wrap_exception()
1050 content = error.wrap_exception()
1049 self.session.send(self.query, "hub_error",
1051 self.session.send(self.query, "hub_error",
1050 content=content, ident=client_id)
1052 content=content, ident=client_id)
1051 return
1053 return
1052 verbose = content.get('verbose', False)
1054 verbose = content.get('verbose', False)
1053 content = dict(status='ok')
1055 content = dict(status='ok')
1054 for t in targets:
1056 for t in targets:
1055 queue = self.queues[t]
1057 queue = self.queues[t]
1056 completed = self.completed[t]
1058 completed = self.completed[t]
1057 tasks = self.tasks[t]
1059 tasks = self.tasks[t]
1058 if not verbose:
1060 if not verbose:
1059 queue = len(queue)
1061 queue = len(queue)
1060 completed = len(completed)
1062 completed = len(completed)
1061 tasks = len(tasks)
1063 tasks = len(tasks)
1062 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1064 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1063 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1065 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1064 # print (content)
1066 # print (content)
1065 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1067 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1066
1068
1067 def purge_results(self, client_id, msg):
1069 def purge_results(self, client_id, msg):
1068 """Purge results from memory. This method is more valuable before we move
1070 """Purge results from memory. This method is more valuable before we move
1069 to a DB based message storage mechanism."""
1071 to a DB based message storage mechanism."""
1070 content = msg['content']
1072 content = msg['content']
1071 self.log.info("Dropping records with %s", content)
1073 self.log.info("Dropping records with %s", content)
1072 msg_ids = content.get('msg_ids', [])
1074 msg_ids = content.get('msg_ids', [])
1073 reply = dict(status='ok')
1075 reply = dict(status='ok')
1074 if msg_ids == 'all':
1076 if msg_ids == 'all':
1075 try:
1077 try:
1076 self.db.drop_matching_records(dict(completed={'$ne':None}))
1078 self.db.drop_matching_records(dict(completed={'$ne':None}))
1077 except Exception:
1079 except Exception:
1078 reply = error.wrap_exception()
1080 reply = error.wrap_exception()
1079 else:
1081 else:
1080 pending = filter(lambda m: m in self.pending, msg_ids)
1082 pending = filter(lambda m: m in self.pending, msg_ids)
1081 if pending:
1083 if pending:
1082 try:
1084 try:
1083 raise IndexError("msg pending: %r" % pending[0])
1085 raise IndexError("msg pending: %r" % pending[0])
1084 except:
1086 except:
1085 reply = error.wrap_exception()
1087 reply = error.wrap_exception()
1086 else:
1088 else:
1087 try:
1089 try:
1088 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1090 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1089 except Exception:
1091 except Exception:
1090 reply = error.wrap_exception()
1092 reply = error.wrap_exception()
1091
1093
1092 if reply['status'] == 'ok':
1094 if reply['status'] == 'ok':
1093 eids = content.get('engine_ids', [])
1095 eids = content.get('engine_ids', [])
1094 for eid in eids:
1096 for eid in eids:
1095 if eid not in self.engines:
1097 if eid not in self.engines:
1096 try:
1098 try:
1097 raise IndexError("No such engine: %i" % eid)
1099 raise IndexError("No such engine: %i" % eid)
1098 except:
1100 except:
1099 reply = error.wrap_exception()
1101 reply = error.wrap_exception()
1100 break
1102 break
1101 uid = self.engines[eid].queue
1103 uid = self.engines[eid].queue
1102 try:
1104 try:
1103 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1105 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1104 except Exception:
1106 except Exception:
1105 reply = error.wrap_exception()
1107 reply = error.wrap_exception()
1106 break
1108 break
1107
1109
1108 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1110 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1109
1111
1110 def resubmit_task(self, client_id, msg):
1112 def resubmit_task(self, client_id, msg):
1111 """Resubmit one or more tasks."""
1113 """Resubmit one or more tasks."""
1112 def finish(reply):
1114 def finish(reply):
1113 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1115 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1114
1116
1115 content = msg['content']
1117 content = msg['content']
1116 msg_ids = content['msg_ids']
1118 msg_ids = content['msg_ids']
1117 reply = dict(status='ok')
1119 reply = dict(status='ok')
1118 try:
1120 try:
1119 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1121 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1120 'header', 'content', 'buffers'])
1122 'header', 'content', 'buffers'])
1121 except Exception:
1123 except Exception:
1122 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1124 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1123 return finish(error.wrap_exception())
1125 return finish(error.wrap_exception())
1124
1126
1125 # validate msg_ids
1127 # validate msg_ids
1126 found_ids = [ rec['msg_id'] for rec in records ]
1128 found_ids = [ rec['msg_id'] for rec in records ]
1127 invalid_ids = filter(lambda m: m in self.pending, found_ids)
1129 invalid_ids = filter(lambda m: m in self.pending, found_ids)
1128 if len(records) > len(msg_ids):
1130 if len(records) > len(msg_ids):
1129 try:
1131 try:
1130 raise RuntimeError("DB appears to be in an inconsistent state."
1132 raise RuntimeError("DB appears to be in an inconsistent state."
1131 "More matching records were found than should exist")
1133 "More matching records were found than should exist")
1132 except Exception:
1134 except Exception:
1133 return finish(error.wrap_exception())
1135 return finish(error.wrap_exception())
1134 elif len(records) < len(msg_ids):
1136 elif len(records) < len(msg_ids):
1135 missing = [ m for m in msg_ids if m not in found_ids ]
1137 missing = [ m for m in msg_ids if m not in found_ids ]
1136 try:
1138 try:
1137 raise KeyError("No such msg(s): %r" % missing)
1139 raise KeyError("No such msg(s): %r" % missing)
1138 except KeyError:
1140 except KeyError:
1139 return finish(error.wrap_exception())
1141 return finish(error.wrap_exception())
1140 elif invalid_ids:
1142 elif invalid_ids:
1141 msg_id = invalid_ids[0]
1143 msg_id = invalid_ids[0]
1142 try:
1144 try:
1143 raise ValueError("Task %r appears to be inflight" % msg_id)
1145 raise ValueError("Task %r appears to be inflight" % msg_id)
1144 except Exception:
1146 except Exception:
1145 return finish(error.wrap_exception())
1147 return finish(error.wrap_exception())
1146
1148
1147 # clear the existing records
1149 # clear the existing records
1148 now = datetime.now()
1150 now = datetime.now()
1149 rec = empty_record()
1151 rec = empty_record()
1150 map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted'])
1152 map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted'])
1151 rec['resubmitted'] = now
1153 rec['resubmitted'] = now
1152 rec['queue'] = 'task'
1154 rec['queue'] = 'task'
1153 rec['client_uuid'] = client_id[0]
1155 rec['client_uuid'] = client_id[0]
1154 try:
1156 try:
1155 for msg_id in msg_ids:
1157 for msg_id in msg_ids:
1156 self.all_completed.discard(msg_id)
1158 self.all_completed.discard(msg_id)
1157 self.db.update_record(msg_id, rec)
1159 self.db.update_record(msg_id, rec)
1158 except Exception:
1160 except Exception:
1159 self.log.error('db::db error upating record', exc_info=True)
1161 self.log.error('db::db error upating record', exc_info=True)
1160 reply = error.wrap_exception()
1162 reply = error.wrap_exception()
1161 else:
1163 else:
1162 # send the messages
1164 # send the messages
1163 for rec in records:
1165 for rec in records:
1164 header = rec['header']
1166 header = rec['header']
1165 # include resubmitted in header to prevent digest collision
1167 # include resubmitted in header to prevent digest collision
1166 header['resubmitted'] = now
1168 header['resubmitted'] = now
1167 msg = self.session.msg(header['msg_type'])
1169 msg = self.session.msg(header['msg_type'])
1168 msg['content'] = rec['content']
1170 msg['content'] = rec['content']
1169 msg['header'] = header
1171 msg['header'] = header
1170 msg['header']['msg_id'] = rec['msg_id']
1172 msg['header']['msg_id'] = rec['msg_id']
1171 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1173 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1172
1174
1173 finish(dict(status='ok'))
1175 finish(dict(status='ok'))
1174
1176
1175
1177
1176 def _extract_record(self, rec):
1178 def _extract_record(self, rec):
1177 """decompose a TaskRecord dict into subsection of reply for get_result"""
1179 """decompose a TaskRecord dict into subsection of reply for get_result"""
1178 io_dict = {}
1180 io_dict = {}
1179 for key in 'pyin pyout pyerr stdout stderr'.split():
1181 for key in 'pyin pyout pyerr stdout stderr'.split():
1180 io_dict[key] = rec[key]
1182 io_dict[key] = rec[key]
1181 content = { 'result_content': rec['result_content'],
1183 content = { 'result_content': rec['result_content'],
1182 'header': rec['header'],
1184 'header': rec['header'],
1183 'result_header' : rec['result_header'],
1185 'result_header' : rec['result_header'],
1184 'io' : io_dict,
1186 'io' : io_dict,
1185 }
1187 }
1186 if rec['result_buffers']:
1188 if rec['result_buffers']:
1187 buffers = map(bytes, rec['result_buffers'])
1189 buffers = map(bytes, rec['result_buffers'])
1188 else:
1190 else:
1189 buffers = []
1191 buffers = []
1190
1192
1191 return content, buffers
1193 return content, buffers
1192
1194
1193 def get_results(self, client_id, msg):
1195 def get_results(self, client_id, msg):
1194 """Get the result of 1 or more messages."""
1196 """Get the result of 1 or more messages."""
1195 content = msg['content']
1197 content = msg['content']
1196 msg_ids = sorted(set(content['msg_ids']))
1198 msg_ids = sorted(set(content['msg_ids']))
1197 statusonly = content.get('status_only', False)
1199 statusonly = content.get('status_only', False)
1198 pending = []
1200 pending = []
1199 completed = []
1201 completed = []
1200 content = dict(status='ok')
1202 content = dict(status='ok')
1201 content['pending'] = pending
1203 content['pending'] = pending
1202 content['completed'] = completed
1204 content['completed'] = completed
1203 buffers = []
1205 buffers = []
1204 if not statusonly:
1206 if not statusonly:
1205 try:
1207 try:
1206 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1208 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1207 # turn match list into dict, for faster lookup
1209 # turn match list into dict, for faster lookup
1208 records = {}
1210 records = {}
1209 for rec in matches:
1211 for rec in matches:
1210 records[rec['msg_id']] = rec
1212 records[rec['msg_id']] = rec
1211 except Exception:
1213 except Exception:
1212 content = error.wrap_exception()
1214 content = error.wrap_exception()
1213 self.session.send(self.query, "result_reply", content=content,
1215 self.session.send(self.query, "result_reply", content=content,
1214 parent=msg, ident=client_id)
1216 parent=msg, ident=client_id)
1215 return
1217 return
1216 else:
1218 else:
1217 records = {}
1219 records = {}
1218 for msg_id in msg_ids:
1220 for msg_id in msg_ids:
1219 if msg_id in self.pending:
1221 if msg_id in self.pending:
1220 pending.append(msg_id)
1222 pending.append(msg_id)
1221 elif msg_id in self.all_completed:
1223 elif msg_id in self.all_completed:
1222 completed.append(msg_id)
1224 completed.append(msg_id)
1223 if not statusonly:
1225 if not statusonly:
1224 c,bufs = self._extract_record(records[msg_id])
1226 c,bufs = self._extract_record(records[msg_id])
1225 content[msg_id] = c
1227 content[msg_id] = c
1226 buffers.extend(bufs)
1228 buffers.extend(bufs)
1227 elif msg_id in records:
1229 elif msg_id in records:
1228 if rec['completed']:
1230 if rec['completed']:
1229 completed.append(msg_id)
1231 completed.append(msg_id)
1230 c,bufs = self._extract_record(records[msg_id])
1232 c,bufs = self._extract_record(records[msg_id])
1231 content[msg_id] = c
1233 content[msg_id] = c
1232 buffers.extend(bufs)
1234 buffers.extend(bufs)
1233 else:
1235 else:
1234 pending.append(msg_id)
1236 pending.append(msg_id)
1235 else:
1237 else:
1236 try:
1238 try:
1237 raise KeyError('No such message: '+msg_id)
1239 raise KeyError('No such message: '+msg_id)
1238 except:
1240 except:
1239 content = error.wrap_exception()
1241 content = error.wrap_exception()
1240 break
1242 break
1241 self.session.send(self.query, "result_reply", content=content,
1243 self.session.send(self.query, "result_reply", content=content,
1242 parent=msg, ident=client_id,
1244 parent=msg, ident=client_id,
1243 buffers=buffers)
1245 buffers=buffers)
1244
1246
1245 def get_history(self, client_id, msg):
1247 def get_history(self, client_id, msg):
1246 """Get a list of all msg_ids in our DB records"""
1248 """Get a list of all msg_ids in our DB records"""
1247 try:
1249 try:
1248 msg_ids = self.db.get_history()
1250 msg_ids = self.db.get_history()
1249 except Exception as e:
1251 except Exception as e:
1250 content = error.wrap_exception()
1252 content = error.wrap_exception()
1251 else:
1253 else:
1252 content = dict(status='ok', history=msg_ids)
1254 content = dict(status='ok', history=msg_ids)
1253
1255
1254 self.session.send(self.query, "history_reply", content=content,
1256 self.session.send(self.query, "history_reply", content=content,
1255 parent=msg, ident=client_id)
1257 parent=msg, ident=client_id)
1256
1258
1257 def db_query(self, client_id, msg):
1259 def db_query(self, client_id, msg):
1258 """Perform a raw query on the task record database."""
1260 """Perform a raw query on the task record database."""
1259 content = msg['content']
1261 content = msg['content']
1260 query = content.get('query', {})
1262 query = content.get('query', {})
1261 keys = content.get('keys', None)
1263 keys = content.get('keys', None)
1262 buffers = []
1264 buffers = []
1263 empty = list()
1265 empty = list()
1264 try:
1266 try:
1265 records = self.db.find_records(query, keys)
1267 records = self.db.find_records(query, keys)
1266 except Exception as e:
1268 except Exception as e:
1267 content = error.wrap_exception()
1269 content = error.wrap_exception()
1268 else:
1270 else:
1269 # extract buffers from reply content:
1271 # extract buffers from reply content:
1270 if keys is not None:
1272 if keys is not None:
1271 buffer_lens = [] if 'buffers' in keys else None
1273 buffer_lens = [] if 'buffers' in keys else None
1272 result_buffer_lens = [] if 'result_buffers' in keys else None
1274 result_buffer_lens = [] if 'result_buffers' in keys else None
1273 else:
1275 else:
1274 buffer_lens = None
1276 buffer_lens = None
1275 result_buffer_lens = None
1277 result_buffer_lens = None
1276
1278
1277 for rec in records:
1279 for rec in records:
1278 # buffers may be None, so double check
1280 # buffers may be None, so double check
1279 b = rec.pop('buffers', empty) or empty
1281 b = rec.pop('buffers', empty) or empty
1280 if buffer_lens is not None:
1282 if buffer_lens is not None:
1281 buffer_lens.append(len(b))
1283 buffer_lens.append(len(b))
1282 buffers.extend(b)
1284 buffers.extend(b)
1283 rb = rec.pop('result_buffers', empty) or empty
1285 rb = rec.pop('result_buffers', empty) or empty
1284 if result_buffer_lens is not None:
1286 if result_buffer_lens is not None:
1285 result_buffer_lens.append(len(rb))
1287 result_buffer_lens.append(len(rb))
1286 buffers.extend(rb)
1288 buffers.extend(rb)
1287 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1289 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1288 result_buffer_lens=result_buffer_lens)
1290 result_buffer_lens=result_buffer_lens)
1289 # self.log.debug (content)
1291 # self.log.debug (content)
1290 self.session.send(self.query, "db_reply", content=content,
1292 self.session.send(self.query, "db_reply", content=content,
1291 parent=msg, ident=client_id,
1293 parent=msg, ident=client_id,
1292 buffers=buffers)
1294 buffers=buffers)
1293
1295
@@ -1,759 +1,767 b''
1 """The Python scheduler for rich scheduling.
1 """The Python scheduler for rich scheduling.
2
2
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 Python Scheduler exists.
5 Python Scheduler exists.
6
6
7 Authors:
7 Authors:
8
8
9 * Min RK
9 * Min RK
10 """
10 """
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Copyright (C) 2010-2011 The IPython Development Team
12 # Copyright (C) 2010-2011 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 #----------------------------------------------------------------------
18 #----------------------------------------------------------------------
19 # Imports
19 # Imports
20 #----------------------------------------------------------------------
20 #----------------------------------------------------------------------
21
21
22 from __future__ import print_function
22 from __future__ import print_function
23
23
24 import logging
24 import logging
25 import sys
25 import sys
26 import time
26 import time
27
27
28 from datetime import datetime, timedelta
28 from datetime import datetime, timedelta
29 from random import randint, random
29 from random import randint, random
30 from types import FunctionType
30 from types import FunctionType
31
31
32 try:
32 try:
33 import numpy
33 import numpy
34 except ImportError:
34 except ImportError:
35 numpy = None
35 numpy = None
36
36
37 import zmq
37 import zmq
38 from zmq.eventloop import ioloop, zmqstream
38 from zmq.eventloop import ioloop, zmqstream
39
39
40 # local imports
40 # local imports
41 from IPython.external.decorator import decorator
41 from IPython.external.decorator import decorator
42 from IPython.config.application import Application
42 from IPython.config.application import Application
43 from IPython.config.loader import Config
43 from IPython.config.loader import Config
44 from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes
44 from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes
45
45
46 from IPython.parallel import error
46 from IPython.parallel import error, util
47 from IPython.parallel.factory import SessionFactory
47 from IPython.parallel.factory import SessionFactory
48 from IPython.parallel.util import connect_logger, local_logger, asbytes
48 from IPython.parallel.util import connect_logger, local_logger, asbytes
49
49
50 from .dependency import Dependency
50 from .dependency import Dependency
51
51
52 @decorator
52 @decorator
53 def logged(f,self,*args,**kwargs):
53 def logged(f,self,*args,**kwargs):
54 # print ("#--------------------")
54 # print ("#--------------------")
55 self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs)
55 self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs)
56 # print ("#--")
56 # print ("#--")
57 return f(self,*args, **kwargs)
57 return f(self,*args, **kwargs)
58
58
59 #----------------------------------------------------------------------
59 #----------------------------------------------------------------------
60 # Chooser functions
60 # Chooser functions
61 #----------------------------------------------------------------------
61 #----------------------------------------------------------------------
62
62
63 def plainrandom(loads):
63 def plainrandom(loads):
64 """Plain random pick."""
64 """Plain random pick."""
65 n = len(loads)
65 n = len(loads)
66 return randint(0,n-1)
66 return randint(0,n-1)
67
67
68 def lru(loads):
68 def lru(loads):
69 """Always pick the front of the line.
69 """Always pick the front of the line.
70
70
71 The content of `loads` is ignored.
71 The content of `loads` is ignored.
72
72
73 Assumes LRU ordering of loads, with oldest first.
73 Assumes LRU ordering of loads, with oldest first.
74 """
74 """
75 return 0
75 return 0
76
76
77 def twobin(loads):
77 def twobin(loads):
78 """Pick two at random, use the LRU of the two.
78 """Pick two at random, use the LRU of the two.
79
79
80 The content of loads is ignored.
80 The content of loads is ignored.
81
81
82 Assumes LRU ordering of loads, with oldest first.
82 Assumes LRU ordering of loads, with oldest first.
83 """
83 """
84 n = len(loads)
84 n = len(loads)
85 a = randint(0,n-1)
85 a = randint(0,n-1)
86 b = randint(0,n-1)
86 b = randint(0,n-1)
87 return min(a,b)
87 return min(a,b)
88
88
89 def weighted(loads):
89 def weighted(loads):
90 """Pick two at random using inverse load as weight.
90 """Pick two at random using inverse load as weight.
91
91
92 Return the less loaded of the two.
92 Return the less loaded of the two.
93 """
93 """
94 # weight 0 a million times more than 1:
94 # weight 0 a million times more than 1:
95 weights = 1./(1e-6+numpy.array(loads))
95 weights = 1./(1e-6+numpy.array(loads))
96 sums = weights.cumsum()
96 sums = weights.cumsum()
97 t = sums[-1]
97 t = sums[-1]
98 x = random()*t
98 x = random()*t
99 y = random()*t
99 y = random()*t
100 idx = 0
100 idx = 0
101 idy = 0
101 idy = 0
102 while sums[idx] < x:
102 while sums[idx] < x:
103 idx += 1
103 idx += 1
104 while sums[idy] < y:
104 while sums[idy] < y:
105 idy += 1
105 idy += 1
106 if weights[idy] > weights[idx]:
106 if weights[idy] > weights[idx]:
107 return idy
107 return idy
108 else:
108 else:
109 return idx
109 return idx
110
110
111 def leastload(loads):
111 def leastload(loads):
112 """Always choose the lowest load.
112 """Always choose the lowest load.
113
113
114 If the lowest load occurs more than once, the first
114 If the lowest load occurs more than once, the first
115 occurance will be used. If loads has LRU ordering, this means
115 occurance will be used. If loads has LRU ordering, this means
116 the LRU of those with the lowest load is chosen.
116 the LRU of those with the lowest load is chosen.
117 """
117 """
118 return loads.index(min(loads))
118 return loads.index(min(loads))
119
119
120 #---------------------------------------------------------------------
120 #---------------------------------------------------------------------
121 # Classes
121 # Classes
122 #---------------------------------------------------------------------
122 #---------------------------------------------------------------------
123
123
124
124
125 # store empty default dependency:
125 # store empty default dependency:
126 MET = Dependency([])
126 MET = Dependency([])
127
127
128
128
129 class Job(object):
129 class Job(object):
130 """Simple container for a job"""
130 """Simple container for a job"""
131 def __init__(self, msg_id, raw_msg, idents, msg, header, targets, after, follow, timeout):
131 def __init__(self, msg_id, raw_msg, idents, msg, header, targets, after, follow, timeout):
132 self.msg_id = msg_id
132 self.msg_id = msg_id
133 self.raw_msg = raw_msg
133 self.raw_msg = raw_msg
134 self.idents = idents
134 self.idents = idents
135 self.msg = msg
135 self.msg = msg
136 self.header = header
136 self.header = header
137 self.targets = targets
137 self.targets = targets
138 self.after = after
138 self.after = after
139 self.follow = follow
139 self.follow = follow
140 self.timeout = timeout
140 self.timeout = timeout
141
141
142
142
143 self.timestamp = time.time()
143 self.timestamp = time.time()
144 self.blacklist = set()
144 self.blacklist = set()
145
145
146 @property
146 @property
147 def dependents(self):
147 def dependents(self):
148 return self.follow.union(self.after)
148 return self.follow.union(self.after)
149
149
150 class TaskScheduler(SessionFactory):
150 class TaskScheduler(SessionFactory):
151 """Python TaskScheduler object.
151 """Python TaskScheduler object.
152
152
153 This is the simplest object that supports msg_id based
153 This is the simplest object that supports msg_id based
154 DAG dependencies. *Only* task msg_ids are checked, not
154 DAG dependencies. *Only* task msg_ids are checked, not
155 msg_ids of jobs submitted via the MUX queue.
155 msg_ids of jobs submitted via the MUX queue.
156
156
157 """
157 """
158
158
159 hwm = Integer(1, config=True,
159 hwm = Integer(1, config=True,
160 help="""specify the High Water Mark (HWM) for the downstream
160 help="""specify the High Water Mark (HWM) for the downstream
161 socket in the Task scheduler. This is the maximum number
161 socket in the Task scheduler. This is the maximum number
162 of allowed outstanding tasks on each engine.
162 of allowed outstanding tasks on each engine.
163
163
164 The default (1) means that only one task can be outstanding on each
164 The default (1) means that only one task can be outstanding on each
165 engine. Setting TaskScheduler.hwm=0 means there is no limit, and the
165 engine. Setting TaskScheduler.hwm=0 means there is no limit, and the
166 engines continue to be assigned tasks while they are working,
166 engines continue to be assigned tasks while they are working,
167 effectively hiding network latency behind computation, but can result
167 effectively hiding network latency behind computation, but can result
168 in an imbalance of work when submitting many heterogenous tasks all at
168 in an imbalance of work when submitting many heterogenous tasks all at
169 once. Any positive value greater than one is a compromise between the
169 once. Any positive value greater than one is a compromise between the
170 two.
170 two.
171
171
172 """
172 """
173 )
173 )
174 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
174 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
175 'leastload', config=True, allow_none=False,
175 'leastload', config=True, allow_none=False,
176 help="""select the task scheduler scheme [default: Python LRU]
176 help="""select the task scheduler scheme [default: Python LRU]
177 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
177 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
178 )
178 )
179 def _scheme_name_changed(self, old, new):
179 def _scheme_name_changed(self, old, new):
180 self.log.debug("Using scheme %r"%new)
180 self.log.debug("Using scheme %r"%new)
181 self.scheme = globals()[new]
181 self.scheme = globals()[new]
182
182
183 # input arguments:
183 # input arguments:
184 scheme = Instance(FunctionType) # function for determining the destination
184 scheme = Instance(FunctionType) # function for determining the destination
185 def _scheme_default(self):
185 def _scheme_default(self):
186 return leastload
186 return leastload
187 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
187 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
188 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
188 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
189 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
189 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
190 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
190 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
191
191
192 # internals:
192 # internals:
193 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
193 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
194 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
194 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
195 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
195 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
196 depending = Dict() # dict by msg_id of Jobs
196 depending = Dict() # dict by msg_id of Jobs
197 pending = Dict() # dict by engine_uuid of submitted tasks
197 pending = Dict() # dict by engine_uuid of submitted tasks
198 completed = Dict() # dict by engine_uuid of completed tasks
198 completed = Dict() # dict by engine_uuid of completed tasks
199 failed = Dict() # dict by engine_uuid of failed tasks
199 failed = Dict() # dict by engine_uuid of failed tasks
200 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
200 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
201 clients = Dict() # dict by msg_id for who submitted the task
201 clients = Dict() # dict by msg_id for who submitted the task
202 targets = List() # list of target IDENTs
202 targets = List() # list of target IDENTs
203 loads = List() # list of engine loads
203 loads = List() # list of engine loads
204 # full = Set() # set of IDENTs that have HWM outstanding tasks
204 # full = Set() # set of IDENTs that have HWM outstanding tasks
205 all_completed = Set() # set of all completed tasks
205 all_completed = Set() # set of all completed tasks
206 all_failed = Set() # set of all failed tasks
206 all_failed = Set() # set of all failed tasks
207 all_done = Set() # set of all finished tasks=union(completed,failed)
207 all_done = Set() # set of all finished tasks=union(completed,failed)
208 all_ids = Set() # set of all submitted task IDs
208 all_ids = Set() # set of all submitted task IDs
209
209
210 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
210 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
211
211
212 ident = CBytes() # ZMQ identity. This should just be self.session.session
212 ident = CBytes() # ZMQ identity. This should just be self.session.session
213 # but ensure Bytes
213 # but ensure Bytes
214 def _ident_default(self):
214 def _ident_default(self):
215 return self.session.bsession
215 return self.session.bsession
216
216
217 def start(self):
217 def start(self):
218 self.engine_stream.on_recv(self.dispatch_result, copy=False)
218 self.engine_stream.on_recv(self.dispatch_result, copy=False)
219 self.client_stream.on_recv(self.dispatch_submission, copy=False)
219 self.client_stream.on_recv(self.dispatch_submission, copy=False)
220
220
221 self._notification_handlers = dict(
221 self._notification_handlers = dict(
222 registration_notification = self._register_engine,
222 registration_notification = self._register_engine,
223 unregistration_notification = self._unregister_engine
223 unregistration_notification = self._unregister_engine
224 )
224 )
225 self.notifier_stream.on_recv(self.dispatch_notification)
225 self.notifier_stream.on_recv(self.dispatch_notification)
226 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
226 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
227 self.auditor.start()
227 self.auditor.start()
228 self.log.info("Scheduler started [%s]"%self.scheme_name)
228 self.log.info("Scheduler started [%s]"%self.scheme_name)
229
229
230 def resume_receiving(self):
230 def resume_receiving(self):
231 """Resume accepting jobs."""
231 """Resume accepting jobs."""
232 self.client_stream.on_recv(self.dispatch_submission, copy=False)
232 self.client_stream.on_recv(self.dispatch_submission, copy=False)
233
233
234 def stop_receiving(self):
234 def stop_receiving(self):
235 """Stop accepting jobs while there are no engines.
235 """Stop accepting jobs while there are no engines.
236 Leave them in the ZMQ queue."""
236 Leave them in the ZMQ queue."""
237 self.client_stream.on_recv(None)
237 self.client_stream.on_recv(None)
238
238
239 #-----------------------------------------------------------------------
239 #-----------------------------------------------------------------------
240 # [Un]Registration Handling
240 # [Un]Registration Handling
241 #-----------------------------------------------------------------------
241 #-----------------------------------------------------------------------
242
242
243
244 @util.log_errors
243 def dispatch_notification(self, msg):
245 def dispatch_notification(self, msg):
244 """dispatch register/unregister events."""
246 """dispatch register/unregister events."""
245 try:
247 try:
246 idents,msg = self.session.feed_identities(msg)
248 idents,msg = self.session.feed_identities(msg)
247 except ValueError:
249 except ValueError:
248 self.log.warn("task::Invalid Message: %r",msg)
250 self.log.warn("task::Invalid Message: %r",msg)
249 return
251 return
250 try:
252 try:
251 msg = self.session.unserialize(msg)
253 msg = self.session.unserialize(msg)
252 except ValueError:
254 except ValueError:
253 self.log.warn("task::Unauthorized message from: %r"%idents)
255 self.log.warn("task::Unauthorized message from: %r"%idents)
254 return
256 return
255
257
256 msg_type = msg['header']['msg_type']
258 msg_type = msg['header']['msg_type']
257
259
258 handler = self._notification_handlers.get(msg_type, None)
260 handler = self._notification_handlers.get(msg_type, None)
259 if handler is None:
261 if handler is None:
260 self.log.error("Unhandled message type: %r"%msg_type)
262 self.log.error("Unhandled message type: %r"%msg_type)
261 else:
263 else:
262 try:
264 try:
263 handler(asbytes(msg['content']['queue']))
265 handler(asbytes(msg['content']['queue']))
264 except Exception:
266 except Exception:
265 self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
267 self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
266
268
267 def _register_engine(self, uid):
269 def _register_engine(self, uid):
268 """New engine with ident `uid` became available."""
270 """New engine with ident `uid` became available."""
269 # head of the line:
271 # head of the line:
270 self.targets.insert(0,uid)
272 self.targets.insert(0,uid)
271 self.loads.insert(0,0)
273 self.loads.insert(0,0)
272
274
273 # initialize sets
275 # initialize sets
274 self.completed[uid] = set()
276 self.completed[uid] = set()
275 self.failed[uid] = set()
277 self.failed[uid] = set()
276 self.pending[uid] = {}
278 self.pending[uid] = {}
277
279
278 # rescan the graph:
280 # rescan the graph:
279 self.update_graph(None)
281 self.update_graph(None)
280
282
281 def _unregister_engine(self, uid):
283 def _unregister_engine(self, uid):
282 """Existing engine with ident `uid` became unavailable."""
284 """Existing engine with ident `uid` became unavailable."""
283 if len(self.targets) == 1:
285 if len(self.targets) == 1:
284 # this was our only engine
286 # this was our only engine
285 pass
287 pass
286
288
287 # handle any potentially finished tasks:
289 # handle any potentially finished tasks:
288 self.engine_stream.flush()
290 self.engine_stream.flush()
289
291
290 # don't pop destinations, because they might be used later
292 # don't pop destinations, because they might be used later
291 # map(self.destinations.pop, self.completed.pop(uid))
293 # map(self.destinations.pop, self.completed.pop(uid))
292 # map(self.destinations.pop, self.failed.pop(uid))
294 # map(self.destinations.pop, self.failed.pop(uid))
293
295
294 # prevent this engine from receiving work
296 # prevent this engine from receiving work
295 idx = self.targets.index(uid)
297 idx = self.targets.index(uid)
296 self.targets.pop(idx)
298 self.targets.pop(idx)
297 self.loads.pop(idx)
299 self.loads.pop(idx)
298
300
299 # wait 5 seconds before cleaning up pending jobs, since the results might
301 # wait 5 seconds before cleaning up pending jobs, since the results might
300 # still be incoming
302 # still be incoming
301 if self.pending[uid]:
303 if self.pending[uid]:
302 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
304 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
303 dc.start()
305 dc.start()
304 else:
306 else:
305 self.completed.pop(uid)
307 self.completed.pop(uid)
306 self.failed.pop(uid)
308 self.failed.pop(uid)
307
309
308
310
309 def handle_stranded_tasks(self, engine):
311 def handle_stranded_tasks(self, engine):
310 """Deal with jobs resident in an engine that died."""
312 """Deal with jobs resident in an engine that died."""
311 lost = self.pending[engine]
313 lost = self.pending[engine]
312 for msg_id in lost.keys():
314 for msg_id in lost.keys():
313 if msg_id not in self.pending[engine]:
315 if msg_id not in self.pending[engine]:
314 # prevent double-handling of messages
316 # prevent double-handling of messages
315 continue
317 continue
316
318
317 raw_msg = lost[msg_id][0]
319 raw_msg = lost[msg_id][0]
318 idents,msg = self.session.feed_identities(raw_msg, copy=False)
320 idents,msg = self.session.feed_identities(raw_msg, copy=False)
319 parent = self.session.unpack(msg[1].bytes)
321 parent = self.session.unpack(msg[1].bytes)
320 idents = [engine, idents[0]]
322 idents = [engine, idents[0]]
321
323
322 # build fake error reply
324 # build fake error reply
323 try:
325 try:
324 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
326 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
325 except:
327 except:
326 content = error.wrap_exception()
328 content = error.wrap_exception()
327 # build fake header
329 # build fake header
328 header = dict(
330 header = dict(
329 status='error',
331 status='error',
330 engine=engine,
332 engine=engine,
331 date=datetime.now(),
333 date=datetime.now(),
332 )
334 )
333 msg = self.session.msg('apply_reply', content, parent=parent, subheader=header)
335 msg = self.session.msg('apply_reply', content, parent=parent, subheader=header)
334 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
336 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
335 # and dispatch it
337 # and dispatch it
336 self.dispatch_result(raw_reply)
338 self.dispatch_result(raw_reply)
337
339
338 # finally scrub completed/failed lists
340 # finally scrub completed/failed lists
339 self.completed.pop(engine)
341 self.completed.pop(engine)
340 self.failed.pop(engine)
342 self.failed.pop(engine)
341
343
342
344
343 #-----------------------------------------------------------------------
345 #-----------------------------------------------------------------------
344 # Job Submission
346 # Job Submission
345 #-----------------------------------------------------------------------
347 #-----------------------------------------------------------------------
348
349
350 @util.log_errors
346 def dispatch_submission(self, raw_msg):
351 def dispatch_submission(self, raw_msg):
347 """Dispatch job submission to appropriate handlers."""
352 """Dispatch job submission to appropriate handlers."""
348 # ensure targets up to date:
353 # ensure targets up to date:
349 self.notifier_stream.flush()
354 self.notifier_stream.flush()
350 try:
355 try:
351 idents, msg = self.session.feed_identities(raw_msg, copy=False)
356 idents, msg = self.session.feed_identities(raw_msg, copy=False)
352 msg = self.session.unserialize(msg, content=False, copy=False)
357 msg = self.session.unserialize(msg, content=False, copy=False)
353 except Exception:
358 except Exception:
354 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
359 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
355 return
360 return
356
361
357
362
358 # send to monitor
363 # send to monitor
359 self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False)
364 self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False)
360
365
361 header = msg['header']
366 header = msg['header']
362 msg_id = header['msg_id']
367 msg_id = header['msg_id']
363 self.all_ids.add(msg_id)
368 self.all_ids.add(msg_id)
364
369
365 # get targets as a set of bytes objects
370 # get targets as a set of bytes objects
366 # from a list of unicode objects
371 # from a list of unicode objects
367 targets = header.get('targets', [])
372 targets = header.get('targets', [])
368 targets = map(asbytes, targets)
373 targets = map(asbytes, targets)
369 targets = set(targets)
374 targets = set(targets)
370
375
371 retries = header.get('retries', 0)
376 retries = header.get('retries', 0)
372 self.retries[msg_id] = retries
377 self.retries[msg_id] = retries
373
378
374 # time dependencies
379 # time dependencies
375 after = header.get('after', None)
380 after = header.get('after', None)
376 if after:
381 if after:
377 after = Dependency(after)
382 after = Dependency(after)
378 if after.all:
383 if after.all:
379 if after.success:
384 if after.success:
380 after = Dependency(after.difference(self.all_completed),
385 after = Dependency(after.difference(self.all_completed),
381 success=after.success,
386 success=after.success,
382 failure=after.failure,
387 failure=after.failure,
383 all=after.all,
388 all=after.all,
384 )
389 )
385 if after.failure:
390 if after.failure:
386 after = Dependency(after.difference(self.all_failed),
391 after = Dependency(after.difference(self.all_failed),
387 success=after.success,
392 success=after.success,
388 failure=after.failure,
393 failure=after.failure,
389 all=after.all,
394 all=after.all,
390 )
395 )
391 if after.check(self.all_completed, self.all_failed):
396 if after.check(self.all_completed, self.all_failed):
392 # recast as empty set, if `after` already met,
397 # recast as empty set, if `after` already met,
393 # to prevent unnecessary set comparisons
398 # to prevent unnecessary set comparisons
394 after = MET
399 after = MET
395 else:
400 else:
396 after = MET
401 after = MET
397
402
398 # location dependencies
403 # location dependencies
399 follow = Dependency(header.get('follow', []))
404 follow = Dependency(header.get('follow', []))
400
405
401 # turn timeouts into datetime objects:
406 # turn timeouts into datetime objects:
402 timeout = header.get('timeout', None)
407 timeout = header.get('timeout', None)
403 if timeout:
408 if timeout:
404 # cast to float, because jsonlib returns floats as decimal.Decimal,
409 # cast to float, because jsonlib returns floats as decimal.Decimal,
405 # which timedelta does not accept
410 # which timedelta does not accept
406 timeout = datetime.now() + timedelta(0,float(timeout),0)
411 timeout = datetime.now() + timedelta(0,float(timeout),0)
407
412
408 job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg,
413 job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg,
409 header=header, targets=targets, after=after, follow=follow,
414 header=header, targets=targets, after=after, follow=follow,
410 timeout=timeout,
415 timeout=timeout,
411 )
416 )
412
417
413 # validate and reduce dependencies:
418 # validate and reduce dependencies:
414 for dep in after,follow:
419 for dep in after,follow:
415 if not dep: # empty dependency
420 if not dep: # empty dependency
416 continue
421 continue
417 # check valid:
422 # check valid:
418 if msg_id in dep or dep.difference(self.all_ids):
423 if msg_id in dep or dep.difference(self.all_ids):
419 self.depending[msg_id] = job
424 self.depending[msg_id] = job
420 return self.fail_unreachable(msg_id, error.InvalidDependency)
425 return self.fail_unreachable(msg_id, error.InvalidDependency)
421 # check if unreachable:
426 # check if unreachable:
422 if dep.unreachable(self.all_completed, self.all_failed):
427 if dep.unreachable(self.all_completed, self.all_failed):
423 self.depending[msg_id] = job
428 self.depending[msg_id] = job
424 return self.fail_unreachable(msg_id)
429 return self.fail_unreachable(msg_id)
425
430
426 if after.check(self.all_completed, self.all_failed):
431 if after.check(self.all_completed, self.all_failed):
427 # time deps already met, try to run
432 # time deps already met, try to run
428 if not self.maybe_run(job):
433 if not self.maybe_run(job):
429 # can't run yet
434 # can't run yet
430 if msg_id not in self.all_failed:
435 if msg_id not in self.all_failed:
431 # could have failed as unreachable
436 # could have failed as unreachable
432 self.save_unmet(job)
437 self.save_unmet(job)
433 else:
438 else:
434 self.save_unmet(job)
439 self.save_unmet(job)
435
440
436 def audit_timeouts(self):
441 def audit_timeouts(self):
437 """Audit all waiting tasks for expired timeouts."""
442 """Audit all waiting tasks for expired timeouts."""
438 now = datetime.now()
443 now = datetime.now()
439 for msg_id in self.depending.keys():
444 for msg_id in self.depending.keys():
440 # must recheck, in case one failure cascaded to another:
445 # must recheck, in case one failure cascaded to another:
441 if msg_id in self.depending:
446 if msg_id in self.depending:
442 job = self.depending[msg_id]
447 job = self.depending[msg_id]
443 if job.timeout and job.timeout < now:
448 if job.timeout and job.timeout < now:
444 self.fail_unreachable(msg_id, error.TaskTimeout)
449 self.fail_unreachable(msg_id, error.TaskTimeout)
445
450
446 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
451 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
447 """a task has become unreachable, send a reply with an ImpossibleDependency
452 """a task has become unreachable, send a reply with an ImpossibleDependency
448 error."""
453 error."""
449 if msg_id not in self.depending:
454 if msg_id not in self.depending:
450 self.log.error("msg %r already failed!", msg_id)
455 self.log.error("msg %r already failed!", msg_id)
451 return
456 return
452 job = self.depending.pop(msg_id)
457 job = self.depending.pop(msg_id)
453 for mid in job.dependents:
458 for mid in job.dependents:
454 if mid in self.graph:
459 if mid in self.graph:
455 self.graph[mid].remove(msg_id)
460 self.graph[mid].remove(msg_id)
456
461
457 try:
462 try:
458 raise why()
463 raise why()
459 except:
464 except:
460 content = error.wrap_exception()
465 content = error.wrap_exception()
461
466
462 self.all_done.add(msg_id)
467 self.all_done.add(msg_id)
463 self.all_failed.add(msg_id)
468 self.all_failed.add(msg_id)
464
469
465 msg = self.session.send(self.client_stream, 'apply_reply', content,
470 msg = self.session.send(self.client_stream, 'apply_reply', content,
466 parent=job.header, ident=job.idents)
471 parent=job.header, ident=job.idents)
467 self.session.send(self.mon_stream, msg, ident=[b'outtask']+job.idents)
472 self.session.send(self.mon_stream, msg, ident=[b'outtask']+job.idents)
468
473
469 self.update_graph(msg_id, success=False)
474 self.update_graph(msg_id, success=False)
470
475
471 def maybe_run(self, job):
476 def maybe_run(self, job):
472 """check location dependencies, and run if they are met."""
477 """check location dependencies, and run if they are met."""
473 msg_id = job.msg_id
478 msg_id = job.msg_id
474 self.log.debug("Attempting to assign task %s", msg_id)
479 self.log.debug("Attempting to assign task %s", msg_id)
475 if not self.targets:
480 if not self.targets:
476 # no engines, definitely can't run
481 # no engines, definitely can't run
477 return False
482 return False
478
483
479 if job.follow or job.targets or job.blacklist or self.hwm:
484 if job.follow or job.targets or job.blacklist or self.hwm:
480 # we need a can_run filter
485 # we need a can_run filter
481 def can_run(idx):
486 def can_run(idx):
482 # check hwm
487 # check hwm
483 if self.hwm and self.loads[idx] == self.hwm:
488 if self.hwm and self.loads[idx] == self.hwm:
484 return False
489 return False
485 target = self.targets[idx]
490 target = self.targets[idx]
486 # check blacklist
491 # check blacklist
487 if target in job.blacklist:
492 if target in job.blacklist:
488 return False
493 return False
489 # check targets
494 # check targets
490 if job.targets and target not in job.targets:
495 if job.targets and target not in job.targets:
491 return False
496 return False
492 # check follow
497 # check follow
493 return job.follow.check(self.completed[target], self.failed[target])
498 return job.follow.check(self.completed[target], self.failed[target])
494
499
495 indices = filter(can_run, range(len(self.targets)))
500 indices = filter(can_run, range(len(self.targets)))
496
501
497 if not indices:
502 if not indices:
498 # couldn't run
503 # couldn't run
499 if job.follow.all:
504 if job.follow.all:
500 # check follow for impossibility
505 # check follow for impossibility
501 dests = set()
506 dests = set()
502 relevant = set()
507 relevant = set()
503 if job.follow.success:
508 if job.follow.success:
504 relevant = self.all_completed
509 relevant = self.all_completed
505 if job.follow.failure:
510 if job.follow.failure:
506 relevant = relevant.union(self.all_failed)
511 relevant = relevant.union(self.all_failed)
507 for m in job.follow.intersection(relevant):
512 for m in job.follow.intersection(relevant):
508 dests.add(self.destinations[m])
513 dests.add(self.destinations[m])
509 if len(dests) > 1:
514 if len(dests) > 1:
510 self.depending[msg_id] = job
515 self.depending[msg_id] = job
511 self.fail_unreachable(msg_id)
516 self.fail_unreachable(msg_id)
512 return False
517 return False
513 if job.targets:
518 if job.targets:
514 # check blacklist+targets for impossibility
519 # check blacklist+targets for impossibility
515 job.targets.difference_update(blacklist)
520 job.targets.difference_update(blacklist)
516 if not job.targets or not job.targets.intersection(self.targets):
521 if not job.targets or not job.targets.intersection(self.targets):
517 self.depending[msg_id] = job
522 self.depending[msg_id] = job
518 self.fail_unreachable(msg_id)
523 self.fail_unreachable(msg_id)
519 return False
524 return False
520 return False
525 return False
521 else:
526 else:
522 indices = None
527 indices = None
523
528
524 self.submit_task(job, indices)
529 self.submit_task(job, indices)
525 return True
530 return True
526
531
527 def save_unmet(self, job):
532 def save_unmet(self, job):
528 """Save a message for later submission when its dependencies are met."""
533 """Save a message for later submission when its dependencies are met."""
529 msg_id = job.msg_id
534 msg_id = job.msg_id
530 self.depending[msg_id] = job
535 self.depending[msg_id] = job
531 # track the ids in follow or after, but not those already finished
536 # track the ids in follow or after, but not those already finished
532 for dep_id in job.after.union(job.follow).difference(self.all_done):
537 for dep_id in job.after.union(job.follow).difference(self.all_done):
533 if dep_id not in self.graph:
538 if dep_id not in self.graph:
534 self.graph[dep_id] = set()
539 self.graph[dep_id] = set()
535 self.graph[dep_id].add(msg_id)
540 self.graph[dep_id].add(msg_id)
536
541
537 def submit_task(self, job, indices=None):
542 def submit_task(self, job, indices=None):
538 """Submit a task to any of a subset of our targets."""
543 """Submit a task to any of a subset of our targets."""
539 if indices:
544 if indices:
540 loads = [self.loads[i] for i in indices]
545 loads = [self.loads[i] for i in indices]
541 else:
546 else:
542 loads = self.loads
547 loads = self.loads
543 idx = self.scheme(loads)
548 idx = self.scheme(loads)
544 if indices:
549 if indices:
545 idx = indices[idx]
550 idx = indices[idx]
546 target = self.targets[idx]
551 target = self.targets[idx]
547 # print (target, map(str, msg[:3]))
552 # print (target, map(str, msg[:3]))
548 # send job to the engine
553 # send job to the engine
549 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
554 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
550 self.engine_stream.send_multipart(job.raw_msg, copy=False)
555 self.engine_stream.send_multipart(job.raw_msg, copy=False)
551 # update load
556 # update load
552 self.add_job(idx)
557 self.add_job(idx)
553 self.pending[target][job.msg_id] = job
558 self.pending[target][job.msg_id] = job
554 # notify Hub
559 # notify Hub
555 content = dict(msg_id=job.msg_id, engine_id=target.decode('ascii'))
560 content = dict(msg_id=job.msg_id, engine_id=target.decode('ascii'))
556 self.session.send(self.mon_stream, 'task_destination', content=content,
561 self.session.send(self.mon_stream, 'task_destination', content=content,
557 ident=[b'tracktask',self.ident])
562 ident=[b'tracktask',self.ident])
558
563
559
564
560 #-----------------------------------------------------------------------
565 #-----------------------------------------------------------------------
561 # Result Handling
566 # Result Handling
562 #-----------------------------------------------------------------------
567 #-----------------------------------------------------------------------
568
569
570 @util.log_errors
563 def dispatch_result(self, raw_msg):
571 def dispatch_result(self, raw_msg):
564 """dispatch method for result replies"""
572 """dispatch method for result replies"""
565 try:
573 try:
566 idents,msg = self.session.feed_identities(raw_msg, copy=False)
574 idents,msg = self.session.feed_identities(raw_msg, copy=False)
567 msg = self.session.unserialize(msg, content=False, copy=False)
575 msg = self.session.unserialize(msg, content=False, copy=False)
568 engine = idents[0]
576 engine = idents[0]
569 try:
577 try:
570 idx = self.targets.index(engine)
578 idx = self.targets.index(engine)
571 except ValueError:
579 except ValueError:
572 pass # skip load-update for dead engines
580 pass # skip load-update for dead engines
573 else:
581 else:
574 self.finish_job(idx)
582 self.finish_job(idx)
575 except Exception:
583 except Exception:
576 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
584 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
577 return
585 return
578
586
579 header = msg['header']
587 header = msg['header']
580 parent = msg['parent_header']
588 parent = msg['parent_header']
581 if header.get('dependencies_met', True):
589 if header.get('dependencies_met', True):
582 success = (header['status'] == 'ok')
590 success = (header['status'] == 'ok')
583 msg_id = parent['msg_id']
591 msg_id = parent['msg_id']
584 retries = self.retries[msg_id]
592 retries = self.retries[msg_id]
585 if not success and retries > 0:
593 if not success and retries > 0:
586 # failed
594 # failed
587 self.retries[msg_id] = retries - 1
595 self.retries[msg_id] = retries - 1
588 self.handle_unmet_dependency(idents, parent)
596 self.handle_unmet_dependency(idents, parent)
589 else:
597 else:
590 del self.retries[msg_id]
598 del self.retries[msg_id]
591 # relay to client and update graph
599 # relay to client and update graph
592 self.handle_result(idents, parent, raw_msg, success)
600 self.handle_result(idents, parent, raw_msg, success)
593 # send to Hub monitor
601 # send to Hub monitor
594 self.mon_stream.send_multipart([b'outtask']+raw_msg, copy=False)
602 self.mon_stream.send_multipart([b'outtask']+raw_msg, copy=False)
595 else:
603 else:
596 self.handle_unmet_dependency(idents, parent)
604 self.handle_unmet_dependency(idents, parent)
597
605
598 def handle_result(self, idents, parent, raw_msg, success=True):
606 def handle_result(self, idents, parent, raw_msg, success=True):
599 """handle a real task result, either success or failure"""
607 """handle a real task result, either success or failure"""
600 # first, relay result to client
608 # first, relay result to client
601 engine = idents[0]
609 engine = idents[0]
602 client = idents[1]
610 client = idents[1]
603 # swap_ids for XREP-XREP mirror
611 # swap_ids for XREP-XREP mirror
604 raw_msg[:2] = [client,engine]
612 raw_msg[:2] = [client,engine]
605 # print (map(str, raw_msg[:4]))
613 # print (map(str, raw_msg[:4]))
606 self.client_stream.send_multipart(raw_msg, copy=False)
614 self.client_stream.send_multipart(raw_msg, copy=False)
607 # now, update our data structures
615 # now, update our data structures
608 msg_id = parent['msg_id']
616 msg_id = parent['msg_id']
609 self.pending[engine].pop(msg_id)
617 self.pending[engine].pop(msg_id)
610 if success:
618 if success:
611 self.completed[engine].add(msg_id)
619 self.completed[engine].add(msg_id)
612 self.all_completed.add(msg_id)
620 self.all_completed.add(msg_id)
613 else:
621 else:
614 self.failed[engine].add(msg_id)
622 self.failed[engine].add(msg_id)
615 self.all_failed.add(msg_id)
623 self.all_failed.add(msg_id)
616 self.all_done.add(msg_id)
624 self.all_done.add(msg_id)
617 self.destinations[msg_id] = engine
625 self.destinations[msg_id] = engine
618
626
619 self.update_graph(msg_id, success)
627 self.update_graph(msg_id, success)
620
628
621 def handle_unmet_dependency(self, idents, parent):
629 def handle_unmet_dependency(self, idents, parent):
622 """handle an unmet dependency"""
630 """handle an unmet dependency"""
623 engine = idents[0]
631 engine = idents[0]
624 msg_id = parent['msg_id']
632 msg_id = parent['msg_id']
625
633
626 job = self.pending[engine].pop(msg_id)
634 job = self.pending[engine].pop(msg_id)
627 job.blacklist.add(engine)
635 job.blacklist.add(engine)
628
636
629 if job.blacklist == job.targets:
637 if job.blacklist == job.targets:
630 self.depending[msg_id] = job
638 self.depending[msg_id] = job
631 self.fail_unreachable(msg_id)
639 self.fail_unreachable(msg_id)
632 elif not self.maybe_run(job):
640 elif not self.maybe_run(job):
633 # resubmit failed
641 # resubmit failed
634 if msg_id not in self.all_failed:
642 if msg_id not in self.all_failed:
635 # put it back in our dependency tree
643 # put it back in our dependency tree
636 self.save_unmet(job)
644 self.save_unmet(job)
637
645
638 if self.hwm:
646 if self.hwm:
639 try:
647 try:
640 idx = self.targets.index(engine)
648 idx = self.targets.index(engine)
641 except ValueError:
649 except ValueError:
642 pass # skip load-update for dead engines
650 pass # skip load-update for dead engines
643 else:
651 else:
644 if self.loads[idx] == self.hwm-1:
652 if self.loads[idx] == self.hwm-1:
645 self.update_graph(None)
653 self.update_graph(None)
646
654
647
655
648
656
649 def update_graph(self, dep_id=None, success=True):
657 def update_graph(self, dep_id=None, success=True):
650 """dep_id just finished. Update our dependency
658 """dep_id just finished. Update our dependency
651 graph and submit any jobs that just became runable.
659 graph and submit any jobs that just became runable.
652
660
653 Called with dep_id=None to update entire graph for hwm, but without finishing
661 Called with dep_id=None to update entire graph for hwm, but without finishing
654 a task.
662 a task.
655 """
663 """
656 # print ("\n\n***********")
664 # print ("\n\n***********")
657 # pprint (dep_id)
665 # pprint (dep_id)
658 # pprint (self.graph)
666 # pprint (self.graph)
659 # pprint (self.depending)
667 # pprint (self.depending)
660 # pprint (self.all_completed)
668 # pprint (self.all_completed)
661 # pprint (self.all_failed)
669 # pprint (self.all_failed)
662 # print ("\n\n***********\n\n")
670 # print ("\n\n***********\n\n")
663 # update any jobs that depended on the dependency
671 # update any jobs that depended on the dependency
664 jobs = self.graph.pop(dep_id, [])
672 jobs = self.graph.pop(dep_id, [])
665
673
666 # recheck *all* jobs if
674 # recheck *all* jobs if
667 # a) we have HWM and an engine just become no longer full
675 # a) we have HWM and an engine just become no longer full
668 # or b) dep_id was given as None
676 # or b) dep_id was given as None
669
677
670 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
678 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
671 jobs = self.depending.keys()
679 jobs = self.depending.keys()
672
680
673 for msg_id in sorted(jobs, key=lambda msg_id: self.depending[msg_id].timestamp):
681 for msg_id in sorted(jobs, key=lambda msg_id: self.depending[msg_id].timestamp):
674 job = self.depending[msg_id]
682 job = self.depending[msg_id]
675
683
676 if job.after.unreachable(self.all_completed, self.all_failed)\
684 if job.after.unreachable(self.all_completed, self.all_failed)\
677 or job.follow.unreachable(self.all_completed, self.all_failed):
685 or job.follow.unreachable(self.all_completed, self.all_failed):
678 self.fail_unreachable(msg_id)
686 self.fail_unreachable(msg_id)
679
687
680 elif job.after.check(self.all_completed, self.all_failed): # time deps met, maybe run
688 elif job.after.check(self.all_completed, self.all_failed): # time deps met, maybe run
681 if self.maybe_run(job):
689 if self.maybe_run(job):
682
690
683 self.depending.pop(msg_id)
691 self.depending.pop(msg_id)
684 for mid in job.dependents:
692 for mid in job.dependents:
685 if mid in self.graph:
693 if mid in self.graph:
686 self.graph[mid].remove(msg_id)
694 self.graph[mid].remove(msg_id)
687
695
688 #----------------------------------------------------------------------
696 #----------------------------------------------------------------------
689 # methods to be overridden by subclasses
697 # methods to be overridden by subclasses
690 #----------------------------------------------------------------------
698 #----------------------------------------------------------------------
691
699
692 def add_job(self, idx):
700 def add_job(self, idx):
693 """Called after self.targets[idx] just got the job with header.
701 """Called after self.targets[idx] just got the job with header.
694 Override with subclasses. The default ordering is simple LRU.
702 Override with subclasses. The default ordering is simple LRU.
695 The default loads are the number of outstanding jobs."""
703 The default loads are the number of outstanding jobs."""
696 self.loads[idx] += 1
704 self.loads[idx] += 1
697 for lis in (self.targets, self.loads):
705 for lis in (self.targets, self.loads):
698 lis.append(lis.pop(idx))
706 lis.append(lis.pop(idx))
699
707
700
708
701 def finish_job(self, idx):
709 def finish_job(self, idx):
702 """Called after self.targets[idx] just finished a job.
710 """Called after self.targets[idx] just finished a job.
703 Override with subclasses."""
711 Override with subclasses."""
704 self.loads[idx] -= 1
712 self.loads[idx] -= 1
705
713
706
714
707
715
708 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
716 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
709 logname='root', log_url=None, loglevel=logging.DEBUG,
717 logname='root', log_url=None, loglevel=logging.DEBUG,
710 identity=b'task', in_thread=False):
718 identity=b'task', in_thread=False):
711
719
712 ZMQStream = zmqstream.ZMQStream
720 ZMQStream = zmqstream.ZMQStream
713
721
714 if config:
722 if config:
715 # unwrap dict back into Config
723 # unwrap dict back into Config
716 config = Config(config)
724 config = Config(config)
717
725
718 if in_thread:
726 if in_thread:
719 # use instance() to get the same Context/Loop as our parent
727 # use instance() to get the same Context/Loop as our parent
720 ctx = zmq.Context.instance()
728 ctx = zmq.Context.instance()
721 loop = ioloop.IOLoop.instance()
729 loop = ioloop.IOLoop.instance()
722 else:
730 else:
723 # in a process, don't use instance()
731 # in a process, don't use instance()
724 # for safety with multiprocessing
732 # for safety with multiprocessing
725 ctx = zmq.Context()
733 ctx = zmq.Context()
726 loop = ioloop.IOLoop()
734 loop = ioloop.IOLoop()
727 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
735 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
728 ins.setsockopt(zmq.IDENTITY, identity)
736 ins.setsockopt(zmq.IDENTITY, identity)
729 ins.bind(in_addr)
737 ins.bind(in_addr)
730
738
731 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
739 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
732 outs.setsockopt(zmq.IDENTITY, identity)
740 outs.setsockopt(zmq.IDENTITY, identity)
733 outs.bind(out_addr)
741 outs.bind(out_addr)
734 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
742 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
735 mons.connect(mon_addr)
743 mons.connect(mon_addr)
736 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
744 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
737 nots.setsockopt(zmq.SUBSCRIBE, b'')
745 nots.setsockopt(zmq.SUBSCRIBE, b'')
738 nots.connect(not_addr)
746 nots.connect(not_addr)
739
747
740 # setup logging.
748 # setup logging.
741 if in_thread:
749 if in_thread:
742 log = Application.instance().log
750 log = Application.instance().log
743 else:
751 else:
744 if log_url:
752 if log_url:
745 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
753 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
746 else:
754 else:
747 log = local_logger(logname, loglevel)
755 log = local_logger(logname, loglevel)
748
756
749 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
757 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
750 mon_stream=mons, notifier_stream=nots,
758 mon_stream=mons, notifier_stream=nots,
751 loop=loop, log=log,
759 loop=loop, log=log,
752 config=config)
760 config=config)
753 scheduler.start()
761 scheduler.start()
754 if not in_thread:
762 if not in_thread:
755 try:
763 try:
756 loop.start()
764 loop.start()
757 except KeyboardInterrupt:
765 except KeyboardInterrupt:
758 scheduler.log.critical("Interrupted, exiting...")
766 scheduler.log.critical("Interrupted, exiting...")
759
767
@@ -1,480 +1,495 b''
1 """some generic utilities for dealing with classes, urls, and serialization
1 """some generic utilities for dealing with classes, urls, and serialization
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 # Standard library imports.
18 # Standard library imports.
19 import logging
19 import logging
20 import os
20 import os
21 import re
21 import re
22 import stat
22 import stat
23 import socket
23 import socket
24 import sys
24 import sys
25 from signal import signal, SIGINT, SIGABRT, SIGTERM
25 from signal import signal, SIGINT, SIGABRT, SIGTERM
26 try:
26 try:
27 from signal import SIGKILL
27 from signal import SIGKILL
28 except ImportError:
28 except ImportError:
29 SIGKILL=None
29 SIGKILL=None
30
30
31 try:
31 try:
32 import cPickle
32 import cPickle
33 pickle = cPickle
33 pickle = cPickle
34 except:
34 except:
35 cPickle = None
35 cPickle = None
36 import pickle
36 import pickle
37
37
38 # System library imports
38 # System library imports
39 import zmq
39 import zmq
40 from zmq.log import handlers
40 from zmq.log import handlers
41
41
42 from IPython.external.decorator import decorator
43
42 # IPython imports
44 # IPython imports
43 from IPython.config.application import Application
45 from IPython.config.application import Application
44 from IPython.utils import py3compat
46 from IPython.utils import py3compat
45 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
47 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
46 from IPython.utils.newserialized import serialize, unserialize
48 from IPython.utils.newserialized import serialize, unserialize
47 from IPython.zmq.log import EnginePUBHandler
49 from IPython.zmq.log import EnginePUBHandler
48
50
49 if py3compat.PY3:
51 if py3compat.PY3:
50 buffer = memoryview
52 buffer = memoryview
51
53
52 #-----------------------------------------------------------------------------
54 #-----------------------------------------------------------------------------
53 # Classes
55 # Classes
54 #-----------------------------------------------------------------------------
56 #-----------------------------------------------------------------------------
55
57
56 class Namespace(dict):
58 class Namespace(dict):
57 """Subclass of dict for attribute access to keys."""
59 """Subclass of dict for attribute access to keys."""
58
60
59 def __getattr__(self, key):
61 def __getattr__(self, key):
60 """getattr aliased to getitem"""
62 """getattr aliased to getitem"""
61 if key in self.iterkeys():
63 if key in self.iterkeys():
62 return self[key]
64 return self[key]
63 else:
65 else:
64 raise NameError(key)
66 raise NameError(key)
65
67
66 def __setattr__(self, key, value):
68 def __setattr__(self, key, value):
67 """setattr aliased to setitem, with strict"""
69 """setattr aliased to setitem, with strict"""
68 if hasattr(dict, key):
70 if hasattr(dict, key):
69 raise KeyError("Cannot override dict keys %r"%key)
71 raise KeyError("Cannot override dict keys %r"%key)
70 self[key] = value
72 self[key] = value
71
73
72
74
73 class ReverseDict(dict):
75 class ReverseDict(dict):
74 """simple double-keyed subset of dict methods."""
76 """simple double-keyed subset of dict methods."""
75
77
76 def __init__(self, *args, **kwargs):
78 def __init__(self, *args, **kwargs):
77 dict.__init__(self, *args, **kwargs)
79 dict.__init__(self, *args, **kwargs)
78 self._reverse = dict()
80 self._reverse = dict()
79 for key, value in self.iteritems():
81 for key, value in self.iteritems():
80 self._reverse[value] = key
82 self._reverse[value] = key
81
83
82 def __getitem__(self, key):
84 def __getitem__(self, key):
83 try:
85 try:
84 return dict.__getitem__(self, key)
86 return dict.__getitem__(self, key)
85 except KeyError:
87 except KeyError:
86 return self._reverse[key]
88 return self._reverse[key]
87
89
88 def __setitem__(self, key, value):
90 def __setitem__(self, key, value):
89 if key in self._reverse:
91 if key in self._reverse:
90 raise KeyError("Can't have key %r on both sides!"%key)
92 raise KeyError("Can't have key %r on both sides!"%key)
91 dict.__setitem__(self, key, value)
93 dict.__setitem__(self, key, value)
92 self._reverse[value] = key
94 self._reverse[value] = key
93
95
94 def pop(self, key):
96 def pop(self, key):
95 value = dict.pop(self, key)
97 value = dict.pop(self, key)
96 self._reverse.pop(value)
98 self._reverse.pop(value)
97 return value
99 return value
98
100
99 def get(self, key, default=None):
101 def get(self, key, default=None):
100 try:
102 try:
101 return self[key]
103 return self[key]
102 except KeyError:
104 except KeyError:
103 return default
105 return default
104
106
105 #-----------------------------------------------------------------------------
107 #-----------------------------------------------------------------------------
106 # Functions
108 # Functions
107 #-----------------------------------------------------------------------------
109 #-----------------------------------------------------------------------------
108
110
111 @decorator
112 def log_errors(f, self, *args, **kwargs):
113 """decorator to log unhandled exceptions raised in a method.
114
115 For use wrapping on_recv callbacks, so that exceptions
116 do not cause the stream to be closed.
117 """
118 try:
119 return f(self, *args, **kwargs)
120 except Exception:
121 self.log.error("Uncaught exception in %r" % f, exc_info=True)
122
123
109 def asbytes(s):
124 def asbytes(s):
110 """ensure that an object is ascii bytes"""
125 """ensure that an object is ascii bytes"""
111 if isinstance(s, unicode):
126 if isinstance(s, unicode):
112 s = s.encode('ascii')
127 s = s.encode('ascii')
113 return s
128 return s
114
129
115 def is_url(url):
130 def is_url(url):
116 """boolean check for whether a string is a zmq url"""
131 """boolean check for whether a string is a zmq url"""
117 if '://' not in url:
132 if '://' not in url:
118 return False
133 return False
119 proto, addr = url.split('://', 1)
134 proto, addr = url.split('://', 1)
120 if proto.lower() not in ['tcp','pgm','epgm','ipc','inproc']:
135 if proto.lower() not in ['tcp','pgm','epgm','ipc','inproc']:
121 return False
136 return False
122 return True
137 return True
123
138
124 def validate_url(url):
139 def validate_url(url):
125 """validate a url for zeromq"""
140 """validate a url for zeromq"""
126 if not isinstance(url, basestring):
141 if not isinstance(url, basestring):
127 raise TypeError("url must be a string, not %r"%type(url))
142 raise TypeError("url must be a string, not %r"%type(url))
128 url = url.lower()
143 url = url.lower()
129
144
130 proto_addr = url.split('://')
145 proto_addr = url.split('://')
131 assert len(proto_addr) == 2, 'Invalid url: %r'%url
146 assert len(proto_addr) == 2, 'Invalid url: %r'%url
132 proto, addr = proto_addr
147 proto, addr = proto_addr
133 assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto
148 assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto
134
149
135 # domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391
150 # domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391
136 # author: Remi Sabourin
151 # author: Remi Sabourin
137 pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$')
152 pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$')
138
153
139 if proto == 'tcp':
154 if proto == 'tcp':
140 lis = addr.split(':')
155 lis = addr.split(':')
141 assert len(lis) == 2, 'Invalid url: %r'%url
156 assert len(lis) == 2, 'Invalid url: %r'%url
142 addr,s_port = lis
157 addr,s_port = lis
143 try:
158 try:
144 port = int(s_port)
159 port = int(s_port)
145 except ValueError:
160 except ValueError:
146 raise AssertionError("Invalid port %r in url: %r"%(port, url))
161 raise AssertionError("Invalid port %r in url: %r"%(port, url))
147
162
148 assert addr == '*' or pat.match(addr) is not None, 'Invalid url: %r'%url
163 assert addr == '*' or pat.match(addr) is not None, 'Invalid url: %r'%url
149
164
150 else:
165 else:
151 # only validate tcp urls currently
166 # only validate tcp urls currently
152 pass
167 pass
153
168
154 return True
169 return True
155
170
156
171
157 def validate_url_container(container):
172 def validate_url_container(container):
158 """validate a potentially nested collection of urls."""
173 """validate a potentially nested collection of urls."""
159 if isinstance(container, basestring):
174 if isinstance(container, basestring):
160 url = container
175 url = container
161 return validate_url(url)
176 return validate_url(url)
162 elif isinstance(container, dict):
177 elif isinstance(container, dict):
163 container = container.itervalues()
178 container = container.itervalues()
164
179
165 for element in container:
180 for element in container:
166 validate_url_container(element)
181 validate_url_container(element)
167
182
168
183
169 def split_url(url):
184 def split_url(url):
170 """split a zmq url (tcp://ip:port) into ('tcp','ip','port')."""
185 """split a zmq url (tcp://ip:port) into ('tcp','ip','port')."""
171 proto_addr = url.split('://')
186 proto_addr = url.split('://')
172 assert len(proto_addr) == 2, 'Invalid url: %r'%url
187 assert len(proto_addr) == 2, 'Invalid url: %r'%url
173 proto, addr = proto_addr
188 proto, addr = proto_addr
174 lis = addr.split(':')
189 lis = addr.split(':')
175 assert len(lis) == 2, 'Invalid url: %r'%url
190 assert len(lis) == 2, 'Invalid url: %r'%url
176 addr,s_port = lis
191 addr,s_port = lis
177 return proto,addr,s_port
192 return proto,addr,s_port
178
193
179 def disambiguate_ip_address(ip, location=None):
194 def disambiguate_ip_address(ip, location=None):
180 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
195 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
181 ones, based on the location (default interpretation of location is localhost)."""
196 ones, based on the location (default interpretation of location is localhost)."""
182 if ip in ('0.0.0.0', '*'):
197 if ip in ('0.0.0.0', '*'):
183 try:
198 try:
184 external_ips = socket.gethostbyname_ex(socket.gethostname())[2]
199 external_ips = socket.gethostbyname_ex(socket.gethostname())[2]
185 except (socket.gaierror, IndexError):
200 except (socket.gaierror, IndexError):
186 # couldn't identify this machine, assume localhost
201 # couldn't identify this machine, assume localhost
187 external_ips = []
202 external_ips = []
188 if location is None or location in external_ips or not external_ips:
203 if location is None or location in external_ips or not external_ips:
189 # If location is unspecified or cannot be determined, assume local
204 # If location is unspecified or cannot be determined, assume local
190 ip='127.0.0.1'
205 ip='127.0.0.1'
191 elif location:
206 elif location:
192 return location
207 return location
193 return ip
208 return ip
194
209
195 def disambiguate_url(url, location=None):
210 def disambiguate_url(url, location=None):
196 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
211 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
197 ones, based on the location (default interpretation is localhost).
212 ones, based on the location (default interpretation is localhost).
198
213
199 This is for zeromq urls, such as tcp://*:10101."""
214 This is for zeromq urls, such as tcp://*:10101."""
200 try:
215 try:
201 proto,ip,port = split_url(url)
216 proto,ip,port = split_url(url)
202 except AssertionError:
217 except AssertionError:
203 # probably not tcp url; could be ipc, etc.
218 # probably not tcp url; could be ipc, etc.
204 return url
219 return url
205
220
206 ip = disambiguate_ip_address(ip,location)
221 ip = disambiguate_ip_address(ip,location)
207
222
208 return "%s://%s:%s"%(proto,ip,port)
223 return "%s://%s:%s"%(proto,ip,port)
209
224
210 def serialize_object(obj, threshold=64e-6):
225 def serialize_object(obj, threshold=64e-6):
211 """Serialize an object into a list of sendable buffers.
226 """Serialize an object into a list of sendable buffers.
212
227
213 Parameters
228 Parameters
214 ----------
229 ----------
215
230
216 obj : object
231 obj : object
217 The object to be serialized
232 The object to be serialized
218 threshold : float
233 threshold : float
219 The threshold for not double-pickling the content.
234 The threshold for not double-pickling the content.
220
235
221
236
222 Returns
237 Returns
223 -------
238 -------
224 ('pmd', [bufs]) :
239 ('pmd', [bufs]) :
225 where pmd is the pickled metadata wrapper,
240 where pmd is the pickled metadata wrapper,
226 bufs is a list of data buffers
241 bufs is a list of data buffers
227 """
242 """
228 databuffers = []
243 databuffers = []
229 if isinstance(obj, (list, tuple)):
244 if isinstance(obj, (list, tuple)):
230 clist = canSequence(obj)
245 clist = canSequence(obj)
231 slist = map(serialize, clist)
246 slist = map(serialize, clist)
232 for s in slist:
247 for s in slist:
233 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
248 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
234 databuffers.append(s.getData())
249 databuffers.append(s.getData())
235 s.data = None
250 s.data = None
236 return pickle.dumps(slist,-1), databuffers
251 return pickle.dumps(slist,-1), databuffers
237 elif isinstance(obj, dict):
252 elif isinstance(obj, dict):
238 sobj = {}
253 sobj = {}
239 for k in sorted(obj.iterkeys()):
254 for k in sorted(obj.iterkeys()):
240 s = serialize(can(obj[k]))
255 s = serialize(can(obj[k]))
241 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
256 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
242 databuffers.append(s.getData())
257 databuffers.append(s.getData())
243 s.data = None
258 s.data = None
244 sobj[k] = s
259 sobj[k] = s
245 return pickle.dumps(sobj,-1),databuffers
260 return pickle.dumps(sobj,-1),databuffers
246 else:
261 else:
247 s = serialize(can(obj))
262 s = serialize(can(obj))
248 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
263 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
249 databuffers.append(s.getData())
264 databuffers.append(s.getData())
250 s.data = None
265 s.data = None
251 return pickle.dumps(s,-1),databuffers
266 return pickle.dumps(s,-1),databuffers
252
267
253
268
254 def unserialize_object(bufs):
269 def unserialize_object(bufs):
255 """reconstruct an object serialized by serialize_object from data buffers."""
270 """reconstruct an object serialized by serialize_object from data buffers."""
256 bufs = list(bufs)
271 bufs = list(bufs)
257 sobj = pickle.loads(bufs.pop(0))
272 sobj = pickle.loads(bufs.pop(0))
258 if isinstance(sobj, (list, tuple)):
273 if isinstance(sobj, (list, tuple)):
259 for s in sobj:
274 for s in sobj:
260 if s.data is None:
275 if s.data is None:
261 s.data = bufs.pop(0)
276 s.data = bufs.pop(0)
262 return uncanSequence(map(unserialize, sobj)), bufs
277 return uncanSequence(map(unserialize, sobj)), bufs
263 elif isinstance(sobj, dict):
278 elif isinstance(sobj, dict):
264 newobj = {}
279 newobj = {}
265 for k in sorted(sobj.iterkeys()):
280 for k in sorted(sobj.iterkeys()):
266 s = sobj[k]
281 s = sobj[k]
267 if s.data is None:
282 if s.data is None:
268 s.data = bufs.pop(0)
283 s.data = bufs.pop(0)
269 newobj[k] = uncan(unserialize(s))
284 newobj[k] = uncan(unserialize(s))
270 return newobj, bufs
285 return newobj, bufs
271 else:
286 else:
272 if sobj.data is None:
287 if sobj.data is None:
273 sobj.data = bufs.pop(0)
288 sobj.data = bufs.pop(0)
274 return uncan(unserialize(sobj)), bufs
289 return uncan(unserialize(sobj)), bufs
275
290
276 def pack_apply_message(f, args, kwargs, threshold=64e-6):
291 def pack_apply_message(f, args, kwargs, threshold=64e-6):
277 """pack up a function, args, and kwargs to be sent over the wire
292 """pack up a function, args, and kwargs to be sent over the wire
278 as a series of buffers. Any object whose data is larger than `threshold`
293 as a series of buffers. Any object whose data is larger than `threshold`
279 will not have their data copied (currently only numpy arrays support zero-copy)"""
294 will not have their data copied (currently only numpy arrays support zero-copy)"""
280 msg = [pickle.dumps(can(f),-1)]
295 msg = [pickle.dumps(can(f),-1)]
281 databuffers = [] # for large objects
296 databuffers = [] # for large objects
282 sargs, bufs = serialize_object(args,threshold)
297 sargs, bufs = serialize_object(args,threshold)
283 msg.append(sargs)
298 msg.append(sargs)
284 databuffers.extend(bufs)
299 databuffers.extend(bufs)
285 skwargs, bufs = serialize_object(kwargs,threshold)
300 skwargs, bufs = serialize_object(kwargs,threshold)
286 msg.append(skwargs)
301 msg.append(skwargs)
287 databuffers.extend(bufs)
302 databuffers.extend(bufs)
288 msg.extend(databuffers)
303 msg.extend(databuffers)
289 return msg
304 return msg
290
305
291 def unpack_apply_message(bufs, g=None, copy=True):
306 def unpack_apply_message(bufs, g=None, copy=True):
292 """unpack f,args,kwargs from buffers packed by pack_apply_message()
307 """unpack f,args,kwargs from buffers packed by pack_apply_message()
293 Returns: original f,args,kwargs"""
308 Returns: original f,args,kwargs"""
294 bufs = list(bufs) # allow us to pop
309 bufs = list(bufs) # allow us to pop
295 assert len(bufs) >= 3, "not enough buffers!"
310 assert len(bufs) >= 3, "not enough buffers!"
296 if not copy:
311 if not copy:
297 for i in range(3):
312 for i in range(3):
298 bufs[i] = bufs[i].bytes
313 bufs[i] = bufs[i].bytes
299 cf = pickle.loads(bufs.pop(0))
314 cf = pickle.loads(bufs.pop(0))
300 sargs = list(pickle.loads(bufs.pop(0)))
315 sargs = list(pickle.loads(bufs.pop(0)))
301 skwargs = dict(pickle.loads(bufs.pop(0)))
316 skwargs = dict(pickle.loads(bufs.pop(0)))
302 # print sargs, skwargs
317 # print sargs, skwargs
303 f = uncan(cf, g)
318 f = uncan(cf, g)
304 for sa in sargs:
319 for sa in sargs:
305 if sa.data is None:
320 if sa.data is None:
306 m = bufs.pop(0)
321 m = bufs.pop(0)
307 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
322 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
308 # always use a buffer, until memoryviews get sorted out
323 # always use a buffer, until memoryviews get sorted out
309 sa.data = buffer(m)
324 sa.data = buffer(m)
310 # disable memoryview support
325 # disable memoryview support
311 # if copy:
326 # if copy:
312 # sa.data = buffer(m)
327 # sa.data = buffer(m)
313 # else:
328 # else:
314 # sa.data = m.buffer
329 # sa.data = m.buffer
315 else:
330 else:
316 if copy:
331 if copy:
317 sa.data = m
332 sa.data = m
318 else:
333 else:
319 sa.data = m.bytes
334 sa.data = m.bytes
320
335
321 args = uncanSequence(map(unserialize, sargs), g)
336 args = uncanSequence(map(unserialize, sargs), g)
322 kwargs = {}
337 kwargs = {}
323 for k in sorted(skwargs.iterkeys()):
338 for k in sorted(skwargs.iterkeys()):
324 sa = skwargs[k]
339 sa = skwargs[k]
325 if sa.data is None:
340 if sa.data is None:
326 m = bufs.pop(0)
341 m = bufs.pop(0)
327 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
342 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
328 # always use a buffer, until memoryviews get sorted out
343 # always use a buffer, until memoryviews get sorted out
329 sa.data = buffer(m)
344 sa.data = buffer(m)
330 # disable memoryview support
345 # disable memoryview support
331 # if copy:
346 # if copy:
332 # sa.data = buffer(m)
347 # sa.data = buffer(m)
333 # else:
348 # else:
334 # sa.data = m.buffer
349 # sa.data = m.buffer
335 else:
350 else:
336 if copy:
351 if copy:
337 sa.data = m
352 sa.data = m
338 else:
353 else:
339 sa.data = m.bytes
354 sa.data = m.bytes
340
355
341 kwargs[k] = uncan(unserialize(sa), g)
356 kwargs[k] = uncan(unserialize(sa), g)
342
357
343 return f,args,kwargs
358 return f,args,kwargs
344
359
345 #--------------------------------------------------------------------------
360 #--------------------------------------------------------------------------
346 # helpers for implementing old MEC API via view.apply
361 # helpers for implementing old MEC API via view.apply
347 #--------------------------------------------------------------------------
362 #--------------------------------------------------------------------------
348
363
349 def interactive(f):
364 def interactive(f):
350 """decorator for making functions appear as interactively defined.
365 """decorator for making functions appear as interactively defined.
351 This results in the function being linked to the user_ns as globals()
366 This results in the function being linked to the user_ns as globals()
352 instead of the module globals().
367 instead of the module globals().
353 """
368 """
354 f.__module__ = '__main__'
369 f.__module__ = '__main__'
355 return f
370 return f
356
371
357 @interactive
372 @interactive
358 def _push(**ns):
373 def _push(**ns):
359 """helper method for implementing `client.push` via `client.apply`"""
374 """helper method for implementing `client.push` via `client.apply`"""
360 globals().update(ns)
375 globals().update(ns)
361
376
362 @interactive
377 @interactive
363 def _pull(keys):
378 def _pull(keys):
364 """helper method for implementing `client.pull` via `client.apply`"""
379 """helper method for implementing `client.pull` via `client.apply`"""
365 user_ns = globals()
380 user_ns = globals()
366 if isinstance(keys, (list,tuple, set)):
381 if isinstance(keys, (list,tuple, set)):
367 for key in keys:
382 for key in keys:
368 if not user_ns.has_key(key):
383 if not user_ns.has_key(key):
369 raise NameError("name '%s' is not defined"%key)
384 raise NameError("name '%s' is not defined"%key)
370 return map(user_ns.get, keys)
385 return map(user_ns.get, keys)
371 else:
386 else:
372 if not user_ns.has_key(keys):
387 if not user_ns.has_key(keys):
373 raise NameError("name '%s' is not defined"%keys)
388 raise NameError("name '%s' is not defined"%keys)
374 return user_ns.get(keys)
389 return user_ns.get(keys)
375
390
376 @interactive
391 @interactive
377 def _execute(code):
392 def _execute(code):
378 """helper method for implementing `client.execute` via `client.apply`"""
393 """helper method for implementing `client.execute` via `client.apply`"""
379 exec code in globals()
394 exec code in globals()
380
395
381 #--------------------------------------------------------------------------
396 #--------------------------------------------------------------------------
382 # extra process management utilities
397 # extra process management utilities
383 #--------------------------------------------------------------------------
398 #--------------------------------------------------------------------------
384
399
385 _random_ports = set()
400 _random_ports = set()
386
401
387 def select_random_ports(n):
402 def select_random_ports(n):
388 """Selects and return n random ports that are available."""
403 """Selects and return n random ports that are available."""
389 ports = []
404 ports = []
390 for i in xrange(n):
405 for i in xrange(n):
391 sock = socket.socket()
406 sock = socket.socket()
392 sock.bind(('', 0))
407 sock.bind(('', 0))
393 while sock.getsockname()[1] in _random_ports:
408 while sock.getsockname()[1] in _random_ports:
394 sock.close()
409 sock.close()
395 sock = socket.socket()
410 sock = socket.socket()
396 sock.bind(('', 0))
411 sock.bind(('', 0))
397 ports.append(sock)
412 ports.append(sock)
398 for i, sock in enumerate(ports):
413 for i, sock in enumerate(ports):
399 port = sock.getsockname()[1]
414 port = sock.getsockname()[1]
400 sock.close()
415 sock.close()
401 ports[i] = port
416 ports[i] = port
402 _random_ports.add(port)
417 _random_ports.add(port)
403 return ports
418 return ports
404
419
405 def signal_children(children):
420 def signal_children(children):
406 """Relay interupt/term signals to children, for more solid process cleanup."""
421 """Relay interupt/term signals to children, for more solid process cleanup."""
407 def terminate_children(sig, frame):
422 def terminate_children(sig, frame):
408 log = Application.instance().log
423 log = Application.instance().log
409 log.critical("Got signal %i, terminating children..."%sig)
424 log.critical("Got signal %i, terminating children..."%sig)
410 for child in children:
425 for child in children:
411 child.terminate()
426 child.terminate()
412
427
413 sys.exit(sig != SIGINT)
428 sys.exit(sig != SIGINT)
414 # sys.exit(sig)
429 # sys.exit(sig)
415 for sig in (SIGINT, SIGABRT, SIGTERM):
430 for sig in (SIGINT, SIGABRT, SIGTERM):
416 signal(sig, terminate_children)
431 signal(sig, terminate_children)
417
432
418 def generate_exec_key(keyfile):
433 def generate_exec_key(keyfile):
419 import uuid
434 import uuid
420 newkey = str(uuid.uuid4())
435 newkey = str(uuid.uuid4())
421 with open(keyfile, 'w') as f:
436 with open(keyfile, 'w') as f:
422 # f.write('ipython-key ')
437 # f.write('ipython-key ')
423 f.write(newkey+'\n')
438 f.write(newkey+'\n')
424 # set user-only RW permissions (0600)
439 # set user-only RW permissions (0600)
425 # this will have no effect on Windows
440 # this will have no effect on Windows
426 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
441 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
427
442
428
443
429 def integer_loglevel(loglevel):
444 def integer_loglevel(loglevel):
430 try:
445 try:
431 loglevel = int(loglevel)
446 loglevel = int(loglevel)
432 except ValueError:
447 except ValueError:
433 if isinstance(loglevel, str):
448 if isinstance(loglevel, str):
434 loglevel = getattr(logging, loglevel)
449 loglevel = getattr(logging, loglevel)
435 return loglevel
450 return loglevel
436
451
437 def connect_logger(logname, context, iface, root="ip", loglevel=logging.DEBUG):
452 def connect_logger(logname, context, iface, root="ip", loglevel=logging.DEBUG):
438 logger = logging.getLogger(logname)
453 logger = logging.getLogger(logname)
439 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
454 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
440 # don't add a second PUBHandler
455 # don't add a second PUBHandler
441 return
456 return
442 loglevel = integer_loglevel(loglevel)
457 loglevel = integer_loglevel(loglevel)
443 lsock = context.socket(zmq.PUB)
458 lsock = context.socket(zmq.PUB)
444 lsock.connect(iface)
459 lsock.connect(iface)
445 handler = handlers.PUBHandler(lsock)
460 handler = handlers.PUBHandler(lsock)
446 handler.setLevel(loglevel)
461 handler.setLevel(loglevel)
447 handler.root_topic = root
462 handler.root_topic = root
448 logger.addHandler(handler)
463 logger.addHandler(handler)
449 logger.setLevel(loglevel)
464 logger.setLevel(loglevel)
450
465
451 def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):
466 def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):
452 logger = logging.getLogger()
467 logger = logging.getLogger()
453 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
468 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
454 # don't add a second PUBHandler
469 # don't add a second PUBHandler
455 return
470 return
456 loglevel = integer_loglevel(loglevel)
471 loglevel = integer_loglevel(loglevel)
457 lsock = context.socket(zmq.PUB)
472 lsock = context.socket(zmq.PUB)
458 lsock.connect(iface)
473 lsock.connect(iface)
459 handler = EnginePUBHandler(engine, lsock)
474 handler = EnginePUBHandler(engine, lsock)
460 handler.setLevel(loglevel)
475 handler.setLevel(loglevel)
461 logger.addHandler(handler)
476 logger.addHandler(handler)
462 logger.setLevel(loglevel)
477 logger.setLevel(loglevel)
463 return logger
478 return logger
464
479
465 def local_logger(logname, loglevel=logging.DEBUG):
480 def local_logger(logname, loglevel=logging.DEBUG):
466 loglevel = integer_loglevel(loglevel)
481 loglevel = integer_loglevel(loglevel)
467 logger = logging.getLogger(logname)
482 logger = logging.getLogger(logname)
468 if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]):
483 if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]):
469 # don't add a second StreamHandler
484 # don't add a second StreamHandler
470 return
485 return
471 handler = logging.StreamHandler()
486 handler = logging.StreamHandler()
472 handler.setLevel(loglevel)
487 handler.setLevel(loglevel)
473 formatter = logging.Formatter("%(asctime)s.%(msecs).03d [%(name)s] %(message)s",
488 formatter = logging.Formatter("%(asctime)s.%(msecs).03d [%(name)s] %(message)s",
474 datefmt="%Y-%m-%d %H:%M:%S")
489 datefmt="%Y-%m-%d %H:%M:%S")
475 handler.setFormatter(formatter)
490 handler.setFormatter(formatter)
476
491
477 logger.addHandler(handler)
492 logger.addHandler(handler)
478 logger.setLevel(loglevel)
493 logger.setLevel(loglevel)
479 return logger
494 return logger
480
495
General Comments 0
You need to be logged in to leave comments. Login now