##// END OF EJS Templates
minor controller logging adjustments...
MinRK -
Show More
@@ -1,174 +1,174 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
3 A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
4 and hearts are tracked based on their XREQ identities.
4 and hearts are tracked based on their XREQ identities.
5
5
6 Authors:
6 Authors:
7
7
8 * Min RK
8 * Min RK
9 """
9 """
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11 # Copyright (C) 2010-2011 The IPython Development Team
11 # Copyright (C) 2010-2011 The IPython Development Team
12 #
12 #
13 # Distributed under the terms of the BSD License. The full license is in
13 # Distributed under the terms of the BSD License. The full license is in
14 # the file COPYING, distributed as part of this software.
14 # the file COPYING, distributed as part of this software.
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 from __future__ import print_function
17 from __future__ import print_function
18 import time
18 import time
19 import uuid
19 import uuid
20
20
21 import zmq
21 import zmq
22 from zmq.devices import ThreadDevice
22 from zmq.devices import ThreadDevice
23 from zmq.eventloop import ioloop, zmqstream
23 from zmq.eventloop import ioloop, zmqstream
24
24
25 from IPython.config.configurable import LoggingConfigurable
25 from IPython.config.configurable import LoggingConfigurable
26 from IPython.utils.traitlets import Set, Instance, CFloat
26 from IPython.utils.traitlets import Set, Instance, CFloat
27
27
28 from IPython.parallel.util import asbytes
28 from IPython.parallel.util import asbytes
29
29
30 class Heart(object):
30 class Heart(object):
31 """A basic heart object for responding to a HeartMonitor.
31 """A basic heart object for responding to a HeartMonitor.
32 This is a simple wrapper with defaults for the most common
32 This is a simple wrapper with defaults for the most common
33 Device model for responding to heartbeats.
33 Device model for responding to heartbeats.
34
34
35 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
35 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
36 SUB/XREQ for in/out.
36 SUB/XREQ for in/out.
37
37
38 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
38 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
39 device=None
39 device=None
40 id=None
40 id=None
41 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.DEALER, heart_id=None):
41 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.DEALER, heart_id=None):
42 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
42 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
43 self.device.daemon=True
43 self.device.daemon=True
44 self.device.connect_in(in_addr)
44 self.device.connect_in(in_addr)
45 self.device.connect_out(out_addr)
45 self.device.connect_out(out_addr)
46 if in_type == zmq.SUB:
46 if in_type == zmq.SUB:
47 self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
47 self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
48 if heart_id is None:
48 if heart_id is None:
49 heart_id = uuid.uuid4().bytes
49 heart_id = uuid.uuid4().bytes
50 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
50 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
51 self.id = heart_id
51 self.id = heart_id
52
52
53 def start(self):
53 def start(self):
54 return self.device.start()
54 return self.device.start()
55
55
56
56 class HeartMonitor(LoggingConfigurable):
57 class HeartMonitor(LoggingConfigurable):
57 """A basic HeartMonitor class
58 """A basic HeartMonitor class
58 pingstream: a PUB stream
59 pingstream: a PUB stream
59 pongstream: an XREP stream
60 pongstream: an XREP stream
60 period: the period of the heartbeat in milliseconds"""
61 period: the period of the heartbeat in milliseconds"""
61
62
62 period=CFloat(1000, config=True,
63 period=CFloat(1000, config=True,
63 help='The frequency at which the Hub pings the engines for heartbeats '
64 help='The frequency at which the Hub pings the engines for heartbeats '
64 '(in ms)',
65 '(in ms)',
65 )
66 )
66
67
67 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
68 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
68 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
69 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
69 loop = Instance('zmq.eventloop.ioloop.IOLoop')
70 loop = Instance('zmq.eventloop.ioloop.IOLoop')
70 def _loop_default(self):
71 def _loop_default(self):
71 return ioloop.IOLoop.instance()
72 return ioloop.IOLoop.instance()
72
73
73 # not settable:
74 # not settable:
74 hearts=Set()
75 hearts=Set()
75 responses=Set()
76 responses=Set()
76 on_probation=Set()
77 on_probation=Set()
77 last_ping=CFloat(0)
78 last_ping=CFloat(0)
78 _new_handlers = Set()
79 _new_handlers = Set()
79 _failure_handlers = Set()
80 _failure_handlers = Set()
80 lifetime = CFloat(0)
81 lifetime = CFloat(0)
81 tic = CFloat(0)
82 tic = CFloat(0)
82
83
83 def __init__(self, **kwargs):
84 def __init__(self, **kwargs):
84 super(HeartMonitor, self).__init__(**kwargs)
85 super(HeartMonitor, self).__init__(**kwargs)
85
86
86 self.pongstream.on_recv(self.handle_pong)
87 self.pongstream.on_recv(self.handle_pong)
87
88
88 def start(self):
89 def start(self):
89 self.tic = time.time()
90 self.tic = time.time()
90 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
91 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
91 self.caller.start()
92 self.caller.start()
92
93
93 def add_new_heart_handler(self, handler):
94 def add_new_heart_handler(self, handler):
94 """add a new handler for new hearts"""
95 """add a new handler for new hearts"""
95 self.log.debug("heartbeat::new_heart_handler: %s"%handler)
96 self.log.debug("heartbeat::new_heart_handler: %s", handler)
96 self._new_handlers.add(handler)
97 self._new_handlers.add(handler)
97
98
98 def add_heart_failure_handler(self, handler):
99 def add_heart_failure_handler(self, handler):
99 """add a new handler for heart failure"""
100 """add a new handler for heart failure"""
100 self.log.debug("heartbeat::new heart failure handler: %s"%handler)
101 self.log.debug("heartbeat::new heart failure handler: %s", handler)
101 self._failure_handlers.add(handler)
102 self._failure_handlers.add(handler)
102
103
103 def beat(self):
104 def beat(self):
104 self.pongstream.flush()
105 self.pongstream.flush()
105 self.last_ping = self.lifetime
106 self.last_ping = self.lifetime
106
107
107 toc = time.time()
108 toc = time.time()
108 self.lifetime += toc-self.tic
109 self.lifetime += toc-self.tic
109 self.tic = toc
110 self.tic = toc
110 # self.log.debug("heartbeat::%s"%self.lifetime)
111 self.log.debug("heartbeat::sending %s", self.lifetime)
111 goodhearts = self.hearts.intersection(self.responses)
112 goodhearts = self.hearts.intersection(self.responses)
112 missed_beats = self.hearts.difference(goodhearts)
113 missed_beats = self.hearts.difference(goodhearts)
113 heartfailures = self.on_probation.intersection(missed_beats)
114 heartfailures = self.on_probation.intersection(missed_beats)
114 newhearts = self.responses.difference(goodhearts)
115 newhearts = self.responses.difference(goodhearts)
115 map(self.handle_new_heart, newhearts)
116 map(self.handle_new_heart, newhearts)
116 map(self.handle_heart_failure, heartfailures)
117 map(self.handle_heart_failure, heartfailures)
117 self.on_probation = missed_beats.intersection(self.hearts)
118 self.on_probation = missed_beats.intersection(self.hearts)
118 self.responses = set()
119 self.responses = set()
119 # print self.on_probation, self.hearts
120 # print self.on_probation, self.hearts
120 # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
121 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
121 self.pingstream.send(asbytes(str(self.lifetime)))
122 self.pingstream.send(asbytes(str(self.lifetime)))
122
123
123 def handle_new_heart(self, heart):
124 def handle_new_heart(self, heart):
124 if self._new_handlers:
125 if self._new_handlers:
125 for handler in self._new_handlers:
126 for handler in self._new_handlers:
126 handler(heart)
127 handler(heart)
127 else:
128 else:
128 self.log.info("heartbeat::yay, got new heart %s!"%heart)
129 self.log.info("heartbeat::yay, got new heart %s!", heart)
129 self.hearts.add(heart)
130 self.hearts.add(heart)
130
131
131 def handle_heart_failure(self, heart):
132 def handle_heart_failure(self, heart):
132 if self._failure_handlers:
133 if self._failure_handlers:
133 for handler in self._failure_handlers:
134 for handler in self._failure_handlers:
134 try:
135 try:
135 handler(heart)
136 handler(heart)
136 except Exception as e:
137 except Exception as e:
137 self.log.error("heartbeat::Bad Handler! %s"%handler, exc_info=True)
138 self.log.error("heartbeat::Bad Handler! %s", handler, exc_info=True)
138 pass
139 pass
139 else:
140 else:
140 self.log.info("heartbeat::Heart %s failed :("%heart)
141 self.log.info("heartbeat::Heart %s failed :(", heart)
141 self.hearts.remove(heart)
142 self.hearts.remove(heart)
142
143
143
144
144 def handle_pong(self, msg):
145 def handle_pong(self, msg):
145 "a heart just beat"
146 "a heart just beat"
146 current = asbytes(str(self.lifetime))
147 current = asbytes(str(self.lifetime))
147 last = asbytes(str(self.last_ping))
148 last = asbytes(str(self.last_ping))
148 if msg[1] == current:
149 if msg[1] == current:
149 delta = time.time()-self.tic
150 delta = time.time()-self.tic
150 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
151 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
151 self.responses.add(msg[0])
152 self.responses.add(msg[0])
152 elif msg[1] == last:
153 elif msg[1] == last:
153 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
154 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
154 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
155 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond", msg[0], 1000*delta)
155 self.responses.add(msg[0])
156 self.responses.add(msg[0])
156 else:
157 else:
157 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
158 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)", msg[1], self.lifetime)
158 (msg[1],self.lifetime))
159
159
160
160
161 if __name__ == '__main__':
161 if __name__ == '__main__':
162 loop = ioloop.IOLoop.instance()
162 loop = ioloop.IOLoop.instance()
163 context = zmq.Context()
163 context = zmq.Context()
164 pub = context.socket(zmq.PUB)
164 pub = context.socket(zmq.PUB)
165 pub.bind('tcp://127.0.0.1:5555')
165 pub.bind('tcp://127.0.0.1:5555')
166 xrep = context.socket(zmq.ROUTER)
166 xrep = context.socket(zmq.ROUTER)
167 xrep.bind('tcp://127.0.0.1:5556')
167 xrep.bind('tcp://127.0.0.1:5556')
168
168
169 outstream = zmqstream.ZMQStream(pub, loop)
169 outstream = zmqstream.ZMQStream(pub, loop)
170 instream = zmqstream.ZMQStream(xrep, loop)
170 instream = zmqstream.ZMQStream(xrep, loop)
171
171
172 hb = HeartMonitor(loop, outstream, instream)
172 hb = HeartMonitor(loop, outstream, instream)
173
173
174 loop.start()
174 loop.start()
@@ -1,1290 +1,1293 b''
1 """The IPython Controller Hub with 0MQ
1 """The IPython Controller Hub with 0MQ
2 This is the master object that handles connections from engines and clients,
2 This is the master object that handles connections from engines and clients,
3 and monitors traffic through the various queues.
3 and monitors traffic through the various queues.
4
4
5 Authors:
5 Authors:
6
6
7 * Min RK
7 * Min RK
8 """
8 """
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Copyright (C) 2010-2011 The IPython Development Team
10 # Copyright (C) 2010-2011 The IPython Development Team
11 #
11 #
12 # Distributed under the terms of the BSD License. The full license is in
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
13 # the file COPYING, distributed as part of this software.
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 # Imports
17 # Imports
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 from __future__ import print_function
19 from __future__ import print_function
20
20
21 import sys
21 import sys
22 import time
22 import time
23 from datetime import datetime
23 from datetime import datetime
24
24
25 import zmq
25 import zmq
26 from zmq.eventloop import ioloop
26 from zmq.eventloop import ioloop
27 from zmq.eventloop.zmqstream import ZMQStream
27 from zmq.eventloop.zmqstream import ZMQStream
28
28
29 # internal:
29 # internal:
30 from IPython.utils.importstring import import_item
30 from IPython.utils.importstring import import_item
31 from IPython.utils.traitlets import (
31 from IPython.utils.traitlets import (
32 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
32 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
33 )
33 )
34
34
35 from IPython.parallel import error, util
35 from IPython.parallel import error, util
36 from IPython.parallel.factory import RegistrationFactory
36 from IPython.parallel.factory import RegistrationFactory
37
37
38 from IPython.zmq.session import SessionFactory
38 from IPython.zmq.session import SessionFactory
39
39
40 from .heartmonitor import HeartMonitor
40 from .heartmonitor import HeartMonitor
41
41
42 #-----------------------------------------------------------------------------
42 #-----------------------------------------------------------------------------
43 # Code
43 # Code
44 #-----------------------------------------------------------------------------
44 #-----------------------------------------------------------------------------
45
45
46 def _passer(*args, **kwargs):
46 def _passer(*args, **kwargs):
47 return
47 return
48
48
49 def _printer(*args, **kwargs):
49 def _printer(*args, **kwargs):
50 print (args)
50 print (args)
51 print (kwargs)
51 print (kwargs)
52
52
53 def empty_record():
53 def empty_record():
54 """Return an empty dict with all record keys."""
54 """Return an empty dict with all record keys."""
55 return {
55 return {
56 'msg_id' : None,
56 'msg_id' : None,
57 'header' : None,
57 'header' : None,
58 'content': None,
58 'content': None,
59 'buffers': None,
59 'buffers': None,
60 'submitted': None,
60 'submitted': None,
61 'client_uuid' : None,
61 'client_uuid' : None,
62 'engine_uuid' : None,
62 'engine_uuid' : None,
63 'started': None,
63 'started': None,
64 'completed': None,
64 'completed': None,
65 'resubmitted': None,
65 'resubmitted': None,
66 'result_header' : None,
66 'result_header' : None,
67 'result_content' : None,
67 'result_content' : None,
68 'result_buffers' : None,
68 'result_buffers' : None,
69 'queue' : None,
69 'queue' : None,
70 'pyin' : None,
70 'pyin' : None,
71 'pyout': None,
71 'pyout': None,
72 'pyerr': None,
72 'pyerr': None,
73 'stdout': '',
73 'stdout': '',
74 'stderr': '',
74 'stderr': '',
75 }
75 }
76
76
77 def init_record(msg):
77 def init_record(msg):
78 """Initialize a TaskRecord based on a request."""
78 """Initialize a TaskRecord based on a request."""
79 header = msg['header']
79 header = msg['header']
80 return {
80 return {
81 'msg_id' : header['msg_id'],
81 'msg_id' : header['msg_id'],
82 'header' : header,
82 'header' : header,
83 'content': msg['content'],
83 'content': msg['content'],
84 'buffers': msg['buffers'],
84 'buffers': msg['buffers'],
85 'submitted': header['date'],
85 'submitted': header['date'],
86 'client_uuid' : None,
86 'client_uuid' : None,
87 'engine_uuid' : None,
87 'engine_uuid' : None,
88 'started': None,
88 'started': None,
89 'completed': None,
89 'completed': None,
90 'resubmitted': None,
90 'resubmitted': None,
91 'result_header' : None,
91 'result_header' : None,
92 'result_content' : None,
92 'result_content' : None,
93 'result_buffers' : None,
93 'result_buffers' : None,
94 'queue' : None,
94 'queue' : None,
95 'pyin' : None,
95 'pyin' : None,
96 'pyout': None,
96 'pyout': None,
97 'pyerr': None,
97 'pyerr': None,
98 'stdout': '',
98 'stdout': '',
99 'stderr': '',
99 'stderr': '',
100 }
100 }
101
101
102
102
103 class EngineConnector(HasTraits):
103 class EngineConnector(HasTraits):
104 """A simple object for accessing the various zmq connections of an object.
104 """A simple object for accessing the various zmq connections of an object.
105 Attributes are:
105 Attributes are:
106 id (int): engine ID
106 id (int): engine ID
107 uuid (str): uuid (unused?)
107 uuid (str): uuid (unused?)
108 queue (str): identity of queue's XREQ socket
108 queue (str): identity of queue's XREQ socket
109 registration (str): identity of registration XREQ socket
109 registration (str): identity of registration XREQ socket
110 heartbeat (str): identity of heartbeat XREQ socket
110 heartbeat (str): identity of heartbeat XREQ socket
111 """
111 """
112 id=Integer(0)
112 id=Integer(0)
113 queue=CBytes()
113 queue=CBytes()
114 control=CBytes()
114 control=CBytes()
115 registration=CBytes()
115 registration=CBytes()
116 heartbeat=CBytes()
116 heartbeat=CBytes()
117 pending=Set()
117 pending=Set()
118
118
119 class HubFactory(RegistrationFactory):
119 class HubFactory(RegistrationFactory):
120 """The Configurable for setting up a Hub."""
120 """The Configurable for setting up a Hub."""
121
121
122 # port-pairs for monitoredqueues:
122 # port-pairs for monitoredqueues:
123 hb = Tuple(Integer,Integer,config=True,
123 hb = Tuple(Integer,Integer,config=True,
124 help="""XREQ/SUB Port pair for Engine heartbeats""")
124 help="""XREQ/SUB Port pair for Engine heartbeats""")
125 def _hb_default(self):
125 def _hb_default(self):
126 return tuple(util.select_random_ports(2))
126 return tuple(util.select_random_ports(2))
127
127
128 mux = Tuple(Integer,Integer,config=True,
128 mux = Tuple(Integer,Integer,config=True,
129 help="""Engine/Client Port pair for MUX queue""")
129 help="""Engine/Client Port pair for MUX queue""")
130
130
131 def _mux_default(self):
131 def _mux_default(self):
132 return tuple(util.select_random_ports(2))
132 return tuple(util.select_random_ports(2))
133
133
134 task = Tuple(Integer,Integer,config=True,
134 task = Tuple(Integer,Integer,config=True,
135 help="""Engine/Client Port pair for Task queue""")
135 help="""Engine/Client Port pair for Task queue""")
136 def _task_default(self):
136 def _task_default(self):
137 return tuple(util.select_random_ports(2))
137 return tuple(util.select_random_ports(2))
138
138
139 control = Tuple(Integer,Integer,config=True,
139 control = Tuple(Integer,Integer,config=True,
140 help="""Engine/Client Port pair for Control queue""")
140 help="""Engine/Client Port pair for Control queue""")
141
141
142 def _control_default(self):
142 def _control_default(self):
143 return tuple(util.select_random_ports(2))
143 return tuple(util.select_random_ports(2))
144
144
145 iopub = Tuple(Integer,Integer,config=True,
145 iopub = Tuple(Integer,Integer,config=True,
146 help="""Engine/Client Port pair for IOPub relay""")
146 help="""Engine/Client Port pair for IOPub relay""")
147
147
148 def _iopub_default(self):
148 def _iopub_default(self):
149 return tuple(util.select_random_ports(2))
149 return tuple(util.select_random_ports(2))
150
150
151 # single ports:
151 # single ports:
152 mon_port = Integer(config=True,
152 mon_port = Integer(config=True,
153 help="""Monitor (SUB) port for queue traffic""")
153 help="""Monitor (SUB) port for queue traffic""")
154
154
155 def _mon_port_default(self):
155 def _mon_port_default(self):
156 return util.select_random_ports(1)[0]
156 return util.select_random_ports(1)[0]
157
157
158 notifier_port = Integer(config=True,
158 notifier_port = Integer(config=True,
159 help="""PUB port for sending engine status notifications""")
159 help="""PUB port for sending engine status notifications""")
160
160
161 def _notifier_port_default(self):
161 def _notifier_port_default(self):
162 return util.select_random_ports(1)[0]
162 return util.select_random_ports(1)[0]
163
163
164 engine_ip = Unicode('127.0.0.1', config=True,
164 engine_ip = Unicode('127.0.0.1', config=True,
165 help="IP on which to listen for engine connections. [default: loopback]")
165 help="IP on which to listen for engine connections. [default: loopback]")
166 engine_transport = Unicode('tcp', config=True,
166 engine_transport = Unicode('tcp', config=True,
167 help="0MQ transport for engine connections. [default: tcp]")
167 help="0MQ transport for engine connections. [default: tcp]")
168
168
169 client_ip = Unicode('127.0.0.1', config=True,
169 client_ip = Unicode('127.0.0.1', config=True,
170 help="IP on which to listen for client connections. [default: loopback]")
170 help="IP on which to listen for client connections. [default: loopback]")
171 client_transport = Unicode('tcp', config=True,
171 client_transport = Unicode('tcp', config=True,
172 help="0MQ transport for client connections. [default : tcp]")
172 help="0MQ transport for client connections. [default : tcp]")
173
173
174 monitor_ip = Unicode('127.0.0.1', config=True,
174 monitor_ip = Unicode('127.0.0.1', config=True,
175 help="IP on which to listen for monitor messages. [default: loopback]")
175 help="IP on which to listen for monitor messages. [default: loopback]")
176 monitor_transport = Unicode('tcp', config=True,
176 monitor_transport = Unicode('tcp', config=True,
177 help="0MQ transport for monitor messages. [default : tcp]")
177 help="0MQ transport for monitor messages. [default : tcp]")
178
178
179 monitor_url = Unicode('')
179 monitor_url = Unicode('')
180
180
181 db_class = DottedObjectName('IPython.parallel.controller.dictdb.DictDB',
181 db_class = DottedObjectName('IPython.parallel.controller.dictdb.DictDB',
182 config=True, help="""The class to use for the DB backend""")
182 config=True, help="""The class to use for the DB backend""")
183
183
184 # not configurable
184 # not configurable
185 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
185 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
186 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
186 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
187
187
188 def _ip_changed(self, name, old, new):
188 def _ip_changed(self, name, old, new):
189 self.engine_ip = new
189 self.engine_ip = new
190 self.client_ip = new
190 self.client_ip = new
191 self.monitor_ip = new
191 self.monitor_ip = new
192 self._update_monitor_url()
192 self._update_monitor_url()
193
193
194 def _update_monitor_url(self):
194 def _update_monitor_url(self):
195 self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
195 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
196
196
197 def _transport_changed(self, name, old, new):
197 def _transport_changed(self, name, old, new):
198 self.engine_transport = new
198 self.engine_transport = new
199 self.client_transport = new
199 self.client_transport = new
200 self.monitor_transport = new
200 self.monitor_transport = new
201 self._update_monitor_url()
201 self._update_monitor_url()
202
202
203 def __init__(self, **kwargs):
203 def __init__(self, **kwargs):
204 super(HubFactory, self).__init__(**kwargs)
204 super(HubFactory, self).__init__(**kwargs)
205 self._update_monitor_url()
205 self._update_monitor_url()
206
206
207
207
208 def construct(self):
208 def construct(self):
209 self.init_hub()
209 self.init_hub()
210
210
211 def start(self):
211 def start(self):
212 self.heartmonitor.start()
212 self.heartmonitor.start()
213 self.log.info("Heartmonitor started")
213 self.log.info("Heartmonitor started")
214
214
215 def init_hub(self):
215 def init_hub(self):
216 """construct"""
216 """construct"""
217 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
217 client_iface = "%s://%s:" % (self.client_transport, self.client_ip) + "%i"
218 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
218 engine_iface = "%s://%s:" % (self.engine_transport, self.engine_ip) + "%i"
219
219
220 ctx = self.context
220 ctx = self.context
221 loop = self.loop
221 loop = self.loop
222
222
223 # Registrar socket
223 # Registrar socket
224 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
224 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
225 q.bind(client_iface % self.regport)
225 q.bind(client_iface % self.regport)
226 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
226 self.log.info("Hub listening on %s for registration.", client_iface % self.regport)
227 if self.client_ip != self.engine_ip:
227 if self.client_ip != self.engine_ip:
228 q.bind(engine_iface % self.regport)
228 q.bind(engine_iface % self.regport)
229 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
229 self.log.info("Hub listening on %s for registration.", engine_iface % self.regport)
230
230
231 ### Engine connections ###
231 ### Engine connections ###
232
232
233 # heartbeat
233 # heartbeat
234 hpub = ctx.socket(zmq.PUB)
234 hpub = ctx.socket(zmq.PUB)
235 hpub.bind(engine_iface % self.hb[0])
235 hpub.bind(engine_iface % self.hb[0])
236 hrep = ctx.socket(zmq.ROUTER)
236 hrep = ctx.socket(zmq.ROUTER)
237 hrep.bind(engine_iface % self.hb[1])
237 hrep.bind(engine_iface % self.hb[1])
238 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
238 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
239 pingstream=ZMQStream(hpub,loop),
239 pingstream=ZMQStream(hpub,loop),
240 pongstream=ZMQStream(hrep,loop)
240 pongstream=ZMQStream(hrep,loop)
241 )
241 )
242
242
243 ### Client connections ###
243 ### Client connections ###
244 # Notifier socket
244 # Notifier socket
245 n = ZMQStream(ctx.socket(zmq.PUB), loop)
245 n = ZMQStream(ctx.socket(zmq.PUB), loop)
246 n.bind(client_iface%self.notifier_port)
246 n.bind(client_iface%self.notifier_port)
247
247
248 ### build and launch the queues ###
248 ### build and launch the queues ###
249
249
250 # monitor socket
250 # monitor socket
251 sub = ctx.socket(zmq.SUB)
251 sub = ctx.socket(zmq.SUB)
252 sub.setsockopt(zmq.SUBSCRIBE, b"")
252 sub.setsockopt(zmq.SUBSCRIBE, b"")
253 sub.bind(self.monitor_url)
253 sub.bind(self.monitor_url)
254 sub.bind('inproc://monitor')
254 sub.bind('inproc://monitor')
255 sub = ZMQStream(sub, loop)
255 sub = ZMQStream(sub, loop)
256
256
257 # connect the db
257 # connect the db
258 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
258 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
259 # cdir = self.config.Global.cluster_dir
259 # cdir = self.config.Global.cluster_dir
260 self.db = import_item(str(self.db_class))(session=self.session.session,
260 self.db = import_item(str(self.db_class))(session=self.session.session,
261 config=self.config, log=self.log)
261 config=self.config, log=self.log)
262 time.sleep(.25)
262 time.sleep(.25)
263 try:
263 try:
264 scheme = self.config.TaskScheduler.scheme_name
264 scheme = self.config.TaskScheduler.scheme_name
265 except AttributeError:
265 except AttributeError:
266 from .scheduler import TaskScheduler
266 from .scheduler import TaskScheduler
267 scheme = TaskScheduler.scheme_name.get_default_value()
267 scheme = TaskScheduler.scheme_name.get_default_value()
268 # build connection dicts
268 # build connection dicts
269 self.engine_info = {
269 self.engine_info = {
270 'control' : engine_iface%self.control[1],
270 'control' : engine_iface%self.control[1],
271 'mux': engine_iface%self.mux[1],
271 'mux': engine_iface%self.mux[1],
272 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
272 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
273 'task' : engine_iface%self.task[1],
273 'task' : engine_iface%self.task[1],
274 'iopub' : engine_iface%self.iopub[1],
274 'iopub' : engine_iface%self.iopub[1],
275 # 'monitor' : engine_iface%self.mon_port,
275 # 'monitor' : engine_iface%self.mon_port,
276 }
276 }
277
277
278 self.client_info = {
278 self.client_info = {
279 'control' : client_iface%self.control[0],
279 'control' : client_iface%self.control[0],
280 'mux': client_iface%self.mux[0],
280 'mux': client_iface%self.mux[0],
281 'task' : (scheme, client_iface%self.task[0]),
281 'task' : (scheme, client_iface%self.task[0]),
282 'iopub' : client_iface%self.iopub[0],
282 'iopub' : client_iface%self.iopub[0],
283 'notification': client_iface%self.notifier_port
283 'notification': client_iface%self.notifier_port
284 }
284 }
285 self.log.debug("Hub engine addrs: %s"%self.engine_info)
285 self.log.debug("Hub engine addrs: %s", self.engine_info)
286 self.log.debug("Hub client addrs: %s"%self.client_info)
286 self.log.debug("Hub client addrs: %s", self.client_info)
287
287
288 # resubmit stream
288 # resubmit stream
289 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
289 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
290 url = util.disambiguate_url(self.client_info['task'][-1])
290 url = util.disambiguate_url(self.client_info['task'][-1])
291 r.setsockopt(zmq.IDENTITY, self.session.bsession)
291 r.setsockopt(zmq.IDENTITY, self.session.bsession)
292 r.connect(url)
292 r.connect(url)
293
293
294 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
294 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
295 query=q, notifier=n, resubmit=r, db=self.db,
295 query=q, notifier=n, resubmit=r, db=self.db,
296 engine_info=self.engine_info, client_info=self.client_info,
296 engine_info=self.engine_info, client_info=self.client_info,
297 log=self.log)
297 log=self.log)
298
298
299
299
300 class Hub(SessionFactory):
300 class Hub(SessionFactory):
301 """The IPython Controller Hub with 0MQ connections
301 """The IPython Controller Hub with 0MQ connections
302
302
303 Parameters
303 Parameters
304 ==========
304 ==========
305 loop: zmq IOLoop instance
305 loop: zmq IOLoop instance
306 session: Session object
306 session: Session object
307 <removed> context: zmq context for creating new connections (?)
307 <removed> context: zmq context for creating new connections (?)
308 queue: ZMQStream for monitoring the command queue (SUB)
308 queue: ZMQStream for monitoring the command queue (SUB)
309 query: ZMQStream for engine registration and client queries requests (XREP)
309 query: ZMQStream for engine registration and client queries requests (XREP)
310 heartbeat: HeartMonitor object checking the pulse of the engines
310 heartbeat: HeartMonitor object checking the pulse of the engines
311 notifier: ZMQStream for broadcasting engine registration changes (PUB)
311 notifier: ZMQStream for broadcasting engine registration changes (PUB)
312 db: connection to db for out of memory logging of commands
312 db: connection to db for out of memory logging of commands
313 NotImplemented
313 NotImplemented
314 engine_info: dict of zmq connection information for engines to connect
314 engine_info: dict of zmq connection information for engines to connect
315 to the queues.
315 to the queues.
316 client_info: dict of zmq connection information for engines to connect
316 client_info: dict of zmq connection information for engines to connect
317 to the queues.
317 to the queues.
318 """
318 """
319 # internal data structures:
319 # internal data structures:
320 ids=Set() # engine IDs
320 ids=Set() # engine IDs
321 keytable=Dict()
321 keytable=Dict()
322 by_ident=Dict()
322 by_ident=Dict()
323 engines=Dict()
323 engines=Dict()
324 clients=Dict()
324 clients=Dict()
325 hearts=Dict()
325 hearts=Dict()
326 pending=Set()
326 pending=Set()
327 queues=Dict() # pending msg_ids keyed by engine_id
327 queues=Dict() # pending msg_ids keyed by engine_id
328 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
328 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
329 completed=Dict() # completed msg_ids keyed by engine_id
329 completed=Dict() # completed msg_ids keyed by engine_id
330 all_completed=Set() # completed msg_ids keyed by engine_id
330 all_completed=Set() # completed msg_ids keyed by engine_id
331 dead_engines=Set() # completed msg_ids keyed by engine_id
331 dead_engines=Set() # completed msg_ids keyed by engine_id
332 unassigned=Set() # set of task msg_ds not yet assigned a destination
332 unassigned=Set() # set of task msg_ds not yet assigned a destination
333 incoming_registrations=Dict()
333 incoming_registrations=Dict()
334 registration_timeout=Integer()
334 registration_timeout=Integer()
335 _idcounter=Integer(0)
335 _idcounter=Integer(0)
336
336
337 # objects from constructor:
337 # objects from constructor:
338 query=Instance(ZMQStream)
338 query=Instance(ZMQStream)
339 monitor=Instance(ZMQStream)
339 monitor=Instance(ZMQStream)
340 notifier=Instance(ZMQStream)
340 notifier=Instance(ZMQStream)
341 resubmit=Instance(ZMQStream)
341 resubmit=Instance(ZMQStream)
342 heartmonitor=Instance(HeartMonitor)
342 heartmonitor=Instance(HeartMonitor)
343 db=Instance(object)
343 db=Instance(object)
344 client_info=Dict()
344 client_info=Dict()
345 engine_info=Dict()
345 engine_info=Dict()
346
346
347
347
348 def __init__(self, **kwargs):
348 def __init__(self, **kwargs):
349 """
349 """
350 # universal:
350 # universal:
351 loop: IOLoop for creating future connections
351 loop: IOLoop for creating future connections
352 session: streamsession for sending serialized data
352 session: streamsession for sending serialized data
353 # engine:
353 # engine:
354 queue: ZMQStream for monitoring queue messages
354 queue: ZMQStream for monitoring queue messages
355 query: ZMQStream for engine+client registration and client requests
355 query: ZMQStream for engine+client registration and client requests
356 heartbeat: HeartMonitor object for tracking engines
356 heartbeat: HeartMonitor object for tracking engines
357 # extra:
357 # extra:
358 db: ZMQStream for db connection (NotImplemented)
358 db: ZMQStream for db connection (NotImplemented)
359 engine_info: zmq address/protocol dict for engine connections
359 engine_info: zmq address/protocol dict for engine connections
360 client_info: zmq address/protocol dict for client connections
360 client_info: zmq address/protocol dict for client connections
361 """
361 """
362
362
363 super(Hub, self).__init__(**kwargs)
363 super(Hub, self).__init__(**kwargs)
364 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
364 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
365
365
366 # validate connection dicts:
366 # validate connection dicts:
367 for k,v in self.client_info.iteritems():
367 for k,v in self.client_info.iteritems():
368 if k == 'task':
368 if k == 'task':
369 util.validate_url_container(v[1])
369 util.validate_url_container(v[1])
370 else:
370 else:
371 util.validate_url_container(v)
371 util.validate_url_container(v)
372 # util.validate_url_container(self.client_info)
372 # util.validate_url_container(self.client_info)
373 util.validate_url_container(self.engine_info)
373 util.validate_url_container(self.engine_info)
374
374
375 # register our callbacks
375 # register our callbacks
376 self.query.on_recv(self.dispatch_query)
376 self.query.on_recv(self.dispatch_query)
377 self.monitor.on_recv(self.dispatch_monitor_traffic)
377 self.monitor.on_recv(self.dispatch_monitor_traffic)
378
378
379 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
379 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
380 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
380 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
381
381
382 self.monitor_handlers = {b'in' : self.save_queue_request,
382 self.monitor_handlers = {b'in' : self.save_queue_request,
383 b'out': self.save_queue_result,
383 b'out': self.save_queue_result,
384 b'intask': self.save_task_request,
384 b'intask': self.save_task_request,
385 b'outtask': self.save_task_result,
385 b'outtask': self.save_task_result,
386 b'tracktask': self.save_task_destination,
386 b'tracktask': self.save_task_destination,
387 b'incontrol': _passer,
387 b'incontrol': _passer,
388 b'outcontrol': _passer,
388 b'outcontrol': _passer,
389 b'iopub': self.save_iopub_message,
389 b'iopub': self.save_iopub_message,
390 }
390 }
391
391
392 self.query_handlers = {'queue_request': self.queue_status,
392 self.query_handlers = {'queue_request': self.queue_status,
393 'result_request': self.get_results,
393 'result_request': self.get_results,
394 'history_request': self.get_history,
394 'history_request': self.get_history,
395 'db_request': self.db_query,
395 'db_request': self.db_query,
396 'purge_request': self.purge_results,
396 'purge_request': self.purge_results,
397 'load_request': self.check_load,
397 'load_request': self.check_load,
398 'resubmit_request': self.resubmit_task,
398 'resubmit_request': self.resubmit_task,
399 'shutdown_request': self.shutdown_request,
399 'shutdown_request': self.shutdown_request,
400 'registration_request' : self.register_engine,
400 'registration_request' : self.register_engine,
401 'unregistration_request' : self.unregister_engine,
401 'unregistration_request' : self.unregister_engine,
402 'connection_request': self.connection_request,
402 'connection_request': self.connection_request,
403 }
403 }
404
404
405 # ignore resubmit replies
405 # ignore resubmit replies
406 self.resubmit.on_recv(lambda msg: None, copy=False)
406 self.resubmit.on_recv(lambda msg: None, copy=False)
407
407
408 self.log.info("hub::created hub")
408 self.log.info("hub::created hub")
409
409
410 @property
410 @property
411 def _next_id(self):
411 def _next_id(self):
412 """gemerate a new ID.
412 """gemerate a new ID.
413
413
414 No longer reuse old ids, just count from 0."""
414 No longer reuse old ids, just count from 0."""
415 newid = self._idcounter
415 newid = self._idcounter
416 self._idcounter += 1
416 self._idcounter += 1
417 return newid
417 return newid
418 # newid = 0
418 # newid = 0
419 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
419 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
420 # # print newid, self.ids, self.incoming_registrations
420 # # print newid, self.ids, self.incoming_registrations
421 # while newid in self.ids or newid in incoming:
421 # while newid in self.ids or newid in incoming:
422 # newid += 1
422 # newid += 1
423 # return newid
423 # return newid
424
424
425 #-----------------------------------------------------------------------------
425 #-----------------------------------------------------------------------------
426 # message validation
426 # message validation
427 #-----------------------------------------------------------------------------
427 #-----------------------------------------------------------------------------
428
428
429 def _validate_targets(self, targets):
429 def _validate_targets(self, targets):
430 """turn any valid targets argument into a list of integer ids"""
430 """turn any valid targets argument into a list of integer ids"""
431 if targets is None:
431 if targets is None:
432 # default to all
432 # default to all
433 targets = self.ids
433 targets = self.ids
434
434
435 if isinstance(targets, (int,str,unicode)):
435 if isinstance(targets, (int,str,unicode)):
436 # only one target specified
436 # only one target specified
437 targets = [targets]
437 targets = [targets]
438 _targets = []
438 _targets = []
439 for t in targets:
439 for t in targets:
440 # map raw identities to ids
440 # map raw identities to ids
441 if isinstance(t, (str,unicode)):
441 if isinstance(t, (str,unicode)):
442 t = self.by_ident.get(t, t)
442 t = self.by_ident.get(t, t)
443 _targets.append(t)
443 _targets.append(t)
444 targets = _targets
444 targets = _targets
445 bad_targets = [ t for t in targets if t not in self.ids ]
445 bad_targets = [ t for t in targets if t not in self.ids ]
446 if bad_targets:
446 if bad_targets:
447 raise IndexError("No Such Engine: %r"%bad_targets)
447 raise IndexError("No Such Engine: %r" % bad_targets)
448 if not targets:
448 if not targets:
449 raise IndexError("No Engines Registered")
449 raise IndexError("No Engines Registered")
450 return targets
450 return targets
451
451
452 #-----------------------------------------------------------------------------
452 #-----------------------------------------------------------------------------
453 # dispatch methods (1 per stream)
453 # dispatch methods (1 per stream)
454 #-----------------------------------------------------------------------------
454 #-----------------------------------------------------------------------------
455
455
456
456
457 def dispatch_monitor_traffic(self, msg):
457 def dispatch_monitor_traffic(self, msg):
458 """all ME and Task queue messages come through here, as well as
458 """all ME and Task queue messages come through here, as well as
459 IOPub traffic."""
459 IOPub traffic."""
460 self.log.debug("monitor traffic: %r"%msg[:2])
460 self.log.debug("monitor traffic: %r", msg[:2])
461 switch = msg[0]
461 switch = msg[0]
462 try:
462 try:
463 idents, msg = self.session.feed_identities(msg[1:])
463 idents, msg = self.session.feed_identities(msg[1:])
464 except ValueError:
464 except ValueError:
465 idents=[]
465 idents=[]
466 if not idents:
466 if not idents:
467 self.log.error("Bad Monitor Message: %r"%msg)
467 self.log.error("Bad Monitor Message: %r", msg)
468 return
468 return
469 handler = self.monitor_handlers.get(switch, None)
469 handler = self.monitor_handlers.get(switch, None)
470 if handler is not None:
470 if handler is not None:
471 handler(idents, msg)
471 handler(idents, msg)
472 else:
472 else:
473 self.log.error("Invalid monitor topic: %r"%switch)
473 self.log.error("Invalid monitor topic: %r", switch)
474
474
475
475
476 def dispatch_query(self, msg):
476 def dispatch_query(self, msg):
477 """Route registration requests and queries from clients."""
477 """Route registration requests and queries from clients."""
478 try:
478 try:
479 idents, msg = self.session.feed_identities(msg)
479 idents, msg = self.session.feed_identities(msg)
480 except ValueError:
480 except ValueError:
481 idents = []
481 idents = []
482 if not idents:
482 if not idents:
483 self.log.error("Bad Query Message: %r"%msg)
483 self.log.error("Bad Query Message: %r", msg)
484 return
484 return
485 client_id = idents[0]
485 client_id = idents[0]
486 try:
486 try:
487 msg = self.session.unserialize(msg, content=True)
487 msg = self.session.unserialize(msg, content=True)
488 except Exception:
488 except Exception:
489 content = error.wrap_exception()
489 content = error.wrap_exception()
490 self.log.error("Bad Query Message: %r"%msg, exc_info=True)
490 self.log.error("Bad Query Message: %r", msg, exc_info=True)
491 self.session.send(self.query, "hub_error", ident=client_id,
491 self.session.send(self.query, "hub_error", ident=client_id,
492 content=content)
492 content=content)
493 return
493 return
494 # print client_id, header, parent, content
494 # print client_id, header, parent, content
495 #switch on message type:
495 #switch on message type:
496 msg_type = msg['header']['msg_type']
496 msg_type = msg['header']['msg_type']
497 self.log.info("client::client %r requested %r"%(client_id, msg_type))
497 self.log.info("client::client %r requested %r", client_id, msg_type)
498 handler = self.query_handlers.get(msg_type, None)
498 handler = self.query_handlers.get(msg_type, None)
499 try:
499 try:
500 assert handler is not None, "Bad Message Type: %r"%msg_type
500 assert handler is not None, "Bad Message Type: %r" % msg_type
501 except:
501 except:
502 content = error.wrap_exception()
502 content = error.wrap_exception()
503 self.log.error("Bad Message Type: %r"%msg_type, exc_info=True)
503 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
504 self.session.send(self.query, "hub_error", ident=client_id,
504 self.session.send(self.query, "hub_error", ident=client_id,
505 content=content)
505 content=content)
506 return
506 return
507
507
508 else:
508 else:
509 handler(idents, msg)
509 handler(idents, msg)
510
510
511 def dispatch_db(self, msg):
511 def dispatch_db(self, msg):
512 """"""
512 """"""
513 raise NotImplementedError
513 raise NotImplementedError
514
514
515 #---------------------------------------------------------------------------
515 #---------------------------------------------------------------------------
516 # handler methods (1 per event)
516 # handler methods (1 per event)
517 #---------------------------------------------------------------------------
517 #---------------------------------------------------------------------------
518
518
519 #----------------------- Heartbeat --------------------------------------
519 #----------------------- Heartbeat --------------------------------------
520
520
521 def handle_new_heart(self, heart):
521 def handle_new_heart(self, heart):
522 """handler to attach to heartbeater.
522 """handler to attach to heartbeater.
523 Called when a new heart starts to beat.
523 Called when a new heart starts to beat.
524 Triggers completion of registration."""
524 Triggers completion of registration."""
525 self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
525 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
526 if heart not in self.incoming_registrations:
526 if heart not in self.incoming_registrations:
527 self.log.info("heartbeat::ignoring new heart: %r"%heart)
527 self.log.info("heartbeat::ignoring new heart: %r", heart)
528 else:
528 else:
529 self.finish_registration(heart)
529 self.finish_registration(heart)
530
530
531
531
532 def handle_heart_failure(self, heart):
532 def handle_heart_failure(self, heart):
533 """handler to attach to heartbeater.
533 """handler to attach to heartbeater.
534 called when a previously registered heart fails to respond to beat request.
534 called when a previously registered heart fails to respond to beat request.
535 triggers unregistration"""
535 triggers unregistration"""
536 self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
536 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
537 eid = self.hearts.get(heart, None)
537 eid = self.hearts.get(heart, None)
538 queue = self.engines[eid].queue
538 queue = self.engines[eid].queue
539 if eid is None:
539 if eid is None or self.keytable[eid] in self.dead_engines:
540 self.log.info("heartbeat::ignoring heart failure %r"%heart)
540 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
541 else:
541 else:
542 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
542 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
543
543
544 #----------------------- MUX Queue Traffic ------------------------------
544 #----------------------- MUX Queue Traffic ------------------------------
545
545
546 def save_queue_request(self, idents, msg):
546 def save_queue_request(self, idents, msg):
547 if len(idents) < 2:
547 if len(idents) < 2:
548 self.log.error("invalid identity prefix: %r"%idents)
548 self.log.error("invalid identity prefix: %r", idents)
549 return
549 return
550 queue_id, client_id = idents[:2]
550 queue_id, client_id = idents[:2]
551 try:
551 try:
552 msg = self.session.unserialize(msg)
552 msg = self.session.unserialize(msg)
553 except Exception:
553 except Exception:
554 self.log.error("queue::client %r sent invalid message to %r: %r"%(client_id, queue_id, msg), exc_info=True)
554 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
555 return
555 return
556
556
557 eid = self.by_ident.get(queue_id, None)
557 eid = self.by_ident.get(queue_id, None)
558 if eid is None:
558 if eid is None:
559 self.log.error("queue::target %r not registered"%queue_id)
559 self.log.error("queue::target %r not registered", queue_id)
560 self.log.debug("queue:: valid are: %r"%(self.by_ident.keys()))
560 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
561 return
561 return
562 record = init_record(msg)
562 record = init_record(msg)
563 msg_id = record['msg_id']
563 msg_id = record['msg_id']
564 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
564 # Unicode in records
565 # Unicode in records
565 record['engine_uuid'] = queue_id.decode('ascii')
566 record['engine_uuid'] = queue_id.decode('ascii')
566 record['client_uuid'] = client_id.decode('ascii')
567 record['client_uuid'] = client_id.decode('ascii')
567 record['queue'] = 'mux'
568 record['queue'] = 'mux'
568
569
569 try:
570 try:
570 # it's posible iopub arrived first:
571 # it's posible iopub arrived first:
571 existing = self.db.get_record(msg_id)
572 existing = self.db.get_record(msg_id)
572 for key,evalue in existing.iteritems():
573 for key,evalue in existing.iteritems():
573 rvalue = record.get(key, None)
574 rvalue = record.get(key, None)
574 if evalue and rvalue and evalue != rvalue:
575 if evalue and rvalue and evalue != rvalue:
575 self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
576 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
576 elif evalue and not rvalue:
577 elif evalue and not rvalue:
577 record[key] = evalue
578 record[key] = evalue
578 try:
579 try:
579 self.db.update_record(msg_id, record)
580 self.db.update_record(msg_id, record)
580 except Exception:
581 except Exception:
581 self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
582 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
582 except KeyError:
583 except KeyError:
583 try:
584 try:
584 self.db.add_record(msg_id, record)
585 self.db.add_record(msg_id, record)
585 except Exception:
586 except Exception:
586 self.log.error("DB Error adding record %r"%msg_id, exc_info=True)
587 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
587
588
588
589
589 self.pending.add(msg_id)
590 self.pending.add(msg_id)
590 self.queues[eid].append(msg_id)
591 self.queues[eid].append(msg_id)
591
592
592 def save_queue_result(self, idents, msg):
593 def save_queue_result(self, idents, msg):
593 if len(idents) < 2:
594 if len(idents) < 2:
594 self.log.error("invalid identity prefix: %r"%idents)
595 self.log.error("invalid identity prefix: %r", idents)
595 return
596 return
596
597
597 client_id, queue_id = idents[:2]
598 client_id, queue_id = idents[:2]
598 try:
599 try:
599 msg = self.session.unserialize(msg)
600 msg = self.session.unserialize(msg)
600 except Exception:
601 except Exception:
601 self.log.error("queue::engine %r sent invalid message to %r: %r"%(
602 self.log.error("queue::engine %r sent invalid message to %r: %r",
602 queue_id,client_id, msg), exc_info=True)
603 queue_id, client_id, msg, exc_info=True)
603 return
604 return
604
605
605 eid = self.by_ident.get(queue_id, None)
606 eid = self.by_ident.get(queue_id, None)
606 if eid is None:
607 if eid is None:
607 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
608 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
608 return
609 return
609
610
610 parent = msg['parent_header']
611 parent = msg['parent_header']
611 if not parent:
612 if not parent:
612 return
613 return
613 msg_id = parent['msg_id']
614 msg_id = parent['msg_id']
614 if msg_id in self.pending:
615 if msg_id in self.pending:
615 self.pending.remove(msg_id)
616 self.pending.remove(msg_id)
616 self.all_completed.add(msg_id)
617 self.all_completed.add(msg_id)
617 self.queues[eid].remove(msg_id)
618 self.queues[eid].remove(msg_id)
618 self.completed[eid].append(msg_id)
619 self.completed[eid].append(msg_id)
620 self.log.info("queue::request %r completed on %s", msg_id, eid)
619 elif msg_id not in self.all_completed:
621 elif msg_id not in self.all_completed:
620 # it could be a result from a dead engine that died before delivering the
622 # it could be a result from a dead engine that died before delivering the
621 # result
623 # result
622 self.log.warn("queue:: unknown msg finished %r"%msg_id)
624 self.log.warn("queue:: unknown msg finished %r", msg_id)
623 return
625 return
624 # update record anyway, because the unregistration could have been premature
626 # update record anyway, because the unregistration could have been premature
625 rheader = msg['header']
627 rheader = msg['header']
626 completed = rheader['date']
628 completed = rheader['date']
627 started = rheader.get('started', None)
629 started = rheader.get('started', None)
628 result = {
630 result = {
629 'result_header' : rheader,
631 'result_header' : rheader,
630 'result_content': msg['content'],
632 'result_content': msg['content'],
631 'started' : started,
633 'started' : started,
632 'completed' : completed
634 'completed' : completed
633 }
635 }
634
636
635 result['result_buffers'] = msg['buffers']
637 result['result_buffers'] = msg['buffers']
636 try:
638 try:
637 self.db.update_record(msg_id, result)
639 self.db.update_record(msg_id, result)
638 except Exception:
640 except Exception:
639 self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
641 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
640
642
641
643
642 #--------------------- Task Queue Traffic ------------------------------
644 #--------------------- Task Queue Traffic ------------------------------
643
645
644 def save_task_request(self, idents, msg):
646 def save_task_request(self, idents, msg):
645 """Save the submission of a task."""
647 """Save the submission of a task."""
646 client_id = idents[0]
648 client_id = idents[0]
647
649
648 try:
650 try:
649 msg = self.session.unserialize(msg)
651 msg = self.session.unserialize(msg)
650 except Exception:
652 except Exception:
651 self.log.error("task::client %r sent invalid task message: %r"%(
653 self.log.error("task::client %r sent invalid task message: %r",
652 client_id, msg), exc_info=True)
654 client_id, msg, exc_info=True)
653 return
655 return
654 record = init_record(msg)
656 record = init_record(msg)
655
657
656 record['client_uuid'] = client_id.decode('ascii')
658 record['client_uuid'] = client_id.decode('ascii')
657 record['queue'] = 'task'
659 record['queue'] = 'task'
658 header = msg['header']
660 header = msg['header']
659 msg_id = header['msg_id']
661 msg_id = header['msg_id']
660 self.pending.add(msg_id)
662 self.pending.add(msg_id)
661 self.unassigned.add(msg_id)
663 self.unassigned.add(msg_id)
662 try:
664 try:
663 # it's posible iopub arrived first:
665 # it's posible iopub arrived first:
664 existing = self.db.get_record(msg_id)
666 existing = self.db.get_record(msg_id)
665 if existing['resubmitted']:
667 if existing['resubmitted']:
666 for key in ('submitted', 'client_uuid', 'buffers'):
668 for key in ('submitted', 'client_uuid', 'buffers'):
667 # don't clobber these keys on resubmit
669 # don't clobber these keys on resubmit
668 # submitted and client_uuid should be different
670 # submitted and client_uuid should be different
669 # and buffers might be big, and shouldn't have changed
671 # and buffers might be big, and shouldn't have changed
670 record.pop(key)
672 record.pop(key)
671 # still check content,header which should not change
673 # still check content,header which should not change
672 # but are not expensive to compare as buffers
674 # but are not expensive to compare as buffers
673
675
674 for key,evalue in existing.iteritems():
676 for key,evalue in existing.iteritems():
675 if key.endswith('buffers'):
677 if key.endswith('buffers'):
676 # don't compare buffers
678 # don't compare buffers
677 continue
679 continue
678 rvalue = record.get(key, None)
680 rvalue = record.get(key, None)
679 if evalue and rvalue and evalue != rvalue:
681 if evalue and rvalue and evalue != rvalue:
680 self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
682 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
681 elif evalue and not rvalue:
683 elif evalue and not rvalue:
682 record[key] = evalue
684 record[key] = evalue
683 try:
685 try:
684 self.db.update_record(msg_id, record)
686 self.db.update_record(msg_id, record)
685 except Exception:
687 except Exception:
686 self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
688 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
687 except KeyError:
689 except KeyError:
688 try:
690 try:
689 self.db.add_record(msg_id, record)
691 self.db.add_record(msg_id, record)
690 except Exception:
692 except Exception:
691 self.log.error("DB Error adding record %r"%msg_id, exc_info=True)
693 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
692 except Exception:
694 except Exception:
693 self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
695 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
694
696
695 def save_task_result(self, idents, msg):
697 def save_task_result(self, idents, msg):
696 """save the result of a completed task."""
698 """save the result of a completed task."""
697 client_id = idents[0]
699 client_id = idents[0]
698 try:
700 try:
699 msg = self.session.unserialize(msg)
701 msg = self.session.unserialize(msg)
700 except Exception:
702 except Exception:
701 self.log.error("task::invalid task result message send to %r: %r"%(
703 self.log.error("task::invalid task result message send to %r: %r",
702 client_id, msg), exc_info=True)
704 client_id, msg, exc_info=True)
703 return
705 return
704
706
705 parent = msg['parent_header']
707 parent = msg['parent_header']
706 if not parent:
708 if not parent:
707 # print msg
709 # print msg
708 self.log.warn("Task %r had no parent!"%msg)
710 self.log.warn("Task %r had no parent!", msg)
709 return
711 return
710 msg_id = parent['msg_id']
712 msg_id = parent['msg_id']
711 if msg_id in self.unassigned:
713 if msg_id in self.unassigned:
712 self.unassigned.remove(msg_id)
714 self.unassigned.remove(msg_id)
713
715
714 header = msg['header']
716 header = msg['header']
715 engine_uuid = header.get('engine', None)
717 engine_uuid = header.get('engine', None)
716 eid = self.by_ident.get(engine_uuid, None)
718 eid = self.by_ident.get(engine_uuid, None)
717
719
718 if msg_id in self.pending:
720 if msg_id in self.pending:
721 self.log.info("task::task %r finished on %s", msg_id, eid)
719 self.pending.remove(msg_id)
722 self.pending.remove(msg_id)
720 self.all_completed.add(msg_id)
723 self.all_completed.add(msg_id)
721 if eid is not None:
724 if eid is not None:
722 self.completed[eid].append(msg_id)
725 self.completed[eid].append(msg_id)
723 if msg_id in self.tasks[eid]:
726 if msg_id in self.tasks[eid]:
724 self.tasks[eid].remove(msg_id)
727 self.tasks[eid].remove(msg_id)
725 completed = header['date']
728 completed = header['date']
726 started = header.get('started', None)
729 started = header.get('started', None)
727 result = {
730 result = {
728 'result_header' : header,
731 'result_header' : header,
729 'result_content': msg['content'],
732 'result_content': msg['content'],
730 'started' : started,
733 'started' : started,
731 'completed' : completed,
734 'completed' : completed,
732 'engine_uuid': engine_uuid
735 'engine_uuid': engine_uuid
733 }
736 }
734
737
735 result['result_buffers'] = msg['buffers']
738 result['result_buffers'] = msg['buffers']
736 try:
739 try:
737 self.db.update_record(msg_id, result)
740 self.db.update_record(msg_id, result)
738 except Exception:
741 except Exception:
739 self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
742 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
740
743
741 else:
744 else:
742 self.log.debug("task::unknown task %r finished"%msg_id)
745 self.log.debug("task::unknown task %r finished", msg_id)
743
746
744 def save_task_destination(self, idents, msg):
747 def save_task_destination(self, idents, msg):
745 try:
748 try:
746 msg = self.session.unserialize(msg, content=True)
749 msg = self.session.unserialize(msg, content=True)
747 except Exception:
750 except Exception:
748 self.log.error("task::invalid task tracking message", exc_info=True)
751 self.log.error("task::invalid task tracking message", exc_info=True)
749 return
752 return
750 content = msg['content']
753 content = msg['content']
751 # print (content)
754 # print (content)
752 msg_id = content['msg_id']
755 msg_id = content['msg_id']
753 engine_uuid = content['engine_id']
756 engine_uuid = content['engine_id']
754 eid = self.by_ident[util.asbytes(engine_uuid)]
757 eid = self.by_ident[util.asbytes(engine_uuid)]
755
758
756 self.log.info("task::task %r arrived on %r"%(msg_id, eid))
759 self.log.info("task::task %r arrived on %r", msg_id, eid)
757 if msg_id in self.unassigned:
760 if msg_id in self.unassigned:
758 self.unassigned.remove(msg_id)
761 self.unassigned.remove(msg_id)
759 # else:
762 # else:
760 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
763 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
761
764
762 self.tasks[eid].append(msg_id)
765 self.tasks[eid].append(msg_id)
763 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
766 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
764 try:
767 try:
765 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
768 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
766 except Exception:
769 except Exception:
767 self.log.error("DB Error saving task destination %r"%msg_id, exc_info=True)
770 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
768
771
769
772
770 def mia_task_request(self, idents, msg):
773 def mia_task_request(self, idents, msg):
771 raise NotImplementedError
774 raise NotImplementedError
772 client_id = idents[0]
775 client_id = idents[0]
773 # content = dict(mia=self.mia,status='ok')
776 # content = dict(mia=self.mia,status='ok')
774 # self.session.send('mia_reply', content=content, idents=client_id)
777 # self.session.send('mia_reply', content=content, idents=client_id)
775
778
776
779
777 #--------------------- IOPub Traffic ------------------------------
780 #--------------------- IOPub Traffic ------------------------------
778
781
779 def save_iopub_message(self, topics, msg):
782 def save_iopub_message(self, topics, msg):
780 """save an iopub message into the db"""
783 """save an iopub message into the db"""
781 # print (topics)
784 # print (topics)
782 try:
785 try:
783 msg = self.session.unserialize(msg, content=True)
786 msg = self.session.unserialize(msg, content=True)
784 except Exception:
787 except Exception:
785 self.log.error("iopub::invalid IOPub message", exc_info=True)
788 self.log.error("iopub::invalid IOPub message", exc_info=True)
786 return
789 return
787
790
788 parent = msg['parent_header']
791 parent = msg['parent_header']
789 if not parent:
792 if not parent:
790 self.log.error("iopub::invalid IOPub message: %r"%msg)
793 self.log.error("iopub::invalid IOPub message: %r", msg)
791 return
794 return
792 msg_id = parent['msg_id']
795 msg_id = parent['msg_id']
793 msg_type = msg['header']['msg_type']
796 msg_type = msg['header']['msg_type']
794 content = msg['content']
797 content = msg['content']
795
798
796 # ensure msg_id is in db
799 # ensure msg_id is in db
797 try:
800 try:
798 rec = self.db.get_record(msg_id)
801 rec = self.db.get_record(msg_id)
799 except KeyError:
802 except KeyError:
800 rec = empty_record()
803 rec = empty_record()
801 rec['msg_id'] = msg_id
804 rec['msg_id'] = msg_id
802 self.db.add_record(msg_id, rec)
805 self.db.add_record(msg_id, rec)
803 # stream
806 # stream
804 d = {}
807 d = {}
805 if msg_type == 'stream':
808 if msg_type == 'stream':
806 name = content['name']
809 name = content['name']
807 s = rec[name] or ''
810 s = rec[name] or ''
808 d[name] = s + content['data']
811 d[name] = s + content['data']
809
812
810 elif msg_type == 'pyerr':
813 elif msg_type == 'pyerr':
811 d['pyerr'] = content
814 d['pyerr'] = content
812 elif msg_type == 'pyin':
815 elif msg_type == 'pyin':
813 d['pyin'] = content['code']
816 d['pyin'] = content['code']
814 else:
817 else:
815 d[msg_type] = content.get('data', '')
818 d[msg_type] = content.get('data', '')
816
819
817 try:
820 try:
818 self.db.update_record(msg_id, d)
821 self.db.update_record(msg_id, d)
819 except Exception:
822 except Exception:
820 self.log.error("DB Error saving iopub message %r"%msg_id, exc_info=True)
823 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
821
824
822
825
823
826
824 #-------------------------------------------------------------------------
827 #-------------------------------------------------------------------------
825 # Registration requests
828 # Registration requests
826 #-------------------------------------------------------------------------
829 #-------------------------------------------------------------------------
827
830
828 def connection_request(self, client_id, msg):
831 def connection_request(self, client_id, msg):
829 """Reply with connection addresses for clients."""
832 """Reply with connection addresses for clients."""
830 self.log.info("client::client %r connected"%client_id)
833 self.log.info("client::client %r connected", client_id)
831 content = dict(status='ok')
834 content = dict(status='ok')
832 content.update(self.client_info)
835 content.update(self.client_info)
833 jsonable = {}
836 jsonable = {}
834 for k,v in self.keytable.iteritems():
837 for k,v in self.keytable.iteritems():
835 if v not in self.dead_engines:
838 if v not in self.dead_engines:
836 jsonable[str(k)] = v.decode('ascii')
839 jsonable[str(k)] = v.decode('ascii')
837 content['engines'] = jsonable
840 content['engines'] = jsonable
838 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
841 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
839
842
840 def register_engine(self, reg, msg):
843 def register_engine(self, reg, msg):
841 """Register a new engine."""
844 """Register a new engine."""
842 content = msg['content']
845 content = msg['content']
843 try:
846 try:
844 queue = util.asbytes(content['queue'])
847 queue = util.asbytes(content['queue'])
845 except KeyError:
848 except KeyError:
846 self.log.error("registration::queue not specified", exc_info=True)
849 self.log.error("registration::queue not specified", exc_info=True)
847 return
850 return
848 heart = content.get('heartbeat', None)
851 heart = content.get('heartbeat', None)
849 if heart:
852 if heart:
850 heart = util.asbytes(heart)
853 heart = util.asbytes(heart)
851 """register a new engine, and create the socket(s) necessary"""
854 """register a new engine, and create the socket(s) necessary"""
852 eid = self._next_id
855 eid = self._next_id
853 # print (eid, queue, reg, heart)
856 # print (eid, queue, reg, heart)
854
857
855 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
858 self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart)
856
859
857 content = dict(id=eid,status='ok')
860 content = dict(id=eid,status='ok')
858 content.update(self.engine_info)
861 content.update(self.engine_info)
859 # check if requesting available IDs:
862 # check if requesting available IDs:
860 if queue in self.by_ident:
863 if queue in self.by_ident:
861 try:
864 try:
862 raise KeyError("queue_id %r in use"%queue)
865 raise KeyError("queue_id %r in use" % queue)
863 except:
866 except:
864 content = error.wrap_exception()
867 content = error.wrap_exception()
865 self.log.error("queue_id %r in use"%queue, exc_info=True)
868 self.log.error("queue_id %r in use", queue, exc_info=True)
866 elif heart in self.hearts: # need to check unique hearts?
869 elif heart in self.hearts: # need to check unique hearts?
867 try:
870 try:
868 raise KeyError("heart_id %r in use"%heart)
871 raise KeyError("heart_id %r in use" % heart)
869 except:
872 except:
870 self.log.error("heart_id %r in use"%heart, exc_info=True)
873 self.log.error("heart_id %r in use", heart, exc_info=True)
871 content = error.wrap_exception()
874 content = error.wrap_exception()
872 else:
875 else:
873 for h, pack in self.incoming_registrations.iteritems():
876 for h, pack in self.incoming_registrations.iteritems():
874 if heart == h:
877 if heart == h:
875 try:
878 try:
876 raise KeyError("heart_id %r in use"%heart)
879 raise KeyError("heart_id %r in use" % heart)
877 except:
880 except:
878 self.log.error("heart_id %r in use"%heart, exc_info=True)
881 self.log.error("heart_id %r in use", heart, exc_info=True)
879 content = error.wrap_exception()
882 content = error.wrap_exception()
880 break
883 break
881 elif queue == pack[1]:
884 elif queue == pack[1]:
882 try:
885 try:
883 raise KeyError("queue_id %r in use"%queue)
886 raise KeyError("queue_id %r in use" % queue)
884 except:
887 except:
885 self.log.error("queue_id %r in use"%queue, exc_info=True)
888 self.log.error("queue_id %r in use", queue, exc_info=True)
886 content = error.wrap_exception()
889 content = error.wrap_exception()
887 break
890 break
888
891
889 msg = self.session.send(self.query, "registration_reply",
892 msg = self.session.send(self.query, "registration_reply",
890 content=content,
893 content=content,
891 ident=reg)
894 ident=reg)
892
895
893 if content['status'] == 'ok':
896 if content['status'] == 'ok':
894 if heart in self.heartmonitor.hearts:
897 if heart in self.heartmonitor.hearts:
895 # already beating
898 # already beating
896 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
899 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
897 self.finish_registration(heart)
900 self.finish_registration(heart)
898 else:
901 else:
899 purge = lambda : self._purge_stalled_registration(heart)
902 purge = lambda : self._purge_stalled_registration(heart)
900 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
903 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
901 dc.start()
904 dc.start()
902 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
905 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
903 else:
906 else:
904 self.log.error("registration::registration %i failed: %r"%(eid, content['evalue']))
907 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
905 return eid
908 return eid
906
909
907 def unregister_engine(self, ident, msg):
910 def unregister_engine(self, ident, msg):
908 """Unregister an engine that explicitly requested to leave."""
911 """Unregister an engine that explicitly requested to leave."""
909 try:
912 try:
910 eid = msg['content']['id']
913 eid = msg['content']['id']
911 except:
914 except:
912 self.log.error("registration::bad engine id for unregistration: %r"%ident, exc_info=True)
915 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
913 return
916 return
914 self.log.info("registration::unregister_engine(%r)"%eid)
917 self.log.info("registration::unregister_engine(%r)", eid)
915 # print (eid)
918 # print (eid)
916 uuid = self.keytable[eid]
919 uuid = self.keytable[eid]
917 content=dict(id=eid, queue=uuid.decode('ascii'))
920 content=dict(id=eid, queue=uuid.decode('ascii'))
918 self.dead_engines.add(uuid)
921 self.dead_engines.add(uuid)
919 # self.ids.remove(eid)
922 # self.ids.remove(eid)
920 # uuid = self.keytable.pop(eid)
923 # uuid = self.keytable.pop(eid)
921 #
924 #
922 # ec = self.engines.pop(eid)
925 # ec = self.engines.pop(eid)
923 # self.hearts.pop(ec.heartbeat)
926 # self.hearts.pop(ec.heartbeat)
924 # self.by_ident.pop(ec.queue)
927 # self.by_ident.pop(ec.queue)
925 # self.completed.pop(eid)
928 # self.completed.pop(eid)
926 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
929 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
927 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
930 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
928 dc.start()
931 dc.start()
929 ############## TODO: HANDLE IT ################
932 ############## TODO: HANDLE IT ################
930
933
931 if self.notifier:
934 if self.notifier:
932 self.session.send(self.notifier, "unregistration_notification", content=content)
935 self.session.send(self.notifier, "unregistration_notification", content=content)
933
936
934 def _handle_stranded_msgs(self, eid, uuid):
937 def _handle_stranded_msgs(self, eid, uuid):
935 """Handle messages known to be on an engine when the engine unregisters.
938 """Handle messages known to be on an engine when the engine unregisters.
936
939
937 It is possible that this will fire prematurely - that is, an engine will
940 It is possible that this will fire prematurely - that is, an engine will
938 go down after completing a result, and the client will be notified
941 go down after completing a result, and the client will be notified
939 that the result failed and later receive the actual result.
942 that the result failed and later receive the actual result.
940 """
943 """
941
944
942 outstanding = self.queues[eid]
945 outstanding = self.queues[eid]
943
946
944 for msg_id in outstanding:
947 for msg_id in outstanding:
945 self.pending.remove(msg_id)
948 self.pending.remove(msg_id)
946 self.all_completed.add(msg_id)
949 self.all_completed.add(msg_id)
947 try:
950 try:
948 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
951 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
949 except:
952 except:
950 content = error.wrap_exception()
953 content = error.wrap_exception()
951 # build a fake header:
954 # build a fake header:
952 header = {}
955 header = {}
953 header['engine'] = uuid
956 header['engine'] = uuid
954 header['date'] = datetime.now()
957 header['date'] = datetime.now()
955 rec = dict(result_content=content, result_header=header, result_buffers=[])
958 rec = dict(result_content=content, result_header=header, result_buffers=[])
956 rec['completed'] = header['date']
959 rec['completed'] = header['date']
957 rec['engine_uuid'] = uuid
960 rec['engine_uuid'] = uuid
958 try:
961 try:
959 self.db.update_record(msg_id, rec)
962 self.db.update_record(msg_id, rec)
960 except Exception:
963 except Exception:
961 self.log.error("DB Error handling stranded msg %r"%msg_id, exc_info=True)
964 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
962
965
963
966
964 def finish_registration(self, heart):
967 def finish_registration(self, heart):
965 """Second half of engine registration, called after our HeartMonitor
968 """Second half of engine registration, called after our HeartMonitor
966 has received a beat from the Engine's Heart."""
969 has received a beat from the Engine's Heart."""
967 try:
970 try:
968 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
971 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
969 except KeyError:
972 except KeyError:
970 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
973 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
971 return
974 return
972 self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
975 self.log.info("registration::finished registering engine %i:%r", eid, queue)
973 if purge is not None:
976 if purge is not None:
974 purge.stop()
977 purge.stop()
975 control = queue
978 control = queue
976 self.ids.add(eid)
979 self.ids.add(eid)
977 self.keytable[eid] = queue
980 self.keytable[eid] = queue
978 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
981 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
979 control=control, heartbeat=heart)
982 control=control, heartbeat=heart)
980 self.by_ident[queue] = eid
983 self.by_ident[queue] = eid
981 self.queues[eid] = list()
984 self.queues[eid] = list()
982 self.tasks[eid] = list()
985 self.tasks[eid] = list()
983 self.completed[eid] = list()
986 self.completed[eid] = list()
984 self.hearts[heart] = eid
987 self.hearts[heart] = eid
985 content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
988 content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
986 if self.notifier:
989 if self.notifier:
987 self.session.send(self.notifier, "registration_notification", content=content)
990 self.session.send(self.notifier, "registration_notification", content=content)
988 self.log.info("engine::Engine Connected: %i"%eid)
991 self.log.info("engine::Engine Connected: %i", eid)
989
992
990 def _purge_stalled_registration(self, heart):
993 def _purge_stalled_registration(self, heart):
991 if heart in self.incoming_registrations:
994 if heart in self.incoming_registrations:
992 eid = self.incoming_registrations.pop(heart)[0]
995 eid = self.incoming_registrations.pop(heart)[0]
993 self.log.info("registration::purging stalled registration: %i"%eid)
996 self.log.info("registration::purging stalled registration: %i", eid)
994 else:
997 else:
995 pass
998 pass
996
999
997 #-------------------------------------------------------------------------
1000 #-------------------------------------------------------------------------
998 # Client Requests
1001 # Client Requests
999 #-------------------------------------------------------------------------
1002 #-------------------------------------------------------------------------
1000
1003
1001 def shutdown_request(self, client_id, msg):
1004 def shutdown_request(self, client_id, msg):
1002 """handle shutdown request."""
1005 """handle shutdown request."""
1003 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1006 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1004 # also notify other clients of shutdown
1007 # also notify other clients of shutdown
1005 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1008 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1006 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1009 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1007 dc.start()
1010 dc.start()
1008
1011
1009 def _shutdown(self):
1012 def _shutdown(self):
1010 self.log.info("hub::hub shutting down.")
1013 self.log.info("hub::hub shutting down.")
1011 time.sleep(0.1)
1014 time.sleep(0.1)
1012 sys.exit(0)
1015 sys.exit(0)
1013
1016
1014
1017
1015 def check_load(self, client_id, msg):
1018 def check_load(self, client_id, msg):
1016 content = msg['content']
1019 content = msg['content']
1017 try:
1020 try:
1018 targets = content['targets']
1021 targets = content['targets']
1019 targets = self._validate_targets(targets)
1022 targets = self._validate_targets(targets)
1020 except:
1023 except:
1021 content = error.wrap_exception()
1024 content = error.wrap_exception()
1022 self.session.send(self.query, "hub_error",
1025 self.session.send(self.query, "hub_error",
1023 content=content, ident=client_id)
1026 content=content, ident=client_id)
1024 return
1027 return
1025
1028
1026 content = dict(status='ok')
1029 content = dict(status='ok')
1027 # loads = {}
1030 # loads = {}
1028 for t in targets:
1031 for t in targets:
1029 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1032 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1030 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1033 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1031
1034
1032
1035
1033 def queue_status(self, client_id, msg):
1036 def queue_status(self, client_id, msg):
1034 """Return the Queue status of one or more targets.
1037 """Return the Queue status of one or more targets.
1035 if verbose: return the msg_ids
1038 if verbose: return the msg_ids
1036 else: return len of each type.
1039 else: return len of each type.
1037 keys: queue (pending MUX jobs)
1040 keys: queue (pending MUX jobs)
1038 tasks (pending Task jobs)
1041 tasks (pending Task jobs)
1039 completed (finished jobs from both queues)"""
1042 completed (finished jobs from both queues)"""
1040 content = msg['content']
1043 content = msg['content']
1041 targets = content['targets']
1044 targets = content['targets']
1042 try:
1045 try:
1043 targets = self._validate_targets(targets)
1046 targets = self._validate_targets(targets)
1044 except:
1047 except:
1045 content = error.wrap_exception()
1048 content = error.wrap_exception()
1046 self.session.send(self.query, "hub_error",
1049 self.session.send(self.query, "hub_error",
1047 content=content, ident=client_id)
1050 content=content, ident=client_id)
1048 return
1051 return
1049 verbose = content.get('verbose', False)
1052 verbose = content.get('verbose', False)
1050 content = dict(status='ok')
1053 content = dict(status='ok')
1051 for t in targets:
1054 for t in targets:
1052 queue = self.queues[t]
1055 queue = self.queues[t]
1053 completed = self.completed[t]
1056 completed = self.completed[t]
1054 tasks = self.tasks[t]
1057 tasks = self.tasks[t]
1055 if not verbose:
1058 if not verbose:
1056 queue = len(queue)
1059 queue = len(queue)
1057 completed = len(completed)
1060 completed = len(completed)
1058 tasks = len(tasks)
1061 tasks = len(tasks)
1059 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1062 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1060 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1063 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1061 # print (content)
1064 # print (content)
1062 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1065 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1063
1066
1064 def purge_results(self, client_id, msg):
1067 def purge_results(self, client_id, msg):
1065 """Purge results from memory. This method is more valuable before we move
1068 """Purge results from memory. This method is more valuable before we move
1066 to a DB based message storage mechanism."""
1069 to a DB based message storage mechanism."""
1067 content = msg['content']
1070 content = msg['content']
1068 self.log.info("Dropping records with %s", content)
1071 self.log.info("Dropping records with %s", content)
1069 msg_ids = content.get('msg_ids', [])
1072 msg_ids = content.get('msg_ids', [])
1070 reply = dict(status='ok')
1073 reply = dict(status='ok')
1071 if msg_ids == 'all':
1074 if msg_ids == 'all':
1072 try:
1075 try:
1073 self.db.drop_matching_records(dict(completed={'$ne':None}))
1076 self.db.drop_matching_records(dict(completed={'$ne':None}))
1074 except Exception:
1077 except Exception:
1075 reply = error.wrap_exception()
1078 reply = error.wrap_exception()
1076 else:
1079 else:
1077 pending = filter(lambda m: m in self.pending, msg_ids)
1080 pending = filter(lambda m: m in self.pending, msg_ids)
1078 if pending:
1081 if pending:
1079 try:
1082 try:
1080 raise IndexError("msg pending: %r"%pending[0])
1083 raise IndexError("msg pending: %r" % pending[0])
1081 except:
1084 except:
1082 reply = error.wrap_exception()
1085 reply = error.wrap_exception()
1083 else:
1086 else:
1084 try:
1087 try:
1085 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1088 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1086 except Exception:
1089 except Exception:
1087 reply = error.wrap_exception()
1090 reply = error.wrap_exception()
1088
1091
1089 if reply['status'] == 'ok':
1092 if reply['status'] == 'ok':
1090 eids = content.get('engine_ids', [])
1093 eids = content.get('engine_ids', [])
1091 for eid in eids:
1094 for eid in eids:
1092 if eid not in self.engines:
1095 if eid not in self.engines:
1093 try:
1096 try:
1094 raise IndexError("No such engine: %i"%eid)
1097 raise IndexError("No such engine: %i" % eid)
1095 except:
1098 except:
1096 reply = error.wrap_exception()
1099 reply = error.wrap_exception()
1097 break
1100 break
1098 uid = self.engines[eid].queue
1101 uid = self.engines[eid].queue
1099 try:
1102 try:
1100 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1103 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1101 except Exception:
1104 except Exception:
1102 reply = error.wrap_exception()
1105 reply = error.wrap_exception()
1103 break
1106 break
1104
1107
1105 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1108 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1106
1109
1107 def resubmit_task(self, client_id, msg):
1110 def resubmit_task(self, client_id, msg):
1108 """Resubmit one or more tasks."""
1111 """Resubmit one or more tasks."""
1109 def finish(reply):
1112 def finish(reply):
1110 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1113 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1111
1114
1112 content = msg['content']
1115 content = msg['content']
1113 msg_ids = content['msg_ids']
1116 msg_ids = content['msg_ids']
1114 reply = dict(status='ok')
1117 reply = dict(status='ok')
1115 try:
1118 try:
1116 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1119 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1117 'header', 'content', 'buffers'])
1120 'header', 'content', 'buffers'])
1118 except Exception:
1121 except Exception:
1119 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1122 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1120 return finish(error.wrap_exception())
1123 return finish(error.wrap_exception())
1121
1124
1122 # validate msg_ids
1125 # validate msg_ids
1123 found_ids = [ rec['msg_id'] for rec in records ]
1126 found_ids = [ rec['msg_id'] for rec in records ]
1124 invalid_ids = filter(lambda m: m in self.pending, found_ids)
1127 invalid_ids = filter(lambda m: m in self.pending, found_ids)
1125 if len(records) > len(msg_ids):
1128 if len(records) > len(msg_ids):
1126 try:
1129 try:
1127 raise RuntimeError("DB appears to be in an inconsistent state."
1130 raise RuntimeError("DB appears to be in an inconsistent state."
1128 "More matching records were found than should exist")
1131 "More matching records were found than should exist")
1129 except Exception:
1132 except Exception:
1130 return finish(error.wrap_exception())
1133 return finish(error.wrap_exception())
1131 elif len(records) < len(msg_ids):
1134 elif len(records) < len(msg_ids):
1132 missing = [ m for m in msg_ids if m not in found_ids ]
1135 missing = [ m for m in msg_ids if m not in found_ids ]
1133 try:
1136 try:
1134 raise KeyError("No such msg(s): %r"%missing)
1137 raise KeyError("No such msg(s): %r" % missing)
1135 except KeyError:
1138 except KeyError:
1136 return finish(error.wrap_exception())
1139 return finish(error.wrap_exception())
1137 elif invalid_ids:
1140 elif invalid_ids:
1138 msg_id = invalid_ids[0]
1141 msg_id = invalid_ids[0]
1139 try:
1142 try:
1140 raise ValueError("Task %r appears to be inflight"%(msg_id))
1143 raise ValueError("Task %r appears to be inflight" % msg_id)
1141 except Exception:
1144 except Exception:
1142 return finish(error.wrap_exception())
1145 return finish(error.wrap_exception())
1143
1146
1144 # clear the existing records
1147 # clear the existing records
1145 now = datetime.now()
1148 now = datetime.now()
1146 rec = empty_record()
1149 rec = empty_record()
1147 map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted'])
1150 map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted'])
1148 rec['resubmitted'] = now
1151 rec['resubmitted'] = now
1149 rec['queue'] = 'task'
1152 rec['queue'] = 'task'
1150 rec['client_uuid'] = client_id[0]
1153 rec['client_uuid'] = client_id[0]
1151 try:
1154 try:
1152 for msg_id in msg_ids:
1155 for msg_id in msg_ids:
1153 self.all_completed.discard(msg_id)
1156 self.all_completed.discard(msg_id)
1154 self.db.update_record(msg_id, rec)
1157 self.db.update_record(msg_id, rec)
1155 except Exception:
1158 except Exception:
1156 self.log.error('db::db error upating record', exc_info=True)
1159 self.log.error('db::db error upating record', exc_info=True)
1157 reply = error.wrap_exception()
1160 reply = error.wrap_exception()
1158 else:
1161 else:
1159 # send the messages
1162 # send the messages
1160 for rec in records:
1163 for rec in records:
1161 header = rec['header']
1164 header = rec['header']
1162 # include resubmitted in header to prevent digest collision
1165 # include resubmitted in header to prevent digest collision
1163 header['resubmitted'] = now
1166 header['resubmitted'] = now
1164 msg = self.session.msg(header['msg_type'])
1167 msg = self.session.msg(header['msg_type'])
1165 msg['content'] = rec['content']
1168 msg['content'] = rec['content']
1166 msg['header'] = header
1169 msg['header'] = header
1167 msg['header']['msg_id'] = rec['msg_id']
1170 msg['header']['msg_id'] = rec['msg_id']
1168 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1171 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1169
1172
1170 finish(dict(status='ok'))
1173 finish(dict(status='ok'))
1171
1174
1172
1175
1173 def _extract_record(self, rec):
1176 def _extract_record(self, rec):
1174 """decompose a TaskRecord dict into subsection of reply for get_result"""
1177 """decompose a TaskRecord dict into subsection of reply for get_result"""
1175 io_dict = {}
1178 io_dict = {}
1176 for key in 'pyin pyout pyerr stdout stderr'.split():
1179 for key in 'pyin pyout pyerr stdout stderr'.split():
1177 io_dict[key] = rec[key]
1180 io_dict[key] = rec[key]
1178 content = { 'result_content': rec['result_content'],
1181 content = { 'result_content': rec['result_content'],
1179 'header': rec['header'],
1182 'header': rec['header'],
1180 'result_header' : rec['result_header'],
1183 'result_header' : rec['result_header'],
1181 'io' : io_dict,
1184 'io' : io_dict,
1182 }
1185 }
1183 if rec['result_buffers']:
1186 if rec['result_buffers']:
1184 buffers = map(bytes, rec['result_buffers'])
1187 buffers = map(bytes, rec['result_buffers'])
1185 else:
1188 else:
1186 buffers = []
1189 buffers = []
1187
1190
1188 return content, buffers
1191 return content, buffers
1189
1192
1190 def get_results(self, client_id, msg):
1193 def get_results(self, client_id, msg):
1191 """Get the result of 1 or more messages."""
1194 """Get the result of 1 or more messages."""
1192 content = msg['content']
1195 content = msg['content']
1193 msg_ids = sorted(set(content['msg_ids']))
1196 msg_ids = sorted(set(content['msg_ids']))
1194 statusonly = content.get('status_only', False)
1197 statusonly = content.get('status_only', False)
1195 pending = []
1198 pending = []
1196 completed = []
1199 completed = []
1197 content = dict(status='ok')
1200 content = dict(status='ok')
1198 content['pending'] = pending
1201 content['pending'] = pending
1199 content['completed'] = completed
1202 content['completed'] = completed
1200 buffers = []
1203 buffers = []
1201 if not statusonly:
1204 if not statusonly:
1202 try:
1205 try:
1203 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1206 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1204 # turn match list into dict, for faster lookup
1207 # turn match list into dict, for faster lookup
1205 records = {}
1208 records = {}
1206 for rec in matches:
1209 for rec in matches:
1207 records[rec['msg_id']] = rec
1210 records[rec['msg_id']] = rec
1208 except Exception:
1211 except Exception:
1209 content = error.wrap_exception()
1212 content = error.wrap_exception()
1210 self.session.send(self.query, "result_reply", content=content,
1213 self.session.send(self.query, "result_reply", content=content,
1211 parent=msg, ident=client_id)
1214 parent=msg, ident=client_id)
1212 return
1215 return
1213 else:
1216 else:
1214 records = {}
1217 records = {}
1215 for msg_id in msg_ids:
1218 for msg_id in msg_ids:
1216 if msg_id in self.pending:
1219 if msg_id in self.pending:
1217 pending.append(msg_id)
1220 pending.append(msg_id)
1218 elif msg_id in self.all_completed:
1221 elif msg_id in self.all_completed:
1219 completed.append(msg_id)
1222 completed.append(msg_id)
1220 if not statusonly:
1223 if not statusonly:
1221 c,bufs = self._extract_record(records[msg_id])
1224 c,bufs = self._extract_record(records[msg_id])
1222 content[msg_id] = c
1225 content[msg_id] = c
1223 buffers.extend(bufs)
1226 buffers.extend(bufs)
1224 elif msg_id in records:
1227 elif msg_id in records:
1225 if rec['completed']:
1228 if rec['completed']:
1226 completed.append(msg_id)
1229 completed.append(msg_id)
1227 c,bufs = self._extract_record(records[msg_id])
1230 c,bufs = self._extract_record(records[msg_id])
1228 content[msg_id] = c
1231 content[msg_id] = c
1229 buffers.extend(bufs)
1232 buffers.extend(bufs)
1230 else:
1233 else:
1231 pending.append(msg_id)
1234 pending.append(msg_id)
1232 else:
1235 else:
1233 try:
1236 try:
1234 raise KeyError('No such message: '+msg_id)
1237 raise KeyError('No such message: '+msg_id)
1235 except:
1238 except:
1236 content = error.wrap_exception()
1239 content = error.wrap_exception()
1237 break
1240 break
1238 self.session.send(self.query, "result_reply", content=content,
1241 self.session.send(self.query, "result_reply", content=content,
1239 parent=msg, ident=client_id,
1242 parent=msg, ident=client_id,
1240 buffers=buffers)
1243 buffers=buffers)
1241
1244
1242 def get_history(self, client_id, msg):
1245 def get_history(self, client_id, msg):
1243 """Get a list of all msg_ids in our DB records"""
1246 """Get a list of all msg_ids in our DB records"""
1244 try:
1247 try:
1245 msg_ids = self.db.get_history()
1248 msg_ids = self.db.get_history()
1246 except Exception as e:
1249 except Exception as e:
1247 content = error.wrap_exception()
1250 content = error.wrap_exception()
1248 else:
1251 else:
1249 content = dict(status='ok', history=msg_ids)
1252 content = dict(status='ok', history=msg_ids)
1250
1253
1251 self.session.send(self.query, "history_reply", content=content,
1254 self.session.send(self.query, "history_reply", content=content,
1252 parent=msg, ident=client_id)
1255 parent=msg, ident=client_id)
1253
1256
1254 def db_query(self, client_id, msg):
1257 def db_query(self, client_id, msg):
1255 """Perform a raw query on the task record database."""
1258 """Perform a raw query on the task record database."""
1256 content = msg['content']
1259 content = msg['content']
1257 query = content.get('query', {})
1260 query = content.get('query', {})
1258 keys = content.get('keys', None)
1261 keys = content.get('keys', None)
1259 buffers = []
1262 buffers = []
1260 empty = list()
1263 empty = list()
1261 try:
1264 try:
1262 records = self.db.find_records(query, keys)
1265 records = self.db.find_records(query, keys)
1263 except Exception as e:
1266 except Exception as e:
1264 content = error.wrap_exception()
1267 content = error.wrap_exception()
1265 else:
1268 else:
1266 # extract buffers from reply content:
1269 # extract buffers from reply content:
1267 if keys is not None:
1270 if keys is not None:
1268 buffer_lens = [] if 'buffers' in keys else None
1271 buffer_lens = [] if 'buffers' in keys else None
1269 result_buffer_lens = [] if 'result_buffers' in keys else None
1272 result_buffer_lens = [] if 'result_buffers' in keys else None
1270 else:
1273 else:
1271 buffer_lens = []
1274 buffer_lens = []
1272 result_buffer_lens = []
1275 result_buffer_lens = []
1273
1276
1274 for rec in records:
1277 for rec in records:
1275 # buffers may be None, so double check
1278 # buffers may be None, so double check
1276 if buffer_lens is not None:
1279 if buffer_lens is not None:
1277 b = rec.pop('buffers', empty) or empty
1280 b = rec.pop('buffers', empty) or empty
1278 buffer_lens.append(len(b))
1281 buffer_lens.append(len(b))
1279 buffers.extend(b)
1282 buffers.extend(b)
1280 if result_buffer_lens is not None:
1283 if result_buffer_lens is not None:
1281 rb = rec.pop('result_buffers', empty) or empty
1284 rb = rec.pop('result_buffers', empty) or empty
1282 result_buffer_lens.append(len(rb))
1285 result_buffer_lens.append(len(rb))
1283 buffers.extend(rb)
1286 buffers.extend(rb)
1284 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1287 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1285 result_buffer_lens=result_buffer_lens)
1288 result_buffer_lens=result_buffer_lens)
1286 # self.log.debug (content)
1289 # self.log.debug (content)
1287 self.session.send(self.query, "db_reply", content=content,
1290 self.session.send(self.query, "db_reply", content=content,
1288 parent=msg, ident=client_id,
1291 parent=msg, ident=client_id,
1289 buffers=buffers)
1292 buffers=buffers)
1290
1293
@@ -1,716 +1,716 b''
1 """The Python scheduler for rich scheduling.
1 """The Python scheduler for rich scheduling.
2
2
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 Python Scheduler exists.
5 Python Scheduler exists.
6
6
7 Authors:
7 Authors:
8
8
9 * Min RK
9 * Min RK
10 """
10 """
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Copyright (C) 2010-2011 The IPython Development Team
12 # Copyright (C) 2010-2011 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 #----------------------------------------------------------------------
18 #----------------------------------------------------------------------
19 # Imports
19 # Imports
20 #----------------------------------------------------------------------
20 #----------------------------------------------------------------------
21
21
22 from __future__ import print_function
22 from __future__ import print_function
23
23
24 import logging
24 import logging
25 import sys
25 import sys
26
26
27 from datetime import datetime, timedelta
27 from datetime import datetime, timedelta
28 from random import randint, random
28 from random import randint, random
29 from types import FunctionType
29 from types import FunctionType
30
30
31 try:
31 try:
32 import numpy
32 import numpy
33 except ImportError:
33 except ImportError:
34 numpy = None
34 numpy = None
35
35
36 import zmq
36 import zmq
37 from zmq.eventloop import ioloop, zmqstream
37 from zmq.eventloop import ioloop, zmqstream
38
38
39 # local imports
39 # local imports
40 from IPython.external.decorator import decorator
40 from IPython.external.decorator import decorator
41 from IPython.config.application import Application
41 from IPython.config.application import Application
42 from IPython.config.loader import Config
42 from IPython.config.loader import Config
43 from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes
43 from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes
44
44
45 from IPython.parallel import error
45 from IPython.parallel import error
46 from IPython.parallel.factory import SessionFactory
46 from IPython.parallel.factory import SessionFactory
47 from IPython.parallel.util import connect_logger, local_logger, asbytes
47 from IPython.parallel.util import connect_logger, local_logger, asbytes
48
48
49 from .dependency import Dependency
49 from .dependency import Dependency
50
50
51 @decorator
51 @decorator
52 def logged(f,self,*args,**kwargs):
52 def logged(f,self,*args,**kwargs):
53 # print ("#--------------------")
53 # print ("#--------------------")
54 self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs)
54 self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs)
55 # print ("#--")
55 # print ("#--")
56 return f(self,*args, **kwargs)
56 return f(self,*args, **kwargs)
57
57
58 #----------------------------------------------------------------------
58 #----------------------------------------------------------------------
59 # Chooser functions
59 # Chooser functions
60 #----------------------------------------------------------------------
60 #----------------------------------------------------------------------
61
61
62 def plainrandom(loads):
62 def plainrandom(loads):
63 """Plain random pick."""
63 """Plain random pick."""
64 n = len(loads)
64 n = len(loads)
65 return randint(0,n-1)
65 return randint(0,n-1)
66
66
67 def lru(loads):
67 def lru(loads):
68 """Always pick the front of the line.
68 """Always pick the front of the line.
69
69
70 The content of `loads` is ignored.
70 The content of `loads` is ignored.
71
71
72 Assumes LRU ordering of loads, with oldest first.
72 Assumes LRU ordering of loads, with oldest first.
73 """
73 """
74 return 0
74 return 0
75
75
76 def twobin(loads):
76 def twobin(loads):
77 """Pick two at random, use the LRU of the two.
77 """Pick two at random, use the LRU of the two.
78
78
79 The content of loads is ignored.
79 The content of loads is ignored.
80
80
81 Assumes LRU ordering of loads, with oldest first.
81 Assumes LRU ordering of loads, with oldest first.
82 """
82 """
83 n = len(loads)
83 n = len(loads)
84 a = randint(0,n-1)
84 a = randint(0,n-1)
85 b = randint(0,n-1)
85 b = randint(0,n-1)
86 return min(a,b)
86 return min(a,b)
87
87
88 def weighted(loads):
88 def weighted(loads):
89 """Pick two at random using inverse load as weight.
89 """Pick two at random using inverse load as weight.
90
90
91 Return the less loaded of the two.
91 Return the less loaded of the two.
92 """
92 """
93 # weight 0 a million times more than 1:
93 # weight 0 a million times more than 1:
94 weights = 1./(1e-6+numpy.array(loads))
94 weights = 1./(1e-6+numpy.array(loads))
95 sums = weights.cumsum()
95 sums = weights.cumsum()
96 t = sums[-1]
96 t = sums[-1]
97 x = random()*t
97 x = random()*t
98 y = random()*t
98 y = random()*t
99 idx = 0
99 idx = 0
100 idy = 0
100 idy = 0
101 while sums[idx] < x:
101 while sums[idx] < x:
102 idx += 1
102 idx += 1
103 while sums[idy] < y:
103 while sums[idy] < y:
104 idy += 1
104 idy += 1
105 if weights[idy] > weights[idx]:
105 if weights[idy] > weights[idx]:
106 return idy
106 return idy
107 else:
107 else:
108 return idx
108 return idx
109
109
110 def leastload(loads):
110 def leastload(loads):
111 """Always choose the lowest load.
111 """Always choose the lowest load.
112
112
113 If the lowest load occurs more than once, the first
113 If the lowest load occurs more than once, the first
114 occurance will be used. If loads has LRU ordering, this means
114 occurance will be used. If loads has LRU ordering, this means
115 the LRU of those with the lowest load is chosen.
115 the LRU of those with the lowest load is chosen.
116 """
116 """
117 return loads.index(min(loads))
117 return loads.index(min(loads))
118
118
119 #---------------------------------------------------------------------
119 #---------------------------------------------------------------------
120 # Classes
120 # Classes
121 #---------------------------------------------------------------------
121 #---------------------------------------------------------------------
122 # store empty default dependency:
122 # store empty default dependency:
123 MET = Dependency([])
123 MET = Dependency([])
124
124
125 class TaskScheduler(SessionFactory):
125 class TaskScheduler(SessionFactory):
126 """Python TaskScheduler object.
126 """Python TaskScheduler object.
127
127
128 This is the simplest object that supports msg_id based
128 This is the simplest object that supports msg_id based
129 DAG dependencies. *Only* task msg_ids are checked, not
129 DAG dependencies. *Only* task msg_ids are checked, not
130 msg_ids of jobs submitted via the MUX queue.
130 msg_ids of jobs submitted via the MUX queue.
131
131
132 """
132 """
133
133
134 hwm = Integer(0, config=True, shortname='hwm',
134 hwm = Integer(0, config=True, shortname='hwm',
135 help="""specify the High Water Mark (HWM) for the downstream
135 help="""specify the High Water Mark (HWM) for the downstream
136 socket in the Task scheduler. This is the maximum number
136 socket in the Task scheduler. This is the maximum number
137 of allowed outstanding tasks on each engine."""
137 of allowed outstanding tasks on each engine."""
138 )
138 )
139 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
139 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
140 'leastload', config=True, shortname='scheme', allow_none=False,
140 'leastload', config=True, shortname='scheme', allow_none=False,
141 help="""select the task scheduler scheme [default: Python LRU]
141 help="""select the task scheduler scheme [default: Python LRU]
142 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
142 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
143 )
143 )
144 def _scheme_name_changed(self, old, new):
144 def _scheme_name_changed(self, old, new):
145 self.log.debug("Using scheme %r"%new)
145 self.log.debug("Using scheme %r"%new)
146 self.scheme = globals()[new]
146 self.scheme = globals()[new]
147
147
148 # input arguments:
148 # input arguments:
149 scheme = Instance(FunctionType) # function for determining the destination
149 scheme = Instance(FunctionType) # function for determining the destination
150 def _scheme_default(self):
150 def _scheme_default(self):
151 return leastload
151 return leastload
152 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
152 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
153 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
153 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
154 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
154 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
155 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
155 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
156
156
157 # internals:
157 # internals:
158 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
158 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
159 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
159 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
160 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
160 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
161 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
161 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
162 pending = Dict() # dict by engine_uuid of submitted tasks
162 pending = Dict() # dict by engine_uuid of submitted tasks
163 completed = Dict() # dict by engine_uuid of completed tasks
163 completed = Dict() # dict by engine_uuid of completed tasks
164 failed = Dict() # dict by engine_uuid of failed tasks
164 failed = Dict() # dict by engine_uuid of failed tasks
165 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
165 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
166 clients = Dict() # dict by msg_id for who submitted the task
166 clients = Dict() # dict by msg_id for who submitted the task
167 targets = List() # list of target IDENTs
167 targets = List() # list of target IDENTs
168 loads = List() # list of engine loads
168 loads = List() # list of engine loads
169 # full = Set() # set of IDENTs that have HWM outstanding tasks
169 # full = Set() # set of IDENTs that have HWM outstanding tasks
170 all_completed = Set() # set of all completed tasks
170 all_completed = Set() # set of all completed tasks
171 all_failed = Set() # set of all failed tasks
171 all_failed = Set() # set of all failed tasks
172 all_done = Set() # set of all finished tasks=union(completed,failed)
172 all_done = Set() # set of all finished tasks=union(completed,failed)
173 all_ids = Set() # set of all submitted task IDs
173 all_ids = Set() # set of all submitted task IDs
174 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
174 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
175 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
175 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
176
176
177 ident = CBytes() # ZMQ identity. This should just be self.session.session
177 ident = CBytes() # ZMQ identity. This should just be self.session.session
178 # but ensure Bytes
178 # but ensure Bytes
179 def _ident_default(self):
179 def _ident_default(self):
180 return self.session.bsession
180 return self.session.bsession
181
181
182 def start(self):
182 def start(self):
183 self.engine_stream.on_recv(self.dispatch_result, copy=False)
183 self.engine_stream.on_recv(self.dispatch_result, copy=False)
184 self._notification_handlers = dict(
184 self._notification_handlers = dict(
185 registration_notification = self._register_engine,
185 registration_notification = self._register_engine,
186 unregistration_notification = self._unregister_engine
186 unregistration_notification = self._unregister_engine
187 )
187 )
188 self.notifier_stream.on_recv(self.dispatch_notification)
188 self.notifier_stream.on_recv(self.dispatch_notification)
189 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
189 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
190 self.auditor.start()
190 self.auditor.start()
191 self.log.info("Scheduler started [%s]"%self.scheme_name)
191 self.log.info("Scheduler started [%s]"%self.scheme_name)
192
192
193 def resume_receiving(self):
193 def resume_receiving(self):
194 """Resume accepting jobs."""
194 """Resume accepting jobs."""
195 self.client_stream.on_recv(self.dispatch_submission, copy=False)
195 self.client_stream.on_recv(self.dispatch_submission, copy=False)
196
196
197 def stop_receiving(self):
197 def stop_receiving(self):
198 """Stop accepting jobs while there are no engines.
198 """Stop accepting jobs while there are no engines.
199 Leave them in the ZMQ queue."""
199 Leave them in the ZMQ queue."""
200 self.client_stream.on_recv(None)
200 self.client_stream.on_recv(None)
201
201
202 #-----------------------------------------------------------------------
202 #-----------------------------------------------------------------------
203 # [Un]Registration Handling
203 # [Un]Registration Handling
204 #-----------------------------------------------------------------------
204 #-----------------------------------------------------------------------
205
205
206 def dispatch_notification(self, msg):
206 def dispatch_notification(self, msg):
207 """dispatch register/unregister events."""
207 """dispatch register/unregister events."""
208 try:
208 try:
209 idents,msg = self.session.feed_identities(msg)
209 idents,msg = self.session.feed_identities(msg)
210 except ValueError:
210 except ValueError:
211 self.log.warn("task::Invalid Message: %r",msg)
211 self.log.warn("task::Invalid Message: %r",msg)
212 return
212 return
213 try:
213 try:
214 msg = self.session.unserialize(msg)
214 msg = self.session.unserialize(msg)
215 except ValueError:
215 except ValueError:
216 self.log.warn("task::Unauthorized message from: %r"%idents)
216 self.log.warn("task::Unauthorized message from: %r"%idents)
217 return
217 return
218
218
219 msg_type = msg['header']['msg_type']
219 msg_type = msg['header']['msg_type']
220
220
221 handler = self._notification_handlers.get(msg_type, None)
221 handler = self._notification_handlers.get(msg_type, None)
222 if handler is None:
222 if handler is None:
223 self.log.error("Unhandled message type: %r"%msg_type)
223 self.log.error("Unhandled message type: %r"%msg_type)
224 else:
224 else:
225 try:
225 try:
226 handler(asbytes(msg['content']['queue']))
226 handler(asbytes(msg['content']['queue']))
227 except Exception:
227 except Exception:
228 self.log.error("task::Invalid notification msg: %r",msg)
228 self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
229
229
230 def _register_engine(self, uid):
230 def _register_engine(self, uid):
231 """New engine with ident `uid` became available."""
231 """New engine with ident `uid` became available."""
232 # head of the line:
232 # head of the line:
233 self.targets.insert(0,uid)
233 self.targets.insert(0,uid)
234 self.loads.insert(0,0)
234 self.loads.insert(0,0)
235
235
236 # initialize sets
236 # initialize sets
237 self.completed[uid] = set()
237 self.completed[uid] = set()
238 self.failed[uid] = set()
238 self.failed[uid] = set()
239 self.pending[uid] = {}
239 self.pending[uid] = {}
240 if len(self.targets) == 1:
240 if len(self.targets) == 1:
241 self.resume_receiving()
241 self.resume_receiving()
242 # rescan the graph:
242 # rescan the graph:
243 self.update_graph(None)
243 self.update_graph(None)
244
244
245 def _unregister_engine(self, uid):
245 def _unregister_engine(self, uid):
246 """Existing engine with ident `uid` became unavailable."""
246 """Existing engine with ident `uid` became unavailable."""
247 if len(self.targets) == 1:
247 if len(self.targets) == 1:
248 # this was our only engine
248 # this was our only engine
249 self.stop_receiving()
249 self.stop_receiving()
250
250
251 # handle any potentially finished tasks:
251 # handle any potentially finished tasks:
252 self.engine_stream.flush()
252 self.engine_stream.flush()
253
253
254 # don't pop destinations, because they might be used later
254 # don't pop destinations, because they might be used later
255 # map(self.destinations.pop, self.completed.pop(uid))
255 # map(self.destinations.pop, self.completed.pop(uid))
256 # map(self.destinations.pop, self.failed.pop(uid))
256 # map(self.destinations.pop, self.failed.pop(uid))
257
257
258 # prevent this engine from receiving work
258 # prevent this engine from receiving work
259 idx = self.targets.index(uid)
259 idx = self.targets.index(uid)
260 self.targets.pop(idx)
260 self.targets.pop(idx)
261 self.loads.pop(idx)
261 self.loads.pop(idx)
262
262
263 # wait 5 seconds before cleaning up pending jobs, since the results might
263 # wait 5 seconds before cleaning up pending jobs, since the results might
264 # still be incoming
264 # still be incoming
265 if self.pending[uid]:
265 if self.pending[uid]:
266 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
266 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
267 dc.start()
267 dc.start()
268 else:
268 else:
269 self.completed.pop(uid)
269 self.completed.pop(uid)
270 self.failed.pop(uid)
270 self.failed.pop(uid)
271
271
272
272
273 def handle_stranded_tasks(self, engine):
273 def handle_stranded_tasks(self, engine):
274 """Deal with jobs resident in an engine that died."""
274 """Deal with jobs resident in an engine that died."""
275 lost = self.pending[engine]
275 lost = self.pending[engine]
276 for msg_id in lost.keys():
276 for msg_id in lost.keys():
277 if msg_id not in self.pending[engine]:
277 if msg_id not in self.pending[engine]:
278 # prevent double-handling of messages
278 # prevent double-handling of messages
279 continue
279 continue
280
280
281 raw_msg = lost[msg_id][0]
281 raw_msg = lost[msg_id][0]
282 idents,msg = self.session.feed_identities(raw_msg, copy=False)
282 idents,msg = self.session.feed_identities(raw_msg, copy=False)
283 parent = self.session.unpack(msg[1].bytes)
283 parent = self.session.unpack(msg[1].bytes)
284 idents = [engine, idents[0]]
284 idents = [engine, idents[0]]
285
285
286 # build fake error reply
286 # build fake error reply
287 try:
287 try:
288 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
288 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
289 except:
289 except:
290 content = error.wrap_exception()
290 content = error.wrap_exception()
291 msg = self.session.msg('apply_reply', content, parent=parent, subheader={'status':'error'})
291 msg = self.session.msg('apply_reply', content, parent=parent, subheader={'status':'error'})
292 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
292 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
293 # and dispatch it
293 # and dispatch it
294 self.dispatch_result(raw_reply)
294 self.dispatch_result(raw_reply)
295
295
296 # finally scrub completed/failed lists
296 # finally scrub completed/failed lists
297 self.completed.pop(engine)
297 self.completed.pop(engine)
298 self.failed.pop(engine)
298 self.failed.pop(engine)
299
299
300
300
301 #-----------------------------------------------------------------------
301 #-----------------------------------------------------------------------
302 # Job Submission
302 # Job Submission
303 #-----------------------------------------------------------------------
303 #-----------------------------------------------------------------------
304 def dispatch_submission(self, raw_msg):
304 def dispatch_submission(self, raw_msg):
305 """Dispatch job submission to appropriate handlers."""
305 """Dispatch job submission to appropriate handlers."""
306 # ensure targets up to date:
306 # ensure targets up to date:
307 self.notifier_stream.flush()
307 self.notifier_stream.flush()
308 try:
308 try:
309 idents, msg = self.session.feed_identities(raw_msg, copy=False)
309 idents, msg = self.session.feed_identities(raw_msg, copy=False)
310 msg = self.session.unserialize(msg, content=False, copy=False)
310 msg = self.session.unserialize(msg, content=False, copy=False)
311 except Exception:
311 except Exception:
312 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
312 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
313 return
313 return
314
314
315
315
316 # send to monitor
316 # send to monitor
317 self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False)
317 self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False)
318
318
319 header = msg['header']
319 header = msg['header']
320 msg_id = header['msg_id']
320 msg_id = header['msg_id']
321 self.all_ids.add(msg_id)
321 self.all_ids.add(msg_id)
322
322
323 # get targets as a set of bytes objects
323 # get targets as a set of bytes objects
324 # from a list of unicode objects
324 # from a list of unicode objects
325 targets = header.get('targets', [])
325 targets = header.get('targets', [])
326 targets = map(asbytes, targets)
326 targets = map(asbytes, targets)
327 targets = set(targets)
327 targets = set(targets)
328
328
329 retries = header.get('retries', 0)
329 retries = header.get('retries', 0)
330 self.retries[msg_id] = retries
330 self.retries[msg_id] = retries
331
331
332 # time dependencies
332 # time dependencies
333 after = header.get('after', None)
333 after = header.get('after', None)
334 if after:
334 if after:
335 after = Dependency(after)
335 after = Dependency(after)
336 if after.all:
336 if after.all:
337 if after.success:
337 if after.success:
338 after = Dependency(after.difference(self.all_completed),
338 after = Dependency(after.difference(self.all_completed),
339 success=after.success,
339 success=after.success,
340 failure=after.failure,
340 failure=after.failure,
341 all=after.all,
341 all=after.all,
342 )
342 )
343 if after.failure:
343 if after.failure:
344 after = Dependency(after.difference(self.all_failed),
344 after = Dependency(after.difference(self.all_failed),
345 success=after.success,
345 success=after.success,
346 failure=after.failure,
346 failure=after.failure,
347 all=after.all,
347 all=after.all,
348 )
348 )
349 if after.check(self.all_completed, self.all_failed):
349 if after.check(self.all_completed, self.all_failed):
350 # recast as empty set, if `after` already met,
350 # recast as empty set, if `after` already met,
351 # to prevent unnecessary set comparisons
351 # to prevent unnecessary set comparisons
352 after = MET
352 after = MET
353 else:
353 else:
354 after = MET
354 after = MET
355
355
356 # location dependencies
356 # location dependencies
357 follow = Dependency(header.get('follow', []))
357 follow = Dependency(header.get('follow', []))
358
358
359 # turn timeouts into datetime objects:
359 # turn timeouts into datetime objects:
360 timeout = header.get('timeout', None)
360 timeout = header.get('timeout', None)
361 if timeout:
361 if timeout:
362 # cast to float, because jsonlib returns floats as decimal.Decimal,
362 # cast to float, because jsonlib returns floats as decimal.Decimal,
363 # which timedelta does not accept
363 # which timedelta does not accept
364 timeout = datetime.now() + timedelta(0,float(timeout),0)
364 timeout = datetime.now() + timedelta(0,float(timeout),0)
365
365
366 args = [raw_msg, targets, after, follow, timeout]
366 args = [raw_msg, targets, after, follow, timeout]
367
367
368 # validate and reduce dependencies:
368 # validate and reduce dependencies:
369 for dep in after,follow:
369 for dep in after,follow:
370 if not dep: # empty dependency
370 if not dep: # empty dependency
371 continue
371 continue
372 # check valid:
372 # check valid:
373 if msg_id in dep or dep.difference(self.all_ids):
373 if msg_id in dep or dep.difference(self.all_ids):
374 self.depending[msg_id] = args
374 self.depending[msg_id] = args
375 return self.fail_unreachable(msg_id, error.InvalidDependency)
375 return self.fail_unreachable(msg_id, error.InvalidDependency)
376 # check if unreachable:
376 # check if unreachable:
377 if dep.unreachable(self.all_completed, self.all_failed):
377 if dep.unreachable(self.all_completed, self.all_failed):
378 self.depending[msg_id] = args
378 self.depending[msg_id] = args
379 return self.fail_unreachable(msg_id)
379 return self.fail_unreachable(msg_id)
380
380
381 if after.check(self.all_completed, self.all_failed):
381 if after.check(self.all_completed, self.all_failed):
382 # time deps already met, try to run
382 # time deps already met, try to run
383 if not self.maybe_run(msg_id, *args):
383 if not self.maybe_run(msg_id, *args):
384 # can't run yet
384 # can't run yet
385 if msg_id not in self.all_failed:
385 if msg_id not in self.all_failed:
386 # could have failed as unreachable
386 # could have failed as unreachable
387 self.save_unmet(msg_id, *args)
387 self.save_unmet(msg_id, *args)
388 else:
388 else:
389 self.save_unmet(msg_id, *args)
389 self.save_unmet(msg_id, *args)
390
390
391 def audit_timeouts(self):
391 def audit_timeouts(self):
392 """Audit all waiting tasks for expired timeouts."""
392 """Audit all waiting tasks for expired timeouts."""
393 now = datetime.now()
393 now = datetime.now()
394 for msg_id in self.depending.keys():
394 for msg_id in self.depending.keys():
395 # must recheck, in case one failure cascaded to another:
395 # must recheck, in case one failure cascaded to another:
396 if msg_id in self.depending:
396 if msg_id in self.depending:
397 raw,after,targets,follow,timeout = self.depending[msg_id]
397 raw,after,targets,follow,timeout = self.depending[msg_id]
398 if timeout and timeout < now:
398 if timeout and timeout < now:
399 self.fail_unreachable(msg_id, error.TaskTimeout)
399 self.fail_unreachable(msg_id, error.TaskTimeout)
400
400
401 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
401 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
402 """a task has become unreachable, send a reply with an ImpossibleDependency
402 """a task has become unreachable, send a reply with an ImpossibleDependency
403 error."""
403 error."""
404 if msg_id not in self.depending:
404 if msg_id not in self.depending:
405 self.log.error("msg %r already failed!", msg_id)
405 self.log.error("msg %r already failed!", msg_id)
406 return
406 return
407 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
407 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
408 for mid in follow.union(after):
408 for mid in follow.union(after):
409 if mid in self.graph:
409 if mid in self.graph:
410 self.graph[mid].remove(msg_id)
410 self.graph[mid].remove(msg_id)
411
411
412 # FIXME: unpacking a message I've already unpacked, but didn't save:
412 # FIXME: unpacking a message I've already unpacked, but didn't save:
413 idents,msg = self.session.feed_identities(raw_msg, copy=False)
413 idents,msg = self.session.feed_identities(raw_msg, copy=False)
414 header = self.session.unpack(msg[1].bytes)
414 header = self.session.unpack(msg[1].bytes)
415
415
416 try:
416 try:
417 raise why()
417 raise why()
418 except:
418 except:
419 content = error.wrap_exception()
419 content = error.wrap_exception()
420
420
421 self.all_done.add(msg_id)
421 self.all_done.add(msg_id)
422 self.all_failed.add(msg_id)
422 self.all_failed.add(msg_id)
423
423
424 msg = self.session.send(self.client_stream, 'apply_reply', content,
424 msg = self.session.send(self.client_stream, 'apply_reply', content,
425 parent=header, ident=idents)
425 parent=header, ident=idents)
426 self.session.send(self.mon_stream, msg, ident=[b'outtask']+idents)
426 self.session.send(self.mon_stream, msg, ident=[b'outtask']+idents)
427
427
428 self.update_graph(msg_id, success=False)
428 self.update_graph(msg_id, success=False)
429
429
430 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
430 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
431 """check location dependencies, and run if they are met."""
431 """check location dependencies, and run if they are met."""
432 blacklist = self.blacklist.setdefault(msg_id, set())
432 blacklist = self.blacklist.setdefault(msg_id, set())
433 if follow or targets or blacklist or self.hwm:
433 if follow or targets or blacklist or self.hwm:
434 # we need a can_run filter
434 # we need a can_run filter
435 def can_run(idx):
435 def can_run(idx):
436 # check hwm
436 # check hwm
437 if self.hwm and self.loads[idx] == self.hwm:
437 if self.hwm and self.loads[idx] == self.hwm:
438 return False
438 return False
439 target = self.targets[idx]
439 target = self.targets[idx]
440 # check blacklist
440 # check blacklist
441 if target in blacklist:
441 if target in blacklist:
442 return False
442 return False
443 # check targets
443 # check targets
444 if targets and target not in targets:
444 if targets and target not in targets:
445 return False
445 return False
446 # check follow
446 # check follow
447 return follow.check(self.completed[target], self.failed[target])
447 return follow.check(self.completed[target], self.failed[target])
448
448
449 indices = filter(can_run, range(len(self.targets)))
449 indices = filter(can_run, range(len(self.targets)))
450
450
451 if not indices:
451 if not indices:
452 # couldn't run
452 # couldn't run
453 if follow.all:
453 if follow.all:
454 # check follow for impossibility
454 # check follow for impossibility
455 dests = set()
455 dests = set()
456 relevant = set()
456 relevant = set()
457 if follow.success:
457 if follow.success:
458 relevant = self.all_completed
458 relevant = self.all_completed
459 if follow.failure:
459 if follow.failure:
460 relevant = relevant.union(self.all_failed)
460 relevant = relevant.union(self.all_failed)
461 for m in follow.intersection(relevant):
461 for m in follow.intersection(relevant):
462 dests.add(self.destinations[m])
462 dests.add(self.destinations[m])
463 if len(dests) > 1:
463 if len(dests) > 1:
464 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
464 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
465 self.fail_unreachable(msg_id)
465 self.fail_unreachable(msg_id)
466 return False
466 return False
467 if targets:
467 if targets:
468 # check blacklist+targets for impossibility
468 # check blacklist+targets for impossibility
469 targets.difference_update(blacklist)
469 targets.difference_update(blacklist)
470 if not targets or not targets.intersection(self.targets):
470 if not targets or not targets.intersection(self.targets):
471 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
471 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
472 self.fail_unreachable(msg_id)
472 self.fail_unreachable(msg_id)
473 return False
473 return False
474 return False
474 return False
475 else:
475 else:
476 indices = None
476 indices = None
477
477
478 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
478 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
479 return True
479 return True
480
480
481 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
481 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
482 """Save a message for later submission when its dependencies are met."""
482 """Save a message for later submission when its dependencies are met."""
483 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
483 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
484 # track the ids in follow or after, but not those already finished
484 # track the ids in follow or after, but not those already finished
485 for dep_id in after.union(follow).difference(self.all_done):
485 for dep_id in after.union(follow).difference(self.all_done):
486 if dep_id not in self.graph:
486 if dep_id not in self.graph:
487 self.graph[dep_id] = set()
487 self.graph[dep_id] = set()
488 self.graph[dep_id].add(msg_id)
488 self.graph[dep_id].add(msg_id)
489
489
490 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
490 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
491 """Submit a task to any of a subset of our targets."""
491 """Submit a task to any of a subset of our targets."""
492 if indices:
492 if indices:
493 loads = [self.loads[i] for i in indices]
493 loads = [self.loads[i] for i in indices]
494 else:
494 else:
495 loads = self.loads
495 loads = self.loads
496 idx = self.scheme(loads)
496 idx = self.scheme(loads)
497 if indices:
497 if indices:
498 idx = indices[idx]
498 idx = indices[idx]
499 target = self.targets[idx]
499 target = self.targets[idx]
500 # print (target, map(str, msg[:3]))
500 # print (target, map(str, msg[:3]))
501 # send job to the engine
501 # send job to the engine
502 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
502 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
503 self.engine_stream.send_multipart(raw_msg, copy=False)
503 self.engine_stream.send_multipart(raw_msg, copy=False)
504 # update load
504 # update load
505 self.add_job(idx)
505 self.add_job(idx)
506 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
506 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
507 # notify Hub
507 # notify Hub
508 content = dict(msg_id=msg_id, engine_id=target.decode('ascii'))
508 content = dict(msg_id=msg_id, engine_id=target.decode('ascii'))
509 self.session.send(self.mon_stream, 'task_destination', content=content,
509 self.session.send(self.mon_stream, 'task_destination', content=content,
510 ident=[b'tracktask',self.ident])
510 ident=[b'tracktask',self.ident])
511
511
512
512
513 #-----------------------------------------------------------------------
513 #-----------------------------------------------------------------------
514 # Result Handling
514 # Result Handling
515 #-----------------------------------------------------------------------
515 #-----------------------------------------------------------------------
516 def dispatch_result(self, raw_msg):
516 def dispatch_result(self, raw_msg):
517 """dispatch method for result replies"""
517 """dispatch method for result replies"""
518 try:
518 try:
519 idents,msg = self.session.feed_identities(raw_msg, copy=False)
519 idents,msg = self.session.feed_identities(raw_msg, copy=False)
520 msg = self.session.unserialize(msg, content=False, copy=False)
520 msg = self.session.unserialize(msg, content=False, copy=False)
521 engine = idents[0]
521 engine = idents[0]
522 try:
522 try:
523 idx = self.targets.index(engine)
523 idx = self.targets.index(engine)
524 except ValueError:
524 except ValueError:
525 pass # skip load-update for dead engines
525 pass # skip load-update for dead engines
526 else:
526 else:
527 self.finish_job(idx)
527 self.finish_job(idx)
528 except Exception:
528 except Exception:
529 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
529 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
530 return
530 return
531
531
532 header = msg['header']
532 header = msg['header']
533 parent = msg['parent_header']
533 parent = msg['parent_header']
534 if header.get('dependencies_met', True):
534 if header.get('dependencies_met', True):
535 success = (header['status'] == 'ok')
535 success = (header['status'] == 'ok')
536 msg_id = parent['msg_id']
536 msg_id = parent['msg_id']
537 retries = self.retries[msg_id]
537 retries = self.retries[msg_id]
538 if not success and retries > 0:
538 if not success and retries > 0:
539 # failed
539 # failed
540 self.retries[msg_id] = retries - 1
540 self.retries[msg_id] = retries - 1
541 self.handle_unmet_dependency(idents, parent)
541 self.handle_unmet_dependency(idents, parent)
542 else:
542 else:
543 del self.retries[msg_id]
543 del self.retries[msg_id]
544 # relay to client and update graph
544 # relay to client and update graph
545 self.handle_result(idents, parent, raw_msg, success)
545 self.handle_result(idents, parent, raw_msg, success)
546 # send to Hub monitor
546 # send to Hub monitor
547 self.mon_stream.send_multipart([b'outtask']+raw_msg, copy=False)
547 self.mon_stream.send_multipart([b'outtask']+raw_msg, copy=False)
548 else:
548 else:
549 self.handle_unmet_dependency(idents, parent)
549 self.handle_unmet_dependency(idents, parent)
550
550
551 def handle_result(self, idents, parent, raw_msg, success=True):
551 def handle_result(self, idents, parent, raw_msg, success=True):
552 """handle a real task result, either success or failure"""
552 """handle a real task result, either success or failure"""
553 # first, relay result to client
553 # first, relay result to client
554 engine = idents[0]
554 engine = idents[0]
555 client = idents[1]
555 client = idents[1]
556 # swap_ids for XREP-XREP mirror
556 # swap_ids for XREP-XREP mirror
557 raw_msg[:2] = [client,engine]
557 raw_msg[:2] = [client,engine]
558 # print (map(str, raw_msg[:4]))
558 # print (map(str, raw_msg[:4]))
559 self.client_stream.send_multipart(raw_msg, copy=False)
559 self.client_stream.send_multipart(raw_msg, copy=False)
560 # now, update our data structures
560 # now, update our data structures
561 msg_id = parent['msg_id']
561 msg_id = parent['msg_id']
562 self.blacklist.pop(msg_id, None)
562 self.blacklist.pop(msg_id, None)
563 self.pending[engine].pop(msg_id)
563 self.pending[engine].pop(msg_id)
564 if success:
564 if success:
565 self.completed[engine].add(msg_id)
565 self.completed[engine].add(msg_id)
566 self.all_completed.add(msg_id)
566 self.all_completed.add(msg_id)
567 else:
567 else:
568 self.failed[engine].add(msg_id)
568 self.failed[engine].add(msg_id)
569 self.all_failed.add(msg_id)
569 self.all_failed.add(msg_id)
570 self.all_done.add(msg_id)
570 self.all_done.add(msg_id)
571 self.destinations[msg_id] = engine
571 self.destinations[msg_id] = engine
572
572
573 self.update_graph(msg_id, success)
573 self.update_graph(msg_id, success)
574
574
575 def handle_unmet_dependency(self, idents, parent):
575 def handle_unmet_dependency(self, idents, parent):
576 """handle an unmet dependency"""
576 """handle an unmet dependency"""
577 engine = idents[0]
577 engine = idents[0]
578 msg_id = parent['msg_id']
578 msg_id = parent['msg_id']
579
579
580 if msg_id not in self.blacklist:
580 if msg_id not in self.blacklist:
581 self.blacklist[msg_id] = set()
581 self.blacklist[msg_id] = set()
582 self.blacklist[msg_id].add(engine)
582 self.blacklist[msg_id].add(engine)
583
583
584 args = self.pending[engine].pop(msg_id)
584 args = self.pending[engine].pop(msg_id)
585 raw,targets,after,follow,timeout = args
585 raw,targets,after,follow,timeout = args
586
586
587 if self.blacklist[msg_id] == targets:
587 if self.blacklist[msg_id] == targets:
588 self.depending[msg_id] = args
588 self.depending[msg_id] = args
589 self.fail_unreachable(msg_id)
589 self.fail_unreachable(msg_id)
590 elif not self.maybe_run(msg_id, *args):
590 elif not self.maybe_run(msg_id, *args):
591 # resubmit failed
591 # resubmit failed
592 if msg_id not in self.all_failed:
592 if msg_id not in self.all_failed:
593 # put it back in our dependency tree
593 # put it back in our dependency tree
594 self.save_unmet(msg_id, *args)
594 self.save_unmet(msg_id, *args)
595
595
596 if self.hwm:
596 if self.hwm:
597 try:
597 try:
598 idx = self.targets.index(engine)
598 idx = self.targets.index(engine)
599 except ValueError:
599 except ValueError:
600 pass # skip load-update for dead engines
600 pass # skip load-update for dead engines
601 else:
601 else:
602 if self.loads[idx] == self.hwm-1:
602 if self.loads[idx] == self.hwm-1:
603 self.update_graph(None)
603 self.update_graph(None)
604
604
605
605
606
606
607 def update_graph(self, dep_id=None, success=True):
607 def update_graph(self, dep_id=None, success=True):
608 """dep_id just finished. Update our dependency
608 """dep_id just finished. Update our dependency
609 graph and submit any jobs that just became runable.
609 graph and submit any jobs that just became runable.
610
610
611 Called with dep_id=None to update entire graph for hwm, but without finishing
611 Called with dep_id=None to update entire graph for hwm, but without finishing
612 a task.
612 a task.
613 """
613 """
614 # print ("\n\n***********")
614 # print ("\n\n***********")
615 # pprint (dep_id)
615 # pprint (dep_id)
616 # pprint (self.graph)
616 # pprint (self.graph)
617 # pprint (self.depending)
617 # pprint (self.depending)
618 # pprint (self.all_completed)
618 # pprint (self.all_completed)
619 # pprint (self.all_failed)
619 # pprint (self.all_failed)
620 # print ("\n\n***********\n\n")
620 # print ("\n\n***********\n\n")
621 # update any jobs that depended on the dependency
621 # update any jobs that depended on the dependency
622 jobs = self.graph.pop(dep_id, [])
622 jobs = self.graph.pop(dep_id, [])
623
623
624 # recheck *all* jobs if
624 # recheck *all* jobs if
625 # a) we have HWM and an engine just become no longer full
625 # a) we have HWM and an engine just become no longer full
626 # or b) dep_id was given as None
626 # or b) dep_id was given as None
627 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
627 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
628 jobs = self.depending.keys()
628 jobs = self.depending.keys()
629
629
630 for msg_id in jobs:
630 for msg_id in jobs:
631 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
631 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
632
632
633 if after.unreachable(self.all_completed, self.all_failed)\
633 if after.unreachable(self.all_completed, self.all_failed)\
634 or follow.unreachable(self.all_completed, self.all_failed):
634 or follow.unreachable(self.all_completed, self.all_failed):
635 self.fail_unreachable(msg_id)
635 self.fail_unreachable(msg_id)
636
636
637 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
637 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
638 if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout):
638 if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout):
639
639
640 self.depending.pop(msg_id)
640 self.depending.pop(msg_id)
641 for mid in follow.union(after):
641 for mid in follow.union(after):
642 if mid in self.graph:
642 if mid in self.graph:
643 self.graph[mid].remove(msg_id)
643 self.graph[mid].remove(msg_id)
644
644
645 #----------------------------------------------------------------------
645 #----------------------------------------------------------------------
646 # methods to be overridden by subclasses
646 # methods to be overridden by subclasses
647 #----------------------------------------------------------------------
647 #----------------------------------------------------------------------
648
648
649 def add_job(self, idx):
649 def add_job(self, idx):
650 """Called after self.targets[idx] just got the job with header.
650 """Called after self.targets[idx] just got the job with header.
651 Override with subclasses. The default ordering is simple LRU.
651 Override with subclasses. The default ordering is simple LRU.
652 The default loads are the number of outstanding jobs."""
652 The default loads are the number of outstanding jobs."""
653 self.loads[idx] += 1
653 self.loads[idx] += 1
654 for lis in (self.targets, self.loads):
654 for lis in (self.targets, self.loads):
655 lis.append(lis.pop(idx))
655 lis.append(lis.pop(idx))
656
656
657
657
658 def finish_job(self, idx):
658 def finish_job(self, idx):
659 """Called after self.targets[idx] just finished a job.
659 """Called after self.targets[idx] just finished a job.
660 Override with subclasses."""
660 Override with subclasses."""
661 self.loads[idx] -= 1
661 self.loads[idx] -= 1
662
662
663
663
664
664
665 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
665 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
666 logname='root', log_url=None, loglevel=logging.DEBUG,
666 logname='root', log_url=None, loglevel=logging.DEBUG,
667 identity=b'task', in_thread=False):
667 identity=b'task', in_thread=False):
668
668
669 ZMQStream = zmqstream.ZMQStream
669 ZMQStream = zmqstream.ZMQStream
670
670
671 if config:
671 if config:
672 # unwrap dict back into Config
672 # unwrap dict back into Config
673 config = Config(config)
673 config = Config(config)
674
674
675 if in_thread:
675 if in_thread:
676 # use instance() to get the same Context/Loop as our parent
676 # use instance() to get the same Context/Loop as our parent
677 ctx = zmq.Context.instance()
677 ctx = zmq.Context.instance()
678 loop = ioloop.IOLoop.instance()
678 loop = ioloop.IOLoop.instance()
679 else:
679 else:
680 # in a process, don't use instance()
680 # in a process, don't use instance()
681 # for safety with multiprocessing
681 # for safety with multiprocessing
682 ctx = zmq.Context()
682 ctx = zmq.Context()
683 loop = ioloop.IOLoop()
683 loop = ioloop.IOLoop()
684 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
684 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
685 ins.setsockopt(zmq.IDENTITY, identity)
685 ins.setsockopt(zmq.IDENTITY, identity)
686 ins.bind(in_addr)
686 ins.bind(in_addr)
687
687
688 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
688 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
689 outs.setsockopt(zmq.IDENTITY, identity)
689 outs.setsockopt(zmq.IDENTITY, identity)
690 outs.bind(out_addr)
690 outs.bind(out_addr)
691 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
691 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
692 mons.connect(mon_addr)
692 mons.connect(mon_addr)
693 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
693 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
694 nots.setsockopt(zmq.SUBSCRIBE, b'')
694 nots.setsockopt(zmq.SUBSCRIBE, b'')
695 nots.connect(not_addr)
695 nots.connect(not_addr)
696
696
697 # setup logging.
697 # setup logging.
698 if in_thread:
698 if in_thread:
699 log = Application.instance().log
699 log = Application.instance().log
700 else:
700 else:
701 if log_url:
701 if log_url:
702 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
702 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
703 else:
703 else:
704 log = local_logger(logname, loglevel)
704 log = local_logger(logname, loglevel)
705
705
706 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
706 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
707 mon_stream=mons, notifier_stream=nots,
707 mon_stream=mons, notifier_stream=nots,
708 loop=loop, log=log,
708 loop=loop, log=log,
709 config=config)
709 config=config)
710 scheduler.start()
710 scheduler.start()
711 if not in_thread:
711 if not in_thread:
712 try:
712 try:
713 loop.start()
713 loop.start()
714 except KeyboardInterrupt:
714 except KeyboardInterrupt:
715 print ("interrupted, exiting...", file=sys.__stderr__)
715 print ("interrupted, exiting...", file=sys.__stderr__)
716
716
General Comments 0
You need to be logged in to leave comments. Login now