##// END OF EJS Templates
improved logging + Hub,Engine,Scheduler are Configurable
MinRK -
Show More
@@ -21,7 +21,3 b' class EnginePUBHandler(PUBHandler):'
21 21 else:
22 22 return "engine"
23 23
24
25 logger = logging.getLogger('ipzmq')
26 logger.setLevel(logging.DEBUG)
27
@@ -90,11 +90,6 b' def defaultblock(f, self, *args, **kwargs):'
90 90 # Classes
91 91 #--------------------------------------------------------------------------
92 92
93 class AbortedTask(object):
94 """A basic wrapper object describing an aborted task."""
95 def __init__(self, msg_id):
96 self.msg_id = msg_id
97
98 93 class ResultDict(dict):
99 94 """A subclass of dict that raises errors if it has them."""
100 95 def __getitem__(self, key):
@@ -332,10 +327,10 b' class Client(object):'
332 327 msg = ss.Message(msg)
333 328 content = msg.content
334 329 if content.status == 'ok':
335 if content.queue:
330 if content.mux:
336 331 self._mux_socket = self.context.socket(zmq.PAIR)
337 332 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
338 connect_socket(self._mux_socket, content.queue)
333 connect_socket(self._mux_socket, content.mux)
339 334 if content.task:
340 335 self._task_socket = self.context.socket(zmq.PAIR)
341 336 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
@@ -17,6 +17,7 b' from __future__ import print_function'
17 17
18 18 import os
19 19 import time
20 import logging
20 21 from multiprocessing import Process
21 22
22 23 import zmq
@@ -29,7 +30,8 b' from IPython.zmq.entry_point import bind_port'
29 30
30 31 from hub import Hub
31 32 from entry_point import (make_base_argument_parser, select_random_ports, split_ports,
32 connect_logger, parse_url, signal_children, generate_exec_key)
33 connect_logger, parse_url, signal_children, generate_exec_key,
34 local_logger)
33 35
34 36
35 37 import streamsession as session
@@ -118,8 +120,6 b' def main(argv=None):'
118 120 ctx = zmq.Context()
119 121 loop = ioloop.IOLoop.instance()
120 122
121 # setup logging
122 connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel)
123 123
124 124 # Registrar socket
125 125 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
@@ -207,7 +207,9 b' def main(argv=None):'
207 207 q.start()
208 208 children.append(q.launcher)
209 209 else:
210 sargs = (iface%task[0],iface%task[1],iface%monport,iface%nport,args.scheduler)
210 log_addr = iface%args.logport if args.logport else None
211 sargs = (iface%task[0], iface%task[1], iface%monport, iface%nport,
212 log_addr, args.loglevel, args.scheduler)
211 213 print (sargs)
212 214 q = Process(target=launch_scheduler, args=sargs)
213 215 q.daemon=True
@@ -224,7 +226,7 b' def main(argv=None):'
224 226 # build connection dicts
225 227 engine_addrs = {
226 228 'control' : iface%control[1],
227 'queue': iface%mux[1],
229 'mux': iface%mux[1],
228 230 'heartbeat': (iface%hb[0], iface%hb[1]),
229 231 'task' : iface%task[1],
230 232 'iopub' : iface%iopub[1],
@@ -234,15 +236,24 b' def main(argv=None):'
234 236 client_addrs = {
235 237 'control' : iface%control[0],
236 238 'query': iface%cport,
237 'queue': iface%mux[0],
239 'mux': iface%mux[0],
238 240 'task' : iface%task[0],
239 241 'iopub' : iface%iopub[0],
240 242 'notification': iface%nport
241 243 }
242 244
245 # setup logging
246 if args.logport:
247 connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel)
248 else:
249 local_logger(args.loglevel)
250
243 251 # register relay of signals to the children
244 252 signal_children(children)
245 hub = Hub(loop, thesession, sub, reg, hmon, c, n, db, engine_addrs, client_addrs)
253 hub = Hub(loop=loop, session=thesession, monitor=sub, heartmonitor=hmon,
254 registrar=reg, clientele=c, notifier=n, db=db,
255 engine_addrs=engine_addrs, client_addrs=client_addrs)
256
246 257 dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
247 258 dc.start()
248 259 loop.start()
@@ -8,49 +8,48 b' import sys'
8 8 import time
9 9 import traceback
10 10 import uuid
11 import logging
11 12 from pprint import pprint
12 13
13 14 import zmq
14 15 from zmq.eventloop import ioloop, zmqstream
15 16
16 from IPython.utils.traitlets import HasTraits
17 from IPython.utils.localinterfaces import LOCALHOST
17 # internal
18 from IPython.config.configurable import Configurable
19 from IPython.utils.traitlets import Instance, Str, Dict
20 # from IPython.utils.localinterfaces import LOCALHOST
18 21
19 22 from streamsession import Message, StreamSession
20 from client import Client
21 23 from streamkernel import Kernel, make_kernel
22 24 import heartmonitor
23 from entry_point import make_base_argument_parser, connect_logger, parse_url
25 from entry_point import (make_base_argument_parser, connect_engine_logger, parse_url,
26 local_logger)
24 27 # import taskthread
25 # from log import logger
26
28 logger = logging.getLogger()
27 29
28 30 def printer(*msg):
29 pprint(msg, stream=sys.__stdout__)
31 # print (logger.handlers, file=sys.__stdout__)
32 logger.info(str(msg))
30 33
31 class Engine(object):
34 class Engine(Configurable):
32 35 """IPython engine"""
33 36
34 id=None
35 context=None
36 loop=None
37 session=None
38 ident=None
39 registrar=None
40 heart=None
41 37 kernel=None
42 user_ns=None
43
44 def __init__(self, context, loop, session, registrar, client=None, ident=None,
45 heart_id=None, user_ns=None):
46 self.context = context
47 self.loop = loop
48 self.session = session
49 self.registrar = registrar
50 self.client = client
51 self.ident = ident if ident else str(uuid.uuid4())
38 id=None
39
40 # configurables:
41 context=Instance(zmq.Context)
42 loop=Instance(ioloop.IOLoop)
43 session=Instance(StreamSession)
44 ident=Str()
45 registrar=Instance(zmqstream.ZMQStream)
46 user_ns=Dict()
47
48 def __init__(self, **kwargs):
49 super(Engine, self).__init__(**kwargs)
50 if not self.ident:
51 self.ident = str(uuid.uuid4())
52 52 self.registrar.on_send(printer)
53 self.user_ns = user_ns
54 53
55 54 def register(self):
56 55
@@ -64,8 +63,9 b' class Engine(object):'
64 63 idents,msg = self.session.feed_identities(msg)
65 64 msg = Message(self.session.unpack_message(msg))
66 65 if msg.content.status == 'ok':
67 self.session.username = str(msg.content.id)
68 queue_addr = msg.content.queue
66 self.id = int(msg.content.id)
67 self.session.username = 'engine-%i'%self.id
68 queue_addr = msg.content.mux
69 69 shell_addrs = [str(queue_addr)]
70 70 control_addr = str(msg.content.control)
71 71 task_addr = msg.content.task
@@ -75,7 +75,7 b' class Engine(object):'
75 75
76 76 hb_addrs = msg.content.heartbeat
77 77 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
78 k = make_kernel(self.ident, control_addr, shell_addrs, iopub_addr,
78 k = make_kernel(self.id, self.ident, control_addr, shell_addrs, iopub_addr,
79 79 hb_addrs, client_addr=None, loop=self.loop,
80 80 context=self.context, key=self.session.key)[-1]
81 81 self.kernel = k
@@ -84,12 +84,12 b' class Engine(object):'
84 84 self.kernel.user_ns = self.user_ns
85 85
86 86 else:
87 # logger.error("Registration Failed: %s"%msg)
87 logger.error("Registration Failed: %s"%msg)
88 88 raise Exception("Registration Failed: %s"%msg)
89 89
90 # logger.info("engine::completed registration with id %s"%self.session.username)
90 logger.info("completed registration with id %i"%self.id)
91 91
92 print (msg,file=sys.__stdout__)
92 # logger.info(str(msg))
93 93
94 94 def unregister(self):
95 95 self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username)))
@@ -97,7 +97,7 b' class Engine(object):'
97 97 sys.exit(0)
98 98
99 99 def start(self):
100 print ("registering",file=sys.__stdout__)
100 logger.info("registering")
101 101 self.register()
102 102
103 103
@@ -118,7 +118,6 b' def main(argv=None, user_ns=None):'
118 118 ctx = zmq.Context()
119 119
120 120 # setup logging
121 connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel)
122 121
123 122 reg_conn = iface % args.regport
124 123 print (reg_conn, file=sys.__stdout__)
@@ -127,10 +126,16 b' def main(argv=None, user_ns=None):'
127 126 reg = ctx.socket(zmq.PAIR)
128 127 reg.connect(reg_conn)
129 128 reg = zmqstream.ZMQStream(reg, loop)
130 client = None
131 129
132 e = Engine(ctx, loop, session, reg, client, args.ident, user_ns=user_ns)
133 dc = ioloop.DelayedCallback(e.start, 100, loop)
130 e = Engine(context=ctx, loop=loop, session=session, registrar=reg,
131 ident=args.ident or '', user_ns=user_ns)
132 if args.logport:
133 print ("connecting logger to %s"%(iface%args.logport), file=sys.__stdout__)
134 connect_engine_logger(ctx, iface%args.logport, e, loglevel=args.loglevel)
135 else:
136 local_logger(args.loglevel)
137
138 dc = ioloop.DelayedCallback(e.start, 0, loop)
134 139 dc.start()
135 140 loop.start()
136 141
@@ -22,7 +22,7 b' from zmq.log import handlers'
22 22 # Local imports.
23 23 from IPython.core.ultratb import FormattedTB
24 24 from IPython.external.argparse import ArgumentParser
25 from IPython.zmq.log import logger
25 from IPython.zmq.log import EnginePUBHandler
26 26
27 27 def split_ports(s, n):
28 28 """Parser helper for multiport strings"""
@@ -82,6 +82,7 b' def make_base_argument_parser():'
82 82 """ Creates an ArgumentParser for the generic arguments supported by all
83 83 ipcluster entry points.
84 84 """
85
85 86 parser = ArgumentParser()
86 87 parser.add_argument('--ip', type=str, default='127.0.0.1',
87 88 help='set the controller\'s IP address [default: local]')
@@ -89,10 +90,10 b' def make_base_argument_parser():'
89 90 help='set the transport to use [default: tcp]')
90 91 parser.add_argument('--regport', type=int, metavar='PORT', default=10101,
91 92 help='set the XREP port for registration [default: 10101]')
92 parser.add_argument('--logport', type=int, metavar='PORT', default=20202,
93 help='set the PUB port for logging [default: 10201]')
94 parser.add_argument('--loglevel', type=str, metavar='LEVEL', default=logging.DEBUG,
95 help='set the log level [default: DEBUG]')
93 parser.add_argument('--logport', type=int, metavar='PORT', default=0,
94 help='set the PUB port for remote logging [default: log to stdout]')
95 parser.add_argument('--loglevel', type=str, metavar='LEVEL', default=logging.INFO,
96 help='set the log level [default: INFO]')
96 97 parser.add_argument('--ident', type=str,
97 98 help='set the ZMQ identity [default: random]')
98 99 parser.add_argument('--packer', type=str, default='json',
@@ -105,17 +106,42 b' def make_base_argument_parser():'
105 106
106 107 return parser
107 108
108
109 def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG):
109 def integer_loglevel(loglevel):
110 110 try:
111 111 loglevel = int(loglevel)
112 112 except ValueError:
113 113 if isinstance(loglevel, str):
114 114 loglevel = getattr(logging, loglevel)
115 return loglevel
116
117 def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG):
118 loglevel = integer_loglevel(loglevel)
115 119 lsock = context.socket(zmq.PUB)
116 120 lsock.connect(iface)
117 121 handler = handlers.PUBHandler(lsock)
118 122 handler.setLevel(loglevel)
119 123 handler.root_topic = root
124 logger = logging.getLogger()
125 logger.addHandler(handler)
126 logger.setLevel(loglevel)
127
128 def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):
129 logger = logging.getLogger()
130 loglevel = integer_loglevel(loglevel)
131 lsock = context.socket(zmq.PUB)
132 lsock.connect(iface)
133 handler = EnginePUBHandler(engine, lsock)
134 handler.setLevel(loglevel)
120 135 logger.addHandler(handler)
136 logger.setLevel(loglevel)
121 137
138 def local_logger(loglevel=logging.DEBUG):
139 loglevel = integer_loglevel(loglevel)
140 logger = logging.getLogger()
141 if logger.handlers:
142 # if there are any handlers, skip the hookup
143 return
144 handler = logging.StreamHandler()
145 handler.setLevel(loglevel)
146 logger.addHandler(handler)
147 logger.setLevel(loglevel)
@@ -7,13 +7,13 b' and hearts are tracked based on their XREQ identities.'
7 7 from __future__ import print_function
8 8 import time
9 9 import uuid
10 import logging
10 11
11 12 import zmq
12 13 from zmq.devices import ProcessDevice,ThreadDevice
13 14 from zmq.eventloop import ioloop, zmqstream
14 15
15 #internal
16 from IPython.zmq.log import logger
16 logger = logging.getLogger()
17 17
18 18 class Heart(object):
19 19 """A basic heart object for responding to a HeartMonitor.
@@ -53,6 +53,7 b' class HeartMonitor(object):'
53 53 hearts=None
54 54 on_probation=None
55 55 last_ping=None
56 # debug=False
56 57
57 58 def __init__(self, loop, pingstream, pongstream, period=1000):
58 59 self.loop = loop
@@ -85,19 +86,6 b' class HeartMonitor(object):'
85 86 logger.debug("heartbeat::new heart failure handler: %s"%handler)
86 87 self._failure_handlers.add(handler)
87 88
88 # def _flush(self):
89 # """override IOLoop triggers"""
90 # while True:
91 # try:
92 # msg = self.pongstream.socket.recv_multipart(zmq.NOBLOCK)
93 # logger.warn("IOLoop triggered beat with incoming heartbeat waiting to be handled")
94 # except zmq.ZMQError:
95 # return
96 # else:
97 # self.handle_pong(msg)
98 # # print '.'
99 #
100
101 89 def beat(self):
102 90 self.pongstream.flush()
103 91 self.last_ping = self.lifetime
@@ -105,7 +93,7 b' class HeartMonitor(object):'
105 93 toc = time.time()
106 94 self.lifetime += toc-self.tic
107 95 self.tic = toc
108 logger.debug("heartbeat::%s"%self.lifetime)
96 # logger.debug("heartbeat::%s"%self.lifetime)
109 97 goodhearts = self.hearts.intersection(self.responses)
110 98 missed_beats = self.hearts.difference(goodhearts)
111 99 heartfailures = self.on_probation.intersection(missed_beats)
@@ -144,7 +132,7 b' class HeartMonitor(object):'
144 132 "a heart just beat"
145 133 if msg[1] == str(self.lifetime):
146 134 delta = time.time()-self.tic
147 logger.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
135 # logger.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
148 136 self.responses.add(msg[0])
149 137 elif msg[1] == str(self.last_ping):
150 138 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
@@ -18,14 +18,19 b' from __future__ import print_function'
18 18 import sys
19 19 from datetime import datetime
20 20 import time
21 import logging
21 22
22 23 import zmq
23 from zmq.eventloop import ioloop
24 from zmq.eventloop import ioloop, zmqstream
24 25
25 26 # internal:
26 from IPython.zmq.log import logger # a Logger object
27 from IPython.config.configurable import Configurable
28 from IPython.utils.traitlets import HasTraits, Instance, Int, Str, Dict
29 # from IPython.zmq.log import logger # a Logger object
27 30
28 31 from streamsession import Message, wrap_exception, ISO8601
32 from heartmonitor import HeartMonitor
33 from util import validate_url_container
29 34
30 35 try:
31 36 from pymongo.binary import Binary
@@ -38,6 +43,8 b' else:'
38 43 # Code
39 44 #-----------------------------------------------------------------------------
40 45
46 logger = logging.getLogger()
47
41 48 def _passer(*args, **kwargs):
42 49 return
43 50
@@ -46,7 +53,7 b' def _printer(*args, **kwargs):'
46 53 print (kwargs)
47 54
48 55 def init_record(msg):
49 """return an empty TaskRecord dict, with all keys initialized with None."""
56 """Initialize a TaskRecord based on a request."""
50 57 header = msg['header']
51 58 return {
52 59 'msg_id' : header['msg_id'],
@@ -71,7 +78,7 b' def init_record(msg):'
71 78 }
72 79
73 80
74 class EngineConnector(object):
81 class EngineConnector(HasTraits):
75 82 """A simple object for accessing the various zmq connections of an object.
76 83 Attributes are:
77 84 id (int): engine ID
@@ -80,22 +87,18 b' class EngineConnector(object):'
80 87 registration (str): identity of registration XREQ socket
81 88 heartbeat (str): identity of heartbeat XREQ socket
82 89 """
83 id=0
84 queue=None
85 control=None
86 registration=None
87 heartbeat=None
88 pending=None
89
90 def __init__(self, id, queue, registration, control, heartbeat=None):
91 logger.info("engine::Engine Connected: %i"%id)
92 self.id = id
93 self.queue = queue
94 self.registration = registration
95 self.control = control
96 self.heartbeat = heartbeat
97
98 class Hub(object):
90 id=Int(0)
91 queue=Str()
92 control=Str()
93 registration=Str()
94 heartbeat=Str()
95 pending=Instance(set)
96
97 def __init__(self, **kwargs):
98 super(EngineConnector, self).__init__(**kwargs)
99 logger.info("engine::Engine Connected: %i"%self.id)
100
101 class Hub(Configurable):
99 102 """The IPython Controller Hub with 0MQ connections
100 103
101 104 Parameters
@@ -123,26 +126,25 b' class Hub(object):'
123 126 clients=None
124 127 hearts=None
125 128 pending=None
126 results=None
127 129 tasks=None
128 130 completed=None
129 mia=None
131 # mia=None
130 132 incoming_registrations=None
131 133 registration_timeout=None
132 134
133 135 #objects from constructor:
134 loop=None
135 registrar=None
136 clientelle=None
137 queue=None
138 heartbeat=None
139 notifier=None
140 db=None
141 client_addr=None
142 engine_addrs=None
143
144
145 def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifier, db, engine_addrs, client_addrs):
136 loop=Instance(ioloop.IOLoop)
137 registrar=Instance(zmqstream.ZMQStream)
138 clientele=Instance(zmqstream.ZMQStream)
139 monitor=Instance(zmqstream.ZMQStream)
140 heartmonitor=Instance(HeartMonitor)
141 notifier=Instance(zmqstream.ZMQStream)
142 db=Instance(object)
143 client_addrs=Dict()
144 engine_addrs=Dict()
145
146
147 def __init__(self, **kwargs):
146 148 """
147 149 # universal:
148 150 loop: IOLoop for creating future connections
@@ -158,6 +160,8 b' class Hub(object):'
158 160 engine_addrs: zmq address/protocol dict for engine connections
159 161 client_addrs: zmq address/protocol dict for client connections
160 162 """
163
164 super(Hub, self).__init__(**kwargs)
161 165 self.ids = set()
162 166 self.keytable={}
163 167 self.incoming_registrations={}
@@ -166,35 +170,44 b' class Hub(object):'
166 170 self.clients = {}
167 171 self.hearts = {}
168 172 # self.mia = set()
169
173 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
174 # this is the stuff that will move to DB:
175 self.pending = set() # pending messages, keyed by msg_id
176 self.queues = {} # pending msg_ids keyed by engine_id
177 self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
178 self.completed = {} # completed msg_ids keyed by engine_id
179 self.all_completed = set()
180 self._idcounter = 0
170 181 # self.sockets = {}
171 self.loop = loop
172 self.session = session
173 self.registrar = registrar
174 self.clientele = clientele
175 self.queue = queue
176 self.heartbeat = heartbeat
177 self.notifier = notifier
178 self.db = db
182 # self.loop = loop
183 # self.session = session
184 # self.registrar = registrar
185 # self.clientele = clientele
186 # self.queue = queue
187 # self.heartmonitor = heartbeat
188 # self.notifier = notifier
189 # self.db = db
179 190
180 191 # validate connection dicts:
181 self.client_addrs = client_addrs
182 assert isinstance(client_addrs['queue'], str)
183 assert isinstance(client_addrs['control'], str)
192 # self.client_addrs = client_addrs
193 validate_url_container(self.client_addrs)
194
195 # assert isinstance(self.client_addrs['queue'], str)
196 # assert isinstance(self.client_addrs['control'], str)
184 197 # self.hb_addrs = hb_addrs
185 self.engine_addrs = engine_addrs
186 assert isinstance(engine_addrs['queue'], str)
187 assert isinstance(client_addrs['control'], str)
188 assert len(engine_addrs['heartbeat']) == 2
198 validate_url_container(self.engine_addrs)
199 # self.engine_addrs = engine_addrs
200 # assert isinstance(self.engine_addrs['queue'], str)
201 # assert isinstance(self.engine_addrs['control'], str)
202 # assert len(engine_addrs['heartbeat']) == 2
189 203
190 204 # register our callbacks
191 205 self.registrar.on_recv(self.dispatch_register_request)
192 206 self.clientele.on_recv(self.dispatch_client_msg)
193 self.queue.on_recv(self.dispatch_monitor_traffic)
207 self.monitor.on_recv(self.dispatch_monitor_traffic)
194 208
195 if heartbeat is not None:
196 heartbeat.add_heart_failure_handler(self.handle_heart_failure)
197 heartbeat.add_new_heart_handler(self.handle_new_heart)
209 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
210 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
198 211
199 212 self.monitor_handlers = { 'in' : self.save_queue_request,
200 213 'out': self.save_queue_result,
@@ -218,25 +231,21 b' class Hub(object):'
218 231 'unregistration_request' : self.unregister_engine,
219 232 'connection_request': self.connection_request,
220 233 }
221 self.registration_timeout = max(5000, 2*self.heartbeat.period)
222 # this is the stuff that will move to DB:
223 # self.results = {} # completed results
224 self.pending = set() # pending messages, keyed by msg_id
225 self.queues = {} # pending msg_ids keyed by engine_id
226 self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
227 self.completed = {} # completed msg_ids keyed by engine_id
228 self.all_completed = set()
229 234
230 235 logger.info("controller::created controller")
231 236
232 def _new_id(self):
237 @property
238 def _next_id(self):
233 239 """gemerate a new ID"""
234 newid = 0
235 incoming = [id[0] for id in self.incoming_registrations.itervalues()]
236 # print newid, self.ids, self.incoming_registrations
237 while newid in self.ids or newid in incoming:
238 newid += 1
240 newid = self._idcounter
241 self._idcounter += 1
239 242 return newid
243 # newid = 0
244 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
245 # # print newid, self.ids, self.incoming_registrations
246 # while newid in self.ids or newid in incoming:
247 # newid += 1
248 # return newid
240 249
241 250 #-----------------------------------------------------------------------------
242 251 # message validation
@@ -580,6 +589,9 b' class Hub(object):'
580 589 return
581 590
582 591 parent = msg['parent_header']
592 if not parent:
593 logger.error("iopub::invalid IOPub message: %s"%msg)
594 return
583 595 msg_id = parent['msg_id']
584 596 msg_type = msg['msg_type']
585 597 content = msg['content']
@@ -631,7 +643,7 b' class Hub(object):'
631 643 return
632 644 heart = content.get('heartbeat', None)
633 645 """register a new engine, and create the socket(s) necessary"""
634 eid = self._new_id()
646 eid = self._next_id
635 647 # print (eid, queue, reg, heart)
636 648
637 649 logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
@@ -644,10 +656,12 b' class Hub(object):'
644 656 raise KeyError("queue_id %r in use"%queue)
645 657 except:
646 658 content = wrap_exception()
659 logger.error("queue_id %r in use"%queue, exc_info=True)
647 660 elif heart in self.hearts: # need to check unique hearts?
648 661 try:
649 662 raise KeyError("heart_id %r in use"%heart)
650 663 except:
664 logger.error("heart_id %r in use"%heart, exc_info=True)
651 665 content = wrap_exception()
652 666 else:
653 667 for h, pack in self.incoming_registrations.iteritems():
@@ -655,12 +669,14 b' class Hub(object):'
655 669 try:
656 670 raise KeyError("heart_id %r in use"%heart)
657 671 except:
672 logger.error("heart_id %r in use"%heart, exc_info=True)
658 673 content = wrap_exception()
659 674 break
660 675 elif queue == pack[1]:
661 676 try:
662 677 raise KeyError("queue_id %r in use"%queue)
663 678 except:
679 logger.error("queue_id %r in use"%queue, exc_info=True)
664 680 content = wrap_exception()
665 681 break
666 682
@@ -669,15 +685,15 b' class Hub(object):'
669 685 ident=reg)
670 686
671 687 if content['status'] == 'ok':
672 if heart in self.heartbeat.hearts:
688 if heart in self.heartmonitor.hearts:
673 689 # already beating
674 self.incoming_registrations[heart] = (eid,queue,reg,None)
690 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
675 691 self.finish_registration(heart)
676 692 else:
677 693 purge = lambda : self._purge_stalled_registration(heart)
678 694 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
679 695 dc.start()
680 self.incoming_registrations[heart] = (eid,queue,reg,dc)
696 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
681 697 else:
682 698 logger.error("registration::registration %i failed: %s"%(eid, content['evalue']))
683 699 return eid
@@ -718,7 +734,8 b' class Hub(object):'
718 734 control = queue
719 735 self.ids.add(eid)
720 736 self.keytable[eid] = queue
721 self.engines[eid] = EngineConnector(eid, queue, reg, control, heart)
737 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
738 control=control, heartbeat=heart)
722 739 self.by_ident[queue] = eid
723 740 self.queues[eid] = list()
724 741 self.tasks[eid] = list()
@@ -11,6 +11,8 b' Python Scheduler exists.'
11 11
12 12 from __future__ import print_function
13 13 from random import randint,random
14 import logging
15 from types import FunctionType
14 16
15 17 try:
16 18 import numpy
@@ -21,17 +23,22 b' import zmq'
21 23 from zmq.eventloop import ioloop, zmqstream
22 24
23 25 # local imports
24 from IPython.zmq.log import logger # a Logger object
26 from IPython.external.decorator import decorator
27 from IPython.config.configurable import Configurable
28 from IPython.utils.traitlets import Instance
29
25 30 from client import Client
26 31 from dependency import Dependency
27 32 import streamsession as ss
33 from entry_point import connect_logger, local_logger
28 34
29 from IPython.external.decorator import decorator
35
36 logger = logging.getLogger()
30 37
31 38 @decorator
32 39 def logged(f,self,*args,**kwargs):
33 40 # print ("#--------------------")
34 # print ("%s(*%s,**%s)"%(f.func_name, args, kwargs))
41 logger.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
35 42 # print ("#--")
36 43 return f(self,*args, **kwargs)
37 44
@@ -99,7 +106,7 b' def leastload(loads):'
99 106 #---------------------------------------------------------------------
100 107 # Classes
101 108 #---------------------------------------------------------------------
102 class TaskScheduler(object):
109 class TaskScheduler(Configurable):
103 110 """Python TaskScheduler object.
104 111
105 112 This is the simplest object that supports msg_id based
@@ -108,10 +115,15 b' class TaskScheduler(object):'
108 115
109 116 """
110 117
111 scheme = leastload # function for determining the destination
112 client_stream = None # client-facing stream
113 engine_stream = None # engine-facing stream
114 mon_stream = None # controller-facing stream
118 # configurables:
119 scheme = Instance(FunctionType, default=leastload) # function for determining the destination
120 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
121 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
122 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
123 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
124 io_loop = Instance(ioloop.IOLoop)
125
126 # internals:
115 127 dependencies = None # dict by msg_id of [ msg_ids that depend on key ]
116 128 depending = None # dict by msg_id of (msg_id, raw_msg, after, follow)
117 129 pending = None # dict by engine_uuid of submitted tasks
@@ -123,23 +135,10 b' class TaskScheduler(object):'
123 135 blacklist = None # dict by msg_id of locations where a job has encountered UnmetDependency
124 136
125 137
126 def __init__(self, client_stream, engine_stream, mon_stream,
127 notifier_stream, scheme=None, io_loop=None):
128 if io_loop is None:
129 io_loop = ioloop.IOLoop.instance()
130 self.io_loop = io_loop
131 self.client_stream = client_stream
132 self.engine_stream = engine_stream
133 self.mon_stream = mon_stream
134 self.notifier_stream = notifier_stream
135
136 if scheme is not None:
137 self.scheme = scheme
138 else:
139 self.scheme = TaskScheduler.scheme
138 def __init__(self, **kwargs):
139 super(TaskScheduler, self).__init__(**kwargs)
140 140
141 141 self.session = ss.StreamSession(username="TaskScheduler")
142
143 142 self.dependencies = {}
144 143 self.depending = {}
145 144 self.completed = {}
@@ -150,12 +149,13 b' class TaskScheduler(object):'
150 149 self.targets = []
151 150 self.loads = []
152 151
153 engine_stream.on_recv(self.dispatch_result, copy=False)
152 self.engine_stream.on_recv(self.dispatch_result, copy=False)
154 153 self._notification_handlers = dict(
155 154 registration_notification = self._register_engine,
156 155 unregistration_notification = self._unregister_engine
157 156 )
158 157 self.notifier_stream.on_recv(self.dispatch_notification)
158 logger.info("Scheduler started...%r"%self)
159 159
160 160 def resume_receiving(self):
161 161 """Resume accepting jobs."""
@@ -183,6 +183,7 b' class TaskScheduler(object):'
183 183 handler(str(msg['content']['queue']))
184 184 except KeyError:
185 185 logger.error("task::Invalid notification msg: %s"%msg)
186
186 187 @logged
187 188 def _register_engine(self, uid):
188 189 """New engine with ident `uid` became available."""
@@ -306,7 +307,8 b' class TaskScheduler(object):'
306 307 self.add_job(idx)
307 308 self.pending[target][msg_id] = (msg, follow)
308 309 content = dict(msg_id=msg_id, engine_id=target)
309 self.session.send(self.mon_stream, 'task_destination', content=content, ident='tracktask')
310 self.session.send(self.mon_stream, 'task_destination', content=content,
311 ident=['tracktask',self.session.session])
310 312
311 313 #-----------------------------------------------------------------------
312 314 # Result Handling
@@ -395,7 +397,7 b' class TaskScheduler(object):'
395 397
396 398
397 399
398 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, scheme='weighted'):
400 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, log_addr=None, loglevel=logging.DEBUG, scheme='weighted'):
399 401 from zmq.eventloop import ioloop
400 402 from zmq.eventloop.zmqstream import ZMQStream
401 403
@@ -414,7 +416,15 b" def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, scheme='weighted'):"
414 416 nots.setsockopt(zmq.SUBSCRIBE, '')
415 417 nots.connect(not_addr)
416 418
417 scheduler = TaskScheduler(ins,outs,mons,nots,scheme,loop)
419 # setup logging
420 if log_addr:
421 connect_logger(ctx, log_addr, root="scheduler", loglevel=loglevel)
422 else:
423 local_logger(loglevel)
424
425 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
426 mon_stream=mons,notifier_stream=nots,
427 scheme=scheme,io_loop=loop)
418 428
419 429 loop.start()
420 430
@@ -15,6 +15,7 b' import os'
15 15 import sys
16 16 import time
17 17 import traceback
18 import logging
18 19 from datetime import datetime
19 20 from signal import SIGTERM, SIGKILL
20 21 from pprint import pprint
@@ -25,9 +26,8 b' from zmq.eventloop import ioloop, zmqstream'
25 26
26 27 # Local imports.
27 28 from IPython.core import ultratb
28 from IPython.utils.traitlets import HasTraits, Instance, List
29 from IPython.utils.traitlets import HasTraits, Instance, List, Int
29 30 from IPython.zmq.completer import KernelCompleter
30 from IPython.zmq.log import logger # a Logger object
31 31 from IPython.zmq.iostream import OutStream
32 32 from IPython.zmq.displayhook import DisplayHook
33 33
@@ -38,6 +38,8 b' from dependency import UnmetDependency'
38 38 import heartmonitor
39 39 from client import Client
40 40
41 logger = logging.getLogger()
42
41 43 def printer(*args):
42 44 pprint(args, stream=sys.__stdout__)
43 45
@@ -51,8 +53,9 b' class Kernel(HasTraits):'
51 53 # Kernel interface
52 54 #---------------------------------------------------------------------------
53 55
56 id = Int(-1)
54 57 session = Instance(StreamSession)
55 shell_streams = Instance(list)
58 shell_streams = List()
56 59 control_stream = Instance(zmqstream.ZMQStream)
57 60 task_stream = Instance(zmqstream.ZMQStream)
58 61 iopub_stream = Instance(zmqstream.ZMQStream)
@@ -62,7 +65,8 b' class Kernel(HasTraits):'
62 65 def __init__(self, **kwargs):
63 66 super(Kernel, self).__init__(**kwargs)
64 67 self.identity = self.shell_streams[0].getsockopt(zmq.IDENTITY)
65 self.prefix = 'engine.%s'%self.identity
68 self.prefix = 'engine.%s'%self.id
69 logger.root_topic = self.prefix
66 70 self.user_ns = {}
67 71 self.history = []
68 72 self.compiler = CommandCompiler()
@@ -108,8 +112,8 b' class Kernel(HasTraits):'
108 112
109 113 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
110 114 # msg = self.reply_socket.recv_json()
111 print ("Aborting:", file=sys.__stdout__)
112 print (Message(msg), file=sys.__stdout__)
115 logger.info("Aborting:")
116 logger.info(str(msg))
113 117 msg_type = msg['msg_type']
114 118 reply_type = msg_type.split('_')[0] + '_reply'
115 119 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
@@ -117,7 +121,7 b' class Kernel(HasTraits):'
117 121 # self.reply_socket.send_json(reply_msg)
118 122 reply_msg = self.session.send(stream, reply_type,
119 123 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
120 print(Message(reply_msg), file=sys.__stdout__)
124 logger.debug(str(reply_msg))
121 125 # We need to wait a bit for requests to come in. This can probably
122 126 # be set shorter for true asynchronous clients.
123 127 time.sleep(0.05)
@@ -135,7 +139,7 b' class Kernel(HasTraits):'
135 139 content = dict(status='ok')
136 140 reply_msg = self.session.send(stream, 'abort_reply', content=content,
137 141 parent=parent, ident=ident)[0]
138 print(Message(reply_msg), file=sys.__stdout__)
142 logger(Message(reply_msg), file=sys.__stdout__)
139 143
140 144 def shutdown_request(self, stream, ident, parent):
141 145 """kill ourself. This should really be handled in an external process"""
@@ -168,7 +172,7 b' class Kernel(HasTraits):'
168 172
169 173 handler = self.control_handlers.get(msg['msg_type'], None)
170 174 if handler is None:
171 print ("UNKNOWN CONTROL MESSAGE TYPE:", msg, file=sys.__stderr__)
175 logger.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
172 176 else:
173 177 handler(self.control_stream, idents, msg)
174 178
@@ -211,13 +215,12 b' class Kernel(HasTraits):'
211 215 try:
212 216 code = parent[u'content'][u'code']
213 217 except:
214 print("Got bad msg: ", file=sys.__stderr__)
215 print(Message(parent), file=sys.__stderr__)
218 logger.error("Got bad msg: %s"%parent, exc_info=True)
216 219 return
217 220 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
218 221 # self.iopub_stream.send(pyin_msg)
219 222 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
220 ident=self.identity+'.pyin')
223 ident='%s.pyin'%self.prefix)
221 224 started = datetime.now().strftime(ISO8601)
222 225 try:
223 226 comp_code = self.compiler(code, '<zmq-kernel>')
@@ -231,7 +234,7 b' class Kernel(HasTraits):'
231 234 exc_content = self._wrap_exception('execute')
232 235 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
233 236 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
234 ident=self.identity+'.pyerr')
237 ident='%s.pyerr'%self.prefix)
235 238 reply_content = exc_content
236 239 else:
237 240 reply_content = {'status' : 'ok'}
@@ -240,7 +243,7 b' class Kernel(HasTraits):'
240 243 # self.reply_socket.send_json(reply_msg)
241 244 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
242 245 ident=ident, subheader = dict(started=started))
243 print(Message(reply_msg), file=sys.__stdout__)
246 logger.debug(str(reply_msg))
244 247 if reply_msg['content']['status'] == u'error':
245 248 self.abort_queues()
246 249
@@ -262,8 +265,7 b' class Kernel(HasTraits):'
262 265 msg_id = parent['header']['msg_id']
263 266 bound = content.get('bound', False)
264 267 except:
265 print("Got bad msg: ", file=sys.__stderr__)
266 print(Message(parent), file=sys.__stderr__)
268 logger.error("Got bad msg: %s"%parent, exc_info=True)
267 269 return
268 270 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
269 271 # self.iopub_stream.send(pyin_msg)
@@ -316,7 +318,7 b' class Kernel(HasTraits):'
316 318 exc_content = self._wrap_exception('apply')
317 319 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
318 320 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
319 ident=self.identity+'.pyerr')
321 ident='%s.pyerr'%self.prefix)
320 322 reply_content = exc_content
321 323 result_buf = []
322 324
@@ -354,7 +356,7 b' class Kernel(HasTraits):'
354 356 return
355 357 handler = self.shell_handlers.get(msg['msg_type'], None)
356 358 if handler is None:
357 print ("UNKNOWN MESSAGE TYPE:", msg, file=sys.__stderr__)
359 logger.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
358 360 else:
359 361 handler(stream, idents, msg)
360 362
@@ -398,7 +400,7 b' class Kernel(HasTraits):'
398 400 # # don't busywait
399 401 # time.sleep(1e-3)
400 402
401 def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
403 def make_kernel(int_id, identity, control_addr, shell_addrs, iopub_addr, hb_addrs,
402 404 client_addr=None, loop=None, context=None, key=None,
403 405 out_stream_factory=OutStream, display_hook_factory=DisplayHook):
404 406
@@ -410,7 +412,7 b' def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,'
410 412 c = context
411 413 session = StreamSession(key=key)
412 414 # print (session.key)
413 print (control_addr, shell_addrs, iopub_addr, hb_addrs)
415 # print (control_addr, shell_addrs, iopub_addr, hb_addrs)
414 416
415 417 # create Control Stream
416 418 control_stream = zmqstream.ZMQStream(c.socket(zmq.PAIR), loop)
@@ -433,12 +435,12 b' def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,'
433 435 # Redirect input streams and set a display hook.
434 436 if out_stream_factory:
435 437 sys.stdout = out_stream_factory(session, iopub_stream, u'stdout')
436 sys.stdout.topic = identity+'.stdout'
438 sys.stdout.topic = 'engine.%i.stdout'%int_id
437 439 sys.stderr = out_stream_factory(session, iopub_stream, u'stderr')
438 sys.stderr.topic = identity+'.stderr'
440 sys.stderr.topic = 'engine.%i.stderr'%int_id
439 441 if display_hook_factory:
440 442 sys.displayhook = display_hook_factory(session, iopub_stream)
441 sys.displayhook.topic = identity+'.pyout'
443 sys.displayhook.topic = 'engine.%i.pyout'%int_id
442 444
443 445
444 446 # launch heartbeat
@@ -451,7 +453,7 b' def make_kernel(identity, control_addr, shell_addrs, iopub_addr, hb_addrs,'
451 453 else:
452 454 client = None
453 455
454 kernel = Kernel(session=session, control_stream=control_stream,
456 kernel = Kernel(id=int_id, session=session, control_stream=control_stream,
455 457 shell_streams=shell_streams, iopub_stream=iopub_stream,
456 458 client=client, loop=loop)
457 459 kernel.start()
@@ -1,4 +1,5 b''
1 1 """some generic utilities"""
2 import re
2 3
3 4 class ReverseDict(dict):
4 5 """simple double-keyed subset of dict methods."""
@@ -33,3 +34,46 b' class ReverseDict(dict):'
33 34 return default
34 35
35 36
37 def validate_url(url):
38 """validate a url for zeromq"""
39 if not isinstance(url, basestring):
40 raise TypeError("url must be a string, not %r"%type(url))
41 url = url.lower()
42
43 proto_addr = url.split('://')
44 assert len(proto_addr) == 2, 'Invalid url: %r'%url
45 proto, addr = proto_addr
46 assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto
47
48 # domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391
49 # author: Remi Sabourin
50 pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$')
51
52 if proto == 'tcp':
53 lis = addr.split(':')
54 assert len(lis) == 2, 'Invalid url: %r'%url
55 addr,s_port = lis
56 try:
57 port = int(s_port)
58 except ValueError:
59 raise AssertionError("Invalid port %r in url: %r"%(port, url))
60
61 assert pat.match(addr) is not None, 'Invalid url: %r'%url
62
63 else:
64 # only validate tcp urls currently
65 pass
66
67 return True
68
69
70 def validate_url_container(container):
71 """validate a potentially nested collection of urls."""
72 if isinstance(container, basestring):
73 url = container
74 return validate_url(url)
75 elif isinstance(container, dict):
76 container = container.itervalues()
77
78 for element in container:
79 validate_url_container(element) No newline at end of file
@@ -19,6 +19,7 b''
19 19 # You should have received a copy of the Lesser GNU General Public License
20 20 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 21
22 import sys
22 23 import zmq
23 24 logport = 20202
24 25 def main(topics, addrs):
@@ -26,20 +27,27 b' def main(topics, addrs):'
26 27 context = zmq.Context()
27 28 socket = context.socket(zmq.SUB)
28 29 for topic in topics:
30 print "Subscribing to: %r"%topic
29 31 socket.setsockopt(zmq.SUBSCRIBE, topic)
30 32 if addrs:
31 33 for addr in addrs:
32 34 print "Connecting to: ", addr
33 35 socket.connect(addr)
34 36 else:
35 socket.bind('tcp://127.0.0.1:%i'%logport)
37 socket.bind('tcp://*:%i'%logport)
36 38
37 39 while True:
38 40 # topic = socket.recv()
39 41 # print topic
40 topic, msg = socket.recv_multipart()
41 # msg = socket.recv_pyobj()
42 # print 'tic'
43 raw = socket.recv_multipart()
44 if len(raw) != 2:
45 print "!!! invalid log message: %s"%raw
46 else:
47 topic, msg = raw
48 # don't newline, since log messages always newline:
42 49 print "%s | %s " % (topic, msg),
50 sys.stdout.flush()
43 51
44 52 if __name__ == '__main__':
45 53 import sys
General Comments 0
You need to be logged in to leave comments. Login now