##// END OF EJS Templates
match log format in Scheduler to rest of parallel apps
MinRK -
Show More
@@ -1,472 +1,476 b''
1 """some generic utilities for dealing with classes, urls, and serialization
1 """some generic utilities for dealing with classes, urls, and serialization
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 # Standard library imports.
18 # Standard library imports.
19 import logging
19 import logging
20 import os
20 import os
21 import re
21 import re
22 import stat
22 import stat
23 import socket
23 import socket
24 import sys
24 import sys
25 from signal import signal, SIGINT, SIGABRT, SIGTERM
25 from signal import signal, SIGINT, SIGABRT, SIGTERM
26 try:
26 try:
27 from signal import SIGKILL
27 from signal import SIGKILL
28 except ImportError:
28 except ImportError:
29 SIGKILL=None
29 SIGKILL=None
30
30
31 try:
31 try:
32 import cPickle
32 import cPickle
33 pickle = cPickle
33 pickle = cPickle
34 except:
34 except:
35 cPickle = None
35 cPickle = None
36 import pickle
36 import pickle
37
37
38 # System library imports
38 # System library imports
39 import zmq
39 import zmq
40 from zmq.log import handlers
40 from zmq.log import handlers
41
41
42 # IPython imports
42 # IPython imports
43 from IPython.config.application import Application
43 from IPython.config.application import Application
44 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
44 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
45 from IPython.utils.newserialized import serialize, unserialize
45 from IPython.utils.newserialized import serialize, unserialize
46 from IPython.zmq.log import EnginePUBHandler
46 from IPython.zmq.log import EnginePUBHandler
47
47
48 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
49 # Classes
49 # Classes
50 #-----------------------------------------------------------------------------
50 #-----------------------------------------------------------------------------
51
51
52 class Namespace(dict):
52 class Namespace(dict):
53 """Subclass of dict for attribute access to keys."""
53 """Subclass of dict for attribute access to keys."""
54
54
55 def __getattr__(self, key):
55 def __getattr__(self, key):
56 """getattr aliased to getitem"""
56 """getattr aliased to getitem"""
57 if key in self.iterkeys():
57 if key in self.iterkeys():
58 return self[key]
58 return self[key]
59 else:
59 else:
60 raise NameError(key)
60 raise NameError(key)
61
61
62 def __setattr__(self, key, value):
62 def __setattr__(self, key, value):
63 """setattr aliased to setitem, with strict"""
63 """setattr aliased to setitem, with strict"""
64 if hasattr(dict, key):
64 if hasattr(dict, key):
65 raise KeyError("Cannot override dict keys %r"%key)
65 raise KeyError("Cannot override dict keys %r"%key)
66 self[key] = value
66 self[key] = value
67
67
68
68
69 class ReverseDict(dict):
69 class ReverseDict(dict):
70 """simple double-keyed subset of dict methods."""
70 """simple double-keyed subset of dict methods."""
71
71
72 def __init__(self, *args, **kwargs):
72 def __init__(self, *args, **kwargs):
73 dict.__init__(self, *args, **kwargs)
73 dict.__init__(self, *args, **kwargs)
74 self._reverse = dict()
74 self._reverse = dict()
75 for key, value in self.iteritems():
75 for key, value in self.iteritems():
76 self._reverse[value] = key
76 self._reverse[value] = key
77
77
78 def __getitem__(self, key):
78 def __getitem__(self, key):
79 try:
79 try:
80 return dict.__getitem__(self, key)
80 return dict.__getitem__(self, key)
81 except KeyError:
81 except KeyError:
82 return self._reverse[key]
82 return self._reverse[key]
83
83
84 def __setitem__(self, key, value):
84 def __setitem__(self, key, value):
85 if key in self._reverse:
85 if key in self._reverse:
86 raise KeyError("Can't have key %r on both sides!"%key)
86 raise KeyError("Can't have key %r on both sides!"%key)
87 dict.__setitem__(self, key, value)
87 dict.__setitem__(self, key, value)
88 self._reverse[value] = key
88 self._reverse[value] = key
89
89
90 def pop(self, key):
90 def pop(self, key):
91 value = dict.pop(self, key)
91 value = dict.pop(self, key)
92 self._reverse.pop(value)
92 self._reverse.pop(value)
93 return value
93 return value
94
94
95 def get(self, key, default=None):
95 def get(self, key, default=None):
96 try:
96 try:
97 return self[key]
97 return self[key]
98 except KeyError:
98 except KeyError:
99 return default
99 return default
100
100
101 #-----------------------------------------------------------------------------
101 #-----------------------------------------------------------------------------
102 # Functions
102 # Functions
103 #-----------------------------------------------------------------------------
103 #-----------------------------------------------------------------------------
104
104
105 def asbytes(s):
105 def asbytes(s):
106 """ensure that an object is ascii bytes"""
106 """ensure that an object is ascii bytes"""
107 if isinstance(s, unicode):
107 if isinstance(s, unicode):
108 s = s.encode('ascii')
108 s = s.encode('ascii')
109 return s
109 return s
110
110
111 def is_url(url):
111 def is_url(url):
112 """boolean check for whether a string is a zmq url"""
112 """boolean check for whether a string is a zmq url"""
113 if '://' not in url:
113 if '://' not in url:
114 return False
114 return False
115 proto, addr = url.split('://', 1)
115 proto, addr = url.split('://', 1)
116 if proto.lower() not in ['tcp','pgm','epgm','ipc','inproc']:
116 if proto.lower() not in ['tcp','pgm','epgm','ipc','inproc']:
117 return False
117 return False
118 return True
118 return True
119
119
120 def validate_url(url):
120 def validate_url(url):
121 """validate a url for zeromq"""
121 """validate a url for zeromq"""
122 if not isinstance(url, basestring):
122 if not isinstance(url, basestring):
123 raise TypeError("url must be a string, not %r"%type(url))
123 raise TypeError("url must be a string, not %r"%type(url))
124 url = url.lower()
124 url = url.lower()
125
125
126 proto_addr = url.split('://')
126 proto_addr = url.split('://')
127 assert len(proto_addr) == 2, 'Invalid url: %r'%url
127 assert len(proto_addr) == 2, 'Invalid url: %r'%url
128 proto, addr = proto_addr
128 proto, addr = proto_addr
129 assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto
129 assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto
130
130
131 # domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391
131 # domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391
132 # author: Remi Sabourin
132 # author: Remi Sabourin
133 pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$')
133 pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$')
134
134
135 if proto == 'tcp':
135 if proto == 'tcp':
136 lis = addr.split(':')
136 lis = addr.split(':')
137 assert len(lis) == 2, 'Invalid url: %r'%url
137 assert len(lis) == 2, 'Invalid url: %r'%url
138 addr,s_port = lis
138 addr,s_port = lis
139 try:
139 try:
140 port = int(s_port)
140 port = int(s_port)
141 except ValueError:
141 except ValueError:
142 raise AssertionError("Invalid port %r in url: %r"%(port, url))
142 raise AssertionError("Invalid port %r in url: %r"%(port, url))
143
143
144 assert addr == '*' or pat.match(addr) is not None, 'Invalid url: %r'%url
144 assert addr == '*' or pat.match(addr) is not None, 'Invalid url: %r'%url
145
145
146 else:
146 else:
147 # only validate tcp urls currently
147 # only validate tcp urls currently
148 pass
148 pass
149
149
150 return True
150 return True
151
151
152
152
153 def validate_url_container(container):
153 def validate_url_container(container):
154 """validate a potentially nested collection of urls."""
154 """validate a potentially nested collection of urls."""
155 if isinstance(container, basestring):
155 if isinstance(container, basestring):
156 url = container
156 url = container
157 return validate_url(url)
157 return validate_url(url)
158 elif isinstance(container, dict):
158 elif isinstance(container, dict):
159 container = container.itervalues()
159 container = container.itervalues()
160
160
161 for element in container:
161 for element in container:
162 validate_url_container(element)
162 validate_url_container(element)
163
163
164
164
165 def split_url(url):
165 def split_url(url):
166 """split a zmq url (tcp://ip:port) into ('tcp','ip','port')."""
166 """split a zmq url (tcp://ip:port) into ('tcp','ip','port')."""
167 proto_addr = url.split('://')
167 proto_addr = url.split('://')
168 assert len(proto_addr) == 2, 'Invalid url: %r'%url
168 assert len(proto_addr) == 2, 'Invalid url: %r'%url
169 proto, addr = proto_addr
169 proto, addr = proto_addr
170 lis = addr.split(':')
170 lis = addr.split(':')
171 assert len(lis) == 2, 'Invalid url: %r'%url
171 assert len(lis) == 2, 'Invalid url: %r'%url
172 addr,s_port = lis
172 addr,s_port = lis
173 return proto,addr,s_port
173 return proto,addr,s_port
174
174
175 def disambiguate_ip_address(ip, location=None):
175 def disambiguate_ip_address(ip, location=None):
176 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
176 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
177 ones, based on the location (default interpretation of location is localhost)."""
177 ones, based on the location (default interpretation of location is localhost)."""
178 if ip in ('0.0.0.0', '*'):
178 if ip in ('0.0.0.0', '*'):
179 try:
179 try:
180 external_ips = socket.gethostbyname_ex(socket.gethostname())[2]
180 external_ips = socket.gethostbyname_ex(socket.gethostname())[2]
181 except (socket.gaierror, IndexError):
181 except (socket.gaierror, IndexError):
182 # couldn't identify this machine, assume localhost
182 # couldn't identify this machine, assume localhost
183 external_ips = []
183 external_ips = []
184 if location is None or location in external_ips or not external_ips:
184 if location is None or location in external_ips or not external_ips:
185 # If location is unspecified or cannot be determined, assume local
185 # If location is unspecified or cannot be determined, assume local
186 ip='127.0.0.1'
186 ip='127.0.0.1'
187 elif location:
187 elif location:
188 return location
188 return location
189 return ip
189 return ip
190
190
191 def disambiguate_url(url, location=None):
191 def disambiguate_url(url, location=None):
192 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
192 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
193 ones, based on the location (default interpretation is localhost).
193 ones, based on the location (default interpretation is localhost).
194
194
195 This is for zeromq urls, such as tcp://*:10101."""
195 This is for zeromq urls, such as tcp://*:10101."""
196 try:
196 try:
197 proto,ip,port = split_url(url)
197 proto,ip,port = split_url(url)
198 except AssertionError:
198 except AssertionError:
199 # probably not tcp url; could be ipc, etc.
199 # probably not tcp url; could be ipc, etc.
200 return url
200 return url
201
201
202 ip = disambiguate_ip_address(ip,location)
202 ip = disambiguate_ip_address(ip,location)
203
203
204 return "%s://%s:%s"%(proto,ip,port)
204 return "%s://%s:%s"%(proto,ip,port)
205
205
206 def serialize_object(obj, threshold=64e-6):
206 def serialize_object(obj, threshold=64e-6):
207 """Serialize an object into a list of sendable buffers.
207 """Serialize an object into a list of sendable buffers.
208
208
209 Parameters
209 Parameters
210 ----------
210 ----------
211
211
212 obj : object
212 obj : object
213 The object to be serialized
213 The object to be serialized
214 threshold : float
214 threshold : float
215 The threshold for not double-pickling the content.
215 The threshold for not double-pickling the content.
216
216
217
217
218 Returns
218 Returns
219 -------
219 -------
220 ('pmd', [bufs]) :
220 ('pmd', [bufs]) :
221 where pmd is the pickled metadata wrapper,
221 where pmd is the pickled metadata wrapper,
222 bufs is a list of data buffers
222 bufs is a list of data buffers
223 """
223 """
224 databuffers = []
224 databuffers = []
225 if isinstance(obj, (list, tuple)):
225 if isinstance(obj, (list, tuple)):
226 clist = canSequence(obj)
226 clist = canSequence(obj)
227 slist = map(serialize, clist)
227 slist = map(serialize, clist)
228 for s in slist:
228 for s in slist:
229 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
229 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
230 databuffers.append(s.getData())
230 databuffers.append(s.getData())
231 s.data = None
231 s.data = None
232 return pickle.dumps(slist,-1), databuffers
232 return pickle.dumps(slist,-1), databuffers
233 elif isinstance(obj, dict):
233 elif isinstance(obj, dict):
234 sobj = {}
234 sobj = {}
235 for k in sorted(obj.iterkeys()):
235 for k in sorted(obj.iterkeys()):
236 s = serialize(can(obj[k]))
236 s = serialize(can(obj[k]))
237 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
237 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
238 databuffers.append(s.getData())
238 databuffers.append(s.getData())
239 s.data = None
239 s.data = None
240 sobj[k] = s
240 sobj[k] = s
241 return pickle.dumps(sobj,-1),databuffers
241 return pickle.dumps(sobj,-1),databuffers
242 else:
242 else:
243 s = serialize(can(obj))
243 s = serialize(can(obj))
244 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
244 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
245 databuffers.append(s.getData())
245 databuffers.append(s.getData())
246 s.data = None
246 s.data = None
247 return pickle.dumps(s,-1),databuffers
247 return pickle.dumps(s,-1),databuffers
248
248
249
249
250 def unserialize_object(bufs):
250 def unserialize_object(bufs):
251 """reconstruct an object serialized by serialize_object from data buffers."""
251 """reconstruct an object serialized by serialize_object from data buffers."""
252 bufs = list(bufs)
252 bufs = list(bufs)
253 sobj = pickle.loads(bufs.pop(0))
253 sobj = pickle.loads(bufs.pop(0))
254 if isinstance(sobj, (list, tuple)):
254 if isinstance(sobj, (list, tuple)):
255 for s in sobj:
255 for s in sobj:
256 if s.data is None:
256 if s.data is None:
257 s.data = bufs.pop(0)
257 s.data = bufs.pop(0)
258 return uncanSequence(map(unserialize, sobj)), bufs
258 return uncanSequence(map(unserialize, sobj)), bufs
259 elif isinstance(sobj, dict):
259 elif isinstance(sobj, dict):
260 newobj = {}
260 newobj = {}
261 for k in sorted(sobj.iterkeys()):
261 for k in sorted(sobj.iterkeys()):
262 s = sobj[k]
262 s = sobj[k]
263 if s.data is None:
263 if s.data is None:
264 s.data = bufs.pop(0)
264 s.data = bufs.pop(0)
265 newobj[k] = uncan(unserialize(s))
265 newobj[k] = uncan(unserialize(s))
266 return newobj, bufs
266 return newobj, bufs
267 else:
267 else:
268 if sobj.data is None:
268 if sobj.data is None:
269 sobj.data = bufs.pop(0)
269 sobj.data = bufs.pop(0)
270 return uncan(unserialize(sobj)), bufs
270 return uncan(unserialize(sobj)), bufs
271
271
272 def pack_apply_message(f, args, kwargs, threshold=64e-6):
272 def pack_apply_message(f, args, kwargs, threshold=64e-6):
273 """pack up a function, args, and kwargs to be sent over the wire
273 """pack up a function, args, and kwargs to be sent over the wire
274 as a series of buffers. Any object whose data is larger than `threshold`
274 as a series of buffers. Any object whose data is larger than `threshold`
275 will not have their data copied (currently only numpy arrays support zero-copy)"""
275 will not have their data copied (currently only numpy arrays support zero-copy)"""
276 msg = [pickle.dumps(can(f),-1)]
276 msg = [pickle.dumps(can(f),-1)]
277 databuffers = [] # for large objects
277 databuffers = [] # for large objects
278 sargs, bufs = serialize_object(args,threshold)
278 sargs, bufs = serialize_object(args,threshold)
279 msg.append(sargs)
279 msg.append(sargs)
280 databuffers.extend(bufs)
280 databuffers.extend(bufs)
281 skwargs, bufs = serialize_object(kwargs,threshold)
281 skwargs, bufs = serialize_object(kwargs,threshold)
282 msg.append(skwargs)
282 msg.append(skwargs)
283 databuffers.extend(bufs)
283 databuffers.extend(bufs)
284 msg.extend(databuffers)
284 msg.extend(databuffers)
285 return msg
285 return msg
286
286
287 def unpack_apply_message(bufs, g=None, copy=True):
287 def unpack_apply_message(bufs, g=None, copy=True):
288 """unpack f,args,kwargs from buffers packed by pack_apply_message()
288 """unpack f,args,kwargs from buffers packed by pack_apply_message()
289 Returns: original f,args,kwargs"""
289 Returns: original f,args,kwargs"""
290 bufs = list(bufs) # allow us to pop
290 bufs = list(bufs) # allow us to pop
291 assert len(bufs) >= 3, "not enough buffers!"
291 assert len(bufs) >= 3, "not enough buffers!"
292 if not copy:
292 if not copy:
293 for i in range(3):
293 for i in range(3):
294 bufs[i] = bufs[i].bytes
294 bufs[i] = bufs[i].bytes
295 cf = pickle.loads(bufs.pop(0))
295 cf = pickle.loads(bufs.pop(0))
296 sargs = list(pickle.loads(bufs.pop(0)))
296 sargs = list(pickle.loads(bufs.pop(0)))
297 skwargs = dict(pickle.loads(bufs.pop(0)))
297 skwargs = dict(pickle.loads(bufs.pop(0)))
298 # print sargs, skwargs
298 # print sargs, skwargs
299 f = uncan(cf, g)
299 f = uncan(cf, g)
300 for sa in sargs:
300 for sa in sargs:
301 if sa.data is None:
301 if sa.data is None:
302 m = bufs.pop(0)
302 m = bufs.pop(0)
303 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
303 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
304 # always use a buffer, until memoryviews get sorted out
304 # always use a buffer, until memoryviews get sorted out
305 sa.data = buffer(m)
305 sa.data = buffer(m)
306 # disable memoryview support
306 # disable memoryview support
307 # if copy:
307 # if copy:
308 # sa.data = buffer(m)
308 # sa.data = buffer(m)
309 # else:
309 # else:
310 # sa.data = m.buffer
310 # sa.data = m.buffer
311 else:
311 else:
312 if copy:
312 if copy:
313 sa.data = m
313 sa.data = m
314 else:
314 else:
315 sa.data = m.bytes
315 sa.data = m.bytes
316
316
317 args = uncanSequence(map(unserialize, sargs), g)
317 args = uncanSequence(map(unserialize, sargs), g)
318 kwargs = {}
318 kwargs = {}
319 for k in sorted(skwargs.iterkeys()):
319 for k in sorted(skwargs.iterkeys()):
320 sa = skwargs[k]
320 sa = skwargs[k]
321 if sa.data is None:
321 if sa.data is None:
322 m = bufs.pop(0)
322 m = bufs.pop(0)
323 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
323 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
324 # always use a buffer, until memoryviews get sorted out
324 # always use a buffer, until memoryviews get sorted out
325 sa.data = buffer(m)
325 sa.data = buffer(m)
326 # disable memoryview support
326 # disable memoryview support
327 # if copy:
327 # if copy:
328 # sa.data = buffer(m)
328 # sa.data = buffer(m)
329 # else:
329 # else:
330 # sa.data = m.buffer
330 # sa.data = m.buffer
331 else:
331 else:
332 if copy:
332 if copy:
333 sa.data = m
333 sa.data = m
334 else:
334 else:
335 sa.data = m.bytes
335 sa.data = m.bytes
336
336
337 kwargs[k] = uncan(unserialize(sa), g)
337 kwargs[k] = uncan(unserialize(sa), g)
338
338
339 return f,args,kwargs
339 return f,args,kwargs
340
340
341 #--------------------------------------------------------------------------
341 #--------------------------------------------------------------------------
342 # helpers for implementing old MEC API via view.apply
342 # helpers for implementing old MEC API via view.apply
343 #--------------------------------------------------------------------------
343 #--------------------------------------------------------------------------
344
344
345 def interactive(f):
345 def interactive(f):
346 """decorator for making functions appear as interactively defined.
346 """decorator for making functions appear as interactively defined.
347 This results in the function being linked to the user_ns as globals()
347 This results in the function being linked to the user_ns as globals()
348 instead of the module globals().
348 instead of the module globals().
349 """
349 """
350 f.__module__ = '__main__'
350 f.__module__ = '__main__'
351 return f
351 return f
352
352
353 @interactive
353 @interactive
354 def _push(ns):
354 def _push(ns):
355 """helper method for implementing `client.push` via `client.apply`"""
355 """helper method for implementing `client.push` via `client.apply`"""
356 globals().update(ns)
356 globals().update(ns)
357
357
358 @interactive
358 @interactive
359 def _pull(keys):
359 def _pull(keys):
360 """helper method for implementing `client.pull` via `client.apply`"""
360 """helper method for implementing `client.pull` via `client.apply`"""
361 user_ns = globals()
361 user_ns = globals()
362 if isinstance(keys, (list,tuple, set)):
362 if isinstance(keys, (list,tuple, set)):
363 for key in keys:
363 for key in keys:
364 if not user_ns.has_key(key):
364 if not user_ns.has_key(key):
365 raise NameError("name '%s' is not defined"%key)
365 raise NameError("name '%s' is not defined"%key)
366 return map(user_ns.get, keys)
366 return map(user_ns.get, keys)
367 else:
367 else:
368 if not user_ns.has_key(keys):
368 if not user_ns.has_key(keys):
369 raise NameError("name '%s' is not defined"%keys)
369 raise NameError("name '%s' is not defined"%keys)
370 return user_ns.get(keys)
370 return user_ns.get(keys)
371
371
372 @interactive
372 @interactive
373 def _execute(code):
373 def _execute(code):
374 """helper method for implementing `client.execute` via `client.apply`"""
374 """helper method for implementing `client.execute` via `client.apply`"""
375 exec code in globals()
375 exec code in globals()
376
376
377 #--------------------------------------------------------------------------
377 #--------------------------------------------------------------------------
378 # extra process management utilities
378 # extra process management utilities
379 #--------------------------------------------------------------------------
379 #--------------------------------------------------------------------------
380
380
381 _random_ports = set()
381 _random_ports = set()
382
382
383 def select_random_ports(n):
383 def select_random_ports(n):
384 """Selects and return n random ports that are available."""
384 """Selects and return n random ports that are available."""
385 ports = []
385 ports = []
386 for i in xrange(n):
386 for i in xrange(n):
387 sock = socket.socket()
387 sock = socket.socket()
388 sock.bind(('', 0))
388 sock.bind(('', 0))
389 while sock.getsockname()[1] in _random_ports:
389 while sock.getsockname()[1] in _random_ports:
390 sock.close()
390 sock.close()
391 sock = socket.socket()
391 sock = socket.socket()
392 sock.bind(('', 0))
392 sock.bind(('', 0))
393 ports.append(sock)
393 ports.append(sock)
394 for i, sock in enumerate(ports):
394 for i, sock in enumerate(ports):
395 port = sock.getsockname()[1]
395 port = sock.getsockname()[1]
396 sock.close()
396 sock.close()
397 ports[i] = port
397 ports[i] = port
398 _random_ports.add(port)
398 _random_ports.add(port)
399 return ports
399 return ports
400
400
401 def signal_children(children):
401 def signal_children(children):
402 """Relay interupt/term signals to children, for more solid process cleanup."""
402 """Relay interupt/term signals to children, for more solid process cleanup."""
403 def terminate_children(sig, frame):
403 def terminate_children(sig, frame):
404 log = Application.instance().log
404 log = Application.instance().log
405 log.critical("Got signal %i, terminating children..."%sig)
405 log.critical("Got signal %i, terminating children..."%sig)
406 for child in children:
406 for child in children:
407 child.terminate()
407 child.terminate()
408
408
409 sys.exit(sig != SIGINT)
409 sys.exit(sig != SIGINT)
410 # sys.exit(sig)
410 # sys.exit(sig)
411 for sig in (SIGINT, SIGABRT, SIGTERM):
411 for sig in (SIGINT, SIGABRT, SIGTERM):
412 signal(sig, terminate_children)
412 signal(sig, terminate_children)
413
413
414 def generate_exec_key(keyfile):
414 def generate_exec_key(keyfile):
415 import uuid
415 import uuid
416 newkey = str(uuid.uuid4())
416 newkey = str(uuid.uuid4())
417 with open(keyfile, 'w') as f:
417 with open(keyfile, 'w') as f:
418 # f.write('ipython-key ')
418 # f.write('ipython-key ')
419 f.write(newkey+'\n')
419 f.write(newkey+'\n')
420 # set user-only RW permissions (0600)
420 # set user-only RW permissions (0600)
421 # this will have no effect on Windows
421 # this will have no effect on Windows
422 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
422 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
423
423
424
424
425 def integer_loglevel(loglevel):
425 def integer_loglevel(loglevel):
426 try:
426 try:
427 loglevel = int(loglevel)
427 loglevel = int(loglevel)
428 except ValueError:
428 except ValueError:
429 if isinstance(loglevel, str):
429 if isinstance(loglevel, str):
430 loglevel = getattr(logging, loglevel)
430 loglevel = getattr(logging, loglevel)
431 return loglevel
431 return loglevel
432
432
433 def connect_logger(logname, context, iface, root="ip", loglevel=logging.DEBUG):
433 def connect_logger(logname, context, iface, root="ip", loglevel=logging.DEBUG):
434 logger = logging.getLogger(logname)
434 logger = logging.getLogger(logname)
435 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
435 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
436 # don't add a second PUBHandler
436 # don't add a second PUBHandler
437 return
437 return
438 loglevel = integer_loglevel(loglevel)
438 loglevel = integer_loglevel(loglevel)
439 lsock = context.socket(zmq.PUB)
439 lsock = context.socket(zmq.PUB)
440 lsock.connect(iface)
440 lsock.connect(iface)
441 handler = handlers.PUBHandler(lsock)
441 handler = handlers.PUBHandler(lsock)
442 handler.setLevel(loglevel)
442 handler.setLevel(loglevel)
443 handler.root_topic = root
443 handler.root_topic = root
444 logger.addHandler(handler)
444 logger.addHandler(handler)
445 logger.setLevel(loglevel)
445 logger.setLevel(loglevel)
446
446
447 def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):
447 def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):
448 logger = logging.getLogger()
448 logger = logging.getLogger()
449 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
449 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
450 # don't add a second PUBHandler
450 # don't add a second PUBHandler
451 return
451 return
452 loglevel = integer_loglevel(loglevel)
452 loglevel = integer_loglevel(loglevel)
453 lsock = context.socket(zmq.PUB)
453 lsock = context.socket(zmq.PUB)
454 lsock.connect(iface)
454 lsock.connect(iface)
455 handler = EnginePUBHandler(engine, lsock)
455 handler = EnginePUBHandler(engine, lsock)
456 handler.setLevel(loglevel)
456 handler.setLevel(loglevel)
457 logger.addHandler(handler)
457 logger.addHandler(handler)
458 logger.setLevel(loglevel)
458 logger.setLevel(loglevel)
459 return logger
459 return logger
460
460
461 def local_logger(logname, loglevel=logging.DEBUG):
461 def local_logger(logname, loglevel=logging.DEBUG):
462 loglevel = integer_loglevel(loglevel)
462 loglevel = integer_loglevel(loglevel)
463 logger = logging.getLogger(logname)
463 logger = logging.getLogger(logname)
464 if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]):
464 if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]):
465 # don't add a second StreamHandler
465 # don't add a second StreamHandler
466 return
466 return
467 handler = logging.StreamHandler()
467 handler = logging.StreamHandler()
468 handler.setLevel(loglevel)
468 handler.setLevel(loglevel)
469 formatter = logging.Formatter("%(asctime)s.%(msecs).03d [%(name)s] %(message)s",
470 datefmt="%Y-%m-%d %H:%M:%S")
471 handler.setFormatter(formatter)
472
469 logger.addHandler(handler)
473 logger.addHandler(handler)
470 logger.setLevel(loglevel)
474 logger.setLevel(loglevel)
471 return logger
475 return logger
472
476
General Comments 0
You need to be logged in to leave comments. Login now