##// END OF EJS Templates
added simple cluster entry point
MinRK -
Show More
@@ -0,0 +1,84
1 #!/usr/bin/env python
2 from __future__ import print_function
3 import sys,os
4 from subprocess import Popen, PIPE
5
6 from entry_point import parse_url
7 from controller import make_argument_parser
8
9 def _filter_arg(flag, args):
10 filtered = []
11 if flag in args:
12 filtered.append(flag)
13 idx = args.index(flag)
14 if len(args) > idx+1:
15 if not args[idx+1].startswith('-'):
16 filtered.append(args[idx+1])
17 return filtered
18
19 def filter_args(flags, args=sys.argv[1:]):
20 filtered = []
21 for flag in flags:
22 if isinstance(flag, (list,tuple)):
23 for f in flag:
24 filtered.extend(_filter_arg(f, args))
25 else:
26 filtered.extend(_filter_arg(flag, args))
27 return filtered
28
29 def _strip_arg(flag, args):
30 while flag in args:
31 idx = args.index(flag)
32 args.pop(idx)
33 if len(args) > idx:
34 if not args[idx].startswith('-'):
35 args.pop(idx)
36
37 def strip_args(flags, args=sys.argv[1:]):
38 args = list(args)
39 for flag in flags:
40 if isinstance(flag, (list,tuple)):
41 for f in flag:
42 _strip_arg(f, args)
43 else:
44 _strip_arg(flag, args)
45 return args
46
47
48 def launch_process(mod, args):
49 """Launch a controller or engine in a subprocess."""
50 code = "from IPython.zmq.parallel.%s import main;main()"%mod
51 arguments = [ sys.executable, '-c', code ] + args
52 blackholew = file(os.devnull, 'w')
53 blackholer = file(os.devnull, 'r')
54
55 proc = Popen(arguments, stdin=blackholer, stdout=blackholew, stderr=blackholew)
56 return proc
57
58 def main():
59 parser = make_argument_parser()
60 parser.add_argument('--n', '-n', type=int, default=1,
61 help="The number of engines to start.")
62 args = parser.parse_args()
63 parse_url(args)
64
65 controller_args = strip_args([('--n','-n')])
66 engine_args = filter_args(['--url', '--regport', '--logport', '--ip',
67 '--transport','--loglevel','--packer'])+['--ident']
68
69 controller = launch_process('controller', controller_args)
70 print("Launched Controller at %s"%args.url)
71 engines = [ launch_process('engine', engine_args+['engine-%i'%i]) for i in range(args.n) ]
72 print("%i Engines started"%args.n)
73
74 def wait_quietly(p):
75 try:
76 p.wait()
77 except KeyboardInterrupt:
78 pass
79 wait_quietly(controller)
80 map(wait_quietly, engines)
81 print ("Done")
82
83 if __name__ == '__main__':
84 main() No newline at end of file
@@ -1,919 +1,923
1 1 #!/usr/bin/env python
2 2 """The IPython Controller with 0MQ
3 3 This is the master object that handles connections from engines, clients, and
4 4 """
5 5 #-----------------------------------------------------------------------------
6 6 # Copyright (C) 2010 The IPython Development Team
7 7 #
8 8 # Distributed under the terms of the BSD License. The full license is in
9 9 # the file COPYING, distributed as part of this software.
10 10 #-----------------------------------------------------------------------------
11 11
12 12 #-----------------------------------------------------------------------------
13 13 # Imports
14 14 #-----------------------------------------------------------------------------
15 from __future__ import print_function
16
15 17 from datetime import datetime
16 18 import logging
17 19
18 20 import zmq
19 21 from zmq.eventloop import zmqstream, ioloop
20 22 import uuid
21 23
22 24 # internal:
23 25 from IPython.zmq.log import logger # a Logger object
24 26 from IPython.zmq.entry_point import bind_port
25 27
26 28 from streamsession import Message, wrap_exception
27 from entry_point import (make_argument_parser, select_random_ports, split_ports,
28 connect_logger)
29 from entry_point import (make_base_argument_parser, select_random_ports, split_ports,
30 connect_logger, parse_url)
29 31 # from messages import json # use the same import switches
30 32
31 33 #-----------------------------------------------------------------------------
32 34 # Code
33 35 #-----------------------------------------------------------------------------
34 36
35 37 class ReverseDict(dict):
36 38 """simple double-keyed subset of dict methods."""
37 39
38 40 def __init__(self, *args, **kwargs):
39 41 dict.__init__(self, *args, **kwargs)
40 42 self.reverse = dict()
41 43 for key, value in self.iteritems():
42 44 self.reverse[value] = key
43 45
44 46 def __getitem__(self, key):
45 47 try:
46 48 return dict.__getitem__(self, key)
47 49 except KeyError:
48 50 return self.reverse[key]
49 51
50 52 def __setitem__(self, key, value):
51 53 if key in self.reverse:
52 54 raise KeyError("Can't have key %r on both sides!"%key)
53 55 dict.__setitem__(self, key, value)
54 56 self.reverse[value] = key
55 57
56 58 def pop(self, key):
57 59 value = dict.pop(self, key)
58 60 self.d1.pop(value)
59 61 return value
60 62
61 63
62 64 class EngineConnector(object):
63 65 """A simple object for accessing the various zmq connections of an object.
64 66 Attributes are:
65 67 id (int): engine ID
66 68 uuid (str): uuid (unused?)
67 69 queue (str): identity of queue's XREQ socket
68 70 registration (str): identity of registration XREQ socket
69 71 heartbeat (str): identity of heartbeat XREQ socket
70 72 """
71 73 id=0
72 74 queue=None
73 75 control=None
74 76 registration=None
75 77 heartbeat=None
76 78 pending=None
77 79
78 80 def __init__(self, id, queue, registration, control, heartbeat=None):
79 81 logger.info("engine::Engine Connected: %i"%id)
80 82 self.id = id
81 83 self.queue = queue
82 84 self.registration = registration
83 85 self.control = control
84 86 self.heartbeat = heartbeat
85 87
86 88 class Controller(object):
87 89 """The IPython Controller with 0MQ connections
88 90
89 91 Parameters
90 92 ==========
91 93 loop: zmq IOLoop instance
92 94 session: StreamSession object
93 95 <removed> context: zmq context for creating new connections (?)
94 96 registrar: ZMQStream for engine registration requests (XREP)
95 97 clientele: ZMQStream for client connections (XREP)
96 98 not used for jobs, only query/control commands
97 99 queue: ZMQStream for monitoring the command queue (SUB)
98 100 heartbeat: HeartMonitor object checking the pulse of the engines
99 101 db_stream: connection to db for out of memory logging of commands
100 102 NotImplemented
101 103 queue_addr: zmq connection address of the XREP socket for the queue
102 104 hb_addr: zmq connection address of the PUB socket for heartbeats
103 105 task_addr: zmq connection address of the XREQ socket for task queue
104 106 """
105 107 # internal data structures:
106 108 ids=None # engine IDs
107 109 keytable=None
108 110 engines=None
109 111 clients=None
110 112 hearts=None
111 113 pending=None
112 114 results=None
113 115 tasks=None
114 116 completed=None
115 117 mia=None
116 118 incoming_registrations=None
117 119 registration_timeout=None
118 120
119 121 #objects from constructor:
120 122 loop=None
121 123 registrar=None
122 124 clientelle=None
123 125 queue=None
124 126 heartbeat=None
125 127 notifier=None
126 128 db=None
127 129 client_addr=None
128 130 engine_addrs=None
129 131
130 132
131 133 def __init__(self, loop, session, queue, registrar, heartbeat, clientele, notifier, db, engine_addrs, client_addrs):
132 134 """
133 135 # universal:
134 136 loop: IOLoop for creating future connections
135 137 session: streamsession for sending serialized data
136 138 # engine:
137 139 queue: ZMQStream for monitoring queue messages
138 140 registrar: ZMQStream for engine registration
139 141 heartbeat: HeartMonitor object for tracking engines
140 142 # client:
141 143 clientele: ZMQStream for client connections
142 144 # extra:
143 145 db: ZMQStream for db connection (NotImplemented)
144 146 engine_addrs: zmq address/protocol dict for engine connections
145 147 client_addrs: zmq address/protocol dict for client connections
146 148 """
147 149 self.ids = set()
148 150 self.keytable={}
149 151 self.incoming_registrations={}
150 152 self.engines = {}
151 153 self.by_ident = {}
152 154 self.clients = {}
153 155 self.hearts = {}
154 156 self.mia = set()
155 157
156 158 # self.sockets = {}
157 159 self.loop = loop
158 160 self.session = session
159 161 self.registrar = registrar
160 162 self.clientele = clientele
161 163 self.queue = queue
162 164 self.heartbeat = heartbeat
163 165 self.notifier = notifier
164 166 self.db = db
165 167
166 168 self.client_addrs = client_addrs
167 169 assert isinstance(client_addrs['queue'], str)
168 170 # self.hb_addrs = hb_addrs
169 171 self.engine_addrs = engine_addrs
170 172 assert isinstance(engine_addrs['queue'], str)
171 173 assert len(engine_addrs['heartbeat']) == 2
172 174
173 175
174 176 # register our callbacks
175 177 self.registrar.on_recv(self.dispatch_register_request)
176 178 self.clientele.on_recv(self.dispatch_client_msg)
177 179 self.queue.on_recv(self.dispatch_queue_traffic)
178 180
179 181 if heartbeat is not None:
180 182 heartbeat.add_heart_failure_handler(self.handle_heart_failure)
181 183 heartbeat.add_new_heart_handler(self.handle_new_heart)
182 184
183 185 if self.db is not None:
184 186 self.db.on_recv(self.dispatch_db)
185 187
186 188 self.client_handlers = {'queue_request': self.queue_status,
187 189 'result_request': self.get_results,
188 190 'purge_request': self.purge_results,
189 191 'resubmit_request': self.resubmit_task,
190 192 }
191 193
192 194 self.registrar_handlers = {'registration_request' : self.register_engine,
193 195 'unregistration_request' : self.unregister_engine,
194 196 'connection_request': self.connection_request,
195 197
196 198 }
197 199 #
198 200 # this is the stuff that will move to DB:
199 201 self.results = {} # completed results
200 202 self.pending = {} # pending messages, keyed by msg_id
201 203 self.queues = {} # pending msg_ids keyed by engine_id
202 204 self.tasks = {} # pending msg_ids submitted as tasks, keyed by client_id
203 205 self.completed = {} # completed msg_ids keyed by engine_id
204 206 self.registration_timeout = max(5000, 2*self.heartbeat.period)
205 207
206 208 logger.info("controller::created controller")
207 209
208 210 def _new_id(self):
209 211 """gemerate a new ID"""
210 212 newid = 0
211 213 incoming = [id[0] for id in self.incoming_registrations.itervalues()]
212 214 # print newid, self.ids, self.incoming_registrations
213 215 while newid in self.ids or newid in incoming:
214 216 newid += 1
215 217 return newid
216 218
217 219 #-----------------------------------------------------------------------------
218 220 # message validation
219 221 #-----------------------------------------------------------------------------
220 222
221 223 def _validate_targets(self, targets):
222 224 """turn any valid targets argument into a list of integer ids"""
223 225 if targets is None:
224 226 # default to all
225 227 targets = self.ids
226 228
227 229 if isinstance(targets, (int,str,unicode)):
228 230 # only one target specified
229 231 targets = [targets]
230 232 _targets = []
231 233 for t in targets:
232 234 # map raw identities to ids
233 235 if isinstance(t, (str,unicode)):
234 236 t = self.by_ident.get(t, t)
235 237 _targets.append(t)
236 238 targets = _targets
237 239 bad_targets = [ t for t in targets if t not in self.ids ]
238 240 if bad_targets:
239 241 raise IndexError("No Such Engine: %r"%bad_targets)
240 242 if not targets:
241 243 raise IndexError("No Engines Registered")
242 244 return targets
243 245
244 246 def _validate_client_msg(self, msg):
245 247 """validates and unpacks headers of a message. Returns False if invalid,
246 248 (ident, header, parent, content)"""
247 249 client_id = msg[0]
248 250 try:
249 251 msg = self.session.unpack_message(msg[1:], content=True)
250 252 except:
251 253 logger.error("client::Invalid Message %s"%msg)
252 254 return False
253 255
254 256 msg_type = msg.get('msg_type', None)
255 257 if msg_type is None:
256 258 return False
257 259 header = msg.get('header')
258 260 # session doesn't handle split content for now:
259 261 return client_id, msg
260 262
261 263
262 264 #-----------------------------------------------------------------------------
263 265 # dispatch methods (1 per stream)
264 266 #-----------------------------------------------------------------------------
265 267
266 268 def dispatch_register_request(self, msg):
267 269 """"""
268 270 logger.debug("registration::dispatch_register_request(%s)"%msg)
269 271 idents,msg = self.session.feed_identities(msg)
270 print idents,msg, len(msg)
272 print (idents,msg, len(msg))
271 273 try:
272 274 msg = self.session.unpack_message(msg,content=True)
273 275 except Exception, e:
274 276 logger.error("registration::got bad registration message: %s"%msg)
275 277 raise e
276 278 return
277 279
278 280 msg_type = msg['msg_type']
279 281 content = msg['content']
280 282
281 283 handler = self.registrar_handlers.get(msg_type, None)
282 284 if handler is None:
283 285 logger.error("registration::got bad registration message: %s"%msg)
284 286 else:
285 287 handler(idents, msg)
286 288
287 289 def dispatch_queue_traffic(self, msg):
288 290 """all ME and Task queue messages come through here"""
289 291 logger.debug("queue traffic: %s"%msg[:2])
290 292 switch = msg[0]
291 293 idents, msg = self.session.feed_identities(msg[1:])
292 294 if switch == 'in':
293 295 self.save_queue_request(idents, msg)
294 296 elif switch == 'out':
295 297 self.save_queue_result(idents, msg)
296 298 elif switch == 'intask':
297 299 self.save_task_request(idents, msg)
298 300 elif switch == 'outtask':
299 301 self.save_task_result(idents, msg)
300 302 elif switch == 'tracktask':
301 303 self.save_task_destination(idents, msg)
302 304 elif switch in ('incontrol', 'outcontrol'):
303 305 pass
304 306 else:
305 307 logger.error("Invalid message topic: %s"%switch)
306 308
307 309
308 310 def dispatch_client_msg(self, msg):
309 311 """Route messages from clients"""
310 312 idents, msg = self.session.feed_identities(msg)
311 313 client_id = idents[0]
312 314 try:
313 315 msg = self.session.unpack_message(msg, content=True)
314 316 except:
315 317 content = wrap_exception()
316 318 logger.error("Bad Client Message: %s"%msg)
317 319 self.session.send(self.clientele, "controller_error", ident=client_id,
318 320 content=content)
319 321 return
320 322
321 323 # print client_id, header, parent, content
322 324 #switch on message type:
323 325 msg_type = msg['msg_type']
324 326 logger.info("client:: client %s requested %s"%(client_id, msg_type))
325 327 handler = self.client_handlers.get(msg_type, None)
326 328 try:
327 329 assert handler is not None, "Bad Message Type: %s"%msg_type
328 330 except:
329 331 content = wrap_exception()
330 332 logger.error("Bad Message Type: %s"%msg_type)
331 333 self.session.send(self.clientele, "controller_error", ident=client_id,
332 334 content=content)
333 335 return
334 336 else:
335 337 handler(client_id, msg)
336 338
337 339 def dispatch_db(self, msg):
338 340 """"""
339 341 raise NotImplementedError
340 342
341 343 #---------------------------------------------------------------------------
342 344 # handler methods (1 per event)
343 345 #---------------------------------------------------------------------------
344 346
345 347 #----------------------- Heartbeat --------------------------------------
346 348
347 349 def handle_new_heart(self, heart):
348 350 """handler to attach to heartbeater.
349 351 Called when a new heart starts to beat.
350 352 Triggers completion of registration."""
351 353 logger.debug("heartbeat::handle_new_heart(%r)"%heart)
352 354 if heart not in self.incoming_registrations:
353 355 logger.info("heartbeat::ignoring new heart: %r"%heart)
354 356 else:
355 357 self.finish_registration(heart)
356 358
357 359
358 360 def handle_heart_failure(self, heart):
359 361 """handler to attach to heartbeater.
360 362 called when a previously registered heart fails to respond to beat request.
361 363 triggers unregistration"""
362 364 logger.debug("heartbeat::handle_heart_failure(%r)"%heart)
363 365 eid = self.hearts.get(heart, None)
364 366 queue = self.engines[eid].queue
365 367 if eid is None:
366 368 logger.info("heartbeat::ignoring heart failure %r"%heart)
367 369 else:
368 370 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
369 371
370 372 #----------------------- MUX Queue Traffic ------------------------------
371 373
372 374 def save_queue_request(self, idents, msg):
373 375 queue_id, client_id = idents[:2]
374 376
375 377 try:
376 378 msg = self.session.unpack_message(msg, content=False)
377 379 except:
378 380 logger.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg))
379 381 return
380 382
381 383 eid = self.by_ident.get(queue_id, None)
382 384 if eid is None:
383 385 logger.error("queue::target %r not registered"%queue_id)
384 386 logger.debug("queue:: valid are: %s"%(self.by_ident.keys()))
385 387 return
386 388
387 389 header = msg['header']
388 390 msg_id = header['msg_id']
389 391 info = dict(submit=datetime.now(),
390 392 received=None,
391 393 engine=(eid, queue_id))
392 394 self.pending[msg_id] = ( msg, info )
393 395 self.queues[eid][0].append(msg_id)
394 396
395 397 def save_queue_result(self, idents, msg):
396 398 client_id, queue_id = idents[:2]
397 399
398 400 try:
399 401 msg = self.session.unpack_message(msg, content=False)
400 402 except:
401 403 logger.error("queue::engine %r sent invalid message to %r: %s"%(
402 404 queue_id,client_id, msg))
403 405 return
404 406
405 407 eid = self.by_ident.get(queue_id, None)
406 408 if eid is None:
407 409 logger.error("queue::unknown engine %r is sending a reply: "%queue_id)
408 410 logger.debug("queue:: %s"%msg[2:])
409 411 return
410 412
411 413 parent = msg['parent_header']
412 414 if not parent:
413 415 return
414 416 msg_id = parent['msg_id']
415 417 self.results[msg_id] = msg
416 418 if msg_id in self.pending:
417 419 self.pending.pop(msg_id)
418 420 self.queues[eid][0].remove(msg_id)
419 421 self.completed[eid].append(msg_id)
420 422 else:
421 423 logger.debug("queue:: unknown msg finished %s"%msg_id)
422 424
423 425 #--------------------- Task Queue Traffic ------------------------------
424 426
425 427 def save_task_request(self, idents, msg):
426 428 client_id = idents[0]
427 429
428 430 try:
429 431 msg = self.session.unpack_message(msg, content=False)
430 432 except:
431 433 logger.error("task::client %r sent invalid task message: %s"%(
432 434 client_id, msg))
433 435 return
434 436
435 437 header = msg['header']
436 438 msg_id = header['msg_id']
437 439 self.mia.add(msg_id)
438 440 self.pending[msg_id] = msg
439 441 if not self.tasks.has_key(client_id):
440 442 self.tasks[client_id] = []
441 443 self.tasks[client_id].append(msg_id)
442 444
443 445 def save_task_result(self, idents, msg):
444 446 client_id = idents[0]
445 447 try:
446 448 msg = self.session.unpack_message(msg, content=False)
447 449 except:
448 450 logger.error("task::invalid task result message send to %r: %s"%(
449 451 client_id, msg))
450 452 return
451 453
452 454 parent = msg['parent_header']
453 455 if not parent:
454 456 # print msg
455 457 # logger.warn("")
456 458 return
457 459 msg_id = parent['msg_id']
458 460 self.results[msg_id] = msg
459 461 if msg_id in self.pending:
460 462 self.pending.pop(msg_id)
461 463 if msg_id in self.mia:
462 464 self.mia.remove(msg_id)
463 465 else:
464 466 logger.debug("task::unknown task %s finished"%msg_id)
465 467
466 468 def save_task_destination(self, idents, msg):
467 469 try:
468 470 msg = self.session.unpack_message(msg, content=True)
469 471 except:
470 472 logger.error("task::invalid task tracking message")
471 473 return
472 474 content = msg['content']
473 print content
475 print (content)
474 476 msg_id = content['msg_id']
475 477 engine_uuid = content['engine_id']
476 478 for eid,queue_id in self.keytable.iteritems():
477 479 if queue_id == engine_uuid:
478 480 break
479 481
480 482 logger.info("task::task %s arrived on %s"%(msg_id, eid))
481 483 if msg_id in self.mia:
482 484 self.mia.remove(msg_id)
483 485 else:
484 486 logger.debug("task::task %s not listed as MIA?!"%(msg_id))
485 487 self.tasks[engine_uuid].append(msg_id)
486 488
487 489 def mia_task_request(self, idents, msg):
488 490 client_id = idents[0]
489 491 content = dict(mia=self.mia,status='ok')
490 492 self.session.send('mia_reply', content=content, idents=client_id)
491 493
492 494
493 495
494 496 #-------------------- Registration -----------------------------
495 497
496 498 def connection_request(self, client_id, msg):
497 499 """reply with connection addresses for clients"""
498 500 logger.info("client::client %s connected"%client_id)
499 501 content = dict(status='ok')
500 502 content.update(self.client_addrs)
501 503 jsonable = {}
502 504 for k,v in self.keytable.iteritems():
503 505 jsonable[str(k)] = v
504 506 content['engines'] = jsonable
505 507 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
506 508
507 509 def register_engine(self, reg, msg):
508 510 """register an engine"""
509 511 content = msg['content']
510 512 try:
511 513 queue = content['queue']
512 514 except KeyError:
513 515 logger.error("registration::queue not specified")
514 516 return
515 517 heart = content.get('heartbeat', None)
516 518 """register a new engine, and create the socket(s) necessary"""
517 519 eid = self._new_id()
518 520 # print (eid, queue, reg, heart)
519 521
520 522 logger.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
521 523
522 524 content = dict(id=eid,status='ok')
523 525 content.update(self.engine_addrs)
524 526 # check if requesting available IDs:
525 527 if queue in self.by_ident:
526 528 content = {'status': 'error', 'reason': "queue_id %r in use"%queue}
527 529 elif heart in self.hearts: # need to check unique hearts?
528 530 content = {'status': 'error', 'reason': "heart_id %r in use"%heart}
529 531 else:
530 532 for h, pack in self.incoming_registrations.iteritems():
531 533 if heart == h:
532 534 content = {'status': 'error', 'reason': "heart_id %r in use"%heart}
533 535 break
534 536 elif queue == pack[1]:
535 537 content = {'status': 'error', 'reason': "queue_id %r in use"%queue}
536 538 break
537 539
538 540 msg = self.session.send(self.registrar, "registration_reply",
539 541 content=content,
540 542 ident=reg)
541 543
542 544 if content['status'] == 'ok':
543 545 if heart in self.heartbeat.hearts:
544 546 # already beating
545 547 self.incoming_registrations[heart] = (eid,queue,reg,None)
546 548 self.finish_registration(heart)
547 549 else:
548 550 purge = lambda : self._purge_stalled_registration(heart)
549 551 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
550 552 dc.start()
551 553 self.incoming_registrations[heart] = (eid,queue,reg,dc)
552 554 else:
553 555 logger.error("registration::registration %i failed: %s"%(eid, content['reason']))
554 556 return eid
555 557
556 558 def unregister_engine(self, ident, msg):
557 559 try:
558 560 eid = msg['content']['id']
559 561 except:
560 562 logger.error("registration::bad engine id for unregistration: %s"%ident)
561 563 return
562 564 logger.info("registration::unregister_engine(%s)"%eid)
563 565 content=dict(id=eid, queue=self.engines[eid].queue)
564 566 self.ids.remove(eid)
565 567 self.keytable.pop(eid)
566 568 ec = self.engines.pop(eid)
567 569 self.hearts.pop(ec.heartbeat)
568 570 self.by_ident.pop(ec.queue)
569 571 self.completed.pop(eid)
570 572 for msg_id in self.queues.pop(eid)[0]:
571 573 msg = self.pending.pop(msg_id)
572 574 ############## TODO: HANDLE IT ################
573 575
574 576 if self.notifier:
575 577 self.session.send(self.notifier, "unregistration_notification", content=content)
576 578
577 579 def finish_registration(self, heart):
578 580 try:
579 581 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
580 582 except KeyError:
581 583 logger.error("registration::tried to finish nonexistant registration")
582 584 return
583 585 logger.info("registration::finished registering engine %i:%r"%(eid,queue))
584 586 if purge is not None:
585 587 purge.stop()
586 588 control = queue
587 589 self.ids.add(eid)
588 590 self.keytable[eid] = queue
589 591 self.engines[eid] = EngineConnector(eid, queue, reg, control, heart)
590 592 self.by_ident[queue] = eid
591 593 self.queues[eid] = ([],[])
592 594 self.completed[eid] = list()
593 595 self.hearts[heart] = eid
594 596 content = dict(id=eid, queue=self.engines[eid].queue)
595 597 if self.notifier:
596 598 self.session.send(self.notifier, "registration_notification", content=content)
597 599
598 600 def _purge_stalled_registration(self, heart):
599 601 if heart in self.incoming_registrations:
600 602 eid = self.incoming_registrations.pop(heart)[0]
601 603 logger.info("registration::purging stalled registration: %i"%eid)
602 604 else:
603 605 pass
604 606
605 607 #------------------- Client Requests -------------------------------
606 608
607 609 def check_load(self, client_id, msg):
608 610 content = msg['content']
609 611 try:
610 612 targets = content['targets']
611 613 targets = self._validate_targets(targets)
612 614 except:
613 615 content = wrap_exception()
614 616 self.session.send(self.clientele, "controller_error",
615 617 content=content, ident=client_id)
616 618 return
617 619
618 620 content = dict(status='ok')
619 621 # loads = {}
620 622 for t in targets:
621 623 content[str(t)] = len(self.queues[t])
622 624 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
623 625
624 626
625 627 def queue_status(self, client_id, msg):
626 628 """handle queue_status request"""
627 629 content = msg['content']
628 630 targets = content['targets']
629 631 try:
630 632 targets = self._validate_targets(targets)
631 633 except:
632 634 content = wrap_exception()
633 635 self.session.send(self.clientele, "controller_error",
634 636 content=content, ident=client_id)
635 637 return
636 638 verbose = msg.get('verbose', False)
637 639 content = dict()
638 640 for t in targets:
639 641 queue = self.queues[t]
640 642 completed = self.completed[t]
641 643 if not verbose:
642 644 queue = len(queue)
643 645 completed = len(completed)
644 646 content[str(t)] = {'queue': queue, 'completed': completed }
645 647 # pending
646 648 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
647 649
648 650 def purge_results(self, client_id, msg):
649 651 content = msg['content']
650 652 msg_ids = content.get('msg_ids', [])
651 653 reply = dict(status='ok')
652 654 if msg_ids == 'all':
653 655 self.results = {}
654 656 else:
655 657 for msg_id in msg_ids:
656 658 if msg_id in self.results:
657 659 self.results.pop(msg_id)
658 660 else:
659 661 if msg_id in self.pending:
660 662 reply = dict(status='error', reason="msg pending: %r"%msg_id)
661 663 else:
662 664 reply = dict(status='error', reason="No such msg: %r"%msg_id)
663 665 break
664 666 eids = content.get('engine_ids', [])
665 667 for eid in eids:
666 668 if eid not in self.engines:
667 669 reply = dict(status='error', reason="No such engine: %i"%eid)
668 670 break
669 671 msg_ids = self.completed.pop(eid)
670 672 for msg_id in msg_ids:
671 673 self.results.pop(msg_id)
672 674
673 675 self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
674 676
675 677 def resubmit_task(self, client_id, msg, buffers):
676 678 content = msg['content']
677 679 header = msg['header']
678 680
679 681
680 682 msg_ids = content.get('msg_ids', [])
681 683 reply = dict(status='ok')
682 684 if msg_ids == 'all':
683 685 self.results = {}
684 686 else:
685 687 for msg_id in msg_ids:
686 688 if msg_id in self.results:
687 689 self.results.pop(msg_id)
688 690 else:
689 691 if msg_id in self.pending:
690 692 reply = dict(status='error', reason="msg pending: %r"%msg_id)
691 693 else:
692 694 reply = dict(status='error', reason="No such msg: %r"%msg_id)
693 695 break
694 696 eids = content.get('engine_ids', [])
695 697 for eid in eids:
696 698 if eid not in self.engines:
697 699 reply = dict(status='error', reason="No such engine: %i"%eid)
698 700 break
699 701 msg_ids = self.completed.pop(eid)
700 702 for msg_id in msg_ids:
701 703 self.results.pop(msg_id)
702 704
703 705 self.sesison.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
704 706
705 707 def get_results(self, client_id, msg):
706 708 """get the result of 1 or more messages"""
707 709 content = msg['content']
708 710 msg_ids = set(content['msg_ids'])
709 711 statusonly = content.get('status_only', False)
710 712 pending = []
711 713 completed = []
712 714 content = dict(status='ok')
713 715 content['pending'] = pending
714 716 content['completed'] = completed
715 717 for msg_id in msg_ids:
716 718 if msg_id in self.pending:
717 719 pending.append(msg_id)
718 720 elif msg_id in self.results:
719 721 completed.append(msg_id)
720 722 if not statusonly:
721 723 content[msg_id] = self.results[msg_id]['content']
722 724 else:
723 725 content = dict(status='error')
724 726 content['reason'] = 'no such message: '+msg_id
725 727 break
726 728 self.session.send(self.clientele, "result_reply", content=content,
727 729 parent=msg, ident=client_id)
728 730
729 731
730 732
731 733 ############ OLD METHODS for Python Relay Controller ###################
732 734 def _validate_engine_msg(self, msg):
733 735 """validates and unpacks headers of a message. Returns False if invalid,
734 736 (ident, message)"""
735 737 ident = msg[0]
736 738 try:
737 739 msg = self.session.unpack_message(msg[1:], content=False)
738 740 except:
739 741 logger.error("engine.%s::Invalid Message %s"%(ident, msg))
740 742 return False
741 743
742 744 try:
743 745 eid = msg.header.username
744 746 assert self.engines.has_key(eid)
745 747 except:
746 748 logger.error("engine::Invalid Engine ID %s"%(ident))
747 749 return False
748 750
749 751 return eid, msg
750 752
751 753
752 754 #--------------------
753 755 # Entry Point
754 756 #--------------------
755
756 def main():
757 import time
758 from multiprocessing import Process
759
760 from zmq.eventloop.zmqstream import ZMQStream
761 from zmq.devices import ProcessMonitoredQueue
762 from zmq.log import handlers
763
764 import streamsession as session
765 import heartmonitor
766 from scheduler import launch_scheduler
767
768 parser = make_argument_parser()
757 def make_argument_parser():
758 """Make an argument parser"""
759 parser = make_base_argument_parser()
769 760
770 761 parser.add_argument('--client', type=int, metavar='PORT', default=0,
771 762 help='set the XREP port for clients [default: random]')
772 763 parser.add_argument('--notice', type=int, metavar='PORT', default=0,
773 764 help='set the PUB socket for registration notification [default: random]')
774 765 parser.add_argument('--hb', type=str, metavar='PORTS',
775 766 help='set the 2 ports for heartbeats [default: random]')
776 767 parser.add_argument('--ping', type=int, default=3000,
777 768 help='set the heartbeat period in ms [default: 3000]')
778 769 parser.add_argument('--monitor', type=int, metavar='PORT', default=0,
779 770 help='set the SUB port for queue monitoring [default: random]')
780 771 parser.add_argument('--mux', type=str, metavar='PORTS',
781 772 help='set the XREP ports for the MUX queue [default: random]')
782 773 parser.add_argument('--task', type=str, metavar='PORTS',
783 774 help='set the XREP/XREQ ports for the task queue [default: random]')
784 775 parser.add_argument('--control', type=str, metavar='PORTS',
785 776 help='set the XREP ports for the control queue [default: random]')
786 777 parser.add_argument('--scheduler', type=str, default='pure',
787 778 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
788 779 help='select the task scheduler [default: pure ZMQ]')
789 780
790 args = parser.parse_args()
781 return parser
791 782
792 if args.url:
793 args.transport,iface = args.url.split('://')
794 iface = iface.split(':')
795 args.ip = iface[0]
796 if iface[1]:
797 args.regport = iface[1]
783 def main():
784 import time
785 from multiprocessing import Process
786
787 from zmq.eventloop.zmqstream import ZMQStream
788 from zmq.devices import ProcessMonitoredQueue
789 from zmq.log import handlers
790
791 import streamsession as session
792 import heartmonitor
793 from scheduler import launch_scheduler
794
795 parser = make_argument_parser()
796
797 args = parser.parse_args()
798 parse_url(args)
798 799
799 800 iface="%s://%s"%(args.transport,args.ip)+':%i'
800 801
801 802 random_ports = 0
802 803 if args.hb:
803 804 hb = split_ports(args.hb, 2)
804 805 else:
805 806 hb = select_random_ports(2)
806 807 if args.mux:
807 808 mux = split_ports(args.mux, 2)
808 809 else:
809 810 mux = None
810 811 random_ports += 2
811 812 if args.task:
812 813 task = split_ports(args.task, 2)
813 814 else:
814 815 task = None
815 816 random_ports += 2
816 817 if args.control:
817 818 control = split_ports(args.control, 2)
818 819 else:
819 820 control = None
820 821 random_ports += 2
821 822
822 823 ctx = zmq.Context()
823 824 loop = ioloop.IOLoop.instance()
824 825
825 826 # setup logging
826 827 connect_logger(ctx, iface%args.logport, root="controller", loglevel=args.loglevel)
827 828
828 829 # Registrar socket
829 830 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
830 831 regport = bind_port(reg, args.ip, args.regport)
831 832
832 833 ### Engine connections ###
833 834
834 835 # heartbeat
835 836 hpub = ctx.socket(zmq.PUB)
836 837 bind_port(hpub, args.ip, hb[0])
837 838 hrep = ctx.socket(zmq.XREP)
838 839 bind_port(hrep, args.ip, hb[1])
839 840
840 841 hmon = heartmonitor.HeartMonitor(loop, ZMQStream(hpub,loop), ZMQStream(hrep,loop),args.ping)
841 842 hmon.start()
842 843
843 844 ### Client connections ###
844 845 # Clientele socket
845 846 c = ZMQStream(ctx.socket(zmq.XREP), loop)
846 847 cport = bind_port(c, args.ip, args.client)
847 848 # Notifier socket
848 849 n = ZMQStream(ctx.socket(zmq.PUB), loop)
849 850 nport = bind_port(n, args.ip, args.notice)
850 851
851 852 thesession = session.StreamSession(username=args.ident or "controller")
852 853
853 854 ### build and launch the queues ###
854 855
855 856 # monitor socket
856 857 sub = ctx.socket(zmq.SUB)
857 858 sub.setsockopt(zmq.SUBSCRIBE, "")
858 859 monport = bind_port(sub, args.ip, args.monitor)
859 860 sub = ZMQStream(sub, loop)
860 861
861 862 ports = select_random_ports(random_ports)
862 863 # Multiplexer Queue (in a Process)
863 864 if not mux:
864 865 mux = (ports.pop(),ports.pop())
865 866 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
866 867 q.bind_in(iface%mux[0])
867 868 q.bind_out(iface%mux[1])
868 869 q.connect_mon(iface%monport)
869 870 q.daemon=True
870 871 q.start()
871 872
872 873 # Control Queue (in a Process)
873 874 if not control:
874 875 control = (ports.pop(),ports.pop())
875 876 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
876 877 q.bind_in(iface%control[0])
877 878 q.bind_out(iface%control[1])
878 879 q.connect_mon(iface%monport)
879 880 q.daemon=True
880 881 q.start()
881 882
882 883 # Task Queue (in a Process)
883 884 if not task:
884 885 task = (ports.pop(),ports.pop())
885 886 if args.scheduler == 'pure':
886 887 q = ProcessMonitoredQueue(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
887 888 q.bind_in(iface%task[0])
888 889 q.bind_out(iface%task[1])
889 890 q.connect_mon(iface%monport)
890 891 q.daemon=True
891 892 q.start()
892 893 else:
893 894 sargs = (iface%task[0],iface%task[1],iface%monport,iface%nport,args.scheduler)
894 print sargs
895 print (sargs)
895 896 p = Process(target=launch_scheduler, args=sargs)
896 897 p.daemon=True
897 898 p.start()
898 899
899 900 time.sleep(.25)
900 901
901 902 # build connection dicts
902 903 engine_addrs = {
903 904 'control' : iface%control[1],
904 905 'queue': iface%mux[1],
905 906 'heartbeat': (iface%hb[0], iface%hb[1]),
906 907 'task' : iface%task[1],
907 908 'monitor' : iface%monport,
908 909 }
909 910
910 911 client_addrs = {
911 912 'control' : iface%control[0],
912 913 'query': iface%cport,
913 914 'queue': iface%mux[0],
914 915 'task' : iface%task[0],
915 916 'notification': iface%nport
916 917 }
917 918 con = Controller(loop, thesession, sub, reg, hmon, c, n, None, engine_addrs, client_addrs)
919 dc = ioloop.DelayedCallback(lambda : print("Controller started..."), 100, loop)
918 920 loop.start()
919 921
922 if __name__ == '__main__':
923 main()
@@ -1,151 +1,148
1 1 #!/usr/bin/env python
2 2 """A simple engine that talks to a controller over 0MQ.
3 3 it handles registration, etc. and launches a kernel
4 4 connected to the Controller's queue(s).
5 5 """
6 from __future__ import print_function
6 7 import sys
7 8 import time
8 9 import traceback
9 10 import uuid
10 11 from pprint import pprint
11 12
12 13 import zmq
13 14 from zmq.eventloop import ioloop, zmqstream
14 15
15 16 from streamsession import Message, StreamSession
16 17 from client import Client
17 18 import streamkernel as kernel
18 19 import heartmonitor
19 from entry_point import make_argument_parser, connect_logger
20 from entry_point import make_base_argument_parser, connect_logger, parse_url
20 21 # import taskthread
21 22 # from log import logger
22 23
23 24
24 25 def printer(*msg):
25 26 pprint(msg)
26 27
27 28 class Engine(object):
28 29 """IPython engine"""
29 30
30 31 id=None
31 32 context=None
32 33 loop=None
33 34 session=None
34 35 ident=None
35 36 registrar=None
36 37 heart=None
37 38 kernel=None
38 39
39 40 def __init__(self, context, loop, session, registrar, client, ident=None, heart_id=None):
40 41 self.context = context
41 42 self.loop = loop
42 43 self.session = session
43 44 self.registrar = registrar
44 45 self.client = client
45 46 self.ident = ident if ident else str(uuid.uuid4())
46 47 self.registrar.on_send(printer)
47 48
48 49 def register(self):
49 50
50 51 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
51 52 self.registrar.on_recv(self.complete_registration)
52 53 self.session.send(self.registrar, "registration_request",content=content)
53 54
54 55 def complete_registration(self, msg):
55 56 # print msg
56 57 idents,msg = self.session.feed_identities(msg)
57 58 msg = Message(self.session.unpack_message(msg))
58 59 if msg.content.status == 'ok':
59 60 self.session.username = str(msg.content.id)
60 61 queue_addr = msg.content.queue
61 62 if queue_addr:
62 63 queue = self.context.socket(zmq.PAIR)
63 64 queue.setsockopt(zmq.IDENTITY, self.ident)
64 65 queue.connect(str(queue_addr))
65 66 self.queue = zmqstream.ZMQStream(queue, self.loop)
66 67
67 68 control_addr = msg.content.control
68 69 if control_addr:
69 70 control = self.context.socket(zmq.PAIR)
70 71 control.setsockopt(zmq.IDENTITY, self.ident)
71 72 control.connect(str(control_addr))
72 73 self.control = zmqstream.ZMQStream(control, self.loop)
73 74
74 75 task_addr = msg.content.task
75 print task_addr
76 print (task_addr)
76 77 if task_addr:
77 78 # task as stream:
78 79 task = self.context.socket(zmq.PAIR)
79 80 task.setsockopt(zmq.IDENTITY, self.ident)
80 81 task.connect(str(task_addr))
81 82 self.task_stream = zmqstream.ZMQStream(task, self.loop)
82 83 # TaskThread:
83 84 # mon_addr = msg.content.monitor
84 85 # task = taskthread.TaskThread(zmq.PAIR, zmq.PUB, self.ident)
85 86 # task.connect_in(str(task_addr))
86 87 # task.connect_out(str(mon_addr))
87 88 # self.task_stream = taskthread.QueueStream(*task.queues)
88 89 # task.start()
89 90
90 91 hbs = msg.content.heartbeat
91 92 self.heart = heartmonitor.Heart(*map(str, hbs), heart_id=self.ident)
92 93 self.heart.start()
93 94 # ioloop.DelayedCallback(self.heart.start, 1000, self.loop).start()
94 95 # placeholder for now:
95 96 pub = self.context.socket(zmq.PUB)
96 97 pub = zmqstream.ZMQStream(pub, self.loop)
97 98 # create and start the kernel
98 99 self.kernel = kernel.Kernel(self.session, self.control, self.queue, pub, self.task_stream, self.client)
99 100 self.kernel.start()
100 101 else:
101 102 # logger.error("Registration Failed: %s"%msg)
102 103 raise Exception("Registration Failed: %s"%msg)
103 104
104 105 # logger.info("engine::completed registration with id %s"%self.session.username)
105 106
106 print msg
107 print (msg)
107 108
108 109 def unregister(self):
109 110 self.session.send(self.registrar, "unregistration_request", content=dict(id=int(self.session.username)))
110 111 time.sleep(1)
111 112 sys.exit(0)
112 113
113 114 def start(self):
114 print "registering"
115 print ("registering")
115 116 self.register()
116 117
117 118
118 119 def main():
119 120
120 parser = make_argument_parser()
121 parser = make_base_argument_parser()
121 122
122 123 args = parser.parse_args()
123 124
124 if args.url:
125 args.transport,iface = args.url.split('://')
126 iface = iface.split(':')
127 args.ip = iface[0]
128 if iface[1]:
129 args.regport = iface[1]
125 parse_url(args)
130 126
131 127 iface="%s://%s"%(args.transport,args.ip)+':%i'
128
132 129 loop = ioloop.IOLoop.instance()
133 130 session = StreamSession()
134 131 ctx = zmq.Context()
135 132
136 133 # setup logging
137 134 connect_logger(ctx, iface%args.logport, root="engine", loglevel=args.loglevel)
138 135
139 136 reg_conn = iface % args.regport
140 print reg_conn
141 print >>sys.__stdout__, "Starting the engine..."
137 print (reg_conn)
138 print ("Starting the engine...", file=sys.__stderr__)
142 139
143 140 reg = ctx.socket(zmq.PAIR)
144 141 reg.connect(reg_conn)
145 142 reg = zmqstream.ZMQStream(reg, loop)
146 143 client = Client(reg_conn)
147 144
148 145 e = Engine(ctx, loop, session, reg, client, args.ident)
149 146 dc = ioloop.DelayedCallback(e.start, 100, loop)
150 147 dc.start()
151 148 loop.start() No newline at end of file
@@ -1,74 +1,89
1 1 """ Defines helper functions for creating kernel entry points and process
2 2 launchers.
3 3 """
4 4
5 5 # Standard library imports.
6 6 import logging
7 7 import atexit
8 8 import os
9 9 import socket
10 10 from subprocess import Popen, PIPE
11 11 import sys
12 12
13 13 # System library imports.
14 14 import zmq
15 15 from zmq.log import handlers
16 16 # Local imports.
17 17 from IPython.core.ultratb import FormattedTB
18 18 from IPython.external.argparse import ArgumentParser
19 19 from IPython.zmq.log import logger
20 20
21 21 def split_ports(s, n):
22 22 """Parser helper for multiport strings"""
23 23 if not s:
24 24 return tuple([0]*n)
25 25 ports = map(int, s.split(','))
26 26 if len(ports) != n:
27 27 raise ValueError
28 28 return ports
29 29
30 30 def select_random_ports(n):
31 31 """Selects and return n random ports that are open."""
32 32 ports = []
33 33 for i in xrange(n):
34 34 sock = socket.socket()
35 35 sock.bind(('', 0))
36 36 ports.append(sock)
37 37 for i, sock in enumerate(ports):
38 38 port = sock.getsockname()[1]
39 39 sock.close()
40 40 ports[i] = port
41 41 return ports
42 42
43 def parse_url(args):
44 if args.url:
45 iface = args.url.split('://',1)
46 if len(args) == 2:
47 args.transport,iface = iface
48 iface = iface.split(':')
49 args.ip = iface[0]
50 if iface[1]:
51 args.regport = iface[1]
52 args.url = "%s://%s:%i"%(args.transport, args.ip,args.regport)
43 53
44 def make_argument_parser():
54
55
56 def make_base_argument_parser():
45 57 """ Creates an ArgumentParser for the generic arguments supported by all
46 58 ipcluster entry points.
47 59 """
48 60 parser = ArgumentParser()
49 61 parser.add_argument('--ip', type=str, default='127.0.0.1',
50 62 help='set the controller\'s IP address [default: local]')
51 63 parser.add_argument('--transport', type=str, default='tcp',
52 64 help='set the transport to use [default: tcp]')
53 65 parser.add_argument('--regport', type=int, metavar='PORT', default=10101,
54 66 help='set the XREP port for registration [default: 10101]')
55 67 parser.add_argument('--logport', type=int, metavar='PORT', default=20202,
56 68 help='set the PUB port for logging [default: 10201]')
57 69 parser.add_argument('--loglevel', type=int, metavar='LEVEL', default=logging.DEBUG,
58 70 help='set the log level [default: DEBUG]')
59 71 parser.add_argument('--ident', type=str,
60 72 help='set the ZMQ identity [default: random]')
73 parser.add_argument('--packer', type=str, default='json',
74 choices=['json','pickle'],
75 help='set the message format method [default: json]')
61 76 parser.add_argument('--url', type=str,
62 77 help='set transport,ip,regport in one arg, e.g. tcp://127.0.0.1:10101')
63 78
64 79 return parser
65 80
66 81
67 82 def connect_logger(context, iface, root="ip", loglevel=logging.DEBUG):
68 83 lsock = context.socket(zmq.PUB)
69 84 lsock.connect(iface)
70 85 handler = handlers.PUBHandler(lsock)
71 86 handler.setLevel(loglevel)
72 87 handler.root_topic = root
73 88 logger.addHandler(handler)
74 89 No newline at end of file
@@ -1,249 +1,250
1 1 #!/usr/bin/env python
2 2 # -*- coding: utf-8 -*-
3 3 """Setup script for IPython.
4 4
5 5 Under Posix environments it works like a typical setup.py script.
6 6 Under Windows, the command sdist is not supported, since IPython
7 7 requires utilities which are not available under Windows."""
8 8
9 9 #-----------------------------------------------------------------------------
10 10 # Copyright (c) 2008-2010, IPython Development Team.
11 11 # Copyright (c) 2001-2007, Fernando Perez <fernando.perez@colorado.edu>
12 12 # Copyright (c) 2001, Janko Hauser <jhauser@zscout.de>
13 13 # Copyright (c) 2001, Nathaniel Gray <n8gray@caltech.edu>
14 14 #
15 15 # Distributed under the terms of the Modified BSD License.
16 16 #
17 17 # The full license is in the file COPYING.txt, distributed with this software.
18 18 #-----------------------------------------------------------------------------
19 19
20 20 #-----------------------------------------------------------------------------
21 21 # Minimal Python version sanity check
22 22 #-----------------------------------------------------------------------------
23 23
24 24 import sys
25 25
26 26 # This check is also made in IPython/__init__, don't forget to update both when
27 27 # changing Python version requirements.
28 28 if sys.version[0:3] < '2.6':
29 29 error = """\
30 30 ERROR: 'IPython requires Python Version 2.6 or above.'
31 31 Exiting."""
32 32 print >> sys.stderr, error
33 33 sys.exit(1)
34 34
35 35 # At least we're on the python version we need, move on.
36 36
37 37 #-------------------------------------------------------------------------------
38 38 # Imports
39 39 #-------------------------------------------------------------------------------
40 40
41 41 # Stdlib imports
42 42 import os
43 43 import shutil
44 44
45 45 from glob import glob
46 46
47 47 # BEFORE importing distutils, remove MANIFEST. distutils doesn't properly
48 48 # update it when the contents of directories change.
49 49 if os.path.exists('MANIFEST'): os.remove('MANIFEST')
50 50
51 51 from distutils.core import setup
52 52
53 53 # Our own imports
54 54 from IPython.utils.path import target_update
55 55
56 56 from setupbase import (
57 57 setup_args,
58 58 find_packages,
59 59 find_package_data,
60 60 find_scripts,
61 61 find_data_files,
62 62 check_for_dependencies,
63 63 record_commit_info,
64 64 )
65 65
66 66 isfile = os.path.isfile
67 67 pjoin = os.path.join
68 68
69 69 #-----------------------------------------------------------------------------
70 70 # Function definitions
71 71 #-----------------------------------------------------------------------------
72 72
73 73 def cleanup():
74 74 """Clean up the junk left around by the build process"""
75 75 if "develop" not in sys.argv:
76 76 try:
77 77 shutil.rmtree('ipython.egg-info')
78 78 except:
79 79 try:
80 80 os.unlink('ipython.egg-info')
81 81 except:
82 82 pass
83 83
84 84 #-------------------------------------------------------------------------------
85 85 # Handle OS specific things
86 86 #-------------------------------------------------------------------------------
87 87
88 88 if os.name == 'posix':
89 89 os_name = 'posix'
90 90 elif os.name in ['nt','dos']:
91 91 os_name = 'windows'
92 92 else:
93 93 print 'Unsupported operating system:',os.name
94 94 sys.exit(1)
95 95
96 96 # Under Windows, 'sdist' has not been supported. Now that the docs build with
97 97 # Sphinx it might work, but let's not turn it on until someone confirms that it
98 98 # actually works.
99 99 if os_name == 'windows' and 'sdist' in sys.argv:
100 100 print 'The sdist command is not available under Windows. Exiting.'
101 101 sys.exit(1)
102 102
103 103 #-------------------------------------------------------------------------------
104 104 # Things related to the IPython documentation
105 105 #-------------------------------------------------------------------------------
106 106
107 107 # update the manuals when building a source dist
108 108 if len(sys.argv) >= 2 and sys.argv[1] in ('sdist','bdist_rpm'):
109 109 import textwrap
110 110
111 111 # List of things to be updated. Each entry is a triplet of args for
112 112 # target_update()
113 113 to_update = [
114 114 # FIXME - Disabled for now: we need to redo an automatic way
115 115 # of generating the magic info inside the rst.
116 116 #('docs/magic.tex',
117 117 #['IPython/Magic.py'],
118 118 #"cd doc && ./update_magic.sh" ),
119 119
120 120 ('docs/man/ipcluster.1.gz',
121 121 ['docs/man/ipcluster.1'],
122 122 'cd docs/man && gzip -9c ipcluster.1 > ipcluster.1.gz'),
123 123
124 124 ('docs/man/ipcontroller.1.gz',
125 125 ['docs/man/ipcontroller.1'],
126 126 'cd docs/man && gzip -9c ipcontroller.1 > ipcontroller.1.gz'),
127 127
128 128 ('docs/man/ipengine.1.gz',
129 129 ['docs/man/ipengine.1'],
130 130 'cd docs/man && gzip -9c ipengine.1 > ipengine.1.gz'),
131 131
132 132 ('docs/man/ipython.1.gz',
133 133 ['docs/man/ipython.1'],
134 134 'cd docs/man && gzip -9c ipython.1 > ipython.1.gz'),
135 135
136 136 ('docs/man/ipython-wx.1.gz',
137 137 ['docs/man/ipython-wx.1'],
138 138 'cd docs/man && gzip -9c ipython-wx.1 > ipython-wx.1.gz'),
139 139
140 140 ('docs/man/ipythonx.1.gz',
141 141 ['docs/man/ipythonx.1'],
142 142 'cd docs/man && gzip -9c ipythonx.1 > ipythonx.1.gz'),
143 143
144 144 ('docs/man/irunner.1.gz',
145 145 ['docs/man/irunner.1'],
146 146 'cd docs/man && gzip -9c irunner.1 > irunner.1.gz'),
147 147
148 148 ('docs/man/pycolor.1.gz',
149 149 ['docs/man/pycolor.1'],
150 150 'cd docs/man && gzip -9c pycolor.1 > pycolor.1.gz'),
151 151 ]
152 152
153 153 # Only build the docs if sphinx is present
154 154 try:
155 155 import sphinx
156 156 except ImportError:
157 157 pass
158 158 else:
159 159 # The Makefile calls the do_sphinx scripts to build html and pdf, so
160 160 # just one target is enough to cover all manual generation
161 161
162 162 # First, compute all the dependencies that can force us to rebuild the
163 163 # docs. Start with the main release file that contains metadata
164 164 docdeps = ['IPython/core/release.py']
165 165 # Inculde all the reST sources
166 166 pjoin = os.path.join
167 167 for dirpath,dirnames,filenames in os.walk('docs/source'):
168 168 if dirpath in ['_static','_templates']:
169 169 continue
170 170 docdeps += [ pjoin(dirpath,f) for f in filenames
171 171 if f.endswith('.txt') ]
172 172 # and the examples
173 173 for dirpath,dirnames,filenames in os.walk('docs/example'):
174 174 docdeps += [ pjoin(dirpath,f) for f in filenames
175 175 if not f.endswith('~') ]
176 176 # then, make them all dependencies for the main PDF (the html will get
177 177 # auto-generated as well).
178 178 to_update.append(
179 179 ('docs/dist/ipython.pdf',
180 180 docdeps,
181 181 "cd docs && make dist")
182 182 )
183 183
184 184 [ target_update(*t) for t in to_update ]
185 185
186 186 #---------------------------------------------------------------------------
187 187 # Find all the packages, package data, scripts and data_files
188 188 #---------------------------------------------------------------------------
189 189
190 190 packages = find_packages()
191 191 package_data = find_package_data()
192 192 scripts = find_scripts()
193 193 data_files = find_data_files()
194 194
195 195 #---------------------------------------------------------------------------
196 196 # Handle dependencies and setuptools specific things
197 197 #---------------------------------------------------------------------------
198 198
199 199 # For some commands, use setuptools. Note that we do NOT list install here!
200 200 # If you want a setuptools-enhanced install, just run 'setupegg.py install'
201 201 if len(set(('develop', 'sdist', 'release', 'bdist_egg', 'bdist_rpm',
202 202 'bdist', 'bdist_dumb', 'bdist_wininst', 'install_egg_info',
203 203 'build_sphinx', 'egg_info', 'easy_install', 'upload',
204 204 )).intersection(sys.argv)) > 0:
205 205 import setuptools
206 206
207 207 # This dict is used for passing extra arguments that are setuptools
208 208 # specific to setup
209 209 setuptools_extra_args = {}
210 210
211 211 if 'setuptools' in sys.modules:
212 212 setuptools_extra_args['zip_safe'] = False
213 213 setuptools_extra_args['entry_points'] = {
214 214 'console_scripts': [
215 215 'ipython = IPython.frontend.terminal.ipapp:launch_new_instance',
216 216 'ipython-qtconsole = IPython.frontend.qt.console.ipythonqt:main',
217 217 'pycolor = IPython.utils.PyColorize:main',
218 218 'ipcontrollerz = IPython.zmq.parallel.controller:main',
219 219 'ipenginez = IPython.zmq.parallel.engine:main',
220 'ipclusterz = IPython.zmq.parallel.ipcluster:main',
220 221 'iptest = IPython.testing.iptest:main',
221 222 'irunner = IPython.lib.irunner:main'
222 223 ]
223 224 }
224 225 setup_args['extras_require'] = dict(
225 226 doc='Sphinx>=0.3',
226 227 test='nose>=0.10.1',
227 228 security='pyOpenSSL>=0.6'
228 229 )
229 230 else:
230 231 # If we are running without setuptools, call this function which will
231 232 # check for dependencies an inform the user what is needed. This is
232 233 # just to make life easy for users.
233 234 check_for_dependencies()
234 235
235 236 #---------------------------------------------------------------------------
236 237 # Do the actual setup now
237 238 #---------------------------------------------------------------------------
238 239
239 240 setup_args['cmdclass'] = {'build_py': record_commit_info('IPython')}
240 241 setup_args['packages'] = packages
241 242 setup_args['package_data'] = package_data
242 243 setup_args['scripts'] = scripts
243 244 setup_args['data_files'] = data_files
244 245 setup_args.update(setuptools_extra_args)
245 246
246 247
247 248 if __name__ == '__main__':
248 249 setup(**setup_args)
249 250 cleanup()
General Comments 0
You need to be logged in to leave comments. Login now