##// END OF EJS Templates
IPython.parallel logging cleanup...
MinRK -
Show More
@@ -1,241 +1,245 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The Base Application class for IPython.parallel apps
4 The Base Application class for IPython.parallel apps
5
5
6 Authors:
6 Authors:
7
7
8 * Brian Granger
8 * Brian Granger
9 * Min RK
9 * Min RK
10
10
11 """
11 """
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Copyright (C) 2008-2011 The IPython Development Team
14 # Copyright (C) 2008-2011 The IPython Development Team
15 #
15 #
16 # Distributed under the terms of the BSD License. The full license is in
16 # Distributed under the terms of the BSD License. The full license is in
17 # the file COPYING, distributed as part of this software.
17 # the file COPYING, distributed as part of this software.
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21 # Imports
21 # Imports
22 #-----------------------------------------------------------------------------
22 #-----------------------------------------------------------------------------
23
23
24 from __future__ import with_statement
24 from __future__ import with_statement
25
25
26 import os
26 import os
27 import logging
27 import logging
28 import re
28 import re
29 import sys
29 import sys
30
30
31 from subprocess import Popen, PIPE
31 from subprocess import Popen, PIPE
32
32
33 from IPython.core import release
33 from IPython.core import release
34 from IPython.core.crashhandler import CrashHandler
34 from IPython.core.crashhandler import CrashHandler
35 from IPython.core.application import (
35 from IPython.core.application import (
36 BaseIPythonApplication,
36 BaseIPythonApplication,
37 base_aliases as base_ip_aliases,
37 base_aliases as base_ip_aliases,
38 base_flags as base_ip_flags
38 base_flags as base_ip_flags
39 )
39 )
40 from IPython.utils.path import expand_path
40 from IPython.utils.path import expand_path
41
41
42 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict, List
42 from IPython.utils.traitlets import Unicode, Bool, Instance, Dict, List
43
43
44 #-----------------------------------------------------------------------------
44 #-----------------------------------------------------------------------------
45 # Module errors
45 # Module errors
46 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
47
47
48 class PIDFileError(Exception):
48 class PIDFileError(Exception):
49 pass
49 pass
50
50
51
51
52 #-----------------------------------------------------------------------------
52 #-----------------------------------------------------------------------------
53 # Crash handler for this application
53 # Crash handler for this application
54 #-----------------------------------------------------------------------------
54 #-----------------------------------------------------------------------------
55
55
56 class ParallelCrashHandler(CrashHandler):
56 class ParallelCrashHandler(CrashHandler):
57 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
57 """sys.excepthook for IPython itself, leaves a detailed report on disk."""
58
58
59 def __init__(self, app):
59 def __init__(self, app):
60 contact_name = release.authors['Min'][0]
60 contact_name = release.authors['Min'][0]
61 contact_email = release.authors['Min'][1]
61 contact_email = release.authors['Min'][1]
62 bug_tracker = 'http://github.com/ipython/ipython/issues'
62 bug_tracker = 'http://github.com/ipython/ipython/issues'
63 super(ParallelCrashHandler,self).__init__(
63 super(ParallelCrashHandler,self).__init__(
64 app, contact_name, contact_email, bug_tracker
64 app, contact_name, contact_email, bug_tracker
65 )
65 )
66
66
67
67
68 #-----------------------------------------------------------------------------
68 #-----------------------------------------------------------------------------
69 # Main application
69 # Main application
70 #-----------------------------------------------------------------------------
70 #-----------------------------------------------------------------------------
71 base_aliases = {}
71 base_aliases = {}
72 base_aliases.update(base_ip_aliases)
72 base_aliases.update(base_ip_aliases)
73 base_aliases.update({
73 base_aliases.update({
74 'profile-dir' : 'ProfileDir.location',
74 'profile-dir' : 'ProfileDir.location',
75 'work-dir' : 'BaseParallelApplication.work_dir',
75 'work-dir' : 'BaseParallelApplication.work_dir',
76 'log-to-file' : 'BaseParallelApplication.log_to_file',
76 'log-to-file' : 'BaseParallelApplication.log_to_file',
77 'clean-logs' : 'BaseParallelApplication.clean_logs',
77 'clean-logs' : 'BaseParallelApplication.clean_logs',
78 'log-url' : 'BaseParallelApplication.log_url',
78 'log-url' : 'BaseParallelApplication.log_url',
79 })
79 })
80
80
81 base_flags = {
81 base_flags = {
82 'log-to-file' : (
82 'log-to-file' : (
83 {'BaseParallelApplication' : {'log_to_file' : True}},
83 {'BaseParallelApplication' : {'log_to_file' : True}},
84 "send log output to a file"
84 "send log output to a file"
85 )
85 )
86 }
86 }
87 base_flags.update(base_ip_flags)
87 base_flags.update(base_ip_flags)
88
88
89 class BaseParallelApplication(BaseIPythonApplication):
89 class BaseParallelApplication(BaseIPythonApplication):
90 """The base Application for IPython.parallel apps
90 """The base Application for IPython.parallel apps
91
91
92 Principle extensions to BaseIPyythonApplication:
92 Principle extensions to BaseIPyythonApplication:
93
93
94 * work_dir
94 * work_dir
95 * remote logging via pyzmq
95 * remote logging via pyzmq
96 * IOLoop instance
96 * IOLoop instance
97 """
97 """
98
98
99 crash_handler_class = ParallelCrashHandler
99 crash_handler_class = ParallelCrashHandler
100
100
101 def _log_level_default(self):
101 def _log_level_default(self):
102 # temporarily override default_log_level to INFO
102 # temporarily override default_log_level to INFO
103 return logging.INFO
103 return logging.INFO
104
104
105 work_dir = Unicode(os.getcwdu(), config=True,
105 work_dir = Unicode(os.getcwdu(), config=True,
106 help='Set the working dir for the process.'
106 help='Set the working dir for the process.'
107 )
107 )
108 def _work_dir_changed(self, name, old, new):
108 def _work_dir_changed(self, name, old, new):
109 self.work_dir = unicode(expand_path(new))
109 self.work_dir = unicode(expand_path(new))
110
110
111 log_to_file = Bool(config=True,
111 log_to_file = Bool(config=True,
112 help="whether to log to a file")
112 help="whether to log to a file")
113
113
114 clean_logs = Bool(False, config=True,
114 clean_logs = Bool(False, config=True,
115 help="whether to cleanup old logfiles before starting")
115 help="whether to cleanup old logfiles before starting")
116
116
117 log_url = Unicode('', config=True,
117 log_url = Unicode('', config=True,
118 help="The ZMQ URL of the iplogger to aggregate logging.")
118 help="The ZMQ URL of the iplogger to aggregate logging.")
119
119
120 def _config_files_default(self):
120 def _config_files_default(self):
121 return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
121 return ['ipcontroller_config.py', 'ipengine_config.py', 'ipcluster_config.py']
122
122
123 loop = Instance('zmq.eventloop.ioloop.IOLoop')
123 loop = Instance('zmq.eventloop.ioloop.IOLoop')
124 def _loop_default(self):
124 def _loop_default(self):
125 from zmq.eventloop.ioloop import IOLoop
125 from zmq.eventloop.ioloop import IOLoop
126 return IOLoop.instance()
126 return IOLoop.instance()
127
127
128 aliases = Dict(base_aliases)
128 aliases = Dict(base_aliases)
129 flags = Dict(base_flags)
129 flags = Dict(base_flags)
130
130
131 def initialize(self, argv=None):
131 def initialize(self, argv=None):
132 """initialize the app"""
132 """initialize the app"""
133 super(BaseParallelApplication, self).initialize(argv)
133 super(BaseParallelApplication, self).initialize(argv)
134 self.to_work_dir()
134 self.to_work_dir()
135 self.reinit_logging()
135 self.reinit_logging()
136
136
137 def to_work_dir(self):
137 def to_work_dir(self):
138 wd = self.work_dir
138 wd = self.work_dir
139 if unicode(wd) != os.getcwdu():
139 if unicode(wd) != os.getcwdu():
140 os.chdir(wd)
140 os.chdir(wd)
141 self.log.info("Changing to working dir: %s" % wd)
141 self.log.info("Changing to working dir: %s" % wd)
142 # This is the working dir by now.
142 # This is the working dir by now.
143 sys.path.insert(0, '')
143 sys.path.insert(0, '')
144
144
145 def reinit_logging(self):
145 def reinit_logging(self):
146 # Remove old log files
146 # Remove old log files
147 log_dir = self.profile_dir.log_dir
147 log_dir = self.profile_dir.log_dir
148 if self.clean_logs:
148 if self.clean_logs:
149 for f in os.listdir(log_dir):
149 for f in os.listdir(log_dir):
150 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
150 if re.match(r'%s-\d+\.(log|err|out)'%self.name,f):
151 os.remove(os.path.join(log_dir, f))
151 os.remove(os.path.join(log_dir, f))
152 if self.log_to_file:
152 if self.log_to_file:
153 # Start logging to the new log file
153 # Start logging to the new log file
154 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
154 log_filename = self.name + u'-' + str(os.getpid()) + u'.log'
155 logfile = os.path.join(log_dir, log_filename)
155 logfile = os.path.join(log_dir, log_filename)
156 open_log_file = open(logfile, 'w')
156 open_log_file = open(logfile, 'w')
157 else:
157 else:
158 open_log_file = None
158 open_log_file = None
159 if open_log_file is not None:
159 if open_log_file is not None:
160 self.log.removeHandler(self._log_handler)
160 self.log.removeHandler(self._log_handler)
161 self._log_handler = logging.StreamHandler(open_log_file)
161 self._log_handler = logging.StreamHandler(open_log_file)
162 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
162 self._log_formatter = logging.Formatter("[%(name)s] %(message)s")
163 self._log_handler.setFormatter(self._log_formatter)
163 self._log_handler.setFormatter(self._log_formatter)
164 self.log.addHandler(self._log_handler)
164 self.log.addHandler(self._log_handler)
165 # do not propagate log messages to root logger
166 # ipcluster app will sometimes print duplicate messages during shutdown
167 # if this is 1 (default):
168 self.log.propagate = False
165
169
166 def write_pid_file(self, overwrite=False):
170 def write_pid_file(self, overwrite=False):
167 """Create a .pid file in the pid_dir with my pid.
171 """Create a .pid file in the pid_dir with my pid.
168
172
169 This must be called after pre_construct, which sets `self.pid_dir`.
173 This must be called after pre_construct, which sets `self.pid_dir`.
170 This raises :exc:`PIDFileError` if the pid file exists already.
174 This raises :exc:`PIDFileError` if the pid file exists already.
171 """
175 """
172 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
176 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
173 if os.path.isfile(pid_file):
177 if os.path.isfile(pid_file):
174 pid = self.get_pid_from_file()
178 pid = self.get_pid_from_file()
175 if not overwrite:
179 if not overwrite:
176 raise PIDFileError(
180 raise PIDFileError(
177 'The pid file [%s] already exists. \nThis could mean that this '
181 'The pid file [%s] already exists. \nThis could mean that this '
178 'server is already running with [pid=%s].' % (pid_file, pid)
182 'server is already running with [pid=%s].' % (pid_file, pid)
179 )
183 )
180 with open(pid_file, 'w') as f:
184 with open(pid_file, 'w') as f:
181 self.log.info("Creating pid file: %s" % pid_file)
185 self.log.info("Creating pid file: %s" % pid_file)
182 f.write(repr(os.getpid())+'\n')
186 f.write(repr(os.getpid())+'\n')
183
187
184 def remove_pid_file(self):
188 def remove_pid_file(self):
185 """Remove the pid file.
189 """Remove the pid file.
186
190
187 This should be called at shutdown by registering a callback with
191 This should be called at shutdown by registering a callback with
188 :func:`reactor.addSystemEventTrigger`. This needs to return
192 :func:`reactor.addSystemEventTrigger`. This needs to return
189 ``None``.
193 ``None``.
190 """
194 """
191 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
195 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
192 if os.path.isfile(pid_file):
196 if os.path.isfile(pid_file):
193 try:
197 try:
194 self.log.info("Removing pid file: %s" % pid_file)
198 self.log.info("Removing pid file: %s" % pid_file)
195 os.remove(pid_file)
199 os.remove(pid_file)
196 except:
200 except:
197 self.log.warn("Error removing the pid file: %s" % pid_file)
201 self.log.warn("Error removing the pid file: %s" % pid_file)
198
202
199 def get_pid_from_file(self):
203 def get_pid_from_file(self):
200 """Get the pid from the pid file.
204 """Get the pid from the pid file.
201
205
202 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
206 If the pid file doesn't exist a :exc:`PIDFileError` is raised.
203 """
207 """
204 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
208 pid_file = os.path.join(self.profile_dir.pid_dir, self.name + u'.pid')
205 if os.path.isfile(pid_file):
209 if os.path.isfile(pid_file):
206 with open(pid_file, 'r') as f:
210 with open(pid_file, 'r') as f:
207 s = f.read().strip()
211 s = f.read().strip()
208 try:
212 try:
209 pid = int(s)
213 pid = int(s)
210 except:
214 except:
211 raise PIDFileError("invalid pid file: %s (contents: %r)"%(pid_file, s))
215 raise PIDFileError("invalid pid file: %s (contents: %r)"%(pid_file, s))
212 return pid
216 return pid
213 else:
217 else:
214 raise PIDFileError('pid file not found: %s' % pid_file)
218 raise PIDFileError('pid file not found: %s' % pid_file)
215
219
216 def check_pid(self, pid):
220 def check_pid(self, pid):
217 if os.name == 'nt':
221 if os.name == 'nt':
218 try:
222 try:
219 import ctypes
223 import ctypes
220 # returns 0 if no such process (of ours) exists
224 # returns 0 if no such process (of ours) exists
221 # positive int otherwise
225 # positive int otherwise
222 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
226 p = ctypes.windll.kernel32.OpenProcess(1,0,pid)
223 except Exception:
227 except Exception:
224 self.log.warn(
228 self.log.warn(
225 "Could not determine whether pid %i is running via `OpenProcess`. "
229 "Could not determine whether pid %i is running via `OpenProcess`. "
226 " Making the likely assumption that it is."%pid
230 " Making the likely assumption that it is."%pid
227 )
231 )
228 return True
232 return True
229 return bool(p)
233 return bool(p)
230 else:
234 else:
231 try:
235 try:
232 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
236 p = Popen(['ps','x'], stdout=PIPE, stderr=PIPE)
233 output,_ = p.communicate()
237 output,_ = p.communicate()
234 except OSError:
238 except OSError:
235 self.log.warn(
239 self.log.warn(
236 "Could not determine whether pid %i is running via `ps x`. "
240 "Could not determine whether pid %i is running via `ps x`. "
237 " Making the likely assumption that it is."%pid
241 " Making the likely assumption that it is."%pid
238 )
242 )
239 return True
243 return True
240 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
244 pids = map(int, re.findall(r'^\W*\d+', output, re.MULTILINE))
241 return pid in pids
245 return pid in pids
@@ -1,461 +1,463 b''
1 """some generic utilities for dealing with classes, urls, and serialization
1 """some generic utilities for dealing with classes, urls, and serialization
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 # Standard library imports.
18 # Standard library imports.
19 import logging
19 import logging
20 import os
20 import os
21 import re
21 import re
22 import stat
22 import stat
23 import socket
23 import socket
24 import sys
24 import sys
25 from signal import signal, SIGINT, SIGABRT, SIGTERM
25 from signal import signal, SIGINT, SIGABRT, SIGTERM
26 try:
26 try:
27 from signal import SIGKILL
27 from signal import SIGKILL
28 except ImportError:
28 except ImportError:
29 SIGKILL=None
29 SIGKILL=None
30
30
31 try:
31 try:
32 import cPickle
32 import cPickle
33 pickle = cPickle
33 pickle = cPickle
34 except:
34 except:
35 cPickle = None
35 cPickle = None
36 import pickle
36 import pickle
37
37
38 # System library imports
38 # System library imports
39 import zmq
39 import zmq
40 from zmq.log import handlers
40 from zmq.log import handlers
41
41
42 # IPython imports
42 # IPython imports
43 from IPython.config.application import Application
43 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
44 from IPython.utils.pickleutil import can, uncan, canSequence, uncanSequence
44 from IPython.utils.newserialized import serialize, unserialize
45 from IPython.utils.newserialized import serialize, unserialize
45 from IPython.zmq.log import EnginePUBHandler
46 from IPython.zmq.log import EnginePUBHandler
46
47
47 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
48 # Classes
49 # Classes
49 #-----------------------------------------------------------------------------
50 #-----------------------------------------------------------------------------
50
51
51 class Namespace(dict):
52 class Namespace(dict):
52 """Subclass of dict for attribute access to keys."""
53 """Subclass of dict for attribute access to keys."""
53
54
54 def __getattr__(self, key):
55 def __getattr__(self, key):
55 """getattr aliased to getitem"""
56 """getattr aliased to getitem"""
56 if key in self.iterkeys():
57 if key in self.iterkeys():
57 return self[key]
58 return self[key]
58 else:
59 else:
59 raise NameError(key)
60 raise NameError(key)
60
61
61 def __setattr__(self, key, value):
62 def __setattr__(self, key, value):
62 """setattr aliased to setitem, with strict"""
63 """setattr aliased to setitem, with strict"""
63 if hasattr(dict, key):
64 if hasattr(dict, key):
64 raise KeyError("Cannot override dict keys %r"%key)
65 raise KeyError("Cannot override dict keys %r"%key)
65 self[key] = value
66 self[key] = value
66
67
67
68
68 class ReverseDict(dict):
69 class ReverseDict(dict):
69 """simple double-keyed subset of dict methods."""
70 """simple double-keyed subset of dict methods."""
70
71
71 def __init__(self, *args, **kwargs):
72 def __init__(self, *args, **kwargs):
72 dict.__init__(self, *args, **kwargs)
73 dict.__init__(self, *args, **kwargs)
73 self._reverse = dict()
74 self._reverse = dict()
74 for key, value in self.iteritems():
75 for key, value in self.iteritems():
75 self._reverse[value] = key
76 self._reverse[value] = key
76
77
77 def __getitem__(self, key):
78 def __getitem__(self, key):
78 try:
79 try:
79 return dict.__getitem__(self, key)
80 return dict.__getitem__(self, key)
80 except KeyError:
81 except KeyError:
81 return self._reverse[key]
82 return self._reverse[key]
82
83
83 def __setitem__(self, key, value):
84 def __setitem__(self, key, value):
84 if key in self._reverse:
85 if key in self._reverse:
85 raise KeyError("Can't have key %r on both sides!"%key)
86 raise KeyError("Can't have key %r on both sides!"%key)
86 dict.__setitem__(self, key, value)
87 dict.__setitem__(self, key, value)
87 self._reverse[value] = key
88 self._reverse[value] = key
88
89
89 def pop(self, key):
90 def pop(self, key):
90 value = dict.pop(self, key)
91 value = dict.pop(self, key)
91 self._reverse.pop(value)
92 self._reverse.pop(value)
92 return value
93 return value
93
94
94 def get(self, key, default=None):
95 def get(self, key, default=None):
95 try:
96 try:
96 return self[key]
97 return self[key]
97 except KeyError:
98 except KeyError:
98 return default
99 return default
99
100
100 #-----------------------------------------------------------------------------
101 #-----------------------------------------------------------------------------
101 # Functions
102 # Functions
102 #-----------------------------------------------------------------------------
103 #-----------------------------------------------------------------------------
103
104
104 def asbytes(s):
105 def asbytes(s):
105 """ensure that an object is ascii bytes"""
106 """ensure that an object is ascii bytes"""
106 if isinstance(s, unicode):
107 if isinstance(s, unicode):
107 s = s.encode('ascii')
108 s = s.encode('ascii')
108 return s
109 return s
109
110
110 def validate_url(url):
111 def validate_url(url):
111 """validate a url for zeromq"""
112 """validate a url for zeromq"""
112 if not isinstance(url, basestring):
113 if not isinstance(url, basestring):
113 raise TypeError("url must be a string, not %r"%type(url))
114 raise TypeError("url must be a string, not %r"%type(url))
114 url = url.lower()
115 url = url.lower()
115
116
116 proto_addr = url.split('://')
117 proto_addr = url.split('://')
117 assert len(proto_addr) == 2, 'Invalid url: %r'%url
118 assert len(proto_addr) == 2, 'Invalid url: %r'%url
118 proto, addr = proto_addr
119 proto, addr = proto_addr
119 assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto
120 assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto
120
121
121 # domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391
122 # domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391
122 # author: Remi Sabourin
123 # author: Remi Sabourin
123 pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$')
124 pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$')
124
125
125 if proto == 'tcp':
126 if proto == 'tcp':
126 lis = addr.split(':')
127 lis = addr.split(':')
127 assert len(lis) == 2, 'Invalid url: %r'%url
128 assert len(lis) == 2, 'Invalid url: %r'%url
128 addr,s_port = lis
129 addr,s_port = lis
129 try:
130 try:
130 port = int(s_port)
131 port = int(s_port)
131 except ValueError:
132 except ValueError:
132 raise AssertionError("Invalid port %r in url: %r"%(port, url))
133 raise AssertionError("Invalid port %r in url: %r"%(port, url))
133
134
134 assert addr == '*' or pat.match(addr) is not None, 'Invalid url: %r'%url
135 assert addr == '*' or pat.match(addr) is not None, 'Invalid url: %r'%url
135
136
136 else:
137 else:
137 # only validate tcp urls currently
138 # only validate tcp urls currently
138 pass
139 pass
139
140
140 return True
141 return True
141
142
142
143
143 def validate_url_container(container):
144 def validate_url_container(container):
144 """validate a potentially nested collection of urls."""
145 """validate a potentially nested collection of urls."""
145 if isinstance(container, basestring):
146 if isinstance(container, basestring):
146 url = container
147 url = container
147 return validate_url(url)
148 return validate_url(url)
148 elif isinstance(container, dict):
149 elif isinstance(container, dict):
149 container = container.itervalues()
150 container = container.itervalues()
150
151
151 for element in container:
152 for element in container:
152 validate_url_container(element)
153 validate_url_container(element)
153
154
154
155
155 def split_url(url):
156 def split_url(url):
156 """split a zmq url (tcp://ip:port) into ('tcp','ip','port')."""
157 """split a zmq url (tcp://ip:port) into ('tcp','ip','port')."""
157 proto_addr = url.split('://')
158 proto_addr = url.split('://')
158 assert len(proto_addr) == 2, 'Invalid url: %r'%url
159 assert len(proto_addr) == 2, 'Invalid url: %r'%url
159 proto, addr = proto_addr
160 proto, addr = proto_addr
160 lis = addr.split(':')
161 lis = addr.split(':')
161 assert len(lis) == 2, 'Invalid url: %r'%url
162 assert len(lis) == 2, 'Invalid url: %r'%url
162 addr,s_port = lis
163 addr,s_port = lis
163 return proto,addr,s_port
164 return proto,addr,s_port
164
165
165 def disambiguate_ip_address(ip, location=None):
166 def disambiguate_ip_address(ip, location=None):
166 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
167 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
167 ones, based on the location (default interpretation of location is localhost)."""
168 ones, based on the location (default interpretation of location is localhost)."""
168 if ip in ('0.0.0.0', '*'):
169 if ip in ('0.0.0.0', '*'):
169 try:
170 try:
170 external_ips = socket.gethostbyname_ex(socket.gethostname())[2]
171 external_ips = socket.gethostbyname_ex(socket.gethostname())[2]
171 except (socket.gaierror, IndexError):
172 except (socket.gaierror, IndexError):
172 # couldn't identify this machine, assume localhost
173 # couldn't identify this machine, assume localhost
173 external_ips = []
174 external_ips = []
174 if location is None or location in external_ips or not external_ips:
175 if location is None or location in external_ips or not external_ips:
175 # If location is unspecified or cannot be determined, assume local
176 # If location is unspecified or cannot be determined, assume local
176 ip='127.0.0.1'
177 ip='127.0.0.1'
177 elif location:
178 elif location:
178 return location
179 return location
179 return ip
180 return ip
180
181
181 def disambiguate_url(url, location=None):
182 def disambiguate_url(url, location=None):
182 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
183 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
183 ones, based on the location (default interpretation is localhost).
184 ones, based on the location (default interpretation is localhost).
184
185
185 This is for zeromq urls, such as tcp://*:10101."""
186 This is for zeromq urls, such as tcp://*:10101."""
186 try:
187 try:
187 proto,ip,port = split_url(url)
188 proto,ip,port = split_url(url)
188 except AssertionError:
189 except AssertionError:
189 # probably not tcp url; could be ipc, etc.
190 # probably not tcp url; could be ipc, etc.
190 return url
191 return url
191
192
192 ip = disambiguate_ip_address(ip,location)
193 ip = disambiguate_ip_address(ip,location)
193
194
194 return "%s://%s:%s"%(proto,ip,port)
195 return "%s://%s:%s"%(proto,ip,port)
195
196
196 def serialize_object(obj, threshold=64e-6):
197 def serialize_object(obj, threshold=64e-6):
197 """Serialize an object into a list of sendable buffers.
198 """Serialize an object into a list of sendable buffers.
198
199
199 Parameters
200 Parameters
200 ----------
201 ----------
201
202
202 obj : object
203 obj : object
203 The object to be serialized
204 The object to be serialized
204 threshold : float
205 threshold : float
205 The threshold for not double-pickling the content.
206 The threshold for not double-pickling the content.
206
207
207
208
208 Returns
209 Returns
209 -------
210 -------
210 ('pmd', [bufs]) :
211 ('pmd', [bufs]) :
211 where pmd is the pickled metadata wrapper,
212 where pmd is the pickled metadata wrapper,
212 bufs is a list of data buffers
213 bufs is a list of data buffers
213 """
214 """
214 databuffers = []
215 databuffers = []
215 if isinstance(obj, (list, tuple)):
216 if isinstance(obj, (list, tuple)):
216 clist = canSequence(obj)
217 clist = canSequence(obj)
217 slist = map(serialize, clist)
218 slist = map(serialize, clist)
218 for s in slist:
219 for s in slist:
219 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
220 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
220 databuffers.append(s.getData())
221 databuffers.append(s.getData())
221 s.data = None
222 s.data = None
222 return pickle.dumps(slist,-1), databuffers
223 return pickle.dumps(slist,-1), databuffers
223 elif isinstance(obj, dict):
224 elif isinstance(obj, dict):
224 sobj = {}
225 sobj = {}
225 for k in sorted(obj.iterkeys()):
226 for k in sorted(obj.iterkeys()):
226 s = serialize(can(obj[k]))
227 s = serialize(can(obj[k]))
227 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
228 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
228 databuffers.append(s.getData())
229 databuffers.append(s.getData())
229 s.data = None
230 s.data = None
230 sobj[k] = s
231 sobj[k] = s
231 return pickle.dumps(sobj,-1),databuffers
232 return pickle.dumps(sobj,-1),databuffers
232 else:
233 else:
233 s = serialize(can(obj))
234 s = serialize(can(obj))
234 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
235 if s.typeDescriptor in ('buffer', 'ndarray') or s.getDataSize() > threshold:
235 databuffers.append(s.getData())
236 databuffers.append(s.getData())
236 s.data = None
237 s.data = None
237 return pickle.dumps(s,-1),databuffers
238 return pickle.dumps(s,-1),databuffers
238
239
239
240
240 def unserialize_object(bufs):
241 def unserialize_object(bufs):
241 """reconstruct an object serialized by serialize_object from data buffers."""
242 """reconstruct an object serialized by serialize_object from data buffers."""
242 bufs = list(bufs)
243 bufs = list(bufs)
243 sobj = pickle.loads(bufs.pop(0))
244 sobj = pickle.loads(bufs.pop(0))
244 if isinstance(sobj, (list, tuple)):
245 if isinstance(sobj, (list, tuple)):
245 for s in sobj:
246 for s in sobj:
246 if s.data is None:
247 if s.data is None:
247 s.data = bufs.pop(0)
248 s.data = bufs.pop(0)
248 return uncanSequence(map(unserialize, sobj)), bufs
249 return uncanSequence(map(unserialize, sobj)), bufs
249 elif isinstance(sobj, dict):
250 elif isinstance(sobj, dict):
250 newobj = {}
251 newobj = {}
251 for k in sorted(sobj.iterkeys()):
252 for k in sorted(sobj.iterkeys()):
252 s = sobj[k]
253 s = sobj[k]
253 if s.data is None:
254 if s.data is None:
254 s.data = bufs.pop(0)
255 s.data = bufs.pop(0)
255 newobj[k] = uncan(unserialize(s))
256 newobj[k] = uncan(unserialize(s))
256 return newobj, bufs
257 return newobj, bufs
257 else:
258 else:
258 if sobj.data is None:
259 if sobj.data is None:
259 sobj.data = bufs.pop(0)
260 sobj.data = bufs.pop(0)
260 return uncan(unserialize(sobj)), bufs
261 return uncan(unserialize(sobj)), bufs
261
262
262 def pack_apply_message(f, args, kwargs, threshold=64e-6):
263 def pack_apply_message(f, args, kwargs, threshold=64e-6):
263 """pack up a function, args, and kwargs to be sent over the wire
264 """pack up a function, args, and kwargs to be sent over the wire
264 as a series of buffers. Any object whose data is larger than `threshold`
265 as a series of buffers. Any object whose data is larger than `threshold`
265 will not have their data copied (currently only numpy arrays support zero-copy)"""
266 will not have their data copied (currently only numpy arrays support zero-copy)"""
266 msg = [pickle.dumps(can(f),-1)]
267 msg = [pickle.dumps(can(f),-1)]
267 databuffers = [] # for large objects
268 databuffers = [] # for large objects
268 sargs, bufs = serialize_object(args,threshold)
269 sargs, bufs = serialize_object(args,threshold)
269 msg.append(sargs)
270 msg.append(sargs)
270 databuffers.extend(bufs)
271 databuffers.extend(bufs)
271 skwargs, bufs = serialize_object(kwargs,threshold)
272 skwargs, bufs = serialize_object(kwargs,threshold)
272 msg.append(skwargs)
273 msg.append(skwargs)
273 databuffers.extend(bufs)
274 databuffers.extend(bufs)
274 msg.extend(databuffers)
275 msg.extend(databuffers)
275 return msg
276 return msg
276
277
277 def unpack_apply_message(bufs, g=None, copy=True):
278 def unpack_apply_message(bufs, g=None, copy=True):
278 """unpack f,args,kwargs from buffers packed by pack_apply_message()
279 """unpack f,args,kwargs from buffers packed by pack_apply_message()
279 Returns: original f,args,kwargs"""
280 Returns: original f,args,kwargs"""
280 bufs = list(bufs) # allow us to pop
281 bufs = list(bufs) # allow us to pop
281 assert len(bufs) >= 3, "not enough buffers!"
282 assert len(bufs) >= 3, "not enough buffers!"
282 if not copy:
283 if not copy:
283 for i in range(3):
284 for i in range(3):
284 bufs[i] = bufs[i].bytes
285 bufs[i] = bufs[i].bytes
285 cf = pickle.loads(bufs.pop(0))
286 cf = pickle.loads(bufs.pop(0))
286 sargs = list(pickle.loads(bufs.pop(0)))
287 sargs = list(pickle.loads(bufs.pop(0)))
287 skwargs = dict(pickle.loads(bufs.pop(0)))
288 skwargs = dict(pickle.loads(bufs.pop(0)))
288 # print sargs, skwargs
289 # print sargs, skwargs
289 f = uncan(cf, g)
290 f = uncan(cf, g)
290 for sa in sargs:
291 for sa in sargs:
291 if sa.data is None:
292 if sa.data is None:
292 m = bufs.pop(0)
293 m = bufs.pop(0)
293 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
294 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
294 # always use a buffer, until memoryviews get sorted out
295 # always use a buffer, until memoryviews get sorted out
295 sa.data = buffer(m)
296 sa.data = buffer(m)
296 # disable memoryview support
297 # disable memoryview support
297 # if copy:
298 # if copy:
298 # sa.data = buffer(m)
299 # sa.data = buffer(m)
299 # else:
300 # else:
300 # sa.data = m.buffer
301 # sa.data = m.buffer
301 else:
302 else:
302 if copy:
303 if copy:
303 sa.data = m
304 sa.data = m
304 else:
305 else:
305 sa.data = m.bytes
306 sa.data = m.bytes
306
307
307 args = uncanSequence(map(unserialize, sargs), g)
308 args = uncanSequence(map(unserialize, sargs), g)
308 kwargs = {}
309 kwargs = {}
309 for k in sorted(skwargs.iterkeys()):
310 for k in sorted(skwargs.iterkeys()):
310 sa = skwargs[k]
311 sa = skwargs[k]
311 if sa.data is None:
312 if sa.data is None:
312 m = bufs.pop(0)
313 m = bufs.pop(0)
313 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
314 if sa.getTypeDescriptor() in ('buffer', 'ndarray'):
314 # always use a buffer, until memoryviews get sorted out
315 # always use a buffer, until memoryviews get sorted out
315 sa.data = buffer(m)
316 sa.data = buffer(m)
316 # disable memoryview support
317 # disable memoryview support
317 # if copy:
318 # if copy:
318 # sa.data = buffer(m)
319 # sa.data = buffer(m)
319 # else:
320 # else:
320 # sa.data = m.buffer
321 # sa.data = m.buffer
321 else:
322 else:
322 if copy:
323 if copy:
323 sa.data = m
324 sa.data = m
324 else:
325 else:
325 sa.data = m.bytes
326 sa.data = m.bytes
326
327
327 kwargs[k] = uncan(unserialize(sa), g)
328 kwargs[k] = uncan(unserialize(sa), g)
328
329
329 return f,args,kwargs
330 return f,args,kwargs
330
331
331 #--------------------------------------------------------------------------
332 #--------------------------------------------------------------------------
332 # helpers for implementing old MEC API via view.apply
333 # helpers for implementing old MEC API via view.apply
333 #--------------------------------------------------------------------------
334 #--------------------------------------------------------------------------
334
335
335 def interactive(f):
336 def interactive(f):
336 """decorator for making functions appear as interactively defined.
337 """decorator for making functions appear as interactively defined.
337 This results in the function being linked to the user_ns as globals()
338 This results in the function being linked to the user_ns as globals()
338 instead of the module globals().
339 instead of the module globals().
339 """
340 """
340 f.__module__ = '__main__'
341 f.__module__ = '__main__'
341 return f
342 return f
342
343
343 @interactive
344 @interactive
344 def _push(ns):
345 def _push(ns):
345 """helper method for implementing `client.push` via `client.apply`"""
346 """helper method for implementing `client.push` via `client.apply`"""
346 globals().update(ns)
347 globals().update(ns)
347
348
348 @interactive
349 @interactive
349 def _pull(keys):
350 def _pull(keys):
350 """helper method for implementing `client.pull` via `client.apply`"""
351 """helper method for implementing `client.pull` via `client.apply`"""
351 user_ns = globals()
352 user_ns = globals()
352 if isinstance(keys, (list,tuple, set)):
353 if isinstance(keys, (list,tuple, set)):
353 for key in keys:
354 for key in keys:
354 if not user_ns.has_key(key):
355 if not user_ns.has_key(key):
355 raise NameError("name '%s' is not defined"%key)
356 raise NameError("name '%s' is not defined"%key)
356 return map(user_ns.get, keys)
357 return map(user_ns.get, keys)
357 else:
358 else:
358 if not user_ns.has_key(keys):
359 if not user_ns.has_key(keys):
359 raise NameError("name '%s' is not defined"%keys)
360 raise NameError("name '%s' is not defined"%keys)
360 return user_ns.get(keys)
361 return user_ns.get(keys)
361
362
362 @interactive
363 @interactive
363 def _execute(code):
364 def _execute(code):
364 """helper method for implementing `client.execute` via `client.apply`"""
365 """helper method for implementing `client.execute` via `client.apply`"""
365 exec code in globals()
366 exec code in globals()
366
367
367 #--------------------------------------------------------------------------
368 #--------------------------------------------------------------------------
368 # extra process management utilities
369 # extra process management utilities
369 #--------------------------------------------------------------------------
370 #--------------------------------------------------------------------------
370
371
371 _random_ports = set()
372 _random_ports = set()
372
373
373 def select_random_ports(n):
374 def select_random_ports(n):
374 """Selects and return n random ports that are available."""
375 """Selects and return n random ports that are available."""
375 ports = []
376 ports = []
376 for i in xrange(n):
377 for i in xrange(n):
377 sock = socket.socket()
378 sock = socket.socket()
378 sock.bind(('', 0))
379 sock.bind(('', 0))
379 while sock.getsockname()[1] in _random_ports:
380 while sock.getsockname()[1] in _random_ports:
380 sock.close()
381 sock.close()
381 sock = socket.socket()
382 sock = socket.socket()
382 sock.bind(('', 0))
383 sock.bind(('', 0))
383 ports.append(sock)
384 ports.append(sock)
384 for i, sock in enumerate(ports):
385 for i, sock in enumerate(ports):
385 port = sock.getsockname()[1]
386 port = sock.getsockname()[1]
386 sock.close()
387 sock.close()
387 ports[i] = port
388 ports[i] = port
388 _random_ports.add(port)
389 _random_ports.add(port)
389 return ports
390 return ports
390
391
391 def signal_children(children):
392 def signal_children(children):
392 """Relay interupt/term signals to children, for more solid process cleanup."""
393 """Relay interupt/term signals to children, for more solid process cleanup."""
393 def terminate_children(sig, frame):
394 def terminate_children(sig, frame):
394 logging.critical("Got signal %i, terminating children..."%sig)
395 log = Application.instance().log
396 log.critical("Got signal %i, terminating children..."%sig)
395 for child in children:
397 for child in children:
396 child.terminate()
398 child.terminate()
397
399
398 sys.exit(sig != SIGINT)
400 sys.exit(sig != SIGINT)
399 # sys.exit(sig)
401 # sys.exit(sig)
400 for sig in (SIGINT, SIGABRT, SIGTERM):
402 for sig in (SIGINT, SIGABRT, SIGTERM):
401 signal(sig, terminate_children)
403 signal(sig, terminate_children)
402
404
403 def generate_exec_key(keyfile):
405 def generate_exec_key(keyfile):
404 import uuid
406 import uuid
405 newkey = str(uuid.uuid4())
407 newkey = str(uuid.uuid4())
406 with open(keyfile, 'w') as f:
408 with open(keyfile, 'w') as f:
407 # f.write('ipython-key ')
409 # f.write('ipython-key ')
408 f.write(newkey+'\n')
410 f.write(newkey+'\n')
409 # set user-only RW permissions (0600)
411 # set user-only RW permissions (0600)
410 # this will have no effect on Windows
412 # this will have no effect on Windows
411 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
413 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
412
414
413
415
414 def integer_loglevel(loglevel):
416 def integer_loglevel(loglevel):
415 try:
417 try:
416 loglevel = int(loglevel)
418 loglevel = int(loglevel)
417 except ValueError:
419 except ValueError:
418 if isinstance(loglevel, str):
420 if isinstance(loglevel, str):
419 loglevel = getattr(logging, loglevel)
421 loglevel = getattr(logging, loglevel)
420 return loglevel
422 return loglevel
421
423
422 def connect_logger(logname, context, iface, root="ip", loglevel=logging.DEBUG):
424 def connect_logger(logname, context, iface, root="ip", loglevel=logging.DEBUG):
423 logger = logging.getLogger(logname)
425 logger = logging.getLogger(logname)
424 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
426 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
425 # don't add a second PUBHandler
427 # don't add a second PUBHandler
426 return
428 return
427 loglevel = integer_loglevel(loglevel)
429 loglevel = integer_loglevel(loglevel)
428 lsock = context.socket(zmq.PUB)
430 lsock = context.socket(zmq.PUB)
429 lsock.connect(iface)
431 lsock.connect(iface)
430 handler = handlers.PUBHandler(lsock)
432 handler = handlers.PUBHandler(lsock)
431 handler.setLevel(loglevel)
433 handler.setLevel(loglevel)
432 handler.root_topic = root
434 handler.root_topic = root
433 logger.addHandler(handler)
435 logger.addHandler(handler)
434 logger.setLevel(loglevel)
436 logger.setLevel(loglevel)
435
437
436 def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):
438 def connect_engine_logger(context, iface, engine, loglevel=logging.DEBUG):
437 logger = logging.getLogger()
439 logger = logging.getLogger()
438 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
440 if any([isinstance(h, handlers.PUBHandler) for h in logger.handlers]):
439 # don't add a second PUBHandler
441 # don't add a second PUBHandler
440 return
442 return
441 loglevel = integer_loglevel(loglevel)
443 loglevel = integer_loglevel(loglevel)
442 lsock = context.socket(zmq.PUB)
444 lsock = context.socket(zmq.PUB)
443 lsock.connect(iface)
445 lsock.connect(iface)
444 handler = EnginePUBHandler(engine, lsock)
446 handler = EnginePUBHandler(engine, lsock)
445 handler.setLevel(loglevel)
447 handler.setLevel(loglevel)
446 logger.addHandler(handler)
448 logger.addHandler(handler)
447 logger.setLevel(loglevel)
449 logger.setLevel(loglevel)
448 return logger
450 return logger
449
451
450 def local_logger(logname, loglevel=logging.DEBUG):
452 def local_logger(logname, loglevel=logging.DEBUG):
451 loglevel = integer_loglevel(loglevel)
453 loglevel = integer_loglevel(loglevel)
452 logger = logging.getLogger(logname)
454 logger = logging.getLogger(logname)
453 if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]):
455 if any([isinstance(h, logging.StreamHandler) for h in logger.handlers]):
454 # don't add a second StreamHandler
456 # don't add a second StreamHandler
455 return
457 return
456 handler = logging.StreamHandler()
458 handler = logging.StreamHandler()
457 handler.setLevel(loglevel)
459 handler.setLevel(loglevel)
458 logger.addHandler(handler)
460 logger.addHandler(handler)
459 logger.setLevel(loglevel)
461 logger.setLevel(loglevel)
460 return logger
462 return logger
461
463
General Comments 0
You need to be logged in to leave comments. Login now