##// END OF EJS Templates
added simple cluster entry point
MinRK -
Show More
@@ -0,0 +1,84
1 #!/usr/bin/env python
2 from __future__ import print_function
3 import sys,os
4 from subprocess import Popen, PIPE
5
6 from entry_point import parse_url
7 from controller import make_argument_parser
8
9 def _filter_arg(flag, args):
10 filtered = []
11 if flag in args:
12 filtered.append(flag)
13 idx = args.index(flag)
14 if len(args) > idx+1:
15 if not args[idx+1].startswith('-'):
16 filtered.append(args[idx+1])
17 return filtered
18
19 def filter_args(flags, args=sys.argv[1:]):
20 filtered = []
21 for flag in flags:
22 if isinstance(flag, (list,tuple)):
23 for f in flag:
24 filtered.extend(_filter_arg(f, args))
25 else:
26 filtered.extend(_filter_arg(flag, args))
27 return filtered
28
29 def _strip_arg(flag, args):
30 while flag in args:
31 idx = args.index(flag)
32 args.pop(idx)
33 if len(args) > idx:
34 if not args[idx].startswith('-'):
35 args.pop(idx)
36
37 def strip_args(flags, args=sys.argv[1:]):
38 args = list(args)
39 for flag in flags:
40 if isinstance(flag, (list,tuple)):
41 for f in flag:
42 _strip_arg(f, args)
43 else:
44 _strip_arg(flag, args)
45 return args
46
47
48 def launch_process(mod, args):
49 """Launch a controller or engine in a subprocess."""
50 code = "from IPython.zmq.parallel.%s import main;main()"%mod
51 arguments = [ sys.executable, '-c', code ] + args
52 blackholew = file(os.devnull, 'w')
53 blackholer = file(os.devnull, 'r')
54
55 proc = Popen(arguments, stdin=blackholer, stdout=blackholew, stderr=blackholew)
56 return proc
57
58 def main():
59 parser = make_argument_parser()
60 parser.add_argument('--n', '-n', type=int, default=1,
61 help="The number of engines to start.")
62 args = parser.parse_args()
63 parse_url(args)
64
65 controller_args = strip_args([('--n','-n')])
66 engine_args = filter_args(['--url', '--regport', '--logport', '--ip',
67 '--transport','--loglevel','--packer'])+['--ident']
68
69 controller = launch_process('controller', controller_args)
70 print("Launched Controller at %s"%args.url)
71 engines = [ launch_process('engine', engine_args+['engine-%i'%i]) for i in range(args.n) ]
72 print("%i Engines started"%args.n)
73
74 def wait_quietly(p):
75 try:
76 p.wait()
77 except KeyboardInterrupt:
78 pass
79 wait_quietly(controller)
80 map(wait_quietly, engines)
81 print ("Done")
82
83 if __name__ == '__main__':
84 main() No newline at end of file
@@ -1,919 +1,923
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """The IPython Controller with 0MQ
2 """The IPython Controller with 0MQ
3 This is the master object that handles connections from engines, clients, and
3 This is the master object that handles connections from engines, clients, and
4 """
4 """
5 #-----------------------------------------------------------------------------
5 #-----------------------------------------------------------------------------
6 # Copyright (C) 2010 The IPython Development Team
6 # Copyright (C) 2010 The IPython Development Team
7 #
7 #
8 # Distributed under the terms of the BSD License. The full license is in
8 # Distributed under the terms of the BSD License. The full license is in
9 # the file COPYING, distributed as part of this software.
9 # the file COPYING, distributed as part of this software.
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11
11
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 # Imports
13 # Imports
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 from __future__ import print_function
16
15 from datetime import datetime
17 from datetime import datetime
16 import logging
18 import logging
17
19
18 import zmq
20 import zmq
19 from zmq.eventloop import zmqstream, ioloop
21 from zmq.eventloop import zmqstream, ioloop
20 import uuid
22 import uuid
21
23
22 # internal:
24 # internal:
23 from IPython.zmq.log import logger # a Logger object
25 from IPython.zmq.log import logger # a Logger object
24 from IPython.zmq.entry_point import bind_port
26 from IPython.zmq.entry_point import bind_port
25
27
26 from streamsession import Message, wrap_exception
28 from streamsession import Message, wrap_exception
27 from entry_point import (make_argument_parser, select_random_ports, split_ports,
29 from entry_point import (make_base_argument_parser, select_random_ports, split_ports,
28 connect_logger)
30 connect_logger, parse_url)
29 # from messages import json # use the same import switches
31 # from messages import json # use the same import switches
30
32
31 #-----------------------------------------------------------------------------
33 #-----------------------------------------------------------------------------
32 # Code
34 # Code
33 #-----------------------------------------------------------------------------
35 #-----------------------------------------------------------------------------
34
36
35 class ReverseDict(dict):
37 class ReverseDict(dict):
36 """simple double-keyed subset of dict methods."""
38 """simple double-keyed subset of dict methods."""
37
39
38 def __init__(self, *args, **kwargs):
40 def __init__(self, *args, **kwargs):
39 dict.__init__(self, *args, **kwargs)
41 dict.__init__(self, *args, **kwargs)
40 self.reverse = dict()
42 self.reverse = dict()
41 for key, value in self.iteritems():
43 for key, value in self.iteritems():
42 self.reverse[value] = key
44 self.reverse[value] = key
43
45
44 def __getitem__(self, key):
46 def __getitem__(self, key):
45 try:
47 try:
46 return dict.__getitem__(self, key)
48 return dict.__getitem__(self, key)
47 except KeyError:
49 except KeyError:
48 return self.reverse[key]
50 return self.reverse[key]
49
51
50 def __setitem__(self, key, value):
52 def __setitem__(self, key, value):
51 if key in self.reverse:
53 if key in self.reverse:
52 raise KeyError("Can't have key %r on both sides!"%key)
54 raise KeyError("Can't have key %r on both sides!"%key)
53 dict.__setitem__(self, key, value)
55 dict.__setitem__(self, key, value)
54 self.reverse[value] = key
56 self.reverse[value] = key
55
57
56 def pop(self, key):
58 def pop(self, key):
57 value = dict.pop(self, key)
59 value = dict.pop(self, key)
58 self.d1.pop(value)
60 self.d1.pop(value)
59 return value
61 return value
60
62
61
63
62 class EngineConnector(object):
64 class EngineConnector(object):
63 """A simple object for accessing the various zmq connections of an object.
65 """A simple object for accessing the various zmq connections of an object.
64 Attributes are:
66 Attributes are:
65 id (int): engine ID
67 id (int): engine ID
66 uuid (str): uuid (unused?)
68 uuid (str): uuid (unused?)
67 queue (str): identity of queue's XREQ socket
69 queue (str): identity of queue's XREQ socket
68 registration (str): identity of registration XREQ socket
70 registration (str): identity of registration XREQ socket
69 heartbeat (str): identity of heartbeat XREQ socket
71 heartbeat (str): identity of heartbeat XREQ socket
70 """
72 """
71 id=0
73 id=0
72 queue=None
74 queue=None
73 control=None
75 control=None
74 registration=None
76 registration=None
75 heartbeat=None
77 heartbeat=None
76 pending=None
78 pending=None
77
79
78 def __init__(self, id, queue, registration, control, heartbeat=None):
80 def __init__(self, id, queue, registration, control, heartbeat=None):
79 logger.info("engine::Engine Connected: %i"%id)
81 logger.info("engine::Engine Connected: %i"%id)
80 self.id = id
82 self.id = id
81 self.queue = queue
83 self.queue = queue
82 self.registration = registration
84 self.registration = registration
83 self.control = control
85 self.control = control
84 self.heartbeat = heartbeat
86 self.heartbeat = heartbeat
85
87
86 class Controller(object):
88 class Controller(object):
87 """The IPython Controller with 0MQ connections
89 """The IPython Controller with 0MQ connections
88
90
89 Parameters
91 Parameters
90 ==========
92 ==========
91 loop: zmq IOLoop instance
93 loop: zmq IOLoop instance
92 session: StreamSession object
94 session: StreamSession object
93 <removed> context: zmq context for creating new connections (?)
95 <removed> context: zmq context for creating new connections (?)
94 registrar: ZMQStream for engine registration requests (XREP)
96 registrar: ZMQStream for engine registration requests (XREP)
95 clientele: ZMQStream for client connections (XREP)
97 clientele: ZMQStream for client connections (XREP)
96 not used for jobs, only query/control commands
98 not used for jobs, only query/control commands
97 queue: ZMQStream for monitoring the command queue (SUB)
99 queue: ZMQStream for monitoring the command queue (SUB)
98 heartbeat: HeartMonitor object checking the pulse of the engines
100 heartbeat: HeartMonitor object checking the pulse of the engines
99 db_stream: connection to db for out of memory logging of commands
101 db_stream: connection to db for out of memory logging of commands
100 NotImplemented
102 NotImplemented
101 queue_addr: zmq connection address of the XREP socket for the queue
103 queue_addr: zmq connection address of the XREP socket for the queue
102 hb_addr: zmq connection address of the PUB socket for heartbeats
104 hb_addr: zmq connection address of the PUB socket for heartbeats
103 task_addr: zmq connection address of the XREQ socket for task queue
105 task_addr: zmq connection address of the XREQ socket for task queue
104 """
106 """
105 # internal data structures:
107 # internal data structures:
106 ids=None # engine IDs
108 ids=None # engine IDs
107 keytable=None
109 keytable=None
108 engines=None
110 engines=None
109 clients=None
111 clients=None
110 hearts=None
112 hearts=None
111 pending=None
113 pending=None
112 results=None
114 results=None
113 tasks=None
115 tasks=None
114 completed=None
116 completed=None
115 mia=None
117 mia=None
116 incoming_registrations=None
118 incoming_registrations=None
117 registration_timeout=None
119 registration_timeout=None
118
120
119 #objects from constructor:
121 #objects from constructor:
120 loop=None
122 loop=None
121 registrar=None
123 registrar=None
122 clientelle=None
124 clientelle=None
123 queue=None
125 queue=None
124 heartbeat=None
126 heartbeat=None
125 notifier=None
127 notifier=None
126 db=None
128 db=None
127 client_addr=None
129 client_addr=None
128 engine_addrs=None
130 engine_addrs=None
129
131
130
132
131 def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifier, db, engine_addrs, client_addrs):
133 def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifier, db, engine_addrs, client_addrs):
132 """
134 """
133 # universal:
135 # universal:
134 loop: IOLoop for creating future connections
136 loop: IOLoop for creating future connections
135 session: streamsession for sending serialized data
137 session: streamsession for sending serialized data
136 # engine:
138 # engine:
137 queue: ZMQStream for monitoring queue messages
139 queue: ZMQStream for monitoring queue messages
138 registrar: ZMQStream for engine registration
140 registrar: ZMQStream for engine registration
139 heartbeat: HeartMonitor object for tracking engines
141 heartbeat: HeartMonitor object for tracking engines
140 # client:
142 # client:
141 clientele: ZMQStream for client connections
143 clientele: ZMQStream for client connections
142 # extra:
144 # extra:
143 db: ZMQStream for db connection (NotImplemented)
145 db: ZMQStream for db connection (NotImplemented)
144 engine_addrs: zmq address/protocol dict for engine connections
146 engine_addrs: zmq address/protocol dict for engine connections
145 client_addrs: zmq address/protocol dict for client connections
147 client_addrs: zmq address/protocol dict for client connections
146 """
148 """
147 self.ids = set()
149 self.ids = set()
148 self.keytable={}
150 self.keytable={}
149 self.incoming_registrations={}
151 self.incoming_registrations={}
150 self.engines = {}
152 self.engines = {}
151 self.by_ident = {}
153 self.by_ident = {}
152 self.clients = {}
154 self.clients = {}
153 self.hearts = {}
155 self.hearts = {}
154 self.mia = set()
156 self.mia = set()
155
157
156 # self.sockets = {}
158 # self.sockets = {}
157 self.loop = loop
159 self.loop = loop
158 self.session = session
160 self.session = session
159 self.registrar = registrar
161 self.registrar = registrar
160 self.clientele = clientele
162 self.clientele = clientele
161 self.queue = queue
163 self.queue = queue
162 self.heartbeat = heartbeat
164 self.heartbeat = heartbeat
163 self.notifier = notifier
165 self.notifier = notifier
164 self.db = db
166 self.db = db
165
167
166 self.client_addrs = client_addrs
168 self.client_addrs = client_addrs
167 assert isinstance(client_addrs['queue'], str)
169 assert isinstance(client_addrs['queue'], str)
168 # self.hb_addrs = hb_addrs
170 # self.hb_addrs = hb_addrs
169 self.engine_addrs = engine_addrs
171 self.engine_addrs = engine_addrs
170 assert isinstance(engine_addrs['queue'], str)
172 assert isinstance(engine_addrs['queue'], str)
171 assert len(engine_addrs['heartbeat']) == 2
173 assert len(engine_addrs['heartbeat']) == 2
172
174
173
175
174 # register our callbacks
176 # register our callbacks
175 self.registrar.on_recv(self.dispatch_register_request)
177 self.registrar.on_recv(self.dispatch_register_request)
176 self.clientele.on_recv(self.dispatch_client_msg)
178 self.clientele.on_recv(self.dispatch_client_msg)
177 self.queue.on_recv(self.dispatch_queue_traffic)
179 self.queue.on_recv(self.dispatch_queue_traffic)
178
180
179 if heartbeat is not None:
181 if heartbeat is not None:
180 heartbeat.add_heart_failure_handler(self.handle_heart_failure)
182 heartbeat.add_heart_failure_handler(self.handle_heart_failure)
181 heartbeat.add_new_heart_handler(self.handle_new_heart)
183 heartbeat.add_new_heart_handler(self.handle_new_heart)
182
184
183 if self.db is not None:
185 if self.db is not None:
184 self.db.on_recv(self.dispatch_db)
186 self.db.on_recv(self.dispatch_db)
185
187
186 self.client_handlers = {'queue_request': self.queue_status,
188 self.client_handlers = {'queue_request': self.queue_status,
187 'result_request': self.get_results,
189 'result_request': self.get_results,
188 'purge_request': self.purge_results,
190 'purge_request': self.purge_results,
189 'resubmit_request': self.resubmit_task,
191 'resubmit_request': self.resubmit_task,
190 }
192 }
191
193
192 self.registrar_handlers = {'registration_request' : self.register_engine,
194 self.registrar_handlers = {'registration_request' : self.register_engine,
193 'unregistration_request' : self.unregister_engine,
195 'unregistration_request' : self.unregister_engine,
194 'connection_request': self.connection_request,
196 'connection_request': self.connection_request,
195
197
196 }
198 }
197 #
199 #
198 # this is the stuff that will move to DB:
200 # this is the stuff that will move to DB:
199 self.results = {} # completed results
201 self.results = {} # completed results
200 self.pending = {} # pending messages, keyed by msg_id
202 self.pending = {} # pending messages, keyed by msg_id
201 self.queues = {} # pending msg_ids keyed by engine_id
203 self.queues = {} # pending msg_ids keyed by engine_id
202 self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
204 self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
203 self.completed = {} # completed msg_ids keyed by engine_id
205 self.completed = {} # completed msg_ids keyed by engine_id
204 self.registration_timeout = max(5000, 2*self.heartbeat.period)
206 self.registration_timeout = max(5000, 2*self.heartbeat.period)
205
207
206 logger.info("controller::created controller")
208 logger.info("controller::created controller")
207
209
208 def _new_id(self):
210 def _new_id(self):
209 """gemerate a new ID"""
211 """gemerate a new ID"""
210 newid = 0
212 newid = 0
211 incoming = [id[0] for id in self.incoming_registrations.itervalues()]
213 incoming = [id[0] for id in self.incoming_registrations.itervalues()]
212 # print newid, self.ids, self.incoming_registrations
214 # print newid, self.ids, self.incoming_registrations
213 while newid in self.ids or newid in incoming:
215 while newid in self.ids or newid in incoming:
214 newid += 1
216 newid += 1
215 return newid
217 return newid
216
218
217 #-----------------------------------------------------------------------------
219 #-----------------------------------------------------------------------------
218 # message validation
220 # message validation
219 #-----------------------------------------------------------------------------
221 #-----------------------------------------------------------------------------
220
222
221 def _validate_targets(self, targets):
223 def _validate_targets(self, targets):
222 """turn any valid targets argument into a list of integer ids"""
224 """turn any valid targets argument into a list of integer ids"""
223 if targets is None:
225 if targets is None:
224 # default to all
226 # default to all
225 targets = self.ids
227 targets = self.ids
226
228
227 if isinstance(targets, (int,str,unicode)):
229 if isinstance(targets, (int,str,unicode)):
228 # only one target specified
230 # only one target specified
229 targets = [targets]
231 targets = [targets]
230 _targets = []
232 _targets = []
231 for t in targets:
233 for t in targets:
232 # map raw identities to ids
234 # map raw identities to ids
233 if isinstance(t, (str,unicode)):
235 if isinstance(t, (str,unicode)):
234 t = self.by_ident.get(t, t)
236 t = self.by_ident.get(t, t)
235 _targets.append(t)
237 _targets.append(t)
236 targets = _targets
238 targets = _targets
237 bad_targets = [ t for t in targets if t not in self.ids ]
239 bad_targets = [ t for t in targets if t not in self.ids ]
238 if bad_targets:
240 if bad_targets:
239 raise IndexError("No Such Engine: %r"%bad_targets)
241 raise IndexError("No Such Engine: %r"%bad_targets)
240 if not targets:
242 if not targets:
241 raise IndexError("No Engines Registered")
243 raise IndexError("No Engines Registered")
242 return targets
244 return targets
243
245
244 def _validate_client_msg(self, msg):
246 def _validate_client_msg(self, msg):
245 """validates and unpacks headers of a message. Returns False if invalid,
247 """validates and unpacks headers of a message. Returns False if invalid,
246 (ident, header, parent, content)"""
248 (ident, header, parent, content)"""
247 client_id = msg[0]
249 client_id = msg[0]
248 try:
250 try:
249 msg = self.session.unpack_message(msg[1:], content=True)
251 msg = self.session.unpack_message(msg[1:], content=True)
250 except:
252 except:
251 logger.error("client::Invalid Message %s"%msg)
253 logger.error("client::Invalid Message %s"%msg)
252 return False
254 return False
253
255
254 msg_type = msg.get('msg_type', None)
256 msg_type = msg.get('msg_type', None)
255 if msg_type is None:
257 if msg_type is None:
256 return False
258 return False
257 header = msg.get('header')
259 header = msg.get('header')
258 # session doesn't handle split content for now:
260 # session doesn't handle split content for now:
259 return client_id, msg
261 return client_id, msg
260
262
261
263
262 #-----------------------------------------------------------------------------
264 #-----------------------------------------------------------------------------
263 # dispatch methods (1 per stream)
265 # dispatch methods (1 per stream)
264 #-----------------------------------------------------------------------------
266 #-----------------------------------------------------------------------------
265
267
266 def dispatch_register_request(self, msg):
268 def dispatch_register_request(self, msg):
267 """"""
269 """"""
268 logger.debug("registration::dispatch_register_request(%s)"%msg)
270 logger.debug("registration::dispatch_register_request(%s)"%msg)
269 idents,msg = self.session.feed_identities(msg)
271 idents,msg = self.session.feed_identities(msg)
270 print idents,msg, len(msg)
272 print (idents,msg, len(msg))
271 try:
273 try:
272 msg = self.session.unpack_message(msg,content=True)
274 msg = self.session.unpack_message(msg,content=True)
273 except Exception, e:
275 except Exception, e:
274 logger.error("registration::got bad registration message: %s"%msg)
276 logger.error("registration::got bad registration message: %s"%msg)
275 raise e
277 raise e
276 return
278 return
277
279
278 msg_type = msg['msg_type']
280 msg_type = msg['msg_type']
279 content = msg['content']
281 content = msg['content']
280
282
281 handler = self.registrar_handlers.get(msg_type, None)
283 handler = self.registrar_handlers.get(msg_type, None)
282 if handler is None:
284 if handler is None:
283 logger.error("registration::got bad registration message: %s"%msg)
285 logger.error("registration::got bad registration message: %s"%msg)
284 else:
286 else:
285 handler(idents, msg)
287 handler(idents, msg)
286
288
287 def dispatch_queue_traffic(self, msg):
289 def dispatch_queue_traffic(self, msg):
288 """all ME and Task queue messages come through here"""
290 """all ME and Task queue messages come through here"""
289 logger.debug("queue traffic: %s"%msg[:2])
291 logger.debug("queue traffic: %s"%msg[:2])
290 switch = msg[0]
292 switch = msg[0]
291 idents, msg = self.session.feed_identities(msg[1:])
293 idents, msg = self.session.feed_identities(msg[1:])
292 if switch == 'in':
294 if switch == 'in':
293 self.save_queue_request(idents, msg)
295 self.save_queue_request(idents, msg)
294 elif switch == 'out':
296 elif switch == 'out':
295 self.save_queue_result(idents, msg)
297 self.save_queue_result(idents, msg)
296 elif switch == 'intask':
298 elif switch == 'intask':
297 self.save_task_request(idents, msg)
299 self.save_task_request(idents, msg)
298 elif switch == 'outtask':
300 elif switch == 'outtask':
299 self.save_task_result(idents, msg)
301 self.save_task_result(idents, msg)
300 elif switch == 'tracktask':
302 elif switch == 'tracktask':
301 self.save_task_destination(idents, msg)
303 self.save_task_destination(idents, msg)
302 elif switch in ('incontrol', 'outcontrol'):
304 elif switch in ('incontrol', 'outcontrol'):
303 pass
305 pass
304 else:
306 else:
305 logger.error("Invalid message topic: %s"%switch)
307 logger.error("Invalid message topic: %s"%switch)
306
308
307
309
308 def dispatch_client_msg(self, msg):
310 def dispatch_client_msg(self, msg):
309 """Route messages from clients"""
311 """Route messages from clients"""
310 idents, msg = self.session.feed_identities(msg)
312 idents, msg = self.session.feed_identities(msg)
311 client_id = idents[0]
313 client_id = idents[0]
312 try:
314 try:
313 msg = self.session.unpack_message(msg, content=True)
315 msg = self.session.unpack_message(msg, content=True)
314 except:
316 except:
315 content = wrap_exception()
317 content = wrap_exception()
316 logger.error("Bad Client Message: %s"%msg)
318 logger.error("Bad Client Message: %s"%msg)
317 self.session.send(self.clientele, "controller_error", ident=client_id,
319 self.session.send(self.clientele, "controller_error", ident=client_id,
318 content=content)
320 content=content)
319 return
321 return
320
322
321 # print client_id, header, parent, content
323 # print client_id, header, parent, content
322 #switch on message type:
324 #switch on message type:
323 msg_type = msg['msg_type']
325 msg_type = msg['msg_type']
324 logger.info("client:: client %s requested %s"%(client_id, msg_type))
326 logger.info("client:: client %s requested %s"%(client_id, msg_type))
325 handler = self.client_handlers.get(msg_type, None)
327 handler = self.client_handlers.get(msg_type, None)
326 try:
328 try:
327 assert handler is not None, "Bad Message Type: %s"%msg_type
329 assert handler is not None, "Bad Message Type: %s"%msg_type
328 except:
330 except:
329 content = wrap_exception()
331 content = wrap_exception()
330 logger.error("Bad Message Type: %s"%msg_type)
332 logger.error("Bad Message Type: %s"%msg_type)
331 self.session.send(self.clientele, "controller_error", ident=client_id,
333 self.session.send(self.clientele, "controller_error", ident=client_id,
332 content=content)
334 content=content)
333 return
335 return
334 else:
336 else:
335 handler(client_id, msg)
337 handler(client_id, msg)
336
338
337 def dispatch_db(self, msg):
339 def dispatch_db(self, msg):
338 """"""
340 """"""
339 raise NotImplementedError
341 raise NotImplementedError
340
342
341 #---------------------------------------------------------------------------
343 #---------------------------------------------------------------------------
342 # handler methods (1 per event)
344 # handler methods (1 per event)
343 #---------------------------------------------------------------------------
345 #---------------------------------------------------------------------------
344
346
345 #----------------------- Heartbeat --------------------------------------
347 #----------------------- Heartbeat --------------------------------------
346
348
347 def handle_new_heart(self, heart):
349 def handle_new_heart(self, heart):
348 """handler to attach to heartbeater.
350 """handler to attach to heartbeater.
349 Called when a new heart starts to beat.
351 Called when a new heart starts to beat.
350 Triggers completion of registration."""
352 Triggers completion of registration."""
351 logger.debug("heartbeat::handle_new_heart(%r)"%heart)
353 logger.debug("heartbeat::handle_new_heart(%r)"%heart)
352 if heart not in self.incoming_registrations:
354 if heart not in self.incoming_registrations:
353 logger.info("heartbeat::ignoring new heart: %r"%heart)
355 logger.info("heartbeat::ignoring new heart: %r"%heart)
354 else:
356 else:
355 self.finish_registration(heart)
357 self.finish_registration(heart)
356
358
357
359
358 def handle_heart_failure(self, heart):
360 def handle_heart_failure(self, heart):
359 """handler to attach to heartbeater.
361 """handler to attach to heartbeater.
360 called when a previously registered heart fails to respond to beat request.
362 called when a previously registered heart fails to respond to beat request.
361 triggers unregistration"""
363 triggers unregistration"""
362 logger.debug("heartbeat::handle_heart_failure(%r)"%heart)
364 logger.debug("heartbeat::handle_heart_failure(%r)"%heart)
363 eid = self.hearts.get(heart, None)
365 eid = self.hearts.get(heart, None)
364 queue = self.engines[eid].queue
366 queue = self.engines[eid].queue
365 if eid is None:
367 if eid is None:
366 logger.info("heartbeat::ignoring heart failure %r"%heart)
368 logger.info("heartbeat::ignoring heart failure %r"%heart)
367 else:
369 else:
368 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
370 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
369
371
370 #----------------------- MUX Queue Traffic ------------------------------
372 #----------------------- MUX Queue Traffic ------------------------------
371
373
372 def save_queue_request(self, idents, msg):
374 def save_queue_request(self, idents, msg):
373 queue_id, client_id = idents[:2]
375 queue_id, client_id = idents[:2]
374
376
375 try:
377 try:
376 msg = self.session.unpack_message(msg, content=False)
378 msg = self.session.unpack_message(msg, content=False)
377 except:
379 except:
378 logger.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg))
380 logger.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg))
379 return
381 return
380
382
381 eid = self.by_ident.get(queue_id, None)
383 eid = self.by_ident.get(queue_id, None)
382 if eid is None:
384 if eid is None:
383 logger.error("queue::target %r not registered"%queue_id)
385 logger.error("queue::target %r not registered"%queue_id)
384 logger.debug("queue:: valid are: %s"%(self.by_ident.keys()))
386 logger.debug("queue:: valid are: %s"%(self.by_ident.keys()))
385 return
387 return
386
388
387 header = msg['header']
389 header = msg['header']
388 msg_id = header['msg_id']
390 msg_id = header['msg_id']
389 info = dict(submit=datetime.now(),
391 info = dict(submit=datetime.now(),
390 received=None,
392 received=None,
391 engine=(eid, queue_id))
393 engine=(eid, queue_id))
392 self.pending[msg_id] = ( msg, info )
394 self.pending[msg_id] = ( msg, info )
393 self.queues[eid][0].append(msg_id)
395 self.queues[eid][0].append(msg_id)
394
396
395 def save_queue_result(self, idents, msg):
397 def save_queue_result(self, idents, msg):
396 client_id, queue_id = idents[:2]
398 client_id, queue_id = idents[:2]
397
399
398 try:
400 try:
399 msg = self.session.unpack_message(msg, content=False)
401 msg = self.session.unpack_message(msg, content=False)
400 except:
402 except:
401 logger.error("queue::engine %r sent invalid message to %r: %s"%(
403 logger.error("queue::engine %r sent invalid message to %r: %s"%(
402 queue_id,client_id, msg))
404 queue_id,client_id, msg))
403 return
405 return
404
406
405 eid = self.by_ident.get(queue_id, None)
407 eid = self.by_ident.get(queue_id, None)
406 if eid is None:
408 if eid is None:
407 logger.error("queue::unknown engine %r is sending a reply: "%queue_id)
409 logger.error("queue::unknown engine %r is sending a reply: "%queue_id)
408 logger.debug("queue:: %s"%msg[2:])
410 logger.debug("queue:: %s"%msg[2:])
409 return
411 return
410
412
411 parent = msg['parent_header']
413 parent = msg['parent_header']
412 if not parent:
414 if not parent:
413 return
415 return
414 msg_id = parent['msg_id']
416 msg_id = parent['msg_id']
415 self.results[msg_id] = msg
417 self.results[msg_id] = msg
416 if msg_id in self.pending:
418 if msg_id in self.pending:
417 self.pending.pop(msg_id)
419 self.pending.pop(msg_id)
418 self.queues[eid][0].remove(msg_id)
420 self.queues[eid][0].remove(msg_id)
419 self.completed[eid].append(msg_id)
421 self.completed[eid].append(msg_id)
420 else:
422 else:
421 logger.debug("queue:: unknown msg finished %s"%msg_id)
423 logger.debug("queue:: unknown msg finished %s"%msg_id)
422
424
423 #--------------------- Task Queue Traffic ------------------------------
425 #--------------------- Task Queue Traffic ------------------------------
424
426
425 def save_task_request(self, idents, msg):
427 def save_task_request(self, idents, msg):
426 client_id = idents[0]
428 client_id = idents[0]
427
429
428 try:
430 try:
429 msg = self.session.unpack_message(msg, content=False)
431 msg = self.session.unpack_message(msg, content=False)
430 except:
432 except:
431 logger.error("task::client %r sent invalid task message: %s"%(
433 logger.error("task::client %r sent invalid task message: %s"%(
432 client_id, msg))
434 client_id, msg))
433 return
435 return
434
436
435 header = msg['header']
437 header = msg['header']
436 msg_id = header['msg_id']
438 msg_id = header['msg_id']
437 self.mia.add(msg_id)
439 self.mia.add(msg_id)
438 self.pending[msg_id] = msg
440 self.pending[msg_id] = msg
439 if not self.tasks.has_key(client_id):
441 if not self.tasks.has_key(client_id):
440 self.tasks[client_id] = []
442 self.tasks[client_id] = []
441 self.tasks[client_id].append(msg_id)
443 self.tasks[client_id].append(msg_id)
442
444
443 def save_task_result(self, idents, msg):
445 def save_task_result(self, idents, msg):
444 client_id = idents[0]
446 client_id = idents[0]
445 try:
447 try:
446 msg = self.session.unpack_message(msg, content=False)
448 msg = self.session.unpack_message(msg, content=False)
447 except:
449 except:
448 logger.error("task::invalid task result message send to %r: %s"%(
450 logger.error("task::invalid task result message send to %r: %s"%(
449 client_id, msg))
451 client_id, msg))
450 return
452 return
451
453
452 parent = msg['parent_header']
454 parent = msg['parent_header']
453 if not parent:
455 if not parent:
454 # print msg
456 # print msg
455 # logger.warn("")
457 # logger.warn("")
456 return
458 return
457 msg_id = parent['msg_id']
459 msg_id = parent['msg_id']
458 self.results[msg_id] = msg
460 self.results[msg_id] = msg
459 if msg_id in self.pending:
461 if msg_id in self.pending:
460 self.pending.pop(msg_id)
462 self.pending.pop(msg_id)
461 if msg_id in self.mia:
463 if msg_id in self.mia:
462 self.mia.remove(msg_id)
464 self.mia.remove(msg_id)
463 else:
465 else:
464 logger.debug("task::unknown task %s finished"%msg_id)
466 logger.debug("task::unknown task %s finished"%msg_id)
465
467
466 def save_task_destination(self, idents, msg):
468 def save_task_destination(self, idents, msg):
467 try:
469 try:
468 msg = self.session.unpack_message(msg, content=True)
470 msg = self.session.unpack_message(msg, content=True)
469 except:
471 except:
470 logger.error("task::invalid task tracking message")
472 logger.error("task::invalid task tracking message")
471 return
473 return
472 content = msg['content']
474 content = msg['content']
473 print content
475 print (content)
474 msg_id = content['msg_id']
476 msg_id = content['msg_id']
475 engine_uuid = content['engine_id']
477 engine_uuid = content['engine_id']
476 for eid,queue_id in self.keytable.iteritems():
478 for eid,queue_id in self.keytable.iteritems():
477 if queue_id == engine_uuid:
479 if queue_id == engine_uuid:
478 break
480 break
479
481
480 logger.info("task::task %s arrived on %s"%(msg_id, eid))
482 logger.info("task::task %s arrived on %s"%(msg_id, eid))
481 if msg_id in self.mia:
483 if msg_id in self.mia:
482 self.mia.remove(msg_id)
484 self.mia.remove(msg_id)
483 else:
485 else:
484 logger.debug("task::task %s not listed as MIA?!"%(msg_id))
486 logger.debug("task::task %s not listed as MIA?!"%(msg_id))
485 self.tasks[engine_uuid].append(msg_id)
487 self.tasks[engine_uuid].append(msg_id)
486
488
487 def mia_task_request(self, idents, msg):
489 def mia_task_request(self, idents, msg):
488 client_id = idents[0]
490 client_id = idents[0]
489 content = dict(mia=self.mia,status='ok')
491 content = dict(mia=self.mia,status='ok')
490 self.session.send('mia_reply', content=content, idents=client_id)
492 self.session.send('mia_reply', content=content, idents=client_id)
491
493
492
494
493
495
494 #-------------------- Registration -----------------------------
496 #-------------------- Registration -----------------------------
495
497
496 def connection_request(self, client_id, msg):
498 def connection_request(self, client_id, msg):
497 """reply with connection addresses for clients"""
499 """reply with connection addresses for clients"""
498 logger.info("client::client %s connected"%client_id)
500 logger.info("client::client %s connected"%client_id)
499 content = dict(status='ok')
501 content = dict(status='ok')
500 content.update(self.client_addrs)
502 content.update(self.client_addrs)
501 jsonable = {}
503 jsonable = {}
502 for k,v in self.keytable.iteritems():
504 for k,v in self.keytable.iteritems():
503 jsonable[str(k)] = v
505 jsonable[str(k)] = v
504 content['engines'] = jsonable
506 content['engines'] = jsonable
505 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
507 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
506
508
507 def register_engine(self, reg, msg):
509 def register_engine(self, reg, msg):
508 """register an engine"""
510 """register an engine"""
509 content = msg['content']
511 content = msg['content']
510 try:
512 try:
511 queue = content['queue']
513 queue = content['queue']
512 except KeyError:
514 except KeyError:
513 logger.error("registration::queue not specified")
515 logger.error("registration::queue not specified")
514 return
516 return
515 heart = content.get('heartbeat', None)
517 heart = content.get('heartbeat', None)
516 """register a new engine, and create the socket(s) necessary"""
518 """register a new engine, and create the socket(s) necessary"""
517 eid = self._new_id()
519 eid = self._new_id()
518 # print (eid, queue, reg, heart)
520 # print (eid, queue, reg, heart)
519
521
520 logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
522 logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
521
523
522 content = dict(id=eid,status='ok')
524 content = dict(id=eid,status='ok')
523 content.update(self.engine_addrs)
525 content.update(self.engine_addrs)
524 # check if requesting available IDs:
526 # check if requesting available IDs:
525 if queue in self.by_ident:
527 if queue in self.by_ident:
526 content = {'status': 'error', 'reason': "queue_id %r in use"%queue}
528 content = {'status': 'error', 'reason': "queue_id %r in use"%queue}
527 elif heart in self.hearts: # need to check unique hearts?
529 elif heart in self.hearts: # need to check unique hearts?
528 content = {'status': 'error', 'reason': "heart_id %r in use"%heart}
530 content = {'status': 'error', 'reason': "heart_id %r in use"%heart}
529 else:
531 else:
530 for h, pack in self.incoming_registrations.iteritems():
532 for h, pack in self.incoming_registrations.iteritems():
531 if heart == h:
533 if heart == h:
532 content = {'status': 'error', 'reason': "heart_id %r in use"%heart}
534 content = {'status': 'error', 'reason': "heart_id %r in use"%heart}
533 break
535 break
534 elif queue == pack[1]:
536 elif queue == pack[1]:
535 content = {'status': 'error', 'reason': "queue_id %r in use"%queue}
537 content = {'status': 'error', 'reason': "queue_id %r in use"%queue}
536 break
538 break
537
539
538 msg = self.session.send(self.registrar, "registration_reply",
540 msg = self.session.send(self.registrar, "registration_reply",
539 content=content,
541 content=content,
540 ident=reg)
542 ident=reg)
541
543
542 if content['status'] == 'ok':
544 if content['status'] == 'ok':
543 if heart in self.heartbeat.hearts:
545 if heart in self.heartbeat.hearts:
544 # already beating
546 # already beating
545 self.incoming_registrations[heart] = (eid,queue,reg,None)
547 self.incoming_registrations[heart] = (eid,queue,reg,None)
546 self.finish_registration(heart)
548 self.finish_registration(heart)
547 else:
549 else:
548 purge = lambda : self._purge_stalled_registration(heart)
550 purge = lambda : self._purge_stalled_registration(heart)
549 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
551 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
550 dc.start()
552 dc.start()
551 self.incoming_registrations[heart] = (eid,queue,reg,dc)
553 self.incoming_registrations[heart] = (eid,queue,reg,dc)
552 else:
554 else:
553 logger.error("registration::registration %i failed: %s"%(eid, content['reason']))
555 logger.error("registration::registration %i failed: %s"%(eid, content['reason']))
554 return eid
556 return eid
555
557
556 def unregister_engine(self, ident, msg):
558 def unregister_engine(self, ident, msg):
557 try:
559 try:
558 eid = msg['content']['id']
560 eid = msg['content']['id']
559 except:
561 except:
560 logger.error("registration::bad engine id for unregistration: %s"%ident)
562 logger.error("registration::bad engine id for unregistration: %s"%ident)
561 return
563 return
562 logger.info("registration::unregister_engine(%s)"%eid)
564 logger.info("registration::unregister_engine(%s)"%eid)
563 content=dict(id=eid, queue=self.engines[eid].queue)
565 content=dict(id=eid, queue=self.engines[eid].queue)
564 self.ids.remove(eid)
566 self.ids.remove(eid)
565 self.keytable.pop(eid)
567 self.keytable.pop(eid)
566 ec = self.engines.pop(eid)
568 ec = self.engines.pop(eid)
567 self.hearts.pop(ec.heartbeat)
569 self.hearts.pop(ec.heartbeat)
568 self.by_ident.pop(ec.queue)
570 self.by_ident.pop(ec.queue)
569 self.completed.pop(eid)
571 self.completed.pop(eid)
570 for msg_id in self.queues.pop(eid)[0]:
572 for msg_id in self.queues.pop(eid)[0]:
571 msg = self.pending.pop(msg_id)
573 msg = self.pending.pop(msg_id)
572 ############## TODO: HANDLE IT ################
574 ############## TODO: HANDLE IT ################
573
575
574 if self.notifier:
576 if self.notifier:
575 self.session.send(self.notifier, "unregistration_notification", content=content)
577 self.session.send(self.notifier, "unregistration_notification", content=content)
576
578
577 def finish_registration(self, heart):
579 def finish_registration(self, heart):
578 try:
580 try:
579 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
581 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
580 except KeyError:
582 except KeyError:
581 logger.error("registration::tried to finish nonexistant registration")
583 logger.error("registration::tried to finish nonexistant registration")
582 return
584 return
583 logger.info("registration::finished registering engine %i:%r"%(eid,queue))
585 logger.info("registration::finished registering engine %i:%r"%(eid,queue))
584 if purge is not None:
586 if purge is not None:
585 purge.stop()
587 purge.stop()
586 control = queue
588 control = queue
587 self.ids.add(eid)
589 self.ids.add(eid)
588 self.keytable[eid] = queue
590 self.keytable[eid] = queue
589 self.engines[eid] = EngineConnector(eid, queue, reg, control, heart)
591 self.engines[eid] = EngineConnector(eid, queue, reg, control, heart)
590 self.by_ident[queue] = eid
592 self.by_ident[queue] = eid
591 self.queues[eid] = ([],[])
593 self.queues[eid] = ([],[])
592 self.completed[eid] = list()
594 self.completed[eid] = list()
593 self.hearts[heart] = eid
595 self.hearts[heart] = eid
594 content = dict(id=eid, queue=self.engines[eid].queue)
596 content = dict(id=eid, queue=self.engines[eid].queue)
595 if self.notifier:
597 if self.notifier:
596 self.session.send(self.notifier, "registration_notification", content=content)
598 self.session.send(self.notifier, "registration_notification", content=content)
597
599
598 def _purge_stalled_registration(self, heart):
600 def _purge_stalled_registration(self, heart):
599 if heart in self.incoming_registrations:
601 if heart in self.incoming_registrations:
600 eid = self.incoming_registrations.pop(heart)[0]
602 eid = self.incoming_registrations.pop(heart)[0]
601 logger.info("registration::purging stalled registration: %i"%eid)
603 logger.info("registration::purging stalled registration: %i"%eid)
602 else:
604 else:
603 pass
605 pass
604
606
605 #------------------- Client Requests -------------------------------
607 #------------------- Client Requests -------------------------------
606
608
607 def check_load(self, client_id, msg):
609 def check_load(self, client_id, msg):
608 content = msg['content']
610 content = msg['content']
609 try:
611 try:
610 targets = content['targets']
612 targets = content['targets']
611 targets = self._validate_targets(targets)
613 targets = self._validate_targets(targets)
612 except:
614 except:
613 content = wrap_exception()
615 content = wrap_exception()
614 self.session.send(self.clientele, "controller_error",
616 self.session.send(self.clientele, "controller_error",
615 content=content, ident=client_id)
617 content=content, ident=client_id)
616 return
618 return
617
619
618 content = dict(status='ok')
620 content = dict(status='ok')
619 # loads = {}
621 # loads = {}
620 for t in targets:
622 for t in targets:
621 content[str(t)] = len(self.queues[t])
623 content[str(t)] = len(self.queues[t])
622 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
624 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
623
625
624
626
625 def queue_status(self, client_id, msg):
627 def queue_status(self, client_id, msg):
626 """handle queue_status request"""
628 """handle queue_status request"""
627 content = msg['content']
629 content = msg['content']
628 targets = content['targets']
630 targets = content['targets']
629 try:
631 try:
630 targets = self._validate_targets(targets)
632 targets = self._validate_targets(targets)
631 except:
633 except:
632 content = wrap_exception()
634 content = wrap_exception()
633 self.session.send(self.clientele, "controller_error",
635 self.session.send(self.clientele, "controller_error",
634 content=content, ident=client_id)
636 content=content, ident=client_id)
635 return
637 return
636 verbose = msg.get('verbose', False)
638 verbose = msg.get('verbose', False)
637 content = dict()
639 content = dict()
638 for t in targets:
640 for t in targets:
639 queue = self.queues[t]
641 queue = self.queues[t]
640 completed = self.completed[t]
642 completed = self.completed[t]
641 if not verbose:
643 if not verbose:
642 queue = len(queue)
644 queue = len(queue)
643 completed = len(completed)
645 completed = len(completed)
644 content[str(t)] = {'queue': queue, 'completed': completed }
646 content[str(t)] = {'queue': queue, 'completed': completed }
645 # pending
647 # pending
646 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
648 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
647
649
648 def purge_results(self, client_id, msg):
650 def purge_results(self, client_id, msg):
649 content = msg['content']
651 content = msg['content']
650 msg_ids = content.get('msg_ids', [])
652 msg_ids = content.get('msg_ids', [])
651 reply = dict(status='ok')
653 reply = dict(status='ok')
652 if msg_ids == 'all':
654 if msg_ids == 'all':
653 self.results = {}
655 self.results = {}
654 else:
656 else:
655 for msg_id in msg_ids:
657 for msg_id in msg_ids:
656 if msg_id in self.results:
658 if msg_id in self.results:
657 self.results.pop(msg_id)
659 self.results.pop(msg_id)
658 else:
660 else:
659 if msg_id in self.pending:
661 if msg_id in self.pending:
660 reply = dict(status='error', reason="msg pending: %r"%msg_id)
662 reply = dict(status='error', reason="msg pending: %r"%msg_id)
661 else:
663 else:
662 reply = dict(status='error', reason="No such msg: %r"%msg_id)
664 reply = dict(status='error', reason="No such msg: %r"%msg_id)
663 break
665 break
664 eids = content.get('engine_ids', [])
666 eids = content.get('engine_ids', [])
665 for eid in eids:
667 for eid in eids:
666 if eid not in self.engines:
668 if eid not in self.engines:
667 reply = dict(status='error', reason="No such engine: %i"%eid)
669 reply = dict(status='error', reason="No such engine: %i"%eid)
668 break
670 break
669 msg_ids = self.completed.pop(eid)
671 msg_ids = self.completed.pop(eid)
670 for msg_id in msg_ids:
672 for msg_id in msg_ids:
671 self.results.pop(msg_id)
673 self.results.pop(msg_id)
672
674
673 self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
675 self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
674
676
675 def resubmit_task(self, client_id, msg, buffers):
677 def resubmit_task(self, client_id, msg, buffers):
676 content = msg['content']
678 content = msg['content']
677 header = msg['header']
679 header = msg['header']
678
680
679
681
680 msg_ids = content.get('msg_ids', [])
682 msg_ids = content.get('msg_ids', [])
681 reply = dict(status='ok')
683 reply = dict(status='ok')
682 if msg_ids == 'all':
684 if msg_ids == 'all':
683 self.results = {}
685 self.results = {}
684 else:
686 else:
685 for msg_id in msg_ids:
687 for msg_id in msg_ids:
686 if msg_id in self.results:
688 if msg_id in self.results:
687 self.results.pop(msg_id)
689 self.results.pop(msg_id)
688 else:
690 else:
689 if msg_id in self.pending:
691 if msg_id in self.pending:
690 reply = dict(status='error', reason="msg pending: %r"%msg_id)
692 reply = dict(status='error', reason="msg pending: %r"%msg_id)
691 else:
693 else:
692 reply = dict(status='error', reason="No such msg: %r"%msg_id)
694 reply = dict(status='error', reason="No such msg: %r"%msg_id)
693 break
695 break
694 eids = content.get('engine_ids', [])
696 eids = content.get('engine_ids', [])
695 for eid in eids:
697 for eid in eids:
696 if eid not in self.engines:
698 if eid not in self.engines:
697 reply = dict(status='error', reason="No such engine: %i"%eid)
699 reply = dict(status='error', reason="No such engine: %i"%eid)
698 break
700 break
699 msg_ids = self.completed.pop(eid)
701 msg_ids = self.completed.pop(eid)
700 for msg_id in msg_ids:
702 for msg_id in msg_ids:
701 self.results.pop(msg_id)
703 self.results.pop(msg_id)
702
704
703 self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
705 self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
704
706
705 def get_results(self, client_id, msg):
707 def get_results(self, client_id, msg):
706 """get the result of 1 or more messages"""
708 """get the result of 1 or more messages"""
707 content = msg['content']
709 content = msg['content']
708 msg_ids = set(content['msg_ids'])
710 msg_ids = set(content['msg_ids'])
709 statusonly = content.get('status_only', False)
711 statusonly = content.get('status_only', False)
710 pending = []
712 pending = []
711 completed = []
713 completed = []
712 content = dict(status='ok')
714 content = dict(status='ok')
713 content['pending'] = pending
715 content['pending'] = pending
714 content['completed'] = completed
716 content['completed'] = completed
715 for msg_id in msg_ids:
717 for msg_id in msg_ids:
716 if msg_id in self.pending:
718 if msg_id in self.pending:
717 pending.append(msg_id)
719 pending.append(msg_id)
718 elif msg_id in self.results:
720 elif msg_id in self.results:
719 completed.append(msg_id)
721 completed.append(msg_id)
720 if not statusonly:
722 if not statusonly:
721 content[msg_id] = self.results[msg_id]['content']
723 content[msg_id] = self.results[msg_id]['content']
722 else:
724 else:
723 content = dict(status='error')
725 content = dict(status='error')
724 content['reason'] = 'no such message: '+msg_id
726 content['reason'] = 'no such message: '+msg_id
725 break
727 break
726 self.session.send(self.clientele, "result_reply", content=content,
728 self.session.send(self.clientele, "result_reply", content=content,
727 parent=msg, ident=client_id)
729 parent=msg, ident=client_id)
728
730
729
731
730
732
731 ############ OLD METHODS for Python Relay Controller ###################
733 ############ OLD METHODS for Python Relay Controller ###################
732 def _validate_engine_msg(self, msg):
734 def _validate_engine_msg(self, msg):
733 """validates and unpacks headers of a message. Returns False if invalid,
735 """validates and unpacks headers of a message. Returns False if invalid,
734 (ident, message)"""
736 (ident, message)"""
735 ident = msg[0]
737 ident = msg[0]
736 try:
738 try:
737 msg = self.session.unpack_message(msg[1:], content=False)
739 msg = self.session.unpack_message(msg[1:], content=False)
738 except:
740 except:
739 logger.error("engine.%s::Invalid Message %s"%(ident, msg))
741 logger.error("engine.%s::Invalid Message %s"%(ident, msg))
740 return False
742 return False
741
743
742 try:
744 try:
743 eid = msg.header.username
745 eid = msg.header.username
744 assert self.engines.has_key(eid)
746 assert self.engines.has_key(eid)
745 except:
747 except:
746 logger.error("engine::Invalid Engine ID %s"%(ident))
748 logger.error("engine::Invalid Engine ID %s"%(ident))
747 return False
749 return False
748
750
749 return eid, msg
751 return eid, msg
750
752
751
753
752 #--------------------
754 #--------------------
753 # Entry Point
755 # Entry Point
754 #--------------------
756 #--------------------
755
757 def make_argument_parser():
756 def main():
758 """Make an argument parser"""
757 import time
759 parser = make_base_argument_parser()
758 from multiprocessing import Process
759
760 from zmq.eventloop.zmqstream import ZMQStream
761 from zmq.devices import ProcessMonitoredQueue
762 from zmq.log import handlers
763
764 import streamsession as session
765 import heartmonitor
766 from scheduler import launch_scheduler
767
768 parser = make_argument_parser()
769
760
770 parser.add_argument('--client', type=int, metavar='PORT', default=0,
761 parser.add_argument('--client', type=int, metavar='PORT', default=0,
771 help='set the XREP port for clients [default: random]')
762 help='set the XREP port for clients [default: random]')
772 parser.add_argument('--notice', type=int, metavar='PORT', default=0,
763 parser.add_argument('--notice', type=int, metavar='PORT', default=0,
773 help='set the PUB socket for registration notification [default: random]')
764 help='set the PUB socket for registration notification [default: random]')
774 parser.add_argument('--hb', type=str, metavar='PORTS',
765 parser.add_argument('--hb', type=str, metavar='PORTS',
775 help='set the 2 ports for heartbeats [default: random]')
766 help='set the 2 ports for heartbeats [default: random]')
776 parser.add_argument('--ping', type=int, default=3000,
767 parser.add_argument('--ping', type=int, default=3000,
777 help='set the heartbeat period in ms [default: 3000]')
768 help='set the heartbeat period in ms [default: 3000]')
778 parser.add_argument('--monitor', type=int, metavar='PORT', default=0,
769 parser.add_argument('--monitor', type=int, metavar='PORT', default=0,
779 help='set the SUB port for queue monitoring [default: random]')
770 help='set the SUB port for queue monitoring [default: random]')
780 parser.add_argument('--mux', type=str, metavar='PORTS',
771 parser.add_argument('--mux', type=str, metavar='PORTS',
781 help='set the XREP ports for the MUX queue [default: random]')
772 help='set the XREP ports for the MUX queue [default: random]')
782 parser.add_argument('--task', type=str, metavar='PORTS',
773 parser.add_argument('--task', type=str, metavar='PORTS',
783 help='set the XREP/XREQ ports for the task queue [default: random]')
774 help='set the XREP/XREQ ports for the task queue [default: random]')
784 parser.add_argument('--control', type=str, metavar='PORTS',
775 parser.add_argument('--control', type=str, metavar='PORTS',
785 help='set the XREP ports for the control queue [default: random]')
776 help='set the XREP ports for the control queue [default: random]')
786 parser.add_argument('--scheduler', type=str, default='pure',
777 parser.add_argument('--scheduler', type=str, default='pure',
787 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
778 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
788 help='select the task scheduler [default: pure ZMQ]')
779 help='select the task scheduler [default: pure ZMQ]')
789
780
790 args = parser.parse_args()
781 return parser
791
782
792 if args.url:
783 def main():
793 args.transport,iface = args.url.split('://')
784 import time
794 iface = iface.split(':')
785 from multiprocessing import Process
795 args.ip = iface[0]
786
796 if iface[1]:
787 from zmq.eventloop.zmqstream import ZMQStream
797 args.regport = iface[1]
788 from zmq.devices import ProcessMonitoredQueue
789 from zmq.log import handlers
790
791 import streamsession as session
792 import heartmonitor
793 from scheduler import launch_scheduler
794
795 parser = make_argument_parser()
796
797 args = parser.parse_args()
798 parse_url(args)
798
799
799 iface="%s://%s"%(args.transport,args.ip)+':%i'
800 iface="%s://%s"%(args.transport,args.ip)+':%i'
800
801
801 random_ports = 0
802 random_ports = 0
802 if args.hb:
803 if args.hb:
803 hb = split_ports(args.hb, 2)
804 hb = split_ports(args.hb, 2)
804 else:
805 else:
805 hb = select_random_ports(2)
806 hb = select_random_ports(2)
806 if args.mux:
807 if args.mux:
807 mux = split_ports(args.mux, 2)
808 mux = split_ports(args.mux, 2)
808 else:
809 else:
809 mux = None
810 mux = None
810 random_ports += 2
811 random_ports += 2
811 if args.task:
812 if args.task:
812 task = split_ports(args.task, 2)
813 task = split_ports(args.task, 2)
813 else:
814 else:
814 task = None
815 task = None
815 random_ports += 2
816 random_ports += 2
816 if args.control:
817 if args.control:
817 control = split_ports(args.control, 2)
818 control = split_ports(args.control, 2)
818 else:
819 else:
819 control = None
820 control = None
820 random_ports += 2
821 random_ports += 2
821
822
822 ctx = zmq.Context()
823 ctx = zmq.Context()
823 loop = ioloop.IOLoop.instance()
824 loop = ioloop.IOLoop.instance()
824
825
825 # setup logging
826 # setup logging
826 connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel)
827 connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel)
827
828
828 # Registrar socket
829 # Registrar socket
829 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
830 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
830 regport = bind_port(reg, args.ip, args.regport)
831 regport = bind_port(reg, args.ip, args.regport)
831
832
832 ### Engine connections ###
833 ### Engine connections ###
833
834
834 # heartbeat
835 # heartbeat
835 hpub = ctx.socket(zmq.PUB)
836 hpub = ctx.socket(zmq.PUB)
836 bind_port(hpub, args.ip, hb[0])
837 bind_port(hpub, args.ip, hb[0])
837 hrep = ctx.socket(zmq.XREP)
838 hrep = ctx.socket(zmq.XREP)
838 bind_port(hrep, args.ip, hb[1])
839 bind_port(hrep, args.ip, hb[1])
839
840
840 hmon = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),args.ping)
841 hmon = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),args.ping)
841 hmon.start()
842 hmon.start()
842
843
843 ### Client connections ###
844 ### Client connections ###
844 # Clientele socket
845 # Clientele socket
845 c = ZMQStream(ctx.socket(zmq.XREP), loop)
846 c = ZMQStream(ctx.socket(zmq.XREP), loop)
846 cport = bind_port(c, args.ip, args.client)
847 cport = bind_port(c, args.ip, args.client)
847 # Notifier socket
848 # Notifier socket
848 n = ZMQStream(ctx.socket(zmq.PUB), loop)
849 n = ZMQStream(ctx.socket(zmq.PUB), loop)
849 nport = bind_port(n, args.ip, args.notice)
850 nport = bind_port(n, args.ip, args.notice)
850
851
851 thesession = session.StreamSession(username=args.ident or "controller")
852 thesession = session.StreamSession(username=args.ident or "controller")
852
853
853 ### build and launch the queues ###
854 ### build and launch the queues ###
854
855
855 # monitor socket
856 # monitor socket
856 sub = ctx.socket(zmq.SUB)
857 sub = ctx.socket(zmq.SUB)
857 sub.setsockopt(zmq.SUBSCRIBE, "")
858 sub.setsockopt(zmq.SUBSCRIBE, "")
858 monport = bind_port(sub, args.ip, args.monitor)
859 monport = bind_port(sub, args.ip, args.monitor)
859 sub = ZMQStream(sub, loop)
860 sub = ZMQStream(sub, loop)
860
861
861 ports = select_random_ports(random_ports)
862 ports = select_random_ports(random_ports)
862 # Multiplexer Queue (in a Process)
863 # Multiplexer Queue (in a Process)
863 if not mux:
864 if not mux:
864 mux = (ports.pop(),ports.pop())
865 mux = (ports.pop(),ports.pop())
865 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
866 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
866 q.bind_in(iface%mux[0])
867 q.bind_in(iface%mux[0])
867 q.bind_out(iface%mux[1])
868 q.bind_out(iface%mux[1])
868 q.connect_mon(iface%monport)
869 q.connect_mon(iface%monport)
869 q.daemon=True
870 q.daemon=True
870 q.start()
871 q.start()
871
872
872 # Control Queue (in a Process)
873 # Control Queue (in a Process)
873 if not control:
874 if not control:
874 control = (ports.pop(),ports.pop())
875 control = (ports.pop(),ports.pop())
875 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
876 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
876 q.bind_in(iface%control[0])
877 q.bind_in(iface%control[0])
877 q.bind_out(iface%control[1])
878 q.bind_out(iface%control[1])
878 q.connect_mon(iface%monport)
879 q.connect_mon(iface%monport)
879 q.daemon=True
880 q.daemon=True
880 q.start()
881 q.start()
881
882
882 # Task Queue (in a Process)
883 # Task Queue (in a Process)
883 if not task:
884 if not task:
884 task = (ports.pop(),ports.pop())
885 task = (ports.pop(),ports.pop())
885 if args.scheduler == 'pure':
886 if args.scheduler == 'pure':
886 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
887 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
887 q.bind_in(iface%task[0])
888 q.bind_in(iface%task[0])
888 q.bind_out(iface%task[1])
889 q.bind_out(iface%task[1])
889 q.connect_mon(iface%monport)
890 q.connect_mon(iface%monport)
890 q.daemon=True
891 q.daemon=True
891 q.start()
892 q.start()
892 else:
893 else:
893 sargs = (iface%task[0],iface%task[1],iface%monport,iface%nport,args.scheduler)
894 sargs = (iface%task[0],iface%task[1],iface%monport,iface%nport,args.scheduler)
894 print sargs
895 print (sargs)
895 p = Process(target=launch_scheduler, args=sargs)
896 p = Process(target=launch_scheduler, args=sargs)
896 p.daemon=True
897 p.daemon=True
897 p.start()
898 p.start()
898
899
899 time.sleep(.25)
900 time.sleep(.25)
900
901
901 # build connection dicts
902 # build connection dicts
902 engine_addrs = {
903 engine_addrs = {
903 'control' : iface%control[1],
904 'control' : iface%control[1],
904 'queue': iface%mux[1],
905 'queue': iface%mux[1],
905 'heartbeat': (iface%hb[0], iface%hb[1]),
906 'heartbeat': (iface%hb[0], iface%hb[1]),
906 'task' : iface%task[1],
907 'task' : iface%task[1],
907 'monitor' : iface%monport,
908 'monitor' : iface%monport,
908 }
909 }
909
910
910 client_addrs = {
911 client_addrs = {
911 'control' : iface%control[0],
912 'control' : iface%control[0],
912 'query': iface%cport,
913 'query': iface%cport,
913 'queue': iface%mux[0],
914 'queue': iface%mux[0],
914 'task' : iface%task[0],
915 'task' : iface%task[0],
915 'notification': iface%nport
916 'notification': iface%nport
916 }
917 }
917 con = Controller(loop, thesession, sub, reg, hmon, c, n, None, engine_addrs, client_addrs)
918 con = Controller(loop, thesession, sub, reg, hmon, c, n, None, engine_addrs, client_addrs)
919 dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
918 loop.start()
920 loop.start()
919
921
922 if __name__ == '__main__':
923 main()
@@ -1,151 +1,148
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """A simple engine that talks to a controller over 0MQ.
2 """A simple engine that talks to a controller over 0MQ.
3 it handles registration, etc. and launches a kernel
3 it handles registration, etc. and launches a kernel
4 connected to the Controller's queue(s).
4 connected to the Controller's queue(s).
5 """
5 """
6 from __future__ import print_function
6 import sys
7 import sys
7 import time
8 import time
8 import traceback
9 import traceback
9 import uuid
10 import uuid
10 from pprint import pprint
11 from pprint import pprint
11
12
12 import zmq
13 import zmq
13 from zmq.eventloop import ioloop, zmqstream
14 from zmq.eventloop import ioloop, zmqstream
14
15
15 from streamsession import Message, StreamSession
16 from streamsession import Message, StreamSession
16 from client import Client
17 from client import Client
17 import streamkernel as kernel
18 import streamkernel as kernel
18 import heartmonitor
19 import heartmonitor
19 from entry_point import make_argument_parser, connect_logger
20 from entry_point import make_base_argument_parser, connect_logger, parse_url
20 # import taskthread
21 # import taskthread
21 # from log import logger
22 # from log import logger
22
23
23
24
24 def printer(*msg):
25 def printer(*msg):
25 pprint(msg)
26 pprint(msg)
26
27
27 class Engine(object):
28 class Engine(object):
28 """IPython engine"""
29 """IPython engine"""
29
30
30 id=None
31 id=None
31 context=None
32 context=None
32 loop=None
33 loop=None
33 session=None
34 session=None
34 ident=None
35 ident=None
35 registrar=None
36 registrar=None
36 heart=None
37 heart=None
37 kernel=None
38 kernel=None
38
39
39 def __init__(self, context, loop, session, registrar, client, ident=None, heart_id=None):
40 def __init__(self, context, loop, session, registrar, client, ident=None, heart_id=None):
40 self.context = context
41 self.context = context
41 self.loop = loop
42 self.loop = loop
42 self.session = session
43 self.session = session
43 self.registrar = registrar
44 self.registrar = registrar
44 self.client = client
45 self.client = client
45 self.ident = ident if ident else str(uuid.uuid4())
46 self.ident = ident if ident else str(uuid.uuid4())
46 self.registrar.on_send(printer)
47 self.registrar.on_send(printer)
47
48
48 def register(self):
49 def register(self):
49
50
50 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
51 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
51 self.registrar.on_recv(self.complete_registration)
52 self.registrar.on_recv(self.complete_registration)
52 self.session.send(self.registrar, "registration_request",content=content)
53 self.session.send(self.registrar, "registration_request",content=content)
53
54
54 def complete_registration(self, msg):
55 def complete_registration(self, msg):
55 # print msg
56 # print msg
56 idents,msg = self.session.feed_identities(msg)
57 idents,msg = self.session.feed_identities(msg)
57 msg = Message(self.session.unpack_message(msg))
58 msg = Message(self.session.unpack_message(msg))
58 if msg.content.status == 'ok':
59 if msg.content.status == 'ok':
59 self.session.username = str(msg.content.id)
60 self.session.username = str(msg.content.id)
60 queue_addr = msg.content.queue
61 queue_addr = msg.content.queue
61 if queue_addr:
62 if queue_addr:
62 queue = self.context.socket(zmq.PAIR)
63 queue = self.context.socket(zmq.PAIR)
63 queue.setsockopt(zmq.IDENTITY, self.ident)
64 queue.setsockopt(zmq.IDENTITY, self.ident)
64 queue.connect(str(queue_addr))
65 queue.connect(str(queue_addr))
65 self.queue = zmqstream.ZMQStream(queue, self.loop)
66 self.queue = zmqstream.ZMQStream(queue, self.loop)
66
67
67 control_addr = msg.content.control
68 control_addr = msg.content.control
68 if control_addr:
69 if control_addr:
69 control = self.context.socket(zmq.PAIR)
70 control = self.context.socket(zmq.PAIR)
70 control.setsockopt(zmq.IDENTITY, self.ident)
71 control.setsockopt(zmq.IDENTITY, self.ident)
71 control.connect(str(control_addr))
72 control.connect(str(control_addr))
72 self.control = zmqstream.ZMQStream(control, self.loop)
73 self.control = zmqstream.ZMQStream(control, self.loop)
73
74
74 task_addr = msg.content.task
75 task_addr = msg.content.task
75 print task_addr
76 print (task_addr)
76 if task_addr:
77 if task_addr:
77 # task as stream:
78 # task as stream:
78 task = self.context.socket(zmq.PAIR)
79 task = self.context.socket(zmq.PAIR)
79 task.setsockopt(zmq.IDENTITY, self.ident)
80 task.setsockopt(zmq.IDENTITY, self.ident)
80 task.connect(str(task_addr))
81 task.connect(str(task_addr))
81 self.task_stream = zmqstream.ZMQStream(task, self.loop)
82 self.task_stream = zmqstream.ZMQStream(task, self.loop)
82 # TaskThread:
83 # TaskThread:
83 # mon_addr = msg.content.monitor
84 # mon_addr = msg.content.monitor
84 # task = taskthread.TaskThread(zmq.PAIR, zmq.PUB, self.ident)
85 # task = taskthread.TaskThread(zmq.PAIR, zmq.PUB, self.ident)
85 # task.connect_in(str(task_addr))
86 # task.connect_in(str(task_addr))
86 # task.connect_out(str(mon_addr))
87 # task.connect_out(str(mon_addr))
87 # self.task_stream = taskthread.QueueStream(*task.queues)
88 # self.task_stream = taskthread.QueueStream(*task.queues)
88 # task.start()
89 # task.start()
89
90
90 hbs = msg.content.heartbeat
91 hbs = msg.content.heartbeat
91 self.heart = heartmonitor.Heart(*map(str, hbs), heart_id=self.ident)
92 self.heart = heartmonitor.Heart(*map(str, hbs), heart_id=self.ident)
92 self.heart.start()
93 self.heart.start()
93 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
94 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
94 # placeholder for now:
95 # placeholder for now:
95 pub = self.context.socket(zmq.PUB)
96 pub = self.context.socket(zmq.PUB)
96 pub = zmqstream.ZMQStream(pub, self.loop)
97 pub = zmqstream.ZMQStream(pub, self.loop)
97 # create and start the kernel
98 # create and start the kernel
98 self.kernel = kernel.Kernel(self.session, self.control, self.queue, pub, self.task_stream, self.client)
99 self.kernel = kernel.Kernel(self.session, self.control, self.queue, pub, self.task_stream, self.client)
99 self.kernel.start()
100 self.kernel.start()
100 else:
101 else:
101 # logger.error("Registration Failed: %s"%msg)
102 # logger.error("Registration Failed: %s"%msg)
102 raise Exception("Registration Failed: %s"%msg)
103 raise Exception("Registration Failed: %s"%msg)
103
104
104 # logger.info("engine::completed registration with id %s"%self.session.username)
105 # logger.info("engine::completed registration with id %s"%self.session.username)
105
106
106 print msg
107 print (msg)
107
108
108 def unregister(self):
109 def unregister(self):
109 self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username)))
110 self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username)))
110 time.sleep(1)
111 time.sleep(1)
111 sys.exit(0)
112 sys.exit(0)
112
113
113 def start(self):
114 def start(self):
114 print "registering"
115 print ("registering")
115 self.register()
116 self.register()
116
117
117
118
118 def main():
119 def main():
119
120
120 parser = make_argument_parser()
121 parser = make_base_argument_parser()
121
122
122 args = parser.parse_args()
123 args = parser.parse_args()
123
124
124 if args.url:
125 parse_url(args)
125 args.transport,iface = args.url.split('://')
126 iface = iface.split(':')
127 args.ip = iface[0]
128 if iface[1]:
129 args.regport = iface[1]
130
126
131 iface="%s://%s"%(args.transport,args.ip)+':%i'
127 iface="%s://%s"%(args.transport,args.ip)+':%i'
128
132 loop = ioloop.IOLoop.instance()
129 loop = ioloop.IOLoop.instance()
133 session = StreamSession()
130 session = StreamSession()
134 ctx = zmq.Context()
131 ctx = zmq.Context()
135
132
136 # setup logging
133 # setup logging
137 connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel)
134 connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel)
138
135
139 reg_conn = iface % args.regport
136 reg_conn = iface % args.regport
140 print reg_conn
137 print (reg_conn)
141 print >>sys.__stdout__, "Starting the engine..."
138 print ("Starting the engine...", file=sys.__stderr__)
142
139
143 reg = ctx.socket(zmq.PAIR)
140 reg = ctx.socket(zmq.PAIR)
144 reg.connect(reg_conn)
141 reg.connect(reg_conn)
145 reg = zmqstream.ZMQStream(reg, loop)
142 reg = zmqstream.ZMQStream(reg, loop)
146 client = Client(reg_conn)
143 client = Client(reg_conn)
147
144
148 e = Engine(ctx, loop, session, reg, client, args.ident)
145 e = Engine(ctx, loop, session, reg, client, args.ident)
149 dc = ioloop.DelayedCallback(e.start, 100, loop)
146 dc = ioloop.DelayedCallback(e.start, 100, loop)
150 dc.start()
147 dc.start()
151 loop.start() No newline at end of file
148 loop.start()
@@ -1,74 +1,89
1 """ Defines helper functions for creating kernel entry points and process
1 """ Defines helper functions for creating kernel entry points and process
2 launchers.
2 launchers.
3 """
3 """
4
4
5 # Standard library imports.
5 # Standard library imports.
6 import logging
6 import logging
7 import atexit
7 import atexit
8 import os
8 import os
9 import socket
9 import socket
10 from subprocess import Popen, PIPE
10 from subprocess import Popen, PIPE
11 import sys
11 import sys
12
12
13 # System library imports.
13 # System library imports.
14 import zmq
14 import zmq
15 from zmq.log import handlers
15 from zmq.log import handlers
16 # Local imports.
16 # Local imports.
17 from IPython.core.ultratb import FormattedTB
17 from IPython.core.ultratb import FormattedTB
18 from IPython.external.argparse import ArgumentParser
18 from IPython.external.argparse import ArgumentParser
19 from IPython.zmq.log import logger
19 from IPython.zmq.log import logger
20
20
21 def split_ports(s, n):
21 def split_ports(s, n):
22 """Parser helper for multiport strings"""
22 """Parser helper for multiport strings"""
23 if not s:
23 if not s:
24 return tuple([0]*n)
24 return tuple([0]*n)
25 ports = map(int, s.split(','))
25 ports = map(int, s.split(','))
26 if len(ports) != n:
26 if len(ports) != n:
27 raise ValueError
27 raise ValueError
28 return ports
28 return ports
29
29
30 def select_random_ports(n):
30 def select_random_ports(n):
31 """Selects and return n random ports that are open."""
31 """Selects and return n random ports that are open."""
32 ports = []
32 ports = []
33 for i in xrange(n):
33 for i in xrange(n):
34 sock = socket.socket()
34 sock = socket.socket()
35 sock.bind(('', 0))
35 sock.bind(('', 0))
36 ports.append(sock)
36 ports.append(sock)
37 for i, sock in enumerate(ports):
37 for i, sock in enumerate(ports):
38 port = sock.getsockname()[1]
38 port = sock.getsockname()[1]
39 sock.close()
39 sock.close()
40 ports[i] = port
40 ports[i] = port
41 return ports
41 return ports
42
43 def parse_url(args):
44 if args.url:
45 iface = args.url.split('://',1)
46 if len(args) == 2:
47 args.transport,iface = iface
48 iface = iface.split(':')
49 args.ip = iface[0]
50 if iface[1]:
51 args.regport = iface[1]
52 args.url = "%s://%s:%i"%(args.transport, args.ip,args.regport)
53
42
54
43
55
44 def make_argument_parser():
56 def make_base_argument_parser():
45 """ Creates an ArgumentParser for the generic arguments supported by all
57 """ Creates an ArgumentParser for the generic arguments supported by all
46 ipcluster entry points.
58 ipcluster entry points.
47 """
59 """
48 parser = ArgumentParser()
60 parser = ArgumentParser()
49 parser.add_argument('--ip', type=str, default='127.0.0.1',
61 parser.add_argument('--ip', type=str, default='127.0.0.1',
50 help='set the controller\'s IP address [default: local]')
62 help='set the controller\'s IP address [default: local]')
51 parser.add_argument('--transport', type=str, default='tcp',
63 parser.add_argument('--transport', type=str, default='tcp',
52 help='set the transport to use [default: tcp]')
64 help='set the transport to use [default: tcp]')
53 parser.add_argument('--regport', type=int, metavar='PORT', default=10101,
65 parser.add_argument('--regport', type=int, metavar='PORT', default=10101,
54 help='set the XREP port for registration [default: 10101]')
66 help='set the XREP port for registration [default: 10101]')
55 parser.add_argument('--logport', type=int, metavar='PORT', default=20202,
67 parser.add_argument('--logport', type=int, metavar='PORT', default=20202,
56 help='set the PUB port for logging [default: 10201]')
68 help='set the PUB port for logging [default: 10201]')
57 parser.add_argument('--loglevel', type=int, metavar='LEVEL', default=logging.DEBUG,
69 parser.add_argument('--loglevel', type=int, metavar='LEVEL', default=logging.DEBUG,
58 help='set the log level [default: DEBUG]')
70 help='set the log level [default: DEBUG]')
59 parser.add_argument('--ident', type=str,
71 parser.add_argument('--ident', type=str,
60 help='set the ZMQ identity [default: random]')
72 help='set the ZMQ identity [default: random]')
73 parser.add_argument('--packer', type=str, default='json',
74 choices=['json','pickle'],
75 help='set the message format method [default: json]')
61 parser.add_argument('--url', type=str,
76 parser.add_argument('--url', type=str,
62 help='set transport,ip,regport in one arg, e.g. tcp://127.0.0.1:10101')
77 help='set transport,ip,regport in one arg, e.g. tcp://127.0.0.1:10101')
63
78
64 return parser
79 return parser
65
80
66
81
67 def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG):
82 def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG):
68 lsock = context.socket(zmq.PUB)
83 lsock = context.socket(zmq.PUB)
69 lsock.connect(iface)
84 lsock.connect(iface)
70 handler = handlers.PUBHandler(lsock)
85 handler = handlers.PUBHandler(lsock)
71 handler.setLevel(loglevel)
86 handler.setLevel(loglevel)
72 handler.root_topic = root
87 handler.root_topic = root
73 logger.addHandler(handler)
88 logger.addHandler(handler)
74 No newline at end of file
89
@@ -1,249 +1,250
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
2 # -*- coding: utf-8 -*-
3 """Setup script for IPython.
3 """Setup script for IPython.
4
4
5 Under Posix environments it works like a typical setup.py script.
5 Under Posix environments it works like a typical setup.py script.
6 Under Windows, the command sdist is not supported, since IPython
6 Under Windows, the command sdist is not supported, since IPython
7 requires utilities which are not available under Windows."""
7 requires utilities which are not available under Windows."""
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Copyright (c) 2008-2010, IPython Development Team.
10 # Copyright (c) 2008-2010, IPython Development Team.
11 # Copyright (c) 2001-2007, Fernando Perez <fernando.perez@colorado.edu>
11 # Copyright (c) 2001-2007, Fernando Perez <fernando.perez@colorado.edu>
12 # Copyright (c) 2001, Janko Hauser <jhauser@zscout.de>
12 # Copyright (c) 2001, Janko Hauser <jhauser@zscout.de>
13 # Copyright (c) 2001, Nathaniel Gray <n8gray@caltech.edu>
13 # Copyright (c) 2001, Nathaniel Gray <n8gray@caltech.edu>
14 #
14 #
15 # Distributed under the terms of the Modified BSD License.
15 # Distributed under the terms of the Modified BSD License.
16 #
16 #
17 # The full license is in the file COPYING.txt, distributed with this software.
17 # The full license is in the file COPYING.txt, distributed with this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Minimal Python version sanity check
21 # Minimal Python version sanity check
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 import sys
24 import sys
25
25
26 # This check is also made in IPython/__init__, don't forget to update both when
26 # This check is also made in IPython/__init__, don't forget to update both when
27 # changing Python version requirements.
27 # changing Python version requirements.
28 if sys.version[0:3] < '2.6':
28 if sys.version[0:3] < '2.6':
29 error = """\
29 error = """\
30 ERROR: 'IPython requires Python Version 2.6 or above.'
30 ERROR: 'IPython requires Python Version 2.6 or above.'
31 Exiting."""
31 Exiting."""
32 print >> sys.stderr, error
32 print >> sys.stderr, error
33 sys.exit(1)
33 sys.exit(1)
34
34
35 # At least we're on the python version we need, move on.
35 # At least we're on the python version we need, move on.
36
36
37 #-------------------------------------------------------------------------------
37 #-------------------------------------------------------------------------------
38 # Imports
38 # Imports
39 #-------------------------------------------------------------------------------
39 #-------------------------------------------------------------------------------
40
40
41 # Stdlib imports
41 # Stdlib imports
42 import os
42 import os
43 import shutil
43 import shutil
44
44
45 from glob import glob
45 from glob import glob
46
46
47 # BEFORE importing distutils, remove MANIFEST. distutils doesn't properly
47 # BEFORE importing distutils, remove MANIFEST. distutils doesn't properly
48 # update it when the contents of directories change.
48 # update it when the contents of directories change.
49 if os.path.exists('MANIFEST'): os.remove('MANIFEST')
49 if os.path.exists('MANIFEST'): os.remove('MANIFEST')
50
50
51 from distutils.core import setup
51 from distutils.core import setup
52
52
53 # Our own imports
53 # Our own imports
54 from IPython.utils.path import target_update
54 from IPython.utils.path import target_update
55
55
56 from setupbase import (
56 from setupbase import (
57 setup_args,
57 setup_args,
58 find_packages,
58 find_packages,
59 find_package_data,
59 find_package_data,
60 find_scripts,
60 find_scripts,
61 find_data_files,
61 find_data_files,
62 check_for_dependencies,
62 check_for_dependencies,
63 record_commit_info,
63 record_commit_info,
64 )
64 )
65
65
66 isfile = os.path.isfile
66 isfile = os.path.isfile
67 pjoin = os.path.join
67 pjoin = os.path.join
68
68
69 #-----------------------------------------------------------------------------
69 #-----------------------------------------------------------------------------
70 # Function definitions
70 # Function definitions
71 #-----------------------------------------------------------------------------
71 #-----------------------------------------------------------------------------
72
72
73 def cleanup():
73 def cleanup():
74 """Clean up the junk left around by the build process"""
74 """Clean up the junk left around by the build process"""
75 if "develop" not in sys.argv:
75 if "develop" not in sys.argv:
76 try:
76 try:
77 shutil.rmtree('ipython.egg-info')
77 shutil.rmtree('ipython.egg-info')
78 except:
78 except:
79 try:
79 try:
80 os.unlink('ipython.egg-info')
80 os.unlink('ipython.egg-info')
81 except:
81 except:
82 pass
82 pass
83
83
84 #-------------------------------------------------------------------------------
84 #-------------------------------------------------------------------------------
85 # Handle OS specific things
85 # Handle OS specific things
86 #-------------------------------------------------------------------------------
86 #-------------------------------------------------------------------------------
87
87
88 if os.name == 'posix':
88 if os.name == 'posix':
89 os_name = 'posix'
89 os_name = 'posix'
90 elif os.name in ['nt','dos']:
90 elif os.name in ['nt','dos']:
91 os_name = 'windows'
91 os_name = 'windows'
92 else:
92 else:
93 print 'Unsupported operating system:',os.name
93 print 'Unsupported operating system:',os.name
94 sys.exit(1)
94 sys.exit(1)
95
95
96 # Under Windows, 'sdist' has not been supported. Now that the docs build with
96 # Under Windows, 'sdist' has not been supported. Now that the docs build with
97 # Sphinx it might work, but let's not turn it on until someone confirms that it
97 # Sphinx it might work, but let's not turn it on until someone confirms that it
98 # actually works.
98 # actually works.
99 if os_name == 'windows' and 'sdist' in sys.argv:
99 if os_name == 'windows' and 'sdist' in sys.argv:
100 print 'The sdist command is not available under Windows. Exiting.'
100 print 'The sdist command is not available under Windows. Exiting.'
101 sys.exit(1)
101 sys.exit(1)
102
102
103 #-------------------------------------------------------------------------------
103 #-------------------------------------------------------------------------------
104 # Things related to the IPython documentation
104 # Things related to the IPython documentation
105 #-------------------------------------------------------------------------------
105 #-------------------------------------------------------------------------------
106
106
107 # update the manuals when building a source dist
107 # update the manuals when building a source dist
108 if len(sys.argv) >= 2 and sys.argv[1] in ('sdist','bdist_rpm'):
108 if len(sys.argv) >= 2 and sys.argv[1] in ('sdist','bdist_rpm'):
109 import textwrap
109 import textwrap
110
110
111 # List of things to be updated. Each entry is a triplet of args for
111 # List of things to be updated. Each entry is a triplet of args for
112 # target_update()
112 # target_update()
113 to_update = [
113 to_update = [
114 # FIXME - Disabled for now: we need to redo an automatic way
114 # FIXME - Disabled for now: we need to redo an automatic way
115 # of generating the magic info inside the rst.
115 # of generating the magic info inside the rst.
116 #('docs/magic.tex',
116 #('docs/magic.tex',
117 #['IPython/Magic.py'],
117 #['IPython/Magic.py'],
118 #"cd doc && ./update_magic.sh" ),
118 #"cd doc && ./update_magic.sh" ),
119
119
120 ('docs/man/ipcluster.1.gz',
120 ('docs/man/ipcluster.1.gz',
121 ['docs/man/ipcluster.1'],
121 ['docs/man/ipcluster.1'],
122 'cd docs/man && gzip -9c ipcluster.1 > ipcluster.1.gz'),
122 'cd docs/man && gzip -9c ipcluster.1 > ipcluster.1.gz'),
123
123
124 ('docs/man/ipcontroller.1.gz',
124 ('docs/man/ipcontroller.1.gz',
125 ['docs/man/ipcontroller.1'],
125 ['docs/man/ipcontroller.1'],
126 'cd docs/man && gzip -9c ipcontroller.1 > ipcontroller.1.gz'),
126 'cd docs/man && gzip -9c ipcontroller.1 > ipcontroller.1.gz'),
127
127
128 ('docs/man/ipengine.1.gz',
128 ('docs/man/ipengine.1.gz',
129 ['docs/man/ipengine.1'],
129 ['docs/man/ipengine.1'],
130 'cd docs/man && gzip -9c ipengine.1 > ipengine.1.gz'),
130 'cd docs/man && gzip -9c ipengine.1 > ipengine.1.gz'),
131
131
132 ('docs/man/ipython.1.gz',
132 ('docs/man/ipython.1.gz',
133 ['docs/man/ipython.1'],
133 ['docs/man/ipython.1'],
134 'cd docs/man && gzip -9c ipython.1 > ipython.1.gz'),
134 'cd docs/man && gzip -9c ipython.1 > ipython.1.gz'),
135
135
136 ('docs/man/ipython-wx.1.gz',
136 ('docs/man/ipython-wx.1.gz',
137 ['docs/man/ipython-wx.1'],
137 ['docs/man/ipython-wx.1'],
138 'cd docs/man && gzip -9c ipython-wx.1 > ipython-wx.1.gz'),
138 'cd docs/man && gzip -9c ipython-wx.1 > ipython-wx.1.gz'),
139
139
140 ('docs/man/ipythonx.1.gz',
140 ('docs/man/ipythonx.1.gz',
141 ['docs/man/ipythonx.1'],
141 ['docs/man/ipythonx.1'],
142 'cd docs/man && gzip -9c ipythonx.1 > ipythonx.1.gz'),
142 'cd docs/man && gzip -9c ipythonx.1 > ipythonx.1.gz'),
143
143
144 ('docs/man/irunner.1.gz',
144 ('docs/man/irunner.1.gz',
145 ['docs/man/irunner.1'],
145 ['docs/man/irunner.1'],
146 'cd docs/man && gzip -9c irunner.1 > irunner.1.gz'),
146 'cd docs/man && gzip -9c irunner.1 > irunner.1.gz'),
147
147
148 ('docs/man/pycolor.1.gz',
148 ('docs/man/pycolor.1.gz',
149 ['docs/man/pycolor.1'],
149 ['docs/man/pycolor.1'],
150 'cd docs/man && gzip -9c pycolor.1 > pycolor.1.gz'),
150 'cd docs/man && gzip -9c pycolor.1 > pycolor.1.gz'),
151 ]
151 ]
152
152
153 # Only build the docs if sphinx is present
153 # Only build the docs if sphinx is present
154 try:
154 try:
155 import sphinx
155 import sphinx
156 except ImportError:
156 except ImportError:
157 pass
157 pass
158 else:
158 else:
159 # The Makefile calls the do_sphinx scripts to build html and pdf, so
159 # The Makefile calls the do_sphinx scripts to build html and pdf, so
160 # just one target is enough to cover all manual generation
160 # just one target is enough to cover all manual generation
161
161
162 # First, compute all the dependencies that can force us to rebuild the
162 # First, compute all the dependencies that can force us to rebuild the
163 # docs. Start with the main release file that contains metadata
163 # docs. Start with the main release file that contains metadata
164 docdeps = ['IPython/core/release.py']
164 docdeps = ['IPython/core/release.py']
165 # Inculde all the reST sources
165 # Inculde all the reST sources
166 pjoin = os.path.join
166 pjoin = os.path.join
167 for dirpath,dirnames,filenames in os.walk('docs/source'):
167 for dirpath,dirnames,filenames in os.walk('docs/source'):
168 if dirpath in ['_static','_templates']:
168 if dirpath in ['_static','_templates']:
169 continue
169 continue
170 docdeps += [ pjoin(dirpath,f) for f in filenames
170 docdeps += [ pjoin(dirpath,f) for f in filenames
171 if f.endswith('.txt') ]
171 if f.endswith('.txt') ]
172 # and the examples
172 # and the examples
173 for dirpath,dirnames,filenames in os.walk('docs/example'):
173 for dirpath,dirnames,filenames in os.walk('docs/example'):
174 docdeps += [ pjoin(dirpath,f) for f in filenames
174 docdeps += [ pjoin(dirpath,f) for f in filenames
175 if not f.endswith('~') ]
175 if not f.endswith('~') ]
176 # then, make them all dependencies for the main PDF (the html will get
176 # then, make them all dependencies for the main PDF (the html will get
177 # auto-generated as well).
177 # auto-generated as well).
178 to_update.append(
178 to_update.append(
179 ('docs/dist/ipython.pdf',
179 ('docs/dist/ipython.pdf',
180 docdeps,
180 docdeps,
181 "cd docs && make dist")
181 "cd docs && make dist")
182 )
182 )
183
183
184 [ target_update(*t) for t in to_update ]
184 [ target_update(*t) for t in to_update ]
185
185
186 #---------------------------------------------------------------------------
186 #---------------------------------------------------------------------------
187 # Find all the packages, package data, scripts and data_files
187 # Find all the packages, package data, scripts and data_files
188 #---------------------------------------------------------------------------
188 #---------------------------------------------------------------------------
189
189
190 packages = find_packages()
190 packages = find_packages()
191 package_data = find_package_data()
191 package_data = find_package_data()
192 scripts = find_scripts()
192 scripts = find_scripts()
193 data_files = find_data_files()
193 data_files = find_data_files()
194
194
195 #---------------------------------------------------------------------------
195 #---------------------------------------------------------------------------
196 # Handle dependencies and setuptools specific things
196 # Handle dependencies and setuptools specific things
197 #---------------------------------------------------------------------------
197 #---------------------------------------------------------------------------
198
198
199 # For some commands, use setuptools. Note that we do NOT list install here!
199 # For some commands, use setuptools. Note that we do NOT list install here!
200 # If you want a setuptools-enhanced install, just run 'setupegg.py install'
200 # If you want a setuptools-enhanced install, just run 'setupegg.py install'
201 if len(set(('develop', 'sdist', 'release', 'bdist_egg', 'bdist_rpm',
201 if len(set(('develop', 'sdist', 'release', 'bdist_egg', 'bdist_rpm',
202 'bdist', 'bdist_dumb', 'bdist_wininst', 'install_egg_info',
202 'bdist', 'bdist_dumb', 'bdist_wininst', 'install_egg_info',
203 'build_sphinx', 'egg_info', 'easy_install', 'upload',
203 'build_sphinx', 'egg_info', 'easy_install', 'upload',
204 )).intersection(sys.argv)) > 0:
204 )).intersection(sys.argv)) > 0:
205 import setuptools
205 import setuptools
206
206
207 # This dict is used for passing extra arguments that are setuptools
207 # This dict is used for passing extra arguments that are setuptools
208 # specific to setup
208 # specific to setup
209 setuptools_extra_args = {}
209 setuptools_extra_args = {}
210
210
211 if 'setuptools' in sys.modules:
211 if 'setuptools' in sys.modules:
212 setuptools_extra_args['zip_safe'] = False
212 setuptools_extra_args['zip_safe'] = False
213 setuptools_extra_args['entry_points'] = {
213 setuptools_extra_args['entry_points'] = {
214 'console_scripts': [
214 'console_scripts': [
215 'ipython = IPython.frontend.terminal.ipapp:launch_new_instance',
215 'ipython = IPython.frontend.terminal.ipapp:launch_new_instance',
216 'ipython-qtconsole = IPython.frontend.qt.console.ipythonqt:main',
216 'ipython-qtconsole = IPython.frontend.qt.console.ipythonqt:main',
217 'pycolor = IPython.utils.PyColorize:main',
217 'pycolor = IPython.utils.PyColorize:main',
218 'ipcontrollerz = IPython.zmq.parallel.controller:main',
218 'ipcontrollerz = IPython.zmq.parallel.controller:main',
219 'ipenginez = IPython.zmq.parallel.engine:main',
219 'ipenginez = IPython.zmq.parallel.engine:main',
220 'ipclusterz = IPython.zmq.parallel.ipcluster:main',
220 'iptest = IPython.testing.iptest:main',
221 'iptest = IPython.testing.iptest:main',
221 'irunner = IPython.lib.irunner:main'
222 'irunner = IPython.lib.irunner:main'
222 ]
223 ]
223 }
224 }
224 setup_args['extras_require'] = dict(
225 setup_args['extras_require'] = dict(
225 doc='Sphinx>=0.3',
226 doc='Sphinx>=0.3',
226 test='nose>=0.10.1',
227 test='nose>=0.10.1',
227 security='pyOpenSSL>=0.6'
228 security='pyOpenSSL>=0.6'
228 )
229 )
229 else:
230 else:
230 # If we are running without setuptools, call this function which will
231 # If we are running without setuptools, call this function which will
231 # check for dependencies an inform the user what is needed. This is
232 # check for dependencies an inform the user what is needed. This is
232 # just to make life easy for users.
233 # just to make life easy for users.
233 check_for_dependencies()
234 check_for_dependencies()
234
235
235 #---------------------------------------------------------------------------
236 #---------------------------------------------------------------------------
236 # Do the actual setup now
237 # Do the actual setup now
237 #---------------------------------------------------------------------------
238 #---------------------------------------------------------------------------
238
239
239 setup_args['cmdclass'] = {'build_py': record_commit_info('IPython')}
240 setup_args['cmdclass'] = {'build_py': record_commit_info('IPython')}
240 setup_args['packages'] = packages
241 setup_args['packages'] = packages
241 setup_args['package_data'] = package_data
242 setup_args['package_data'] = package_data
242 setup_args['scripts'] = scripts
243 setup_args['scripts'] = scripts
243 setup_args['data_files'] = data_files
244 setup_args['data_files'] = data_files
244 setup_args.update(setuptools_extra_args)
245 setup_args.update(setuptools_extra_args)
245
246
246
247
247 if __name__ == '__main__':
248 if __name__ == '__main__':
248 setup(**setup_args)
249 setup(**setup_args)
249 cleanup()
250 cleanup()
General Comments 0
You need to be logged in to leave comments. Login now