##// END OF EJS Templates
fsmonitor: correct an error message...
Jun Wu -
r33764:dd35abc4 default
parent child Browse files
Show More
@@ -1,1021 +1,1021 b''
1 # Copyright 2014-present Facebook, Inc.
1 # Copyright 2014-present Facebook, Inc.
2 # All rights reserved.
2 # All rights reserved.
3 #
3 #
4 # Redistribution and use in source and binary forms, with or without
4 # Redistribution and use in source and binary forms, with or without
5 # modification, are permitted provided that the following conditions are met:
5 # modification, are permitted provided that the following conditions are met:
6 #
6 #
7 # * Redistributions of source code must retain the above copyright notice,
7 # * Redistributions of source code must retain the above copyright notice,
8 # this list of conditions and the following disclaimer.
8 # this list of conditions and the following disclaimer.
9 #
9 #
10 # * Redistributions in binary form must reproduce the above copyright notice,
10 # * Redistributions in binary form must reproduce the above copyright notice,
11 # this list of conditions and the following disclaimer in the documentation
11 # this list of conditions and the following disclaimer in the documentation
12 # and/or other materials provided with the distribution.
12 # and/or other materials provided with the distribution.
13 #
13 #
14 # * Neither the name Facebook nor the names of its contributors may be used to
14 # * Neither the name Facebook nor the names of its contributors may be used to
15 # endorse or promote products derived from this software without specific
15 # endorse or promote products derived from this software without specific
16 # prior written permission.
16 # prior written permission.
17 #
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
20 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21 # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
21 # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
22 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
23 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24 # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
24 # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
25 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
25 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
26 # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26 # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28
28
29 from __future__ import absolute_import
29 from __future__ import absolute_import
30 from __future__ import division
30 from __future__ import division
31 from __future__ import print_function
31 from __future__ import print_function
32 # no unicode literals
32 # no unicode literals
33
33
34 import inspect
34 import inspect
35 import math
35 import math
36 import os
36 import os
37 import socket
37 import socket
38 import subprocess
38 import subprocess
39 import time
39 import time
40
40
41 # Sometimes it's really hard to get Python extensions to compile,
41 # Sometimes it's really hard to get Python extensions to compile,
42 # so fall back to a pure Python implementation.
42 # so fall back to a pure Python implementation.
43 try:
43 try:
44 from . import bser
44 from . import bser
45 # Demandimport causes modules to be loaded lazily. Force the load now
45 # Demandimport causes modules to be loaded lazily. Force the load now
46 # so that we can fall back on pybser if bser doesn't exist
46 # so that we can fall back on pybser if bser doesn't exist
47 bser.pdu_info
47 bser.pdu_info
48 except ImportError:
48 except ImportError:
49 from . import pybser as bser
49 from . import pybser as bser
50
50
51 from . import (
51 from . import (
52 capabilities,
52 capabilities,
53 compat,
53 compat,
54 encoding,
54 encoding,
55 load,
55 load,
56 )
56 )
57
57
58
58
59 if os.name == 'nt':
59 if os.name == 'nt':
60 import ctypes
60 import ctypes
61 import ctypes.wintypes
61 import ctypes.wintypes
62
62
63 wintypes = ctypes.wintypes
63 wintypes = ctypes.wintypes
64 GENERIC_READ = 0x80000000
64 GENERIC_READ = 0x80000000
65 GENERIC_WRITE = 0x40000000
65 GENERIC_WRITE = 0x40000000
66 FILE_FLAG_OVERLAPPED = 0x40000000
66 FILE_FLAG_OVERLAPPED = 0x40000000
67 OPEN_EXISTING = 3
67 OPEN_EXISTING = 3
68 INVALID_HANDLE_VALUE = -1
68 INVALID_HANDLE_VALUE = -1
69 FORMAT_MESSAGE_FROM_SYSTEM = 0x00001000
69 FORMAT_MESSAGE_FROM_SYSTEM = 0x00001000
70 FORMAT_MESSAGE_ALLOCATE_BUFFER = 0x00000100
70 FORMAT_MESSAGE_ALLOCATE_BUFFER = 0x00000100
71 FORMAT_MESSAGE_IGNORE_INSERTS = 0x00000200
71 FORMAT_MESSAGE_IGNORE_INSERTS = 0x00000200
72 WAIT_FAILED = 0xFFFFFFFF
72 WAIT_FAILED = 0xFFFFFFFF
73 WAIT_TIMEOUT = 0x00000102
73 WAIT_TIMEOUT = 0x00000102
74 WAIT_OBJECT_0 = 0x00000000
74 WAIT_OBJECT_0 = 0x00000000
75 WAIT_IO_COMPLETION = 0x000000C0
75 WAIT_IO_COMPLETION = 0x000000C0
76 INFINITE = 0xFFFFFFFF
76 INFINITE = 0xFFFFFFFF
77
77
78 # Overlapped I/O operation is in progress. (997)
78 # Overlapped I/O operation is in progress. (997)
79 ERROR_IO_PENDING = 0x000003E5
79 ERROR_IO_PENDING = 0x000003E5
80
80
81 # The pointer size follows the architecture
81 # The pointer size follows the architecture
82 # We use WPARAM since this type is already conditionally defined
82 # We use WPARAM since this type is already conditionally defined
83 ULONG_PTR = ctypes.wintypes.WPARAM
83 ULONG_PTR = ctypes.wintypes.WPARAM
84
84
85 class OVERLAPPED(ctypes.Structure):
85 class OVERLAPPED(ctypes.Structure):
86 _fields_ = [
86 _fields_ = [
87 ("Internal", ULONG_PTR), ("InternalHigh", ULONG_PTR),
87 ("Internal", ULONG_PTR), ("InternalHigh", ULONG_PTR),
88 ("Offset", wintypes.DWORD), ("OffsetHigh", wintypes.DWORD),
88 ("Offset", wintypes.DWORD), ("OffsetHigh", wintypes.DWORD),
89 ("hEvent", wintypes.HANDLE)
89 ("hEvent", wintypes.HANDLE)
90 ]
90 ]
91
91
92 def __init__(self):
92 def __init__(self):
93 self.Internal = 0
93 self.Internal = 0
94 self.InternalHigh = 0
94 self.InternalHigh = 0
95 self.Offset = 0
95 self.Offset = 0
96 self.OffsetHigh = 0
96 self.OffsetHigh = 0
97 self.hEvent = 0
97 self.hEvent = 0
98
98
99 LPDWORD = ctypes.POINTER(wintypes.DWORD)
99 LPDWORD = ctypes.POINTER(wintypes.DWORD)
100
100
101 CreateFile = ctypes.windll.kernel32.CreateFileA
101 CreateFile = ctypes.windll.kernel32.CreateFileA
102 CreateFile.argtypes = [wintypes.LPSTR, wintypes.DWORD, wintypes.DWORD,
102 CreateFile.argtypes = [wintypes.LPSTR, wintypes.DWORD, wintypes.DWORD,
103 wintypes.LPVOID, wintypes.DWORD, wintypes.DWORD,
103 wintypes.LPVOID, wintypes.DWORD, wintypes.DWORD,
104 wintypes.HANDLE]
104 wintypes.HANDLE]
105 CreateFile.restype = wintypes.HANDLE
105 CreateFile.restype = wintypes.HANDLE
106
106
107 CloseHandle = ctypes.windll.kernel32.CloseHandle
107 CloseHandle = ctypes.windll.kernel32.CloseHandle
108 CloseHandle.argtypes = [wintypes.HANDLE]
108 CloseHandle.argtypes = [wintypes.HANDLE]
109 CloseHandle.restype = wintypes.BOOL
109 CloseHandle.restype = wintypes.BOOL
110
110
111 ReadFile = ctypes.windll.kernel32.ReadFile
111 ReadFile = ctypes.windll.kernel32.ReadFile
112 ReadFile.argtypes = [wintypes.HANDLE, wintypes.LPVOID, wintypes.DWORD,
112 ReadFile.argtypes = [wintypes.HANDLE, wintypes.LPVOID, wintypes.DWORD,
113 LPDWORD, ctypes.POINTER(OVERLAPPED)]
113 LPDWORD, ctypes.POINTER(OVERLAPPED)]
114 ReadFile.restype = wintypes.BOOL
114 ReadFile.restype = wintypes.BOOL
115
115
116 WriteFile = ctypes.windll.kernel32.WriteFile
116 WriteFile = ctypes.windll.kernel32.WriteFile
117 WriteFile.argtypes = [wintypes.HANDLE, wintypes.LPVOID, wintypes.DWORD,
117 WriteFile.argtypes = [wintypes.HANDLE, wintypes.LPVOID, wintypes.DWORD,
118 LPDWORD, ctypes.POINTER(OVERLAPPED)]
118 LPDWORD, ctypes.POINTER(OVERLAPPED)]
119 WriteFile.restype = wintypes.BOOL
119 WriteFile.restype = wintypes.BOOL
120
120
121 GetLastError = ctypes.windll.kernel32.GetLastError
121 GetLastError = ctypes.windll.kernel32.GetLastError
122 GetLastError.argtypes = []
122 GetLastError.argtypes = []
123 GetLastError.restype = wintypes.DWORD
123 GetLastError.restype = wintypes.DWORD
124
124
125 SetLastError = ctypes.windll.kernel32.SetLastError
125 SetLastError = ctypes.windll.kernel32.SetLastError
126 SetLastError.argtypes = [wintypes.DWORD]
126 SetLastError.argtypes = [wintypes.DWORD]
127 SetLastError.restype = None
127 SetLastError.restype = None
128
128
129 FormatMessage = ctypes.windll.kernel32.FormatMessageA
129 FormatMessage = ctypes.windll.kernel32.FormatMessageA
130 FormatMessage.argtypes = [wintypes.DWORD, wintypes.LPVOID, wintypes.DWORD,
130 FormatMessage.argtypes = [wintypes.DWORD, wintypes.LPVOID, wintypes.DWORD,
131 wintypes.DWORD, ctypes.POINTER(wintypes.LPSTR),
131 wintypes.DWORD, ctypes.POINTER(wintypes.LPSTR),
132 wintypes.DWORD, wintypes.LPVOID]
132 wintypes.DWORD, wintypes.LPVOID]
133 FormatMessage.restype = wintypes.DWORD
133 FormatMessage.restype = wintypes.DWORD
134
134
135 LocalFree = ctypes.windll.kernel32.LocalFree
135 LocalFree = ctypes.windll.kernel32.LocalFree
136
136
137 GetOverlappedResult = ctypes.windll.kernel32.GetOverlappedResult
137 GetOverlappedResult = ctypes.windll.kernel32.GetOverlappedResult
138 GetOverlappedResult.argtypes = [wintypes.HANDLE,
138 GetOverlappedResult.argtypes = [wintypes.HANDLE,
139 ctypes.POINTER(OVERLAPPED), LPDWORD,
139 ctypes.POINTER(OVERLAPPED), LPDWORD,
140 wintypes.BOOL]
140 wintypes.BOOL]
141 GetOverlappedResult.restype = wintypes.BOOL
141 GetOverlappedResult.restype = wintypes.BOOL
142
142
143 GetOverlappedResultEx = getattr(ctypes.windll.kernel32,
143 GetOverlappedResultEx = getattr(ctypes.windll.kernel32,
144 'GetOverlappedResultEx', None)
144 'GetOverlappedResultEx', None)
145 if GetOverlappedResultEx is not None:
145 if GetOverlappedResultEx is not None:
146 GetOverlappedResultEx.argtypes = [wintypes.HANDLE,
146 GetOverlappedResultEx.argtypes = [wintypes.HANDLE,
147 ctypes.POINTER(OVERLAPPED), LPDWORD,
147 ctypes.POINTER(OVERLAPPED), LPDWORD,
148 wintypes.DWORD, wintypes.BOOL]
148 wintypes.DWORD, wintypes.BOOL]
149 GetOverlappedResultEx.restype = wintypes.BOOL
149 GetOverlappedResultEx.restype = wintypes.BOOL
150
150
151 WaitForSingleObjectEx = ctypes.windll.kernel32.WaitForSingleObjectEx
151 WaitForSingleObjectEx = ctypes.windll.kernel32.WaitForSingleObjectEx
152 WaitForSingleObjectEx.argtypes = [wintypes.HANDLE, wintypes.DWORD, wintypes.BOOL]
152 WaitForSingleObjectEx.argtypes = [wintypes.HANDLE, wintypes.DWORD, wintypes.BOOL]
153 WaitForSingleObjectEx.restype = wintypes.DWORD
153 WaitForSingleObjectEx.restype = wintypes.DWORD
154
154
155 CreateEvent = ctypes.windll.kernel32.CreateEventA
155 CreateEvent = ctypes.windll.kernel32.CreateEventA
156 CreateEvent.argtypes = [LPDWORD, wintypes.BOOL, wintypes.BOOL,
156 CreateEvent.argtypes = [LPDWORD, wintypes.BOOL, wintypes.BOOL,
157 wintypes.LPSTR]
157 wintypes.LPSTR]
158 CreateEvent.restype = wintypes.HANDLE
158 CreateEvent.restype = wintypes.HANDLE
159
159
160 # Windows Vista is the minimum supported client for CancelIoEx.
160 # Windows Vista is the minimum supported client for CancelIoEx.
161 CancelIoEx = ctypes.windll.kernel32.CancelIoEx
161 CancelIoEx = ctypes.windll.kernel32.CancelIoEx
162 CancelIoEx.argtypes = [wintypes.HANDLE, ctypes.POINTER(OVERLAPPED)]
162 CancelIoEx.argtypes = [wintypes.HANDLE, ctypes.POINTER(OVERLAPPED)]
163 CancelIoEx.restype = wintypes.BOOL
163 CancelIoEx.restype = wintypes.BOOL
164
164
165 # 2 bytes marker, 1 byte int size, 8 bytes int64 value
165 # 2 bytes marker, 1 byte int size, 8 bytes int64 value
166 sniff_len = 13
166 sniff_len = 13
167
167
168 # This is a helper for debugging the client.
168 # This is a helper for debugging the client.
169 _debugging = False
169 _debugging = False
170 if _debugging:
170 if _debugging:
171
171
172 def log(fmt, *args):
172 def log(fmt, *args):
173 print('[%s] %s' %
173 print('[%s] %s' %
174 (time.strftime("%a, %d %b %Y %H:%M:%S", time.gmtime()),
174 (time.strftime("%a, %d %b %Y %H:%M:%S", time.gmtime()),
175 fmt % args[:]))
175 fmt % args[:]))
176 else:
176 else:
177
177
178 def log(fmt, *args):
178 def log(fmt, *args):
179 pass
179 pass
180
180
181
181
182 def _win32_strerror(err):
182 def _win32_strerror(err):
183 """ expand a win32 error code into a human readable message """
183 """ expand a win32 error code into a human readable message """
184
184
185 # FormatMessage will allocate memory and assign it here
185 # FormatMessage will allocate memory and assign it here
186 buf = ctypes.c_char_p()
186 buf = ctypes.c_char_p()
187 FormatMessage(
187 FormatMessage(
188 FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_ALLOCATE_BUFFER
188 FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_ALLOCATE_BUFFER
189 | FORMAT_MESSAGE_IGNORE_INSERTS, None, err, 0, buf, 0, None)
189 | FORMAT_MESSAGE_IGNORE_INSERTS, None, err, 0, buf, 0, None)
190 try:
190 try:
191 return buf.value
191 return buf.value
192 finally:
192 finally:
193 LocalFree(buf)
193 LocalFree(buf)
194
194
195
195
196 class WatchmanError(Exception):
196 class WatchmanError(Exception):
197 def __init__(self, msg=None, cmd=None):
197 def __init__(self, msg=None, cmd=None):
198 self.msg = msg
198 self.msg = msg
199 self.cmd = cmd
199 self.cmd = cmd
200
200
201 def setCommand(self, cmd):
201 def setCommand(self, cmd):
202 self.cmd = cmd
202 self.cmd = cmd
203
203
204 def __str__(self):
204 def __str__(self):
205 if self.cmd:
205 if self.cmd:
206 return '%s, while executing %s' % (self.msg, self.cmd)
206 return '%s, while executing %s' % (self.msg, self.cmd)
207 return self.msg
207 return self.msg
208
208
209
209
210 class WatchmanEnvironmentError(WatchmanError):
210 class WatchmanEnvironmentError(WatchmanError):
211 def __init__(self, msg, errno, errmsg, cmd=None):
211 def __init__(self, msg, errno, errmsg, cmd=None):
212 super(WatchmanEnvironmentError, self).__init__(
212 super(WatchmanEnvironmentError, self).__init__(
213 '{0}: errno={1} errmsg={2}'.format(msg, errno, errmsg),
213 '{0}: errno={1} errmsg={2}'.format(msg, errno, errmsg),
214 cmd)
214 cmd)
215
215
216
216
217 class SocketConnectError(WatchmanError):
217 class SocketConnectError(WatchmanError):
218 def __init__(self, sockpath, exc):
218 def __init__(self, sockpath, exc):
219 super(SocketConnectError, self).__init__(
219 super(SocketConnectError, self).__init__(
220 'unable to connect to %s: %s' % (sockpath, exc))
220 'unable to connect to %s: %s' % (sockpath, exc))
221 self.sockpath = sockpath
221 self.sockpath = sockpath
222 self.exc = exc
222 self.exc = exc
223
223
224
224
225 class SocketTimeout(WatchmanError):
225 class SocketTimeout(WatchmanError):
226 """A specialized exception raised for socket timeouts during communication to/from watchman.
226 """A specialized exception raised for socket timeouts during communication to/from watchman.
227 This makes it easier to implement non-blocking loops as callers can easily distinguish
227 This makes it easier to implement non-blocking loops as callers can easily distinguish
228 between a routine timeout and an actual error condition.
228 between a routine timeout and an actual error condition.
229
229
230 Note that catching WatchmanError will also catch this as it is a super-class, so backwards
230 Note that catching WatchmanError will also catch this as it is a super-class, so backwards
231 compatibility in exception handling is preserved.
231 compatibility in exception handling is preserved.
232 """
232 """
233
233
234
234
235 class CommandError(WatchmanError):
235 class CommandError(WatchmanError):
236 """error returned by watchman
236 """error returned by watchman
237
237
238 self.msg is the message returned by watchman.
238 self.msg is the message returned by watchman.
239 """
239 """
240 def __init__(self, msg, cmd=None):
240 def __init__(self, msg, cmd=None):
241 super(CommandError, self).__init__(
241 super(CommandError, self).__init__(
242 'watchman command error: %s' % (msg, ),
242 'watchman command error: %s' % (msg, ),
243 cmd,
243 cmd,
244 )
244 )
245
245
246
246
247 class Transport(object):
247 class Transport(object):
248 """ communication transport to the watchman server """
248 """ communication transport to the watchman server """
249 buf = None
249 buf = None
250
250
251 def close(self):
251 def close(self):
252 """ tear it down """
252 """ tear it down """
253 raise NotImplementedError()
253 raise NotImplementedError()
254
254
255 def readBytes(self, size):
255 def readBytes(self, size):
256 """ read size bytes """
256 """ read size bytes """
257 raise NotImplementedError()
257 raise NotImplementedError()
258
258
259 def write(self, buf):
259 def write(self, buf):
260 """ write some data """
260 """ write some data """
261 raise NotImplementedError()
261 raise NotImplementedError()
262
262
263 def setTimeout(self, value):
263 def setTimeout(self, value):
264 pass
264 pass
265
265
266 def readLine(self):
266 def readLine(self):
267 """ read a line
267 """ read a line
268 Maintains its own buffer, callers of the transport should not mix
268 Maintains its own buffer, callers of the transport should not mix
269 calls to readBytes and readLine.
269 calls to readBytes and readLine.
270 """
270 """
271 if self.buf is None:
271 if self.buf is None:
272 self.buf = []
272 self.buf = []
273
273
274 # Buffer may already have a line if we've received unilateral
274 # Buffer may already have a line if we've received unilateral
275 # response(s) from the server
275 # response(s) from the server
276 if len(self.buf) == 1 and b"\n" in self.buf[0]:
276 if len(self.buf) == 1 and b"\n" in self.buf[0]:
277 (line, b) = self.buf[0].split(b"\n", 1)
277 (line, b) = self.buf[0].split(b"\n", 1)
278 self.buf = [b]
278 self.buf = [b]
279 return line
279 return line
280
280
281 while True:
281 while True:
282 b = self.readBytes(4096)
282 b = self.readBytes(4096)
283 if b"\n" in b:
283 if b"\n" in b:
284 result = b''.join(self.buf)
284 result = b''.join(self.buf)
285 (line, b) = b.split(b"\n", 1)
285 (line, b) = b.split(b"\n", 1)
286 self.buf = [b]
286 self.buf = [b]
287 return result + line
287 return result + line
288 self.buf.append(b)
288 self.buf.append(b)
289
289
290
290
291 class Codec(object):
291 class Codec(object):
292 """ communication encoding for the watchman server """
292 """ communication encoding for the watchman server """
293 transport = None
293 transport = None
294
294
295 def __init__(self, transport):
295 def __init__(self, transport):
296 self.transport = transport
296 self.transport = transport
297
297
298 def receive(self):
298 def receive(self):
299 raise NotImplementedError()
299 raise NotImplementedError()
300
300
301 def send(self, *args):
301 def send(self, *args):
302 raise NotImplementedError()
302 raise NotImplementedError()
303
303
304 def setTimeout(self, value):
304 def setTimeout(self, value):
305 self.transport.setTimeout(value)
305 self.transport.setTimeout(value)
306
306
307
307
308 class UnixSocketTransport(Transport):
308 class UnixSocketTransport(Transport):
309 """ local unix domain socket transport """
309 """ local unix domain socket transport """
310 sock = None
310 sock = None
311
311
312 def __init__(self, sockpath, timeout):
312 def __init__(self, sockpath, timeout):
313 self.sockpath = sockpath
313 self.sockpath = sockpath
314 self.timeout = timeout
314 self.timeout = timeout
315
315
316 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
316 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
317 try:
317 try:
318 sock.settimeout(self.timeout)
318 sock.settimeout(self.timeout)
319 sock.connect(self.sockpath)
319 sock.connect(self.sockpath)
320 self.sock = sock
320 self.sock = sock
321 except socket.error as e:
321 except socket.error as e:
322 sock.close()
322 sock.close()
323 raise SocketConnectError(self.sockpath, e)
323 raise SocketConnectError(self.sockpath, e)
324
324
325 def close(self):
325 def close(self):
326 self.sock.close()
326 self.sock.close()
327 self.sock = None
327 self.sock = None
328
328
329 def setTimeout(self, value):
329 def setTimeout(self, value):
330 self.timeout = value
330 self.timeout = value
331 self.sock.settimeout(self.timeout)
331 self.sock.settimeout(self.timeout)
332
332
333 def readBytes(self, size):
333 def readBytes(self, size):
334 try:
334 try:
335 buf = [self.sock.recv(size)]
335 buf = [self.sock.recv(size)]
336 if not buf[0]:
336 if not buf[0]:
337 raise WatchmanError('empty watchman response')
337 raise WatchmanError('empty watchman response')
338 return buf[0]
338 return buf[0]
339 except socket.timeout:
339 except socket.timeout:
340 raise SocketTimeout('timed out waiting for response')
340 raise SocketTimeout('timed out waiting for response')
341
341
342 def write(self, data):
342 def write(self, data):
343 try:
343 try:
344 self.sock.sendall(data)
344 self.sock.sendall(data)
345 except socket.timeout:
345 except socket.timeout:
346 raise SocketTimeout('timed out sending query command')
346 raise SocketTimeout('timed out sending query command')
347
347
348
348
349 def _get_overlapped_result_ex_impl(pipe, olap, nbytes, millis, alertable):
349 def _get_overlapped_result_ex_impl(pipe, olap, nbytes, millis, alertable):
350 """ Windows 7 and earlier does not support GetOverlappedResultEx. The
350 """ Windows 7 and earlier does not support GetOverlappedResultEx. The
351 alternative is to use GetOverlappedResult and wait for read or write
351 alternative is to use GetOverlappedResult and wait for read or write
352 operation to complete. This is done be using CreateEvent and
352 operation to complete. This is done be using CreateEvent and
353 WaitForSingleObjectEx. CreateEvent, WaitForSingleObjectEx
353 WaitForSingleObjectEx. CreateEvent, WaitForSingleObjectEx
354 and GetOverlappedResult are all part of Windows API since WindowsXP.
354 and GetOverlappedResult are all part of Windows API since WindowsXP.
355 This is the exact same implementation that can be found in the watchman
355 This is the exact same implementation that can be found in the watchman
356 source code (see get_overlapped_result_ex_impl in stream_win.c). This
356 source code (see get_overlapped_result_ex_impl in stream_win.c). This
357 way, maintenance should be simplified.
357 way, maintenance should be simplified.
358 """
358 """
359 log('Preparing to wait for maximum %dms', millis )
359 log('Preparing to wait for maximum %dms', millis )
360 if millis != 0:
360 if millis != 0:
361 waitReturnCode = WaitForSingleObjectEx(olap.hEvent, millis, alertable)
361 waitReturnCode = WaitForSingleObjectEx(olap.hEvent, millis, alertable)
362 if waitReturnCode == WAIT_OBJECT_0:
362 if waitReturnCode == WAIT_OBJECT_0:
363 # Event is signaled, overlapped IO operation result should be available.
363 # Event is signaled, overlapped IO operation result should be available.
364 pass
364 pass
365 elif waitReturnCode == WAIT_IO_COMPLETION:
365 elif waitReturnCode == WAIT_IO_COMPLETION:
366 # WaitForSingleObjectEx returnes because the system added an I/O completion
366 # WaitForSingleObjectEx returnes because the system added an I/O completion
367 # routine or an asynchronous procedure call (APC) to the thread queue.
367 # routine or an asynchronous procedure call (APC) to the thread queue.
368 SetLastError(WAIT_IO_COMPLETION)
368 SetLastError(WAIT_IO_COMPLETION)
369 pass
369 pass
370 elif waitReturnCode == WAIT_TIMEOUT:
370 elif waitReturnCode == WAIT_TIMEOUT:
371 # We reached the maximum allowed wait time, the IO operation failed
371 # We reached the maximum allowed wait time, the IO operation failed
372 # to complete in timely fashion.
372 # to complete in timely fashion.
373 SetLastError(WAIT_TIMEOUT)
373 SetLastError(WAIT_TIMEOUT)
374 return False
374 return False
375 elif waitReturnCode == WAIT_FAILED:
375 elif waitReturnCode == WAIT_FAILED:
376 # something went wrong calling WaitForSingleObjectEx
376 # something went wrong calling WaitForSingleObjectEx
377 err = GetLastError()
377 err = GetLastError()
378 log('WaitForSingleObjectEx failed: %s', _win32_strerror(err))
378 log('WaitForSingleObjectEx failed: %s', _win32_strerror(err))
379 return False
379 return False
380 else:
380 else:
381 # unexpected situation deserving investigation.
381 # unexpected situation deserving investigation.
382 err = GetLastError()
382 err = GetLastError()
383 log('Unexpected error: %s', _win32_strerror(err))
383 log('Unexpected error: %s', _win32_strerror(err))
384 return False
384 return False
385
385
386 return GetOverlappedResult(pipe, olap, nbytes, False)
386 return GetOverlappedResult(pipe, olap, nbytes, False)
387
387
388
388
389 class WindowsNamedPipeTransport(Transport):
389 class WindowsNamedPipeTransport(Transport):
390 """ connect to a named pipe """
390 """ connect to a named pipe """
391
391
392 def __init__(self, sockpath, timeout):
392 def __init__(self, sockpath, timeout):
393 self.sockpath = sockpath
393 self.sockpath = sockpath
394 self.timeout = int(math.ceil(timeout * 1000))
394 self.timeout = int(math.ceil(timeout * 1000))
395 self._iobuf = None
395 self._iobuf = None
396
396
397 self.pipe = CreateFile(sockpath, GENERIC_READ | GENERIC_WRITE, 0, None,
397 self.pipe = CreateFile(sockpath, GENERIC_READ | GENERIC_WRITE, 0, None,
398 OPEN_EXISTING, FILE_FLAG_OVERLAPPED, None)
398 OPEN_EXISTING, FILE_FLAG_OVERLAPPED, None)
399
399
400 if self.pipe == INVALID_HANDLE_VALUE:
400 if self.pipe == INVALID_HANDLE_VALUE:
401 self.pipe = None
401 self.pipe = None
402 self._raise_win_err('failed to open pipe %s' % sockpath,
402 self._raise_win_err('failed to open pipe %s' % sockpath,
403 GetLastError())
403 GetLastError())
404
404
405 # event for the overlapped I/O operations
405 # event for the overlapped I/O operations
406 self._waitable = CreateEvent(None, True, False, None)
406 self._waitable = CreateEvent(None, True, False, None)
407 if self._waitable is None:
407 if self._waitable is None:
408 self._raise_win_err('CreateEvent failed', GetLastError())
408 self._raise_win_err('CreateEvent failed', GetLastError())
409
409
410 self._get_overlapped_result_ex = GetOverlappedResultEx
410 self._get_overlapped_result_ex = GetOverlappedResultEx
411 if (os.getenv('WATCHMAN_WIN7_COMPAT') == '1' or
411 if (os.getenv('WATCHMAN_WIN7_COMPAT') == '1' or
412 self._get_overlapped_result_ex is None):
412 self._get_overlapped_result_ex is None):
413 self._get_overlapped_result_ex = _get_overlapped_result_ex_impl
413 self._get_overlapped_result_ex = _get_overlapped_result_ex_impl
414
414
415 def _raise_win_err(self, msg, err):
415 def _raise_win_err(self, msg, err):
416 raise IOError('%s win32 error code: %d %s' %
416 raise IOError('%s win32 error code: %d %s' %
417 (msg, err, _win32_strerror(err)))
417 (msg, err, _win32_strerror(err)))
418
418
419 def close(self):
419 def close(self):
420 if self.pipe:
420 if self.pipe:
421 log('Closing pipe')
421 log('Closing pipe')
422 CloseHandle(self.pipe)
422 CloseHandle(self.pipe)
423 self.pipe = None
423 self.pipe = None
424
424
425 if self._waitable is not None:
425 if self._waitable is not None:
426 # We release the handle for the event
426 # We release the handle for the event
427 CloseHandle(self._waitable)
427 CloseHandle(self._waitable)
428 self._waitable = None
428 self._waitable = None
429
429
430 def setTimeout(self, value):
430 def setTimeout(self, value):
431 # convert to milliseconds
431 # convert to milliseconds
432 self.timeout = int(value * 1000)
432 self.timeout = int(value * 1000)
433
433
434 def readBytes(self, size):
434 def readBytes(self, size):
435 """ A read can block for an unbounded amount of time, even if the
435 """ A read can block for an unbounded amount of time, even if the
436 kernel reports that the pipe handle is signalled, so we need to
436 kernel reports that the pipe handle is signalled, so we need to
437 always perform our reads asynchronously
437 always perform our reads asynchronously
438 """
438 """
439
439
440 # try to satisfy the read from any buffered data
440 # try to satisfy the read from any buffered data
441 if self._iobuf:
441 if self._iobuf:
442 if size >= len(self._iobuf):
442 if size >= len(self._iobuf):
443 res = self._iobuf
443 res = self._iobuf
444 self.buf = None
444 self.buf = None
445 return res
445 return res
446 res = self._iobuf[:size]
446 res = self._iobuf[:size]
447 self._iobuf = self._iobuf[size:]
447 self._iobuf = self._iobuf[size:]
448 return res
448 return res
449
449
450 # We need to initiate a read
450 # We need to initiate a read
451 buf = ctypes.create_string_buffer(size)
451 buf = ctypes.create_string_buffer(size)
452 olap = OVERLAPPED()
452 olap = OVERLAPPED()
453 olap.hEvent = self._waitable
453 olap.hEvent = self._waitable
454
454
455 log('made read buff of size %d', size)
455 log('made read buff of size %d', size)
456
456
457 # ReadFile docs warn against sending in the nread parameter for async
457 # ReadFile docs warn against sending in the nread parameter for async
458 # operations, so we always collect it via GetOverlappedResultEx
458 # operations, so we always collect it via GetOverlappedResultEx
459 immediate = ReadFile(self.pipe, buf, size, None, olap)
459 immediate = ReadFile(self.pipe, buf, size, None, olap)
460
460
461 if not immediate:
461 if not immediate:
462 err = GetLastError()
462 err = GetLastError()
463 if err != ERROR_IO_PENDING:
463 if err != ERROR_IO_PENDING:
464 self._raise_win_err('failed to read %d bytes' % size,
464 self._raise_win_err('failed to read %d bytes' % size,
465 GetLastError())
465 GetLastError())
466
466
467 nread = wintypes.DWORD()
467 nread = wintypes.DWORD()
468 if not self._get_overlapped_result_ex(self.pipe, olap, nread,
468 if not self._get_overlapped_result_ex(self.pipe, olap, nread,
469 0 if immediate else self.timeout,
469 0 if immediate else self.timeout,
470 True):
470 True):
471 err = GetLastError()
471 err = GetLastError()
472 CancelIoEx(self.pipe, olap)
472 CancelIoEx(self.pipe, olap)
473
473
474 if err == WAIT_TIMEOUT:
474 if err == WAIT_TIMEOUT:
475 log('GetOverlappedResultEx timedout')
475 log('GetOverlappedResultEx timedout')
476 raise SocketTimeout('timed out after waiting %dms for read' %
476 raise SocketTimeout('timed out after waiting %dms for read' %
477 self.timeout)
477 self.timeout)
478
478
479 log('GetOverlappedResultEx reports error %d', err)
479 log('GetOverlappedResultEx reports error %d', err)
480 self._raise_win_err('error while waiting for read', err)
480 self._raise_win_err('error while waiting for read', err)
481
481
482 nread = nread.value
482 nread = nread.value
483 if nread == 0:
483 if nread == 0:
484 # Docs say that named pipes return 0 byte when the other end did
484 # Docs say that named pipes return 0 byte when the other end did
485 # a zero byte write. Since we don't ever do that, the only
485 # a zero byte write. Since we don't ever do that, the only
486 # other way this shows up is if the client has gotten in a weird
486 # other way this shows up is if the client has gotten in a weird
487 # state, so let's bail out
487 # state, so let's bail out
488 CancelIoEx(self.pipe, olap)
488 CancelIoEx(self.pipe, olap)
489 raise IOError('Async read yielded 0 bytes; unpossible!')
489 raise IOError('Async read yielded 0 bytes; unpossible!')
490
490
491 # Holds precisely the bytes that we read from the prior request
491 # Holds precisely the bytes that we read from the prior request
492 buf = buf[:nread]
492 buf = buf[:nread]
493
493
494 returned_size = min(nread, size)
494 returned_size = min(nread, size)
495 if returned_size == nread:
495 if returned_size == nread:
496 return buf
496 return buf
497
497
498 # keep any left-overs around for a later read to consume
498 # keep any left-overs around for a later read to consume
499 self._iobuf = buf[returned_size:]
499 self._iobuf = buf[returned_size:]
500 return buf[:returned_size]
500 return buf[:returned_size]
501
501
502 def write(self, data):
502 def write(self, data):
503 olap = OVERLAPPED()
503 olap = OVERLAPPED()
504 olap.hEvent = self._waitable
504 olap.hEvent = self._waitable
505
505
506 immediate = WriteFile(self.pipe, ctypes.c_char_p(data), len(data),
506 immediate = WriteFile(self.pipe, ctypes.c_char_p(data), len(data),
507 None, olap)
507 None, olap)
508
508
509 if not immediate:
509 if not immediate:
510 err = GetLastError()
510 err = GetLastError()
511 if err != ERROR_IO_PENDING:
511 if err != ERROR_IO_PENDING:
512 self._raise_win_err('failed to write %d bytes' % len(data),
512 self._raise_win_err('failed to write %d bytes' % len(data),
513 GetLastError())
513 GetLastError())
514
514
515 # Obtain results, waiting if needed
515 # Obtain results, waiting if needed
516 nwrote = wintypes.DWORD()
516 nwrote = wintypes.DWORD()
517 if self._get_overlapped_result_ex(self.pipe, olap, nwrote,
517 if self._get_overlapped_result_ex(self.pipe, olap, nwrote,
518 0 if immediate else self.timeout,
518 0 if immediate else self.timeout,
519 True):
519 True):
520 log('made write of %d bytes', nwrote.value)
520 log('made write of %d bytes', nwrote.value)
521 return nwrote.value
521 return nwrote.value
522
522
523 err = GetLastError()
523 err = GetLastError()
524
524
525 # It's potentially unsafe to allow the write to continue after
525 # It's potentially unsafe to allow the write to continue after
526 # we unwind, so let's make a best effort to avoid that happening
526 # we unwind, so let's make a best effort to avoid that happening
527 CancelIoEx(self.pipe, olap)
527 CancelIoEx(self.pipe, olap)
528
528
529 if err == WAIT_TIMEOUT:
529 if err == WAIT_TIMEOUT:
530 raise SocketTimeout('timed out after waiting %dms for write' %
530 raise SocketTimeout('timed out after waiting %dms for write' %
531 self.timeout)
531 self.timeout)
532 self._raise_win_err('error while waiting for write of %d bytes' %
532 self._raise_win_err('error while waiting for write of %d bytes' %
533 len(data), err)
533 len(data), err)
534
534
535
535
536 class CLIProcessTransport(Transport):
536 class CLIProcessTransport(Transport):
537 """ open a pipe to the cli to talk to the service
537 """ open a pipe to the cli to talk to the service
538 This intended to be used only in the test harness!
538 This intended to be used only in the test harness!
539
539
540 The CLI is an oddball because we only support JSON input
540 The CLI is an oddball because we only support JSON input
541 and cannot send multiple commands through the same instance,
541 and cannot send multiple commands through the same instance,
542 so we spawn a new process for each command.
542 so we spawn a new process for each command.
543
543
544 We disable server spawning for this implementation, again, because
544 We disable server spawning for this implementation, again, because
545 it is intended to be used only in our test harness. You really
545 it is intended to be used only in our test harness. You really
546 should not need to use the CLI transport for anything real.
546 should not need to use the CLI transport for anything real.
547
547
548 While the CLI can output in BSER, our Transport interface doesn't
548 While the CLI can output in BSER, our Transport interface doesn't
549 support telling this instance that it should do so. That effectively
549 support telling this instance that it should do so. That effectively
550 limits this implementation to JSON input and output only at this time.
550 limits this implementation to JSON input and output only at this time.
551
551
552 It is the responsibility of the caller to set the send and
552 It is the responsibility of the caller to set the send and
553 receive codecs appropriately.
553 receive codecs appropriately.
554 """
554 """
555 proc = None
555 proc = None
556 closed = True
556 closed = True
557
557
558 def __init__(self, sockpath, timeout):
558 def __init__(self, sockpath, timeout):
559 self.sockpath = sockpath
559 self.sockpath = sockpath
560 self.timeout = timeout
560 self.timeout = timeout
561
561
562 def close(self):
562 def close(self):
563 if self.proc:
563 if self.proc:
564 if self.proc.pid is not None:
564 if self.proc.pid is not None:
565 self.proc.kill()
565 self.proc.kill()
566 self.proc.stdin.close()
566 self.proc.stdin.close()
567 self.proc.stdout.close()
567 self.proc.stdout.close()
568 self.proc = None
568 self.proc = None
569
569
570 def _connect(self):
570 def _connect(self):
571 if self.proc:
571 if self.proc:
572 return self.proc
572 return self.proc
573 args = [
573 args = [
574 'watchman',
574 'watchman',
575 '--sockname={0}'.format(self.sockpath),
575 '--sockname={0}'.format(self.sockpath),
576 '--logfile=/BOGUS',
576 '--logfile=/BOGUS',
577 '--statefile=/BOGUS',
577 '--statefile=/BOGUS',
578 '--no-spawn',
578 '--no-spawn',
579 '--no-local',
579 '--no-local',
580 '--no-pretty',
580 '--no-pretty',
581 '-j',
581 '-j',
582 ]
582 ]
583 self.proc = subprocess.Popen(args,
583 self.proc = subprocess.Popen(args,
584 stdin=subprocess.PIPE,
584 stdin=subprocess.PIPE,
585 stdout=subprocess.PIPE)
585 stdout=subprocess.PIPE)
586 return self.proc
586 return self.proc
587
587
588 def readBytes(self, size):
588 def readBytes(self, size):
589 self._connect()
589 self._connect()
590 res = self.proc.stdout.read(size)
590 res = self.proc.stdout.read(size)
591 if res == '':
591 if res == '':
592 raise WatchmanError('EOF on CLI process transport')
592 raise WatchmanError('EOF on CLI process transport')
593 return res
593 return res
594
594
595 def write(self, data):
595 def write(self, data):
596 if self.closed:
596 if self.closed:
597 self.close()
597 self.close()
598 self.closed = False
598 self.closed = False
599 self._connect()
599 self._connect()
600 res = self.proc.stdin.write(data)
600 res = self.proc.stdin.write(data)
601 self.proc.stdin.close()
601 self.proc.stdin.close()
602 self.closed = True
602 self.closed = True
603 return res
603 return res
604
604
605
605
606 class BserCodec(Codec):
606 class BserCodec(Codec):
607 """ use the BSER encoding. This is the default, preferred codec """
607 """ use the BSER encoding. This is the default, preferred codec """
608
608
609 def _loads(self, response):
609 def _loads(self, response):
610 return bser.loads(response) # Defaults to BSER v1
610 return bser.loads(response) # Defaults to BSER v1
611
611
612 def receive(self):
612 def receive(self):
613 buf = [self.transport.readBytes(sniff_len)]
613 buf = [self.transport.readBytes(sniff_len)]
614 if not buf[0]:
614 if not buf[0]:
615 raise WatchmanError('empty watchman response')
615 raise WatchmanError('empty watchman response')
616
616
617 _1, _2, elen = bser.pdu_info(buf[0])
617 _1, _2, elen = bser.pdu_info(buf[0])
618
618
619 rlen = len(buf[0])
619 rlen = len(buf[0])
620 while elen > rlen:
620 while elen > rlen:
621 buf.append(self.transport.readBytes(elen - rlen))
621 buf.append(self.transport.readBytes(elen - rlen))
622 rlen += len(buf[-1])
622 rlen += len(buf[-1])
623
623
624 response = b''.join(buf)
624 response = b''.join(buf)
625 try:
625 try:
626 res = self._loads(response)
626 res = self._loads(response)
627 return res
627 return res
628 except ValueError as e:
628 except ValueError as e:
629 raise WatchmanError('watchman response decode error: %s' % e)
629 raise WatchmanError('watchman response decode error: %s' % e)
630
630
631 def send(self, *args):
631 def send(self, *args):
632 cmd = bser.dumps(*args) # Defaults to BSER v1
632 cmd = bser.dumps(*args) # Defaults to BSER v1
633 self.transport.write(cmd)
633 self.transport.write(cmd)
634
634
635
635
636 class ImmutableBserCodec(BserCodec):
636 class ImmutableBserCodec(BserCodec):
637 """ use the BSER encoding, decoding values using the newer
637 """ use the BSER encoding, decoding values using the newer
638 immutable object support """
638 immutable object support """
639
639
640 def _loads(self, response):
640 def _loads(self, response):
641 return bser.loads(response, False) # Defaults to BSER v1
641 return bser.loads(response, False) # Defaults to BSER v1
642
642
643
643
644 class Bser2WithFallbackCodec(BserCodec):
644 class Bser2WithFallbackCodec(BserCodec):
645 """ use BSER v2 encoding """
645 """ use BSER v2 encoding """
646
646
647 def __init__(self, transport):
647 def __init__(self, transport):
648 super(Bser2WithFallbackCodec, self).__init__(transport)
648 super(Bser2WithFallbackCodec, self).__init__(transport)
649 # Once the server advertises support for bser-v2 we should switch this
649 # Once the server advertises support for bser-v2 we should switch this
650 # to 'required' on Python 3.
650 # to 'required' on Python 3.
651 self.send(["version", {"optional": ["bser-v2"]}])
651 self.send(["version", {"optional": ["bser-v2"]}])
652
652
653 capabilities = self.receive()
653 capabilities = self.receive()
654
654
655 if 'error' in capabilities:
655 if 'error' in capabilities:
656 raise Exception('Unsupported BSER version')
656 raise Exception('Unsupported BSER version')
657
657
658 if capabilities['capabilities']['bser-v2']:
658 if capabilities['capabilities']['bser-v2']:
659 self.bser_version = 2
659 self.bser_version = 2
660 self.bser_capabilities = 0
660 self.bser_capabilities = 0
661 else:
661 else:
662 self.bser_version = 1
662 self.bser_version = 1
663 self.bser_capabilities = 0
663 self.bser_capabilities = 0
664
664
665 def _loads(self, response):
665 def _loads(self, response):
666 return bser.loads(response)
666 return bser.loads(response)
667
667
668 def receive(self):
668 def receive(self):
669 buf = [self.transport.readBytes(sniff_len)]
669 buf = [self.transport.readBytes(sniff_len)]
670 if not buf[0]:
670 if not buf[0]:
671 raise WatchmanError('empty watchman response')
671 raise WatchmanError('empty watchman response')
672
672
673 recv_bser_version, recv_bser_capabilities, elen = bser.pdu_info(buf[0])
673 recv_bser_version, recv_bser_capabilities, elen = bser.pdu_info(buf[0])
674
674
675 if hasattr(self, 'bser_version'):
675 if hasattr(self, 'bser_version'):
676 # Readjust BSER version and capabilities if necessary
676 # Readjust BSER version and capabilities if necessary
677 self.bser_version = max(self.bser_version, recv_bser_version)
677 self.bser_version = max(self.bser_version, recv_bser_version)
678 self.capabilities = self.bser_capabilities & recv_bser_capabilities
678 self.capabilities = self.bser_capabilities & recv_bser_capabilities
679
679
680 rlen = len(buf[0])
680 rlen = len(buf[0])
681 while elen > rlen:
681 while elen > rlen:
682 buf.append(self.transport.readBytes(elen - rlen))
682 buf.append(self.transport.readBytes(elen - rlen))
683 rlen += len(buf[-1])
683 rlen += len(buf[-1])
684
684
685 response = b''.join(buf)
685 response = b''.join(buf)
686 try:
686 try:
687 res = self._loads(response)
687 res = self._loads(response)
688 return res
688 return res
689 except ValueError as e:
689 except ValueError as e:
690 raise WatchmanError('watchman response decode error: %s' % e)
690 raise WatchmanError('watchman response decode error: %s' % e)
691
691
692 def send(self, *args):
692 def send(self, *args):
693 if hasattr(self, 'bser_version'):
693 if hasattr(self, 'bser_version'):
694 cmd = bser.dumps(*args, version=self.bser_version,
694 cmd = bser.dumps(*args, version=self.bser_version,
695 capabilities=self.bser_capabilities)
695 capabilities=self.bser_capabilities)
696 else:
696 else:
697 cmd = bser.dumps(*args)
697 cmd = bser.dumps(*args)
698 self.transport.write(cmd)
698 self.transport.write(cmd)
699
699
700
700
701 class JsonCodec(Codec):
701 class JsonCodec(Codec):
702 """ Use json codec. This is here primarily for testing purposes """
702 """ Use json codec. This is here primarily for testing purposes """
703 json = None
703 json = None
704
704
705 def __init__(self, transport):
705 def __init__(self, transport):
706 super(JsonCodec, self).__init__(transport)
706 super(JsonCodec, self).__init__(transport)
707 # optional dep on json, only if JsonCodec is used
707 # optional dep on json, only if JsonCodec is used
708 import json
708 import json
709 self.json = json
709 self.json = json
710
710
711 def receive(self):
711 def receive(self):
712 line = self.transport.readLine()
712 line = self.transport.readLine()
713 try:
713 try:
714 # In Python 3, json.loads is a transformation from Unicode string to
714 # In Python 3, json.loads is a transformation from Unicode string to
715 # objects possibly containing Unicode strings. We typically expect
715 # objects possibly containing Unicode strings. We typically expect
716 # the JSON blob to be ASCII-only with non-ASCII characters escaped,
716 # the JSON blob to be ASCII-only with non-ASCII characters escaped,
717 # but it's possible we might get non-ASCII bytes that are valid
717 # but it's possible we might get non-ASCII bytes that are valid
718 # UTF-8.
718 # UTF-8.
719 if compat.PYTHON3:
719 if compat.PYTHON3:
720 line = line.decode('utf-8')
720 line = line.decode('utf-8')
721 return self.json.loads(line)
721 return self.json.loads(line)
722 except Exception as e:
722 except Exception as e:
723 print(e, line)
723 print(e, line)
724 raise
724 raise
725
725
726 def send(self, *args):
726 def send(self, *args):
727 cmd = self.json.dumps(*args)
727 cmd = self.json.dumps(*args)
728 # In Python 3, json.dumps is a transformation from objects possibly
728 # In Python 3, json.dumps is a transformation from objects possibly
729 # containing Unicode strings to Unicode string. Even with (the default)
729 # containing Unicode strings to Unicode string. Even with (the default)
730 # ensure_ascii=True, dumps returns a Unicode string.
730 # ensure_ascii=True, dumps returns a Unicode string.
731 if compat.PYTHON3:
731 if compat.PYTHON3:
732 cmd = cmd.encode('ascii')
732 cmd = cmd.encode('ascii')
733 self.transport.write(cmd + b"\n")
733 self.transport.write(cmd + b"\n")
734
734
735
735
736 class client(object):
736 class client(object):
737 """ Handles the communication with the watchman service """
737 """ Handles the communication with the watchman service """
738 sockpath = None
738 sockpath = None
739 transport = None
739 transport = None
740 sendCodec = None
740 sendCodec = None
741 recvCodec = None
741 recvCodec = None
742 sendConn = None
742 sendConn = None
743 recvConn = None
743 recvConn = None
744 subs = {} # Keyed by subscription name
744 subs = {} # Keyed by subscription name
745 sub_by_root = {} # Keyed by root, then by subscription name
745 sub_by_root = {} # Keyed by root, then by subscription name
746 logs = [] # When log level is raised
746 logs = [] # When log level is raised
747 unilateral = ['log', 'subscription']
747 unilateral = ['log', 'subscription']
748 tport = None
748 tport = None
749 useImmutableBser = None
749 useImmutableBser = None
750
750
751 def __init__(self,
751 def __init__(self,
752 sockpath=None,
752 sockpath=None,
753 timeout=1.0,
753 timeout=1.0,
754 transport=None,
754 transport=None,
755 sendEncoding=None,
755 sendEncoding=None,
756 recvEncoding=None,
756 recvEncoding=None,
757 useImmutableBser=False):
757 useImmutableBser=False):
758 self.sockpath = sockpath
758 self.sockpath = sockpath
759 self.timeout = timeout
759 self.timeout = timeout
760 self.useImmutableBser = useImmutableBser
760 self.useImmutableBser = useImmutableBser
761
761
762 if inspect.isclass(transport) and issubclass(transport, Transport):
762 if inspect.isclass(transport) and issubclass(transport, Transport):
763 self.transport = transport
763 self.transport = transport
764 else:
764 else:
765 transport = transport or os.getenv('WATCHMAN_TRANSPORT') or 'local'
765 transport = transport or os.getenv('WATCHMAN_TRANSPORT') or 'local'
766 if transport == 'local' and os.name == 'nt':
766 if transport == 'local' and os.name == 'nt':
767 self.transport = WindowsNamedPipeTransport
767 self.transport = WindowsNamedPipeTransport
768 elif transport == 'local':
768 elif transport == 'local':
769 self.transport = UnixSocketTransport
769 self.transport = UnixSocketTransport
770 elif transport == 'cli':
770 elif transport == 'cli':
771 self.transport = CLIProcessTransport
771 self.transport = CLIProcessTransport
772 if sendEncoding is None:
772 if sendEncoding is None:
773 sendEncoding = 'json'
773 sendEncoding = 'json'
774 if recvEncoding is None:
774 if recvEncoding is None:
775 recvEncoding = sendEncoding
775 recvEncoding = sendEncoding
776 else:
776 else:
777 raise WatchmanError('invalid transport %s' % transport)
777 raise WatchmanError('invalid transport %s' % transport)
778
778
779 sendEncoding = str(sendEncoding or os.getenv('WATCHMAN_ENCODING') or
779 sendEncoding = str(sendEncoding or os.getenv('WATCHMAN_ENCODING') or
780 'bser')
780 'bser')
781 recvEncoding = str(recvEncoding or os.getenv('WATCHMAN_ENCODING') or
781 recvEncoding = str(recvEncoding or os.getenv('WATCHMAN_ENCODING') or
782 'bser')
782 'bser')
783
783
784 self.recvCodec = self._parseEncoding(recvEncoding)
784 self.recvCodec = self._parseEncoding(recvEncoding)
785 self.sendCodec = self._parseEncoding(sendEncoding)
785 self.sendCodec = self._parseEncoding(sendEncoding)
786
786
787 def _parseEncoding(self, enc):
787 def _parseEncoding(self, enc):
788 if enc == 'bser':
788 if enc == 'bser':
789 if self.useImmutableBser:
789 if self.useImmutableBser:
790 return ImmutableBserCodec
790 return ImmutableBserCodec
791 return BserCodec
791 return BserCodec
792 elif enc == 'experimental-bser-v2':
792 elif enc == 'experimental-bser-v2':
793 return Bser2WithFallbackCodec
793 return Bser2WithFallbackCodec
794 elif enc == 'json':
794 elif enc == 'json':
795 return JsonCodec
795 return JsonCodec
796 else:
796 else:
797 raise WatchmanError('invalid encoding %s' % enc)
797 raise WatchmanError('invalid encoding %s' % enc)
798
798
799 def _hasprop(self, result, name):
799 def _hasprop(self, result, name):
800 if self.useImmutableBser:
800 if self.useImmutableBser:
801 return hasattr(result, name)
801 return hasattr(result, name)
802 return name in result
802 return name in result
803
803
804 def _resolvesockname(self):
804 def _resolvesockname(self):
805 # if invoked via a trigger, watchman will set this env var; we
805 # if invoked via a trigger, watchman will set this env var; we
806 # should use it unless explicitly set otherwise
806 # should use it unless explicitly set otherwise
807 path = os.getenv('WATCHMAN_SOCK')
807 path = os.getenv('WATCHMAN_SOCK')
808 if path:
808 if path:
809 return path
809 return path
810
810
811 cmd = ['watchman', '--output-encoding=bser', 'get-sockname']
811 cmd = ['watchman', '--output-encoding=bser', 'get-sockname']
812 try:
812 try:
813 args = dict(stdout=subprocess.PIPE,
813 args = dict(stdout=subprocess.PIPE,
814 stderr=subprocess.PIPE,
814 stderr=subprocess.PIPE,
815 close_fds=os.name != 'nt')
815 close_fds=os.name != 'nt')
816
816
817 if os.name == 'nt':
817 if os.name == 'nt':
818 # if invoked via an application with graphical user interface,
818 # if invoked via an application with graphical user interface,
819 # this call will cause a brief command window pop-up.
819 # this call will cause a brief command window pop-up.
820 # Using the flag STARTF_USESHOWWINDOW to avoid this behavior.
820 # Using the flag STARTF_USESHOWWINDOW to avoid this behavior.
821 startupinfo = subprocess.STARTUPINFO()
821 startupinfo = subprocess.STARTUPINFO()
822 startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
822 startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
823 args['startupinfo'] = startupinfo
823 args['startupinfo'] = startupinfo
824
824
825 p = subprocess.Popen(cmd, **args)
825 p = subprocess.Popen(cmd, **args)
826
826
827 except OSError as e:
827 except OSError as e:
828 raise WatchmanError('"watchman" executable not in PATH (%s)', e)
828 raise WatchmanError('"watchman" executable not in PATH (%s)' % e)
829
829
830 stdout, stderr = p.communicate()
830 stdout, stderr = p.communicate()
831 exitcode = p.poll()
831 exitcode = p.poll()
832
832
833 if exitcode:
833 if exitcode:
834 raise WatchmanError("watchman exited with code %d" % exitcode)
834 raise WatchmanError("watchman exited with code %d" % exitcode)
835
835
836 result = bser.loads(stdout)
836 result = bser.loads(stdout)
837 if b'error' in result:
837 if b'error' in result:
838 raise WatchmanError('get-sockname error: %s' % result['error'])
838 raise WatchmanError('get-sockname error: %s' % result['error'])
839
839
840 return result[b'sockname']
840 return result[b'sockname']
841
841
842 def _connect(self):
842 def _connect(self):
843 """ establish transport connection """
843 """ establish transport connection """
844
844
845 if self.recvConn:
845 if self.recvConn:
846 return
846 return
847
847
848 if self.sockpath is None:
848 if self.sockpath is None:
849 self.sockpath = self._resolvesockname()
849 self.sockpath = self._resolvesockname()
850
850
851 self.tport = self.transport(self.sockpath, self.timeout)
851 self.tport = self.transport(self.sockpath, self.timeout)
852 self.sendConn = self.sendCodec(self.tport)
852 self.sendConn = self.sendCodec(self.tport)
853 self.recvConn = self.recvCodec(self.tport)
853 self.recvConn = self.recvCodec(self.tport)
854
854
855 def __del__(self):
855 def __del__(self):
856 self.close()
856 self.close()
857
857
858 def close(self):
858 def close(self):
859 if self.tport:
859 if self.tport:
860 self.tport.close()
860 self.tport.close()
861 self.tport = None
861 self.tport = None
862 self.recvConn = None
862 self.recvConn = None
863 self.sendConn = None
863 self.sendConn = None
864
864
865 def receive(self):
865 def receive(self):
866 """ receive the next PDU from the watchman service
866 """ receive the next PDU from the watchman service
867
867
868 If the client has activated subscriptions or logs then
868 If the client has activated subscriptions or logs then
869 this PDU may be a unilateral PDU sent by the service to
869 this PDU may be a unilateral PDU sent by the service to
870 inform the client of a log event or subscription change.
870 inform the client of a log event or subscription change.
871
871
872 It may also simply be the response portion of a request
872 It may also simply be the response portion of a request
873 initiated by query.
873 initiated by query.
874
874
875 There are clients in production that subscribe and call
875 There are clients in production that subscribe and call
876 this in a loop to retrieve all subscription responses,
876 this in a loop to retrieve all subscription responses,
877 so care should be taken when making changes here.
877 so care should be taken when making changes here.
878 """
878 """
879
879
880 self._connect()
880 self._connect()
881 result = self.recvConn.receive()
881 result = self.recvConn.receive()
882 if self._hasprop(result, 'error'):
882 if self._hasprop(result, 'error'):
883 error = result['error']
883 error = result['error']
884 if compat.PYTHON3 and isinstance(self.recvConn, BserCodec):
884 if compat.PYTHON3 and isinstance(self.recvConn, BserCodec):
885 error = result['error'].decode('utf-8', 'surrogateescape')
885 error = result['error'].decode('utf-8', 'surrogateescape')
886 raise CommandError(error)
886 raise CommandError(error)
887
887
888 if self._hasprop(result, 'log'):
888 if self._hasprop(result, 'log'):
889 log = result['log']
889 log = result['log']
890 if compat.PYTHON3 and isinstance(self.recvConn, BserCodec):
890 if compat.PYTHON3 and isinstance(self.recvConn, BserCodec):
891 log = log.decode('utf-8', 'surrogateescape')
891 log = log.decode('utf-8', 'surrogateescape')
892 self.logs.append(log)
892 self.logs.append(log)
893
893
894 if self._hasprop(result, 'subscription'):
894 if self._hasprop(result, 'subscription'):
895 sub = result['subscription']
895 sub = result['subscription']
896 if not (sub in self.subs):
896 if not (sub in self.subs):
897 self.subs[sub] = []
897 self.subs[sub] = []
898 self.subs[sub].append(result)
898 self.subs[sub].append(result)
899
899
900 # also accumulate in {root,sub} keyed store
900 # also accumulate in {root,sub} keyed store
901 root = os.path.normcase(result['root'])
901 root = os.path.normcase(result['root'])
902 if not root in self.sub_by_root:
902 if not root in self.sub_by_root:
903 self.sub_by_root[root] = {}
903 self.sub_by_root[root] = {}
904 if not sub in self.sub_by_root[root]:
904 if not sub in self.sub_by_root[root]:
905 self.sub_by_root[root][sub] = []
905 self.sub_by_root[root][sub] = []
906 self.sub_by_root[root][sub].append(result)
906 self.sub_by_root[root][sub].append(result)
907
907
908 return result
908 return result
909
909
910 def isUnilateralResponse(self, res):
910 def isUnilateralResponse(self, res):
911 if 'unilateral' in res and res['unilateral']:
911 if 'unilateral' in res and res['unilateral']:
912 return True
912 return True
913 # Fall back to checking for known unilateral responses
913 # Fall back to checking for known unilateral responses
914 for k in self.unilateral:
914 for k in self.unilateral:
915 if k in res:
915 if k in res:
916 return True
916 return True
917 return False
917 return False
918
918
919 def getLog(self, remove=True):
919 def getLog(self, remove=True):
920 """ Retrieve buffered log data
920 """ Retrieve buffered log data
921
921
922 If remove is true the data will be removed from the buffer.
922 If remove is true the data will be removed from the buffer.
923 Otherwise it will be left in the buffer
923 Otherwise it will be left in the buffer
924 """
924 """
925 res = self.logs
925 res = self.logs
926 if remove:
926 if remove:
927 self.logs = []
927 self.logs = []
928 return res
928 return res
929
929
930 def getSubscription(self, name, remove=True, root=None):
930 def getSubscription(self, name, remove=True, root=None):
931 """ Retrieve the data associated with a named subscription
931 """ Retrieve the data associated with a named subscription
932
932
933 If remove is True (the default), the subscription data is removed
933 If remove is True (the default), the subscription data is removed
934 from the buffer. Otherwise the data is returned but left in
934 from the buffer. Otherwise the data is returned but left in
935 the buffer.
935 the buffer.
936
936
937 Returns None if there is no data associated with `name`
937 Returns None if there is no data associated with `name`
938
938
939 If root is not None, then only return the subscription
939 If root is not None, then only return the subscription
940 data that matches both root and name. When used in this way,
940 data that matches both root and name. When used in this way,
941 remove processing impacts both the unscoped and scoped stores
941 remove processing impacts both the unscoped and scoped stores
942 for the subscription data.
942 for the subscription data.
943 """
943 """
944 if compat.PYTHON3 and issubclass(self.recvCodec, BserCodec):
944 if compat.PYTHON3 and issubclass(self.recvCodec, BserCodec):
945 # People may pass in Unicode strings here -- but currently BSER only
945 # People may pass in Unicode strings here -- but currently BSER only
946 # returns bytestrings. Deal with that.
946 # returns bytestrings. Deal with that.
947 if isinstance(root, str):
947 if isinstance(root, str):
948 root = encoding.encode_local(root)
948 root = encoding.encode_local(root)
949 if isinstance(name, str):
949 if isinstance(name, str):
950 name = name.encode('utf-8')
950 name = name.encode('utf-8')
951
951
952 if root is not None:
952 if root is not None:
953 if not root in self.sub_by_root:
953 if not root in self.sub_by_root:
954 return None
954 return None
955 if not name in self.sub_by_root[root]:
955 if not name in self.sub_by_root[root]:
956 return None
956 return None
957 sub = self.sub_by_root[root][name]
957 sub = self.sub_by_root[root][name]
958 if remove:
958 if remove:
959 del self.sub_by_root[root][name]
959 del self.sub_by_root[root][name]
960 # don't let this grow unbounded
960 # don't let this grow unbounded
961 if name in self.subs:
961 if name in self.subs:
962 del self.subs[name]
962 del self.subs[name]
963 return sub
963 return sub
964
964
965 if not (name in self.subs):
965 if not (name in self.subs):
966 return None
966 return None
967 sub = self.subs[name]
967 sub = self.subs[name]
968 if remove:
968 if remove:
969 del self.subs[name]
969 del self.subs[name]
970 return sub
970 return sub
971
971
972 def query(self, *args):
972 def query(self, *args):
973 """ Send a query to the watchman service and return the response
973 """ Send a query to the watchman service and return the response
974
974
975 This call will block until the response is returned.
975 This call will block until the response is returned.
976 If any unilateral responses are sent by the service in between
976 If any unilateral responses are sent by the service in between
977 the request-response they will be buffered up in the client object
977 the request-response they will be buffered up in the client object
978 and NOT returned via this method.
978 and NOT returned via this method.
979 """
979 """
980
980
981 log('calling client.query')
981 log('calling client.query')
982 self._connect()
982 self._connect()
983 try:
983 try:
984 self.sendConn.send(args)
984 self.sendConn.send(args)
985
985
986 res = self.receive()
986 res = self.receive()
987 while self.isUnilateralResponse(res):
987 while self.isUnilateralResponse(res):
988 res = self.receive()
988 res = self.receive()
989
989
990 return res
990 return res
991 except EnvironmentError as ee:
991 except EnvironmentError as ee:
992 # When we can depend on Python 3, we can use PEP 3134
992 # When we can depend on Python 3, we can use PEP 3134
993 # exception chaining here.
993 # exception chaining here.
994 raise WatchmanEnvironmentError(
994 raise WatchmanEnvironmentError(
995 'I/O error communicating with watchman daemon',
995 'I/O error communicating with watchman daemon',
996 ee.errno,
996 ee.errno,
997 ee.strerror,
997 ee.strerror,
998 args)
998 args)
999 except WatchmanError as ex:
999 except WatchmanError as ex:
1000 ex.setCommand(args)
1000 ex.setCommand(args)
1001 raise
1001 raise
1002
1002
1003 def capabilityCheck(self, optional=None, required=None):
1003 def capabilityCheck(self, optional=None, required=None):
1004 """ Perform a server capability check """
1004 """ Perform a server capability check """
1005 res = self.query('version', {
1005 res = self.query('version', {
1006 'optional': optional or [],
1006 'optional': optional or [],
1007 'required': required or []
1007 'required': required or []
1008 })
1008 })
1009
1009
1010 if not self._hasprop(res, 'capabilities'):
1010 if not self._hasprop(res, 'capabilities'):
1011 # Server doesn't support capabilities, so we need to
1011 # Server doesn't support capabilities, so we need to
1012 # synthesize the results based on the version
1012 # synthesize the results based on the version
1013 capabilities.synthesize(res, optional)
1013 capabilities.synthesize(res, optional)
1014 if 'error' in res:
1014 if 'error' in res:
1015 raise CommandError(res['error'])
1015 raise CommandError(res['error'])
1016
1016
1017 return res
1017 return res
1018
1018
1019 def setTimeout(self, value):
1019 def setTimeout(self, value):
1020 self.recvConn.setTimeout(value)
1020 self.recvConn.setTimeout(value)
1021 self.sendConn.setTimeout(value)
1021 self.sendConn.setTimeout(value)
General Comments 0
You need to be logged in to leave comments. Login now