##// END OF EJS Templates
parallel.util: Use utils.localinterfaces...
W. Trevor King -
Show More
@@ -1,355 +1,351 b''
1 """some generic utilities for dealing with classes, urls, and serialization
1 """some generic utilities for dealing with classes, urls, and serialization
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 # Standard library imports.
18 # Standard library imports.
19 import logging
19 import logging
20 import os
20 import os
21 import re
21 import re
22 import stat
22 import stat
23 import socket
23 import socket
24 import sys
24 import sys
25 from signal import signal, SIGINT, SIGABRT, SIGTERM
25 from signal import signal, SIGINT, SIGABRT, SIGTERM
26 try:
26 try:
27 from signal import SIGKILL
27 from signal import SIGKILL
28 except ImportError:
28 except ImportError:
29 SIGKILL=None
29 SIGKILL=None
30
30
31 try:
31 try:
32 import cPickle
32 import cPickle
33 pickle = cPickle
33 pickle = cPickle
34 except:
34 except:
35 cPickle = None
35 cPickle = None
36 import pickle
36 import pickle
37
37
38 # System library imports
38 # System library imports
39 import zmq
39 import zmq
40 from zmq.log import handlers
40 from zmq.log import handlers
41
41
42 from IPython.external.decorator import decorator
42 from IPython.external.decorator import decorator
43
43
44 # IPython imports
44 # IPython imports
45 from IPython.config.application import Application
45 from IPython.config.application import Application
46 from IPython.utils.localinterfaces import LOCALHOST, PUBLIC_IPS
46 from IPython.zmq.log import EnginePUBHandler
47 from IPython.zmq.log import EnginePUBHandler
47 from IPython.zmq.serialize import (
48 from IPython.zmq.serialize import (
48 unserialize_object, serialize_object, pack_apply_message, unpack_apply_message
49 unserialize_object, serialize_object, pack_apply_message, unpack_apply_message
49 )
50 )
50
51
51 #-----------------------------------------------------------------------------
52 #-----------------------------------------------------------------------------
52 # Classes
53 # Classes
53 #-----------------------------------------------------------------------------
54 #-----------------------------------------------------------------------------
54
55
55 class Namespace(dict):
56 class Namespace(dict):
56 """Subclass of dict for attribute access to keys."""
57 """Subclass of dict for attribute access to keys."""
57
58
58 def __getattr__(self, key):
59 def __getattr__(self, key):
59 """getattr aliased to getitem"""
60 """getattr aliased to getitem"""
60 if key in self.iterkeys():
61 if key in self.iterkeys():
61 return self[key]
62 return self[key]
62 else:
63 else:
63 raise NameError(key)
64 raise NameError(key)
64
65
65 def __setattr__(self, key, value):
66 def __setattr__(self, key, value):
66 """setattr aliased to setitem, with strict"""
67 """setattr aliased to setitem, with strict"""
67 if hasattr(dict, key):
68 if hasattr(dict, key):
68 raise KeyError("Cannot override dict keys %r"%key)
69 raise KeyError("Cannot override dict keys %r"%key)
69 self[key] = value
70 self[key] = value
70
71
71
72
72 class ReverseDict(dict):
73 class ReverseDict(dict):
73 """simple double-keyed subset of dict methods."""
74 """simple double-keyed subset of dict methods."""
74
75
75 def __init__(self, *args, **kwargs):
76 def __init__(self, *args, **kwargs):
76 dict.__init__(self, *args, **kwargs)
77 dict.__init__(self, *args, **kwargs)
77 self._reverse = dict()
78 self._reverse = dict()
78 for key, value in self.iteritems():
79 for key, value in self.iteritems():
79 self._reverse[value] = key
80 self._reverse[value] = key
80
81
81 def __getitem__(self, key):
82 def __getitem__(self, key):
82 try:
83 try:
83 return dict.__getitem__(self, key)
84 return dict.__getitem__(self, key)
84 except KeyError:
85 except KeyError:
85 return self._reverse[key]
86 return self._reverse[key]
86
87
87 def __setitem__(self, key, value):
88 def __setitem__(self, key, value):
88 if key in self._reverse:
89 if key in self._reverse:
89 raise KeyError("Can't have key %r on both sides!"%key)
90 raise KeyError("Can't have key %r on both sides!"%key)
90 dict.__setitem__(self, key, value)
91 dict.__setitem__(self, key, value)
91 self._reverse[value] = key
92 self._reverse[value] = key
92
93
93 def pop(self, key):
94 def pop(self, key):
94 value = dict.pop(self, key)
95 value = dict.pop(self, key)
95 self._reverse.pop(value)
96 self._reverse.pop(value)
96 return value
97 return value
97
98
98 def get(self, key, default=None):
99 def get(self, key, default=None):
99 try:
100 try:
100 return self[key]
101 return self[key]
101 except KeyError:
102 except KeyError:
102 return default
103 return default
103
104
104 #-----------------------------------------------------------------------------
105 #-----------------------------------------------------------------------------
105 # Functions
106 # Functions
106 #-----------------------------------------------------------------------------
107 #-----------------------------------------------------------------------------
107
108
108 @decorator
109 @decorator
109 def log_errors(f, self, *args, **kwargs):
110 def log_errors(f, self, *args, **kwargs):
110 """decorator to log unhandled exceptions raised in a method.
111 """decorator to log unhandled exceptions raised in a method.
111
112
112 For use wrapping on_recv callbacks, so that exceptions
113 For use wrapping on_recv callbacks, so that exceptions
113 do not cause the stream to be closed.
114 do not cause the stream to be closed.
114 """
115 """
115 try:
116 try:
116 return f(self, *args, **kwargs)
117 return f(self, *args, **kwargs)
117 except Exception:
118 except Exception:
118 self.log.error("Uncaught exception in %r" % f, exc_info=True)
119 self.log.error("Uncaught exception in %r" % f, exc_info=True)
119
120
120
121
121 def is_url(url):
122 def is_url(url):
122 """boolean check for whether a string is a zmq url"""
123 """boolean check for whether a string is a zmq url"""
123 if '://' not in url:
124 if '://' not in url:
124 return False
125 return False
125 proto, addr = url.split('://', 1)
126 proto, addr = url.split('://', 1)
126 if proto.lower() not in ['tcp','pgm','epgm','ipc','inproc']:
127 if proto.lower() not in ['tcp','pgm','epgm','ipc','inproc']:
127 return False
128 return False
128 return True
129 return True
129
130
130 def validate_url(url):
131 def validate_url(url):
131 """validate a url for zeromq"""
132 """validate a url for zeromq"""
132 if not isinstance(url, basestring):
133 if not isinstance(url, basestring):
133 raise TypeError("url must be a string, not %r"%type(url))
134 raise TypeError("url must be a string, not %r"%type(url))
134 url = url.lower()
135 url = url.lower()
135
136
136 proto_addr = url.split('://')
137 proto_addr = url.split('://')
137 assert len(proto_addr) == 2, 'Invalid url: %r'%url
138 assert len(proto_addr) == 2, 'Invalid url: %r'%url
138 proto, addr = proto_addr
139 proto, addr = proto_addr
139 assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto
140 assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto
140
141
141 # domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391
142 # domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391
142 # author: Remi Sabourin
143 # author: Remi Sabourin
143 pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$')
144 pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$')
144
145
145 if proto == 'tcp':
146 if proto == 'tcp':
146 lis = addr.split(':')
147 lis = addr.split(':')
147 assert len(lis) == 2, 'Invalid url: %r'%url
148 assert len(lis) == 2, 'Invalid url: %r'%url
148 addr,s_port = lis
149 addr,s_port = lis
149 try:
150 try:
150 port = int(s_port)
151 port = int(s_port)
151 except ValueError:
152 except ValueError:
152 raise AssertionError("Invalid port %r in url: %r"%(port, url))
153 raise AssertionError("Invalid port %r in url: %r"%(port, url))
153
154
154 assert addr == '*' or pat.match(addr) is not None, 'Invalid url: %r'%url
155 assert addr == '*' or pat.match(addr) is not None, 'Invalid url: %r'%url
155
156
156 else:
157 else:
157 # only validate tcp urls currently
158 # only validate tcp urls currently
158 pass
159 pass
159
160
160 return True
161 return True
161
162
162
163
163 def validate_url_container(container):
164 def validate_url_container(container):
164 """validate a potentially nested collection of urls."""
165 """validate a potentially nested collection of urls."""
165 if isinstance(container, basestring):
166 if isinstance(container, basestring):
166 url = container
167 url = container
167 return validate_url(url)
168 return validate_url(url)
168 elif isinstance(container, dict):
169 elif isinstance(container, dict):
169 container = container.itervalues()
170 container = container.itervalues()
170
171
171 for element in container:
172 for element in container:
172 validate_url_container(element)
173 validate_url_container(element)
173
174
174
175
175 def split_url(url):
176 def split_url(url):
176 """split a zmq url (tcp://ip:port) into ('tcp','ip','port')."""
177 """split a zmq url (tcp://ip:port) into ('tcp','ip','port')."""
177 proto_addr = url.split('://')
178 proto_addr = url.split('://')
178 assert len(proto_addr) == 2, 'Invalid url: %r'%url
179 assert len(proto_addr) == 2, 'Invalid url: %r'%url
179 proto, addr = proto_addr
180 proto, addr = proto_addr
180 lis = addr.split(':')
181 lis = addr.split(':')
181 assert len(lis) == 2, 'Invalid url: %r'%url
182 assert len(lis) == 2, 'Invalid url: %r'%url
182 addr,s_port = lis
183 addr,s_port = lis
183 return proto,addr,s_port
184 return proto,addr,s_port
184
185
185 def disambiguate_ip_address(ip, location=None):
186 def disambiguate_ip_address(ip, location=None):
186 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
187 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
187 ones, based on the location (default interpretation of location is localhost)."""
188 ones, based on the location (default interpretation of location is localhost)."""
188 if ip in ('0.0.0.0', '*'):
189 if ip in ('0.0.0.0', '*'):
189 try:
190 if location is None or location in PUBLIC_IPS or not PUBLIC_IPS:
190 external_ips = socket.gethostbyname_ex(socket.gethostname())[2]
191 except (socket.gaierror, IndexError):
192 # couldn't identify this machine, assume localhost
193 external_ips = []
194 if location is None or location in external_ips or not external_ips:
195 # If location is unspecified or cannot be determined, assume local
191 # If location is unspecified or cannot be determined, assume local
196 ip='127.0.0.1'
192 ip = LOCALHOST
197 elif location:
193 elif location:
198 return location
194 return location
199 return ip
195 return ip
200
196
201 def disambiguate_url(url, location=None):
197 def disambiguate_url(url, location=None):
202 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
198 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
203 ones, based on the location (default interpretation is localhost).
199 ones, based on the location (default interpretation is localhost).
204
200
205 This is for zeromq urls, such as tcp://*:10101."""
201 This is for zeromq urls, such as tcp://*:10101."""
206 try:
202 try:
207 proto,ip,port = split_url(url)
203 proto,ip,port = split_url(url)
208 except AssertionError:
204 except AssertionError:
209 # probably not tcp url; could be ipc, etc.
205 # probably not tcp url; could be ipc, etc.
210 return url
206 return url
211
207
212 ip = disambiguate_ip_address(ip,location)
208 ip = disambiguate_ip_address(ip,location)
213
209
214 return "%s://%s:%s"%(proto,ip,port)
210 return "%s://%s:%s"%(proto,ip,port)
215
211
216
212
217 #--------------------------------------------------------------------------
213 #--------------------------------------------------------------------------
218 # helpers for implementing old MEC API via view.apply
214 # helpers for implementing old MEC API via view.apply
219 #--------------------------------------------------------------------------
215 #--------------------------------------------------------------------------
220
216
221 def interactive(f):
217 def interactive(f):
222 """decorator for making functions appear as interactively defined.
218 """decorator for making functions appear as interactively defined.
223 This results in the function being linked to the user_ns as globals()
219 This results in the function being linked to the user_ns as globals()
224 instead of the module globals().
220 instead of the module globals().
225 """
221 """
226 f.__module__ = '__main__'
222 f.__module__ = '__main__'
227 return f
223 return f
228
224
229 @interactive
225 @interactive
230 def _push(**ns):
226 def _push(**ns):
231 """helper method for implementing `client.push` via `client.apply`"""
227 """helper method for implementing `client.push` via `client.apply`"""
232 user_ns = globals()
228 user_ns = globals()
233 tmp = '_IP_PUSH_TMP_'
229 tmp = '_IP_PUSH_TMP_'
234 while tmp in user_ns:
230 while tmp in user_ns:
235 tmp = tmp + '_'
231 tmp = tmp + '_'
236 try:
232 try:
237 for name, value in ns.iteritems():
233 for name, value in ns.iteritems():
238 user_ns[tmp] = value
234 user_ns[tmp] = value
239 exec "%s = %s" % (name, tmp) in user_ns
235 exec "%s = %s" % (name, tmp) in user_ns
240 finally:
236 finally:
241 user_ns.pop(tmp, None)
237 user_ns.pop(tmp, None)
242
238
243 @interactive
239 @interactive
244 def _pull(keys):
240 def _pull(keys):
245 """helper method for implementing `client.pull` via `client.apply`"""
241 """helper method for implementing `client.pull` via `client.apply`"""
246 if isinstance(keys, (list,tuple, set)):
242 if isinstance(keys, (list,tuple, set)):
247 return map(lambda key: eval(key, globals()), keys)
243 return map(lambda key: eval(key, globals()), keys)
248 else:
244 else:
249 return eval(keys, globals())
245 return eval(keys, globals())
250
246
251 @interactive
247 @interactive
252 def _execute(code):
248 def _execute(code):
253 """helper method for implementing `client.execute` via `client.apply`"""
249 """helper method for implementing `client.execute` via `client.apply`"""
254 exec code in globals()
250 exec code in globals()
255
251
256 #--------------------------------------------------------------------------
252 #--------------------------------------------------------------------------
257 # extra process management utilities
253 # extra process management utilities
258 #--------------------------------------------------------------------------
254 #--------------------------------------------------------------------------
259
255
260 _random_ports = set()
256 _random_ports = set()
261
257
262 def select_random_ports(n):
258 def select_random_ports(n):
263 """Selects and return n random ports that are available."""
259 """Selects and return n random ports that are available."""
264 ports = []
260 ports = []
265 for i in xrange(n):
261 for i in xrange(n):
266 sock = socket.socket()
262 sock = socket.socket()
267 sock.bind(('', 0))
263 sock.bind(('', 0))
268 while sock.getsockname()[1] in _random_ports:
264 while sock.getsockname()[1] in _random_ports:
269 sock.close()
265 sock.close()
270 sock = socket.socket()
266 sock = socket.socket()
271 sock.bind(('', 0))
267 sock.bind(('', 0))
272 ports.append(sock)
268 ports.append(sock)
273 for i, sock in enumerate(ports):
269 for i, sock in enumerate(ports):
274 port = sock.getsockname()[1]
270 port = sock.getsockname()[1]
275 sock.close()
271 sock.close()
276 ports[i] = port
272 ports[i] = port
277 _random_ports.add(port)
273 _random_ports.add(port)
278 return ports
274 return ports
279
275
280 def signal_children(children):
276 def signal_children(children):
281 """Relay interupt/term signals to children, for more solid process cleanup."""
277 """Relay interupt/term signals to children, for more solid process cleanup."""
282 def terminate_children(sig, frame):
278 def terminate_children(sig, frame):
283 log = Application.instance().log
279 log = Application.instance().log
284 log.critical("Got signal %i, terminating children..."%sig)
280 log.critical("Got signal %i, terminating children..."%sig)
285 for child in children:
281 for child in children:
286 child.terminate()
282 child.terminate()
287
283
288 sys.exit(sig != SIGINT)
284 sys.exit(sig != SIGINT)
289 # sys.exit(sig)
285 # sys.exit(sig)
290 for sig in (SIGINT, SIGABRT, SIGTERM):
286 for sig in (SIGINT, SIGABRT, SIGTERM):
291 signal(sig, terminate_children)
287 signal(sig, terminate_children)
292
288
293 def generate_exec_key(keyfile):
289 def generate_exec_key(keyfile):
294 import uuid
290 import uuid
295 newkey = str(uuid.uuid4())
291 newkey = str(uuid.uuid4())
296 with open(keyfile, 'w') as f:
292 with open(keyfile, 'w') as f:
297 # f.write('ipython-key ')
293 # f.write('ipython-key ')
298 f.write(newkey+'\n')
294 f.write(newkey+'\n')
299 # set user-only RW permissions (0600)
295 # set user-only RW permissions (0600)
300 # this will have no effect on Windows
296 # this will have no effect on Windows
301 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
297 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
302
298
303
299
304 def integer_loglevel(loglevel):
300 def integer_loglevel(loglevel):
305 try:
301 try:
306 loglevel = int(loglevel)
302 loglevel = int(loglevel)
307 except ValueError:
303 except ValueError:
308 if isinstance(loglevel, str):
304 if isinstance(loglevel, str):
309 loglevel = getattr(logging, loglevel)
305 loglevel = getattr(logging, loglevel)
310 return loglevel
306 return loglevel
311
307
312 def connect_logger(logname, context, iface, root="ip", loglevel=logging.DEBUG):
308 def connect_logger(logname, context, iface, root="ip", loglevel=logging.DEBUG):
313 logger = logging.getLogger(logname)
309 logger = logging.getLogger(logname)
314 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
310 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
315 # don't add a second PUBHandler
311 # don't add a second PUBHandler
316 return
312 return
317 loglevel = integer_loglevel(loglevel)
313 loglevel = integer_loglevel(loglevel)
318 lsock = context.socket(zmq.PUB)
314 lsock = context.socket(zmq.PUB)
319 lsock.connect(iface)
315 lsock.connect(iface)
320 handler = handlers.PUBHandler(lsock)
316 handler = handlers.PUBHandler(lsock)
321 handler.setLevel(loglevel)
317 handler.setLevel(loglevel)
322 handler.root_topic = root
318 handler.root_topic = root
323 logger.addHandler(handler)
319 logger.addHandler(handler)
324 logger.setLevel(loglevel)
320 logger.setLevel(loglevel)
325
321
326 def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):
322 def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):
327 logger = logging.getLogger()
323 logger = logging.getLogger()
328 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
324 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
329 # don't add a second PUBHandler
325 # don't add a second PUBHandler
330 return
326 return
331 loglevel = integer_loglevel(loglevel)
327 loglevel = integer_loglevel(loglevel)
332 lsock = context.socket(zmq.PUB)
328 lsock = context.socket(zmq.PUB)
333 lsock.connect(iface)
329 lsock.connect(iface)
334 handler = EnginePUBHandler(engine, lsock)
330 handler = EnginePUBHandler(engine, lsock)
335 handler.setLevel(loglevel)
331 handler.setLevel(loglevel)
336 logger.addHandler(handler)
332 logger.addHandler(handler)
337 logger.setLevel(loglevel)
333 logger.setLevel(loglevel)
338 return logger
334 return logger
339
335
340 def local_logger(logname, loglevel=logging.DEBUG):
336 def local_logger(logname, loglevel=logging.DEBUG):
341 loglevel = integer_loglevel(loglevel)
337 loglevel = integer_loglevel(loglevel)
342 logger = logging.getLogger(logname)
338 logger = logging.getLogger(logname)
343 if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]):
339 if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]):
344 # don't add a second StreamHandler
340 # don't add a second StreamHandler
345 return
341 return
346 handler = logging.StreamHandler()
342 handler = logging.StreamHandler()
347 handler.setLevel(loglevel)
343 handler.setLevel(loglevel)
348 formatter = logging.Formatter("%(asctime)s.%(msecs).03d [%(name)s] %(message)s",
344 formatter = logging.Formatter("%(asctime)s.%(msecs).03d [%(name)s] %(message)s",
349 datefmt="%Y-%m-%d %H:%M:%S")
345 datefmt="%Y-%m-%d %H:%M:%S")
350 handler.setFormatter(formatter)
346 handler.setFormatter(formatter)
351
347
352 logger.addHandler(handler)
348 logger.addHandler(handler)
353 logger.setLevel(loglevel)
349 logger.setLevel(loglevel)
354 return logger
350 return logger
355
351
General Comments 0
You need to be logged in to leave comments. Login now