##// END OF EJS Templates
fsmonitor: reapply dd35abc409ee...
Gregory Szorc -
r43705:9eed959c stable
parent child Browse files
Show More
@@ -1,1195 +1,1195 b''
1 # Copyright 2014-present Facebook, Inc.
1 # Copyright 2014-present Facebook, Inc.
2 # All rights reserved.
2 # All rights reserved.
3 #
3 #
4 # Redistribution and use in source and binary forms, with or without
4 # Redistribution and use in source and binary forms, with or without
5 # modification, are permitted provided that the following conditions are met:
5 # modification, are permitted provided that the following conditions are met:
6 #
6 #
7 # * Redistributions of source code must retain the above copyright notice,
7 # * Redistributions of source code must retain the above copyright notice,
8 # this list of conditions and the following disclaimer.
8 # this list of conditions and the following disclaimer.
9 #
9 #
10 # * Redistributions in binary form must reproduce the above copyright notice,
10 # * Redistributions in binary form must reproduce the above copyright notice,
11 # this list of conditions and the following disclaimer in the documentation
11 # this list of conditions and the following disclaimer in the documentation
12 # and/or other materials provided with the distribution.
12 # and/or other materials provided with the distribution.
13 #
13 #
14 # * Neither the name Facebook nor the names of its contributors may be used to
14 # * Neither the name Facebook nor the names of its contributors may be used to
15 # endorse or promote products derived from this software without specific
15 # endorse or promote products derived from this software without specific
16 # prior written permission.
16 # prior written permission.
17 #
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
20 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21 # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
21 # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
22 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
23 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24 # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
24 # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
25 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
25 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
26 # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26 # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28
28
29 # no unicode literals
29 # no unicode literals
30 from __future__ import absolute_import, division, print_function
30 from __future__ import absolute_import, division, print_function
31
31
32 import inspect
32 import inspect
33 import math
33 import math
34 import os
34 import os
35 import socket
35 import socket
36 import subprocess
36 import subprocess
37 import time
37 import time
38
38
39 from . import capabilities, compat, encoding, load
39 from . import capabilities, compat, encoding, load
40
40
41
41
42 # Sometimes it's really hard to get Python extensions to compile,
42 # Sometimes it's really hard to get Python extensions to compile,
43 # so fall back to a pure Python implementation.
43 # so fall back to a pure Python implementation.
44 try:
44 try:
45 from . import bser
45 from . import bser
46
46
47 # Demandimport causes modules to be loaded lazily. Force the load now
47 # Demandimport causes modules to be loaded lazily. Force the load now
48 # so that we can fall back on pybser if bser doesn't exist
48 # so that we can fall back on pybser if bser doesn't exist
49 bser.pdu_info
49 bser.pdu_info
50 except ImportError:
50 except ImportError:
51 from . import pybser as bser
51 from . import pybser as bser
52
52
53
53
54 if os.name == "nt":
54 if os.name == "nt":
55 import ctypes
55 import ctypes
56 import ctypes.wintypes
56 import ctypes.wintypes
57
57
58 wintypes = ctypes.wintypes
58 wintypes = ctypes.wintypes
59 GENERIC_READ = 0x80000000
59 GENERIC_READ = 0x80000000
60 GENERIC_WRITE = 0x40000000
60 GENERIC_WRITE = 0x40000000
61 FILE_FLAG_OVERLAPPED = 0x40000000
61 FILE_FLAG_OVERLAPPED = 0x40000000
62 OPEN_EXISTING = 3
62 OPEN_EXISTING = 3
63 INVALID_HANDLE_VALUE = ctypes.c_void_p(-1).value
63 INVALID_HANDLE_VALUE = ctypes.c_void_p(-1).value
64 FORMAT_MESSAGE_FROM_SYSTEM = 0x00001000
64 FORMAT_MESSAGE_FROM_SYSTEM = 0x00001000
65 FORMAT_MESSAGE_ALLOCATE_BUFFER = 0x00000100
65 FORMAT_MESSAGE_ALLOCATE_BUFFER = 0x00000100
66 FORMAT_MESSAGE_IGNORE_INSERTS = 0x00000200
66 FORMAT_MESSAGE_IGNORE_INSERTS = 0x00000200
67 WAIT_FAILED = 0xFFFFFFFF
67 WAIT_FAILED = 0xFFFFFFFF
68 WAIT_TIMEOUT = 0x00000102
68 WAIT_TIMEOUT = 0x00000102
69 WAIT_OBJECT_0 = 0x00000000
69 WAIT_OBJECT_0 = 0x00000000
70 WAIT_IO_COMPLETION = 0x000000C0
70 WAIT_IO_COMPLETION = 0x000000C0
71 INFINITE = 0xFFFFFFFF
71 INFINITE = 0xFFFFFFFF
72
72
73 # Overlapped I/O operation is in progress. (997)
73 # Overlapped I/O operation is in progress. (997)
74 ERROR_IO_PENDING = 0x000003E5
74 ERROR_IO_PENDING = 0x000003E5
75
75
76 # The pointer size follows the architecture
76 # The pointer size follows the architecture
77 # We use WPARAM since this type is already conditionally defined
77 # We use WPARAM since this type is already conditionally defined
78 ULONG_PTR = ctypes.wintypes.WPARAM
78 ULONG_PTR = ctypes.wintypes.WPARAM
79
79
80 class OVERLAPPED(ctypes.Structure):
80 class OVERLAPPED(ctypes.Structure):
81 _fields_ = [
81 _fields_ = [
82 ("Internal", ULONG_PTR),
82 ("Internal", ULONG_PTR),
83 ("InternalHigh", ULONG_PTR),
83 ("InternalHigh", ULONG_PTR),
84 ("Offset", wintypes.DWORD),
84 ("Offset", wintypes.DWORD),
85 ("OffsetHigh", wintypes.DWORD),
85 ("OffsetHigh", wintypes.DWORD),
86 ("hEvent", wintypes.HANDLE),
86 ("hEvent", wintypes.HANDLE),
87 ]
87 ]
88
88
89 def __init__(self):
89 def __init__(self):
90 self.Internal = 0
90 self.Internal = 0
91 self.InternalHigh = 0
91 self.InternalHigh = 0
92 self.Offset = 0
92 self.Offset = 0
93 self.OffsetHigh = 0
93 self.OffsetHigh = 0
94 self.hEvent = 0
94 self.hEvent = 0
95
95
96 LPDWORD = ctypes.POINTER(wintypes.DWORD)
96 LPDWORD = ctypes.POINTER(wintypes.DWORD)
97
97
98 CreateFile = ctypes.windll.kernel32.CreateFileA
98 CreateFile = ctypes.windll.kernel32.CreateFileA
99 CreateFile.argtypes = [
99 CreateFile.argtypes = [
100 wintypes.LPSTR,
100 wintypes.LPSTR,
101 wintypes.DWORD,
101 wintypes.DWORD,
102 wintypes.DWORD,
102 wintypes.DWORD,
103 wintypes.LPVOID,
103 wintypes.LPVOID,
104 wintypes.DWORD,
104 wintypes.DWORD,
105 wintypes.DWORD,
105 wintypes.DWORD,
106 wintypes.HANDLE,
106 wintypes.HANDLE,
107 ]
107 ]
108 CreateFile.restype = wintypes.HANDLE
108 CreateFile.restype = wintypes.HANDLE
109
109
110 CloseHandle = ctypes.windll.kernel32.CloseHandle
110 CloseHandle = ctypes.windll.kernel32.CloseHandle
111 CloseHandle.argtypes = [wintypes.HANDLE]
111 CloseHandle.argtypes = [wintypes.HANDLE]
112 CloseHandle.restype = wintypes.BOOL
112 CloseHandle.restype = wintypes.BOOL
113
113
114 ReadFile = ctypes.windll.kernel32.ReadFile
114 ReadFile = ctypes.windll.kernel32.ReadFile
115 ReadFile.argtypes = [
115 ReadFile.argtypes = [
116 wintypes.HANDLE,
116 wintypes.HANDLE,
117 wintypes.LPVOID,
117 wintypes.LPVOID,
118 wintypes.DWORD,
118 wintypes.DWORD,
119 LPDWORD,
119 LPDWORD,
120 ctypes.POINTER(OVERLAPPED),
120 ctypes.POINTER(OVERLAPPED),
121 ]
121 ]
122 ReadFile.restype = wintypes.BOOL
122 ReadFile.restype = wintypes.BOOL
123
123
124 WriteFile = ctypes.windll.kernel32.WriteFile
124 WriteFile = ctypes.windll.kernel32.WriteFile
125 WriteFile.argtypes = [
125 WriteFile.argtypes = [
126 wintypes.HANDLE,
126 wintypes.HANDLE,
127 wintypes.LPVOID,
127 wintypes.LPVOID,
128 wintypes.DWORD,
128 wintypes.DWORD,
129 LPDWORD,
129 LPDWORD,
130 ctypes.POINTER(OVERLAPPED),
130 ctypes.POINTER(OVERLAPPED),
131 ]
131 ]
132 WriteFile.restype = wintypes.BOOL
132 WriteFile.restype = wintypes.BOOL
133
133
134 GetLastError = ctypes.windll.kernel32.GetLastError
134 GetLastError = ctypes.windll.kernel32.GetLastError
135 GetLastError.argtypes = []
135 GetLastError.argtypes = []
136 GetLastError.restype = wintypes.DWORD
136 GetLastError.restype = wintypes.DWORD
137
137
138 SetLastError = ctypes.windll.kernel32.SetLastError
138 SetLastError = ctypes.windll.kernel32.SetLastError
139 SetLastError.argtypes = [wintypes.DWORD]
139 SetLastError.argtypes = [wintypes.DWORD]
140 SetLastError.restype = None
140 SetLastError.restype = None
141
141
142 FormatMessage = ctypes.windll.kernel32.FormatMessageA
142 FormatMessage = ctypes.windll.kernel32.FormatMessageA
143 FormatMessage.argtypes = [
143 FormatMessage.argtypes = [
144 wintypes.DWORD,
144 wintypes.DWORD,
145 wintypes.LPVOID,
145 wintypes.LPVOID,
146 wintypes.DWORD,
146 wintypes.DWORD,
147 wintypes.DWORD,
147 wintypes.DWORD,
148 ctypes.POINTER(wintypes.LPSTR),
148 ctypes.POINTER(wintypes.LPSTR),
149 wintypes.DWORD,
149 wintypes.DWORD,
150 wintypes.LPVOID,
150 wintypes.LPVOID,
151 ]
151 ]
152 FormatMessage.restype = wintypes.DWORD
152 FormatMessage.restype = wintypes.DWORD
153
153
154 LocalFree = ctypes.windll.kernel32.LocalFree
154 LocalFree = ctypes.windll.kernel32.LocalFree
155
155
156 GetOverlappedResult = ctypes.windll.kernel32.GetOverlappedResult
156 GetOverlappedResult = ctypes.windll.kernel32.GetOverlappedResult
157 GetOverlappedResult.argtypes = [
157 GetOverlappedResult.argtypes = [
158 wintypes.HANDLE,
158 wintypes.HANDLE,
159 ctypes.POINTER(OVERLAPPED),
159 ctypes.POINTER(OVERLAPPED),
160 LPDWORD,
160 LPDWORD,
161 wintypes.BOOL,
161 wintypes.BOOL,
162 ]
162 ]
163 GetOverlappedResult.restype = wintypes.BOOL
163 GetOverlappedResult.restype = wintypes.BOOL
164
164
165 GetOverlappedResultEx = getattr(
165 GetOverlappedResultEx = getattr(
166 ctypes.windll.kernel32, "GetOverlappedResultEx", None
166 ctypes.windll.kernel32, "GetOverlappedResultEx", None
167 )
167 )
168 if GetOverlappedResultEx is not None:
168 if GetOverlappedResultEx is not None:
169 GetOverlappedResultEx.argtypes = [
169 GetOverlappedResultEx.argtypes = [
170 wintypes.HANDLE,
170 wintypes.HANDLE,
171 ctypes.POINTER(OVERLAPPED),
171 ctypes.POINTER(OVERLAPPED),
172 LPDWORD,
172 LPDWORD,
173 wintypes.DWORD,
173 wintypes.DWORD,
174 wintypes.BOOL,
174 wintypes.BOOL,
175 ]
175 ]
176 GetOverlappedResultEx.restype = wintypes.BOOL
176 GetOverlappedResultEx.restype = wintypes.BOOL
177
177
178 WaitForSingleObjectEx = ctypes.windll.kernel32.WaitForSingleObjectEx
178 WaitForSingleObjectEx = ctypes.windll.kernel32.WaitForSingleObjectEx
179 WaitForSingleObjectEx.argtypes = [
179 WaitForSingleObjectEx.argtypes = [
180 wintypes.HANDLE,
180 wintypes.HANDLE,
181 wintypes.DWORD,
181 wintypes.DWORD,
182 wintypes.BOOL,
182 wintypes.BOOL,
183 ]
183 ]
184 WaitForSingleObjectEx.restype = wintypes.DWORD
184 WaitForSingleObjectEx.restype = wintypes.DWORD
185
185
186 CreateEvent = ctypes.windll.kernel32.CreateEventA
186 CreateEvent = ctypes.windll.kernel32.CreateEventA
187 CreateEvent.argtypes = [
187 CreateEvent.argtypes = [
188 LPDWORD,
188 LPDWORD,
189 wintypes.BOOL,
189 wintypes.BOOL,
190 wintypes.BOOL,
190 wintypes.BOOL,
191 wintypes.LPSTR,
191 wintypes.LPSTR,
192 ]
192 ]
193 CreateEvent.restype = wintypes.HANDLE
193 CreateEvent.restype = wintypes.HANDLE
194
194
195 # Windows Vista is the minimum supported client for CancelIoEx.
195 # Windows Vista is the minimum supported client for CancelIoEx.
196 CancelIoEx = ctypes.windll.kernel32.CancelIoEx
196 CancelIoEx = ctypes.windll.kernel32.CancelIoEx
197 CancelIoEx.argtypes = [wintypes.HANDLE, ctypes.POINTER(OVERLAPPED)]
197 CancelIoEx.argtypes = [wintypes.HANDLE, ctypes.POINTER(OVERLAPPED)]
198 CancelIoEx.restype = wintypes.BOOL
198 CancelIoEx.restype = wintypes.BOOL
199
199
200 # 2 bytes marker, 1 byte int size, 8 bytes int64 value
200 # 2 bytes marker, 1 byte int size, 8 bytes int64 value
201 sniff_len = 13
201 sniff_len = 13
202
202
203 # This is a helper for debugging the client.
203 # This is a helper for debugging the client.
204 _debugging = False
204 _debugging = False
205 if _debugging:
205 if _debugging:
206
206
207 def log(fmt, *args):
207 def log(fmt, *args):
208 print(
208 print(
209 "[%s] %s"
209 "[%s] %s"
210 % (
210 % (
211 time.strftime("%a, %d %b %Y %H:%M:%S", time.gmtime()),
211 time.strftime("%a, %d %b %Y %H:%M:%S", time.gmtime()),
212 fmt % args[:],
212 fmt % args[:],
213 )
213 )
214 )
214 )
215
215
216
216
217 else:
217 else:
218
218
219 def log(fmt, *args):
219 def log(fmt, *args):
220 pass
220 pass
221
221
222
222
223 def _win32_strerror(err):
223 def _win32_strerror(err):
224 """ expand a win32 error code into a human readable message """
224 """ expand a win32 error code into a human readable message """
225
225
226 # FormatMessage will allocate memory and assign it here
226 # FormatMessage will allocate memory and assign it here
227 buf = ctypes.c_char_p()
227 buf = ctypes.c_char_p()
228 FormatMessage(
228 FormatMessage(
229 FORMAT_MESSAGE_FROM_SYSTEM
229 FORMAT_MESSAGE_FROM_SYSTEM
230 | FORMAT_MESSAGE_ALLOCATE_BUFFER
230 | FORMAT_MESSAGE_ALLOCATE_BUFFER
231 | FORMAT_MESSAGE_IGNORE_INSERTS,
231 | FORMAT_MESSAGE_IGNORE_INSERTS,
232 None,
232 None,
233 err,
233 err,
234 0,
234 0,
235 buf,
235 buf,
236 0,
236 0,
237 None,
237 None,
238 )
238 )
239 try:
239 try:
240 return buf.value
240 return buf.value
241 finally:
241 finally:
242 LocalFree(buf)
242 LocalFree(buf)
243
243
244
244
245 class WatchmanError(Exception):
245 class WatchmanError(Exception):
246 def __init__(self, msg=None, cmd=None):
246 def __init__(self, msg=None, cmd=None):
247 self.msg = msg
247 self.msg = msg
248 self.cmd = cmd
248 self.cmd = cmd
249
249
250 def setCommand(self, cmd):
250 def setCommand(self, cmd):
251 self.cmd = cmd
251 self.cmd = cmd
252
252
253 def __str__(self):
253 def __str__(self):
254 if self.cmd:
254 if self.cmd:
255 return "%s, while executing %s" % (self.msg, self.cmd)
255 return "%s, while executing %s" % (self.msg, self.cmd)
256 return self.msg
256 return self.msg
257
257
258
258
259 class BSERv1Unsupported(WatchmanError):
259 class BSERv1Unsupported(WatchmanError):
260 pass
260 pass
261
261
262
262
263 class UseAfterFork(WatchmanError):
263 class UseAfterFork(WatchmanError):
264 pass
264 pass
265
265
266
266
267 class WatchmanEnvironmentError(WatchmanError):
267 class WatchmanEnvironmentError(WatchmanError):
268 def __init__(self, msg, errno, errmsg, cmd=None):
268 def __init__(self, msg, errno, errmsg, cmd=None):
269 super(WatchmanEnvironmentError, self).__init__(
269 super(WatchmanEnvironmentError, self).__init__(
270 "{0}: errno={1} errmsg={2}".format(msg, errno, errmsg), cmd
270 "{0}: errno={1} errmsg={2}".format(msg, errno, errmsg), cmd
271 )
271 )
272
272
273
273
274 class SocketConnectError(WatchmanError):
274 class SocketConnectError(WatchmanError):
275 def __init__(self, sockpath, exc):
275 def __init__(self, sockpath, exc):
276 super(SocketConnectError, self).__init__(
276 super(SocketConnectError, self).__init__(
277 "unable to connect to %s: %s" % (sockpath, exc)
277 "unable to connect to %s: %s" % (sockpath, exc)
278 )
278 )
279 self.sockpath = sockpath
279 self.sockpath = sockpath
280 self.exc = exc
280 self.exc = exc
281
281
282
282
283 class SocketTimeout(WatchmanError):
283 class SocketTimeout(WatchmanError):
284 """A specialized exception raised for socket timeouts during communication to/from watchman.
284 """A specialized exception raised for socket timeouts during communication to/from watchman.
285 This makes it easier to implement non-blocking loops as callers can easily distinguish
285 This makes it easier to implement non-blocking loops as callers can easily distinguish
286 between a routine timeout and an actual error condition.
286 between a routine timeout and an actual error condition.
287
287
288 Note that catching WatchmanError will also catch this as it is a super-class, so backwards
288 Note that catching WatchmanError will also catch this as it is a super-class, so backwards
289 compatibility in exception handling is preserved.
289 compatibility in exception handling is preserved.
290 """
290 """
291
291
292
292
293 class CommandError(WatchmanError):
293 class CommandError(WatchmanError):
294 """error returned by watchman
294 """error returned by watchman
295
295
296 self.msg is the message returned by watchman.
296 self.msg is the message returned by watchman.
297 """
297 """
298
298
299 def __init__(self, msg, cmd=None):
299 def __init__(self, msg, cmd=None):
300 super(CommandError, self).__init__(
300 super(CommandError, self).__init__(
301 "watchman command error: %s" % (msg,), cmd
301 "watchman command error: %s" % (msg,), cmd
302 )
302 )
303
303
304
304
305 class Transport(object):
305 class Transport(object):
306 """ communication transport to the watchman server """
306 """ communication transport to the watchman server """
307
307
308 buf = None
308 buf = None
309
309
310 def close(self):
310 def close(self):
311 """ tear it down """
311 """ tear it down """
312 raise NotImplementedError()
312 raise NotImplementedError()
313
313
314 def readBytes(self, size):
314 def readBytes(self, size):
315 """ read size bytes """
315 """ read size bytes """
316 raise NotImplementedError()
316 raise NotImplementedError()
317
317
318 def write(self, buf):
318 def write(self, buf):
319 """ write some data """
319 """ write some data """
320 raise NotImplementedError()
320 raise NotImplementedError()
321
321
322 def setTimeout(self, value):
322 def setTimeout(self, value):
323 pass
323 pass
324
324
325 def readLine(self):
325 def readLine(self):
326 """ read a line
326 """ read a line
327 Maintains its own buffer, callers of the transport should not mix
327 Maintains its own buffer, callers of the transport should not mix
328 calls to readBytes and readLine.
328 calls to readBytes and readLine.
329 """
329 """
330 if self.buf is None:
330 if self.buf is None:
331 self.buf = []
331 self.buf = []
332
332
333 # Buffer may already have a line if we've received unilateral
333 # Buffer may already have a line if we've received unilateral
334 # response(s) from the server
334 # response(s) from the server
335 if len(self.buf) == 1 and b"\n" in self.buf[0]:
335 if len(self.buf) == 1 and b"\n" in self.buf[0]:
336 (line, b) = self.buf[0].split(b"\n", 1)
336 (line, b) = self.buf[0].split(b"\n", 1)
337 self.buf = [b]
337 self.buf = [b]
338 return line
338 return line
339
339
340 while True:
340 while True:
341 b = self.readBytes(4096)
341 b = self.readBytes(4096)
342 if b"\n" in b:
342 if b"\n" in b:
343 result = b"".join(self.buf)
343 result = b"".join(self.buf)
344 (line, b) = b.split(b"\n", 1)
344 (line, b) = b.split(b"\n", 1)
345 self.buf = [b]
345 self.buf = [b]
346 return result + line
346 return result + line
347 self.buf.append(b)
347 self.buf.append(b)
348
348
349
349
350 class Codec(object):
350 class Codec(object):
351 """ communication encoding for the watchman server """
351 """ communication encoding for the watchman server """
352
352
353 transport = None
353 transport = None
354
354
355 def __init__(self, transport):
355 def __init__(self, transport):
356 self.transport = transport
356 self.transport = transport
357
357
358 def receive(self):
358 def receive(self):
359 raise NotImplementedError()
359 raise NotImplementedError()
360
360
361 def send(self, *args):
361 def send(self, *args):
362 raise NotImplementedError()
362 raise NotImplementedError()
363
363
364 def setTimeout(self, value):
364 def setTimeout(self, value):
365 self.transport.setTimeout(value)
365 self.transport.setTimeout(value)
366
366
367
367
368 class UnixSocketTransport(Transport):
368 class UnixSocketTransport(Transport):
369 """ local unix domain socket transport """
369 """ local unix domain socket transport """
370
370
371 sock = None
371 sock = None
372
372
373 def __init__(self, sockpath, timeout):
373 def __init__(self, sockpath, timeout):
374 self.sockpath = sockpath
374 self.sockpath = sockpath
375 self.timeout = timeout
375 self.timeout = timeout
376
376
377 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
377 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
378 try:
378 try:
379 sock.settimeout(self.timeout)
379 sock.settimeout(self.timeout)
380 sock.connect(self.sockpath)
380 sock.connect(self.sockpath)
381 self.sock = sock
381 self.sock = sock
382 except socket.error as e:
382 except socket.error as e:
383 sock.close()
383 sock.close()
384 raise SocketConnectError(self.sockpath, e)
384 raise SocketConnectError(self.sockpath, e)
385
385
386 def close(self):
386 def close(self):
387 if self.sock:
387 if self.sock:
388 self.sock.close()
388 self.sock.close()
389 self.sock = None
389 self.sock = None
390
390
391 def setTimeout(self, value):
391 def setTimeout(self, value):
392 self.timeout = value
392 self.timeout = value
393 self.sock.settimeout(self.timeout)
393 self.sock.settimeout(self.timeout)
394
394
395 def readBytes(self, size):
395 def readBytes(self, size):
396 try:
396 try:
397 buf = [self.sock.recv(size)]
397 buf = [self.sock.recv(size)]
398 if not buf[0]:
398 if not buf[0]:
399 raise WatchmanError("empty watchman response")
399 raise WatchmanError("empty watchman response")
400 return buf[0]
400 return buf[0]
401 except socket.timeout:
401 except socket.timeout:
402 raise SocketTimeout("timed out waiting for response")
402 raise SocketTimeout("timed out waiting for response")
403
403
404 def write(self, data):
404 def write(self, data):
405 try:
405 try:
406 self.sock.sendall(data)
406 self.sock.sendall(data)
407 except socket.timeout:
407 except socket.timeout:
408 raise SocketTimeout("timed out sending query command")
408 raise SocketTimeout("timed out sending query command")
409
409
410
410
411 def _get_overlapped_result_ex_impl(pipe, olap, nbytes, millis, alertable):
411 def _get_overlapped_result_ex_impl(pipe, olap, nbytes, millis, alertable):
412 """ Windows 7 and earlier does not support GetOverlappedResultEx. The
412 """ Windows 7 and earlier does not support GetOverlappedResultEx. The
413 alternative is to use GetOverlappedResult and wait for read or write
413 alternative is to use GetOverlappedResult and wait for read or write
414 operation to complete. This is done be using CreateEvent and
414 operation to complete. This is done be using CreateEvent and
415 WaitForSingleObjectEx. CreateEvent, WaitForSingleObjectEx
415 WaitForSingleObjectEx. CreateEvent, WaitForSingleObjectEx
416 and GetOverlappedResult are all part of Windows API since WindowsXP.
416 and GetOverlappedResult are all part of Windows API since WindowsXP.
417 This is the exact same implementation that can be found in the watchman
417 This is the exact same implementation that can be found in the watchman
418 source code (see get_overlapped_result_ex_impl in stream_win.c). This
418 source code (see get_overlapped_result_ex_impl in stream_win.c). This
419 way, maintenance should be simplified.
419 way, maintenance should be simplified.
420 """
420 """
421 log("Preparing to wait for maximum %dms", millis)
421 log("Preparing to wait for maximum %dms", millis)
422 if millis != 0:
422 if millis != 0:
423 waitReturnCode = WaitForSingleObjectEx(olap.hEvent, millis, alertable)
423 waitReturnCode = WaitForSingleObjectEx(olap.hEvent, millis, alertable)
424 if waitReturnCode == WAIT_OBJECT_0:
424 if waitReturnCode == WAIT_OBJECT_0:
425 # Event is signaled, overlapped IO operation result should be available.
425 # Event is signaled, overlapped IO operation result should be available.
426 pass
426 pass
427 elif waitReturnCode == WAIT_IO_COMPLETION:
427 elif waitReturnCode == WAIT_IO_COMPLETION:
428 # WaitForSingleObjectEx returnes because the system added an I/O completion
428 # WaitForSingleObjectEx returnes because the system added an I/O completion
429 # routine or an asynchronous procedure call (APC) to the thread queue.
429 # routine or an asynchronous procedure call (APC) to the thread queue.
430 SetLastError(WAIT_IO_COMPLETION)
430 SetLastError(WAIT_IO_COMPLETION)
431 pass
431 pass
432 elif waitReturnCode == WAIT_TIMEOUT:
432 elif waitReturnCode == WAIT_TIMEOUT:
433 # We reached the maximum allowed wait time, the IO operation failed
433 # We reached the maximum allowed wait time, the IO operation failed
434 # to complete in timely fashion.
434 # to complete in timely fashion.
435 SetLastError(WAIT_TIMEOUT)
435 SetLastError(WAIT_TIMEOUT)
436 return False
436 return False
437 elif waitReturnCode == WAIT_FAILED:
437 elif waitReturnCode == WAIT_FAILED:
438 # something went wrong calling WaitForSingleObjectEx
438 # something went wrong calling WaitForSingleObjectEx
439 err = GetLastError()
439 err = GetLastError()
440 log("WaitForSingleObjectEx failed: %s", _win32_strerror(err))
440 log("WaitForSingleObjectEx failed: %s", _win32_strerror(err))
441 return False
441 return False
442 else:
442 else:
443 # unexpected situation deserving investigation.
443 # unexpected situation deserving investigation.
444 err = GetLastError()
444 err = GetLastError()
445 log("Unexpected error: %s", _win32_strerror(err))
445 log("Unexpected error: %s", _win32_strerror(err))
446 return False
446 return False
447
447
448 return GetOverlappedResult(pipe, olap, nbytes, False)
448 return GetOverlappedResult(pipe, olap, nbytes, False)
449
449
450
450
451 class WindowsNamedPipeTransport(Transport):
451 class WindowsNamedPipeTransport(Transport):
452 """ connect to a named pipe """
452 """ connect to a named pipe """
453
453
454 def __init__(self, sockpath, timeout):
454 def __init__(self, sockpath, timeout):
455 self.sockpath = sockpath
455 self.sockpath = sockpath
456 self.timeout = int(math.ceil(timeout * 1000))
456 self.timeout = int(math.ceil(timeout * 1000))
457 self._iobuf = None
457 self._iobuf = None
458
458
459 if compat.PYTHON3:
459 if compat.PYTHON3:
460 sockpath = os.fsencode(sockpath)
460 sockpath = os.fsencode(sockpath)
461 self.pipe = CreateFile(
461 self.pipe = CreateFile(
462 sockpath,
462 sockpath,
463 GENERIC_READ | GENERIC_WRITE,
463 GENERIC_READ | GENERIC_WRITE,
464 0,
464 0,
465 None,
465 None,
466 OPEN_EXISTING,
466 OPEN_EXISTING,
467 FILE_FLAG_OVERLAPPED,
467 FILE_FLAG_OVERLAPPED,
468 None,
468 None,
469 )
469 )
470
470
471 err = GetLastError()
471 err = GetLastError()
472 if self.pipe == INVALID_HANDLE_VALUE or self.pipe == 0:
472 if self.pipe == INVALID_HANDLE_VALUE or self.pipe == 0:
473 self.pipe = None
473 self.pipe = None
474 raise SocketConnectError(self.sockpath, self._make_win_err("", err))
474 raise SocketConnectError(self.sockpath, self._make_win_err("", err))
475
475
476 # event for the overlapped I/O operations
476 # event for the overlapped I/O operations
477 self._waitable = CreateEvent(None, True, False, None)
477 self._waitable = CreateEvent(None, True, False, None)
478 err = GetLastError()
478 err = GetLastError()
479 if self._waitable is None:
479 if self._waitable is None:
480 self._raise_win_err("CreateEvent failed", err)
480 self._raise_win_err("CreateEvent failed", err)
481
481
482 self._get_overlapped_result_ex = GetOverlappedResultEx
482 self._get_overlapped_result_ex = GetOverlappedResultEx
483 if (
483 if (
484 os.getenv("WATCHMAN_WIN7_COMPAT") == "1"
484 os.getenv("WATCHMAN_WIN7_COMPAT") == "1"
485 or self._get_overlapped_result_ex is None
485 or self._get_overlapped_result_ex is None
486 ):
486 ):
487 self._get_overlapped_result_ex = _get_overlapped_result_ex_impl
487 self._get_overlapped_result_ex = _get_overlapped_result_ex_impl
488
488
489 def _raise_win_err(self, msg, err):
489 def _raise_win_err(self, msg, err):
490 raise self._make_win_err(msg, err)
490 raise self._make_win_err(msg, err)
491
491
492 def _make_win_err(self, msg, err):
492 def _make_win_err(self, msg, err):
493 return IOError(
493 return IOError(
494 "%s win32 error code: %d %s" % (msg, err, _win32_strerror(err))
494 "%s win32 error code: %d %s" % (msg, err, _win32_strerror(err))
495 )
495 )
496
496
497 def close(self):
497 def close(self):
498 if self.pipe:
498 if self.pipe:
499 log("Closing pipe")
499 log("Closing pipe")
500 CloseHandle(self.pipe)
500 CloseHandle(self.pipe)
501 self.pipe = None
501 self.pipe = None
502
502
503 if self._waitable is not None:
503 if self._waitable is not None:
504 # We release the handle for the event
504 # We release the handle for the event
505 CloseHandle(self._waitable)
505 CloseHandle(self._waitable)
506 self._waitable = None
506 self._waitable = None
507
507
508 def setTimeout(self, value):
508 def setTimeout(self, value):
509 # convert to milliseconds
509 # convert to milliseconds
510 self.timeout = int(value * 1000)
510 self.timeout = int(value * 1000)
511
511
512 def readBytes(self, size):
512 def readBytes(self, size):
513 """ A read can block for an unbounded amount of time, even if the
513 """ A read can block for an unbounded amount of time, even if the
514 kernel reports that the pipe handle is signalled, so we need to
514 kernel reports that the pipe handle is signalled, so we need to
515 always perform our reads asynchronously
515 always perform our reads asynchronously
516 """
516 """
517
517
518 # try to satisfy the read from any buffered data
518 # try to satisfy the read from any buffered data
519 if self._iobuf:
519 if self._iobuf:
520 if size >= len(self._iobuf):
520 if size >= len(self._iobuf):
521 res = self._iobuf
521 res = self._iobuf
522 self.buf = None
522 self.buf = None
523 return res
523 return res
524 res = self._iobuf[:size]
524 res = self._iobuf[:size]
525 self._iobuf = self._iobuf[size:]
525 self._iobuf = self._iobuf[size:]
526 return res
526 return res
527
527
528 # We need to initiate a read
528 # We need to initiate a read
529 buf = ctypes.create_string_buffer(size)
529 buf = ctypes.create_string_buffer(size)
530 olap = OVERLAPPED()
530 olap = OVERLAPPED()
531 olap.hEvent = self._waitable
531 olap.hEvent = self._waitable
532
532
533 log("made read buff of size %d", size)
533 log("made read buff of size %d", size)
534
534
535 # ReadFile docs warn against sending in the nread parameter for async
535 # ReadFile docs warn against sending in the nread parameter for async
536 # operations, so we always collect it via GetOverlappedResultEx
536 # operations, so we always collect it via GetOverlappedResultEx
537 immediate = ReadFile(self.pipe, buf, size, None, olap)
537 immediate = ReadFile(self.pipe, buf, size, None, olap)
538
538
539 if not immediate:
539 if not immediate:
540 err = GetLastError()
540 err = GetLastError()
541 if err != ERROR_IO_PENDING:
541 if err != ERROR_IO_PENDING:
542 self._raise_win_err("failed to read %d bytes" % size, err)
542 self._raise_win_err("failed to read %d bytes" % size, err)
543
543
544 nread = wintypes.DWORD()
544 nread = wintypes.DWORD()
545 if not self._get_overlapped_result_ex(
545 if not self._get_overlapped_result_ex(
546 self.pipe, olap, nread, 0 if immediate else self.timeout, True
546 self.pipe, olap, nread, 0 if immediate else self.timeout, True
547 ):
547 ):
548 err = GetLastError()
548 err = GetLastError()
549 CancelIoEx(self.pipe, olap)
549 CancelIoEx(self.pipe, olap)
550
550
551 if err == WAIT_TIMEOUT:
551 if err == WAIT_TIMEOUT:
552 log("GetOverlappedResultEx timedout")
552 log("GetOverlappedResultEx timedout")
553 raise SocketTimeout(
553 raise SocketTimeout(
554 "timed out after waiting %dms for read" % self.timeout
554 "timed out after waiting %dms for read" % self.timeout
555 )
555 )
556
556
557 log("GetOverlappedResultEx reports error %d", err)
557 log("GetOverlappedResultEx reports error %d", err)
558 self._raise_win_err("error while waiting for read", err)
558 self._raise_win_err("error while waiting for read", err)
559
559
560 nread = nread.value
560 nread = nread.value
561 if nread == 0:
561 if nread == 0:
562 # Docs say that named pipes return 0 byte when the other end did
562 # Docs say that named pipes return 0 byte when the other end did
563 # a zero byte write. Since we don't ever do that, the only
563 # a zero byte write. Since we don't ever do that, the only
564 # other way this shows up is if the client has gotten in a weird
564 # other way this shows up is if the client has gotten in a weird
565 # state, so let's bail out
565 # state, so let's bail out
566 CancelIoEx(self.pipe, olap)
566 CancelIoEx(self.pipe, olap)
567 raise IOError("Async read yielded 0 bytes; unpossible!")
567 raise IOError("Async read yielded 0 bytes; unpossible!")
568
568
569 # Holds precisely the bytes that we read from the prior request
569 # Holds precisely the bytes that we read from the prior request
570 buf = buf[:nread]
570 buf = buf[:nread]
571
571
572 returned_size = min(nread, size)
572 returned_size = min(nread, size)
573 if returned_size == nread:
573 if returned_size == nread:
574 return buf
574 return buf
575
575
576 # keep any left-overs around for a later read to consume
576 # keep any left-overs around for a later read to consume
577 self._iobuf = buf[returned_size:]
577 self._iobuf = buf[returned_size:]
578 return buf[:returned_size]
578 return buf[:returned_size]
579
579
580 def write(self, data):
580 def write(self, data):
581 olap = OVERLAPPED()
581 olap = OVERLAPPED()
582 olap.hEvent = self._waitable
582 olap.hEvent = self._waitable
583
583
584 immediate = WriteFile(
584 immediate = WriteFile(
585 self.pipe, ctypes.c_char_p(data), len(data), None, olap
585 self.pipe, ctypes.c_char_p(data), len(data), None, olap
586 )
586 )
587
587
588 if not immediate:
588 if not immediate:
589 err = GetLastError()
589 err = GetLastError()
590 if err != ERROR_IO_PENDING:
590 if err != ERROR_IO_PENDING:
591 self._raise_win_err(
591 self._raise_win_err(
592 "failed to write %d bytes to handle %r"
592 "failed to write %d bytes to handle %r"
593 % (len(data), self.pipe),
593 % (len(data), self.pipe),
594 err,
594 err,
595 )
595 )
596
596
597 # Obtain results, waiting if needed
597 # Obtain results, waiting if needed
598 nwrote = wintypes.DWORD()
598 nwrote = wintypes.DWORD()
599 if self._get_overlapped_result_ex(
599 if self._get_overlapped_result_ex(
600 self.pipe, olap, nwrote, 0 if immediate else self.timeout, True
600 self.pipe, olap, nwrote, 0 if immediate else self.timeout, True
601 ):
601 ):
602 log("made write of %d bytes", nwrote.value)
602 log("made write of %d bytes", nwrote.value)
603 return nwrote.value
603 return nwrote.value
604
604
605 err = GetLastError()
605 err = GetLastError()
606
606
607 # It's potentially unsafe to allow the write to continue after
607 # It's potentially unsafe to allow the write to continue after
608 # we unwind, so let's make a best effort to avoid that happening
608 # we unwind, so let's make a best effort to avoid that happening
609 CancelIoEx(self.pipe, olap)
609 CancelIoEx(self.pipe, olap)
610
610
611 if err == WAIT_TIMEOUT:
611 if err == WAIT_TIMEOUT:
612 raise SocketTimeout(
612 raise SocketTimeout(
613 "timed out after waiting %dms for write" % self.timeout
613 "timed out after waiting %dms for write" % self.timeout
614 )
614 )
615 self._raise_win_err(
615 self._raise_win_err(
616 "error while waiting for write of %d bytes" % len(data), err
616 "error while waiting for write of %d bytes" % len(data), err
617 )
617 )
618
618
619
619
620 def _default_binpath(binpath=None):
620 def _default_binpath(binpath=None):
621 if binpath:
621 if binpath:
622 return binpath
622 return binpath
623 # The test harness sets WATCHMAN_BINARY to the binary under test,
623 # The test harness sets WATCHMAN_BINARY to the binary under test,
624 # so we use that by default, otherwise, allow resolving watchman
624 # so we use that by default, otherwise, allow resolving watchman
625 # from the users PATH.
625 # from the users PATH.
626 return os.environ.get("WATCHMAN_BINARY", "watchman")
626 return os.environ.get("WATCHMAN_BINARY", "watchman")
627
627
628
628
629 class CLIProcessTransport(Transport):
629 class CLIProcessTransport(Transport):
630 """ open a pipe to the cli to talk to the service
630 """ open a pipe to the cli to talk to the service
631 This intended to be used only in the test harness!
631 This intended to be used only in the test harness!
632
632
633 The CLI is an oddball because we only support JSON input
633 The CLI is an oddball because we only support JSON input
634 and cannot send multiple commands through the same instance,
634 and cannot send multiple commands through the same instance,
635 so we spawn a new process for each command.
635 so we spawn a new process for each command.
636
636
637 We disable server spawning for this implementation, again, because
637 We disable server spawning for this implementation, again, because
638 it is intended to be used only in our test harness. You really
638 it is intended to be used only in our test harness. You really
639 should not need to use the CLI transport for anything real.
639 should not need to use the CLI transport for anything real.
640
640
641 While the CLI can output in BSER, our Transport interface doesn't
641 While the CLI can output in BSER, our Transport interface doesn't
642 support telling this instance that it should do so. That effectively
642 support telling this instance that it should do so. That effectively
643 limits this implementation to JSON input and output only at this time.
643 limits this implementation to JSON input and output only at this time.
644
644
645 It is the responsibility of the caller to set the send and
645 It is the responsibility of the caller to set the send and
646 receive codecs appropriately.
646 receive codecs appropriately.
647 """
647 """
648
648
649 proc = None
649 proc = None
650 closed = True
650 closed = True
651
651
652 def __init__(self, sockpath, timeout, binpath=None):
652 def __init__(self, sockpath, timeout, binpath=None):
653 self.sockpath = sockpath
653 self.sockpath = sockpath
654 self.timeout = timeout
654 self.timeout = timeout
655 self.binpath = _default_binpath(binpath)
655 self.binpath = _default_binpath(binpath)
656
656
657 def close(self):
657 def close(self):
658 if self.proc:
658 if self.proc:
659 if self.proc.pid is not None:
659 if self.proc.pid is not None:
660 self.proc.kill()
660 self.proc.kill()
661 self.proc.stdin.close()
661 self.proc.stdin.close()
662 self.proc.stdout.close()
662 self.proc.stdout.close()
663 self.proc.wait()
663 self.proc.wait()
664 self.proc = None
664 self.proc = None
665
665
666 def _connect(self):
666 def _connect(self):
667 if self.proc:
667 if self.proc:
668 return self.proc
668 return self.proc
669 args = [
669 args = [
670 self.binpath,
670 self.binpath,
671 "--sockname={0}".format(self.sockpath),
671 "--sockname={0}".format(self.sockpath),
672 "--logfile=/BOGUS",
672 "--logfile=/BOGUS",
673 "--statefile=/BOGUS",
673 "--statefile=/BOGUS",
674 "--no-spawn",
674 "--no-spawn",
675 "--no-local",
675 "--no-local",
676 "--no-pretty",
676 "--no-pretty",
677 "-j",
677 "-j",
678 ]
678 ]
679 self.proc = subprocess.Popen(
679 self.proc = subprocess.Popen(
680 args, stdin=subprocess.PIPE, stdout=subprocess.PIPE
680 args, stdin=subprocess.PIPE, stdout=subprocess.PIPE
681 )
681 )
682 return self.proc
682 return self.proc
683
683
684 def readBytes(self, size):
684 def readBytes(self, size):
685 self._connect()
685 self._connect()
686 res = self.proc.stdout.read(size)
686 res = self.proc.stdout.read(size)
687 if not res:
687 if not res:
688 raise WatchmanError("EOF on CLI process transport")
688 raise WatchmanError("EOF on CLI process transport")
689 return res
689 return res
690
690
691 def write(self, data):
691 def write(self, data):
692 if self.closed:
692 if self.closed:
693 self.close()
693 self.close()
694 self.closed = False
694 self.closed = False
695 self._connect()
695 self._connect()
696 res = self.proc.stdin.write(data)
696 res = self.proc.stdin.write(data)
697 self.proc.stdin.close()
697 self.proc.stdin.close()
698 self.closed = True
698 self.closed = True
699 return res
699 return res
700
700
701
701
702 class BserCodec(Codec):
702 class BserCodec(Codec):
703 """ use the BSER encoding. This is the default, preferred codec """
703 """ use the BSER encoding. This is the default, preferred codec """
704
704
705 def __init__(self, transport, value_encoding, value_errors):
705 def __init__(self, transport, value_encoding, value_errors):
706 super(BserCodec, self).__init__(transport)
706 super(BserCodec, self).__init__(transport)
707 self._value_encoding = value_encoding
707 self._value_encoding = value_encoding
708 self._value_errors = value_errors
708 self._value_errors = value_errors
709
709
710 def _loads(self, response):
710 def _loads(self, response):
711 return bser.loads(
711 return bser.loads(
712 response,
712 response,
713 value_encoding=self._value_encoding,
713 value_encoding=self._value_encoding,
714 value_errors=self._value_errors,
714 value_errors=self._value_errors,
715 )
715 )
716
716
717 def receive(self):
717 def receive(self):
718 buf = [self.transport.readBytes(sniff_len)]
718 buf = [self.transport.readBytes(sniff_len)]
719 if not buf[0]:
719 if not buf[0]:
720 raise WatchmanError("empty watchman response")
720 raise WatchmanError("empty watchman response")
721
721
722 _1, _2, elen = bser.pdu_info(buf[0])
722 _1, _2, elen = bser.pdu_info(buf[0])
723
723
724 rlen = len(buf[0])
724 rlen = len(buf[0])
725 while elen > rlen:
725 while elen > rlen:
726 buf.append(self.transport.readBytes(elen - rlen))
726 buf.append(self.transport.readBytes(elen - rlen))
727 rlen += len(buf[-1])
727 rlen += len(buf[-1])
728
728
729 response = b"".join(buf)
729 response = b"".join(buf)
730 try:
730 try:
731 res = self._loads(response)
731 res = self._loads(response)
732 return res
732 return res
733 except ValueError as e:
733 except ValueError as e:
734 raise WatchmanError("watchman response decode error: %s" % e)
734 raise WatchmanError("watchman response decode error: %s" % e)
735
735
736 def send(self, *args):
736 def send(self, *args):
737 cmd = bser.dumps(*args) # Defaults to BSER v1
737 cmd = bser.dumps(*args) # Defaults to BSER v1
738 self.transport.write(cmd)
738 self.transport.write(cmd)
739
739
740
740
741 class ImmutableBserCodec(BserCodec):
741 class ImmutableBserCodec(BserCodec):
742 """ use the BSER encoding, decoding values using the newer
742 """ use the BSER encoding, decoding values using the newer
743 immutable object support """
743 immutable object support """
744
744
745 def _loads(self, response):
745 def _loads(self, response):
746 return bser.loads(
746 return bser.loads(
747 response,
747 response,
748 False,
748 False,
749 value_encoding=self._value_encoding,
749 value_encoding=self._value_encoding,
750 value_errors=self._value_errors,
750 value_errors=self._value_errors,
751 )
751 )
752
752
753
753
754 class Bser2WithFallbackCodec(BserCodec):
754 class Bser2WithFallbackCodec(BserCodec):
755 """ use BSER v2 encoding """
755 """ use BSER v2 encoding """
756
756
757 def __init__(self, transport, value_encoding, value_errors):
757 def __init__(self, transport, value_encoding, value_errors):
758 super(Bser2WithFallbackCodec, self).__init__(
758 super(Bser2WithFallbackCodec, self).__init__(
759 transport, value_encoding, value_errors
759 transport, value_encoding, value_errors
760 )
760 )
761 if compat.PYTHON3:
761 if compat.PYTHON3:
762 bserv2_key = "required"
762 bserv2_key = "required"
763 else:
763 else:
764 bserv2_key = "optional"
764 bserv2_key = "optional"
765
765
766 self.send(["version", {bserv2_key: ["bser-v2"]}])
766 self.send(["version", {bserv2_key: ["bser-v2"]}])
767
767
768 capabilities = self.receive()
768 capabilities = self.receive()
769
769
770 if "error" in capabilities:
770 if "error" in capabilities:
771 raise BSERv1Unsupported(
771 raise BSERv1Unsupported(
772 "The watchman server version does not support Python 3. Please "
772 "The watchman server version does not support Python 3. Please "
773 "upgrade your watchman server."
773 "upgrade your watchman server."
774 )
774 )
775
775
776 if capabilities["capabilities"]["bser-v2"]:
776 if capabilities["capabilities"]["bser-v2"]:
777 self.bser_version = 2
777 self.bser_version = 2
778 self.bser_capabilities = 0
778 self.bser_capabilities = 0
779 else:
779 else:
780 self.bser_version = 1
780 self.bser_version = 1
781 self.bser_capabilities = 0
781 self.bser_capabilities = 0
782
782
783 def receive(self):
783 def receive(self):
784 buf = [self.transport.readBytes(sniff_len)]
784 buf = [self.transport.readBytes(sniff_len)]
785 if not buf[0]:
785 if not buf[0]:
786 raise WatchmanError("empty watchman response")
786 raise WatchmanError("empty watchman response")
787
787
788 recv_bser_version, recv_bser_capabilities, elen = bser.pdu_info(buf[0])
788 recv_bser_version, recv_bser_capabilities, elen = bser.pdu_info(buf[0])
789
789
790 if hasattr(self, "bser_version"):
790 if hasattr(self, "bser_version"):
791 # Readjust BSER version and capabilities if necessary
791 # Readjust BSER version and capabilities if necessary
792 self.bser_version = max(self.bser_version, recv_bser_version)
792 self.bser_version = max(self.bser_version, recv_bser_version)
793 self.capabilities = self.bser_capabilities & recv_bser_capabilities
793 self.capabilities = self.bser_capabilities & recv_bser_capabilities
794
794
795 rlen = len(buf[0])
795 rlen = len(buf[0])
796 while elen > rlen:
796 while elen > rlen:
797 buf.append(self.transport.readBytes(elen - rlen))
797 buf.append(self.transport.readBytes(elen - rlen))
798 rlen += len(buf[-1])
798 rlen += len(buf[-1])
799
799
800 response = b"".join(buf)
800 response = b"".join(buf)
801 try:
801 try:
802 res = self._loads(response)
802 res = self._loads(response)
803 return res
803 return res
804 except ValueError as e:
804 except ValueError as e:
805 raise WatchmanError("watchman response decode error: %s" % e)
805 raise WatchmanError("watchman response decode error: %s" % e)
806
806
807 def send(self, *args):
807 def send(self, *args):
808 if hasattr(self, "bser_version"):
808 if hasattr(self, "bser_version"):
809 cmd = bser.dumps(
809 cmd = bser.dumps(
810 *args,
810 *args,
811 version=self.bser_version,
811 version=self.bser_version,
812 capabilities=self.bser_capabilities
812 capabilities=self.bser_capabilities
813 )
813 )
814 else:
814 else:
815 cmd = bser.dumps(*args)
815 cmd = bser.dumps(*args)
816 self.transport.write(cmd)
816 self.transport.write(cmd)
817
817
818
818
819 class ImmutableBser2Codec(Bser2WithFallbackCodec, ImmutableBserCodec):
819 class ImmutableBser2Codec(Bser2WithFallbackCodec, ImmutableBserCodec):
820 """ use the BSER encoding, decoding values using the newer
820 """ use the BSER encoding, decoding values using the newer
821 immutable object support """
821 immutable object support """
822
822
823 pass
823 pass
824
824
825
825
826 class JsonCodec(Codec):
826 class JsonCodec(Codec):
827 """ Use json codec. This is here primarily for testing purposes """
827 """ Use json codec. This is here primarily for testing purposes """
828
828
829 json = None
829 json = None
830
830
831 def __init__(self, transport):
831 def __init__(self, transport):
832 super(JsonCodec, self).__init__(transport)
832 super(JsonCodec, self).__init__(transport)
833 # optional dep on json, only if JsonCodec is used
833 # optional dep on json, only if JsonCodec is used
834 import json
834 import json
835
835
836 self.json = json
836 self.json = json
837
837
838 def receive(self):
838 def receive(self):
839 line = self.transport.readLine()
839 line = self.transport.readLine()
840 try:
840 try:
841 # In Python 3, json.loads is a transformation from Unicode string to
841 # In Python 3, json.loads is a transformation from Unicode string to
842 # objects possibly containing Unicode strings. We typically expect
842 # objects possibly containing Unicode strings. We typically expect
843 # the JSON blob to be ASCII-only with non-ASCII characters escaped,
843 # the JSON blob to be ASCII-only with non-ASCII characters escaped,
844 # but it's possible we might get non-ASCII bytes that are valid
844 # but it's possible we might get non-ASCII bytes that are valid
845 # UTF-8.
845 # UTF-8.
846 if compat.PYTHON3:
846 if compat.PYTHON3:
847 line = line.decode("utf-8")
847 line = line.decode("utf-8")
848 return self.json.loads(line)
848 return self.json.loads(line)
849 except Exception as e:
849 except Exception as e:
850 print(e, line)
850 print(e, line)
851 raise
851 raise
852
852
853 def send(self, *args):
853 def send(self, *args):
854 cmd = self.json.dumps(*args)
854 cmd = self.json.dumps(*args)
855 # In Python 3, json.dumps is a transformation from objects possibly
855 # In Python 3, json.dumps is a transformation from objects possibly
856 # containing Unicode strings to Unicode string. Even with (the default)
856 # containing Unicode strings to Unicode string. Even with (the default)
857 # ensure_ascii=True, dumps returns a Unicode string.
857 # ensure_ascii=True, dumps returns a Unicode string.
858 if compat.PYTHON3:
858 if compat.PYTHON3:
859 cmd = cmd.encode("ascii")
859 cmd = cmd.encode("ascii")
860 self.transport.write(cmd + b"\n")
860 self.transport.write(cmd + b"\n")
861
861
862
862
863 class client(object):
863 class client(object):
864 """ Handles the communication with the watchman service """
864 """ Handles the communication with the watchman service """
865
865
866 sockpath = None
866 sockpath = None
867 transport = None
867 transport = None
868 sendCodec = None
868 sendCodec = None
869 recvCodec = None
869 recvCodec = None
870 sendConn = None
870 sendConn = None
871 recvConn = None
871 recvConn = None
872 subs = {} # Keyed by subscription name
872 subs = {} # Keyed by subscription name
873 sub_by_root = {} # Keyed by root, then by subscription name
873 sub_by_root = {} # Keyed by root, then by subscription name
874 logs = [] # When log level is raised
874 logs = [] # When log level is raised
875 unilateral = ["log", "subscription"]
875 unilateral = ["log", "subscription"]
876 tport = None
876 tport = None
877 useImmutableBser = None
877 useImmutableBser = None
878 pid = None
878 pid = None
879
879
880 def __init__(
880 def __init__(
881 self,
881 self,
882 sockpath=None,
882 sockpath=None,
883 timeout=1.0,
883 timeout=1.0,
884 transport=None,
884 transport=None,
885 sendEncoding=None,
885 sendEncoding=None,
886 recvEncoding=None,
886 recvEncoding=None,
887 useImmutableBser=False,
887 useImmutableBser=False,
888 # use False for these two because None has a special
888 # use False for these two because None has a special
889 # meaning
889 # meaning
890 valueEncoding=False,
890 valueEncoding=False,
891 valueErrors=False,
891 valueErrors=False,
892 binpath=None,
892 binpath=None,
893 ):
893 ):
894 self.sockpath = sockpath
894 self.sockpath = sockpath
895 self.timeout = timeout
895 self.timeout = timeout
896 self.useImmutableBser = useImmutableBser
896 self.useImmutableBser = useImmutableBser
897 self.binpath = _default_binpath(binpath)
897 self.binpath = _default_binpath(binpath)
898
898
899 if inspect.isclass(transport) and issubclass(transport, Transport):
899 if inspect.isclass(transport) and issubclass(transport, Transport):
900 self.transport = transport
900 self.transport = transport
901 else:
901 else:
902 transport = transport or os.getenv("WATCHMAN_TRANSPORT") or "local"
902 transport = transport or os.getenv("WATCHMAN_TRANSPORT") or "local"
903 if transport == "local" and os.name == "nt":
903 if transport == "local" and os.name == "nt":
904 self.transport = WindowsNamedPipeTransport
904 self.transport = WindowsNamedPipeTransport
905 elif transport == "local":
905 elif transport == "local":
906 self.transport = UnixSocketTransport
906 self.transport = UnixSocketTransport
907 elif transport == "cli":
907 elif transport == "cli":
908 self.transport = CLIProcessTransport
908 self.transport = CLIProcessTransport
909 if sendEncoding is None:
909 if sendEncoding is None:
910 sendEncoding = "json"
910 sendEncoding = "json"
911 if recvEncoding is None:
911 if recvEncoding is None:
912 recvEncoding = sendEncoding
912 recvEncoding = sendEncoding
913 else:
913 else:
914 raise WatchmanError("invalid transport %s" % transport)
914 raise WatchmanError("invalid transport %s" % transport)
915
915
916 sendEncoding = str(
916 sendEncoding = str(
917 sendEncoding or os.getenv("WATCHMAN_ENCODING") or "bser"
917 sendEncoding or os.getenv("WATCHMAN_ENCODING") or "bser"
918 )
918 )
919 recvEncoding = str(
919 recvEncoding = str(
920 recvEncoding or os.getenv("WATCHMAN_ENCODING") or "bser"
920 recvEncoding or os.getenv("WATCHMAN_ENCODING") or "bser"
921 )
921 )
922
922
923 self.recvCodec = self._parseEncoding(recvEncoding)
923 self.recvCodec = self._parseEncoding(recvEncoding)
924 self.sendCodec = self._parseEncoding(sendEncoding)
924 self.sendCodec = self._parseEncoding(sendEncoding)
925
925
926 # We want to act like the native OS methods as much as possible. This
926 # We want to act like the native OS methods as much as possible. This
927 # means returning bytestrings on Python 2 by default and Unicode
927 # means returning bytestrings on Python 2 by default and Unicode
928 # strings on Python 3. However we take an optional argument that lets
928 # strings on Python 3. However we take an optional argument that lets
929 # users override this.
929 # users override this.
930 if valueEncoding is False:
930 if valueEncoding is False:
931 if compat.PYTHON3:
931 if compat.PYTHON3:
932 self.valueEncoding = encoding.get_local_encoding()
932 self.valueEncoding = encoding.get_local_encoding()
933 self.valueErrors = encoding.default_local_errors
933 self.valueErrors = encoding.default_local_errors
934 else:
934 else:
935 self.valueEncoding = None
935 self.valueEncoding = None
936 self.valueErrors = None
936 self.valueErrors = None
937 else:
937 else:
938 self.valueEncoding = valueEncoding
938 self.valueEncoding = valueEncoding
939 if valueErrors is False:
939 if valueErrors is False:
940 self.valueErrors = encoding.default_local_errors
940 self.valueErrors = encoding.default_local_errors
941 else:
941 else:
942 self.valueErrors = valueErrors
942 self.valueErrors = valueErrors
943
943
944 def _makeBSERCodec(self, codec):
944 def _makeBSERCodec(self, codec):
945 def make_codec(transport):
945 def make_codec(transport):
946 return codec(transport, self.valueEncoding, self.valueErrors)
946 return codec(transport, self.valueEncoding, self.valueErrors)
947
947
948 return make_codec
948 return make_codec
949
949
950 def _parseEncoding(self, enc):
950 def _parseEncoding(self, enc):
951 if enc == "bser":
951 if enc == "bser":
952 if self.useImmutableBser:
952 if self.useImmutableBser:
953 return self._makeBSERCodec(ImmutableBser2Codec)
953 return self._makeBSERCodec(ImmutableBser2Codec)
954 return self._makeBSERCodec(Bser2WithFallbackCodec)
954 return self._makeBSERCodec(Bser2WithFallbackCodec)
955 elif enc == "bser-v1":
955 elif enc == "bser-v1":
956 if compat.PYTHON3:
956 if compat.PYTHON3:
957 raise BSERv1Unsupported(
957 raise BSERv1Unsupported(
958 "Python 3 does not support the BSER v1 encoding: specify "
958 "Python 3 does not support the BSER v1 encoding: specify "
959 '"bser" or omit the sendEncoding and recvEncoding '
959 '"bser" or omit the sendEncoding and recvEncoding '
960 "arguments"
960 "arguments"
961 )
961 )
962 if self.useImmutableBser:
962 if self.useImmutableBser:
963 return self._makeBSERCodec(ImmutableBserCodec)
963 return self._makeBSERCodec(ImmutableBserCodec)
964 return self._makeBSERCodec(BserCodec)
964 return self._makeBSERCodec(BserCodec)
965 elif enc == "json":
965 elif enc == "json":
966 return JsonCodec
966 return JsonCodec
967 else:
967 else:
968 raise WatchmanError("invalid encoding %s" % enc)
968 raise WatchmanError("invalid encoding %s" % enc)
969
969
970 def _hasprop(self, result, name):
970 def _hasprop(self, result, name):
971 if self.useImmutableBser:
971 if self.useImmutableBser:
972 return hasattr(result, name)
972 return hasattr(result, name)
973 return name in result
973 return name in result
974
974
975 def _resolvesockname(self):
975 def _resolvesockname(self):
976 # if invoked via a trigger, watchman will set this env var; we
976 # if invoked via a trigger, watchman will set this env var; we
977 # should use it unless explicitly set otherwise
977 # should use it unless explicitly set otherwise
978 path = os.getenv("WATCHMAN_SOCK")
978 path = os.getenv("WATCHMAN_SOCK")
979 if path:
979 if path:
980 return path
980 return path
981
981
982 cmd = [self.binpath, "--output-encoding=bser", "get-sockname"]
982 cmd = [self.binpath, "--output-encoding=bser", "get-sockname"]
983 try:
983 try:
984 args = dict(
984 args = dict(
985 stdout=subprocess.PIPE, stderr=subprocess.PIPE
985 stdout=subprocess.PIPE, stderr=subprocess.PIPE
986 ) # noqa: C408
986 ) # noqa: C408
987
987
988 if os.name == "nt":
988 if os.name == "nt":
989 # if invoked via an application with graphical user interface,
989 # if invoked via an application with graphical user interface,
990 # this call will cause a brief command window pop-up.
990 # this call will cause a brief command window pop-up.
991 # Using the flag STARTF_USESHOWWINDOW to avoid this behavior.
991 # Using the flag STARTF_USESHOWWINDOW to avoid this behavior.
992 startupinfo = subprocess.STARTUPINFO()
992 startupinfo = subprocess.STARTUPINFO()
993 startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
993 startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
994 args["startupinfo"] = startupinfo
994 args["startupinfo"] = startupinfo
995
995
996 p = subprocess.Popen(cmd, **args)
996 p = subprocess.Popen(cmd, **args)
997
997
998 except OSError as e:
998 except OSError as e:
999 raise WatchmanError('"watchman" executable not in PATH (%s)', e)
999 raise WatchmanError('"watchman" executable not in PATH (%s)' % e)
1000
1000
1001 stdout, stderr = p.communicate()
1001 stdout, stderr = p.communicate()
1002 exitcode = p.poll()
1002 exitcode = p.poll()
1003
1003
1004 if exitcode:
1004 if exitcode:
1005 raise WatchmanError("watchman exited with code %d" % exitcode)
1005 raise WatchmanError("watchman exited with code %d" % exitcode)
1006
1006
1007 result = bser.loads(stdout)
1007 result = bser.loads(stdout)
1008 if "error" in result:
1008 if "error" in result:
1009 raise WatchmanError("get-sockname error: %s" % result["error"])
1009 raise WatchmanError("get-sockname error: %s" % result["error"])
1010
1010
1011 return result["sockname"]
1011 return result["sockname"]
1012
1012
1013 def _connect(self):
1013 def _connect(self):
1014 """ establish transport connection """
1014 """ establish transport connection """
1015
1015
1016 if self.recvConn:
1016 if self.recvConn:
1017 if self.pid != os.getpid():
1017 if self.pid != os.getpid():
1018 raise UseAfterFork(
1018 raise UseAfterFork(
1019 "do not re-use a connection after fork; open a new client instead"
1019 "do not re-use a connection after fork; open a new client instead"
1020 )
1020 )
1021 return
1021 return
1022
1022
1023 if self.sockpath is None:
1023 if self.sockpath is None:
1024 self.sockpath = self._resolvesockname()
1024 self.sockpath = self._resolvesockname()
1025
1025
1026 kwargs = {}
1026 kwargs = {}
1027 if self.transport == CLIProcessTransport:
1027 if self.transport == CLIProcessTransport:
1028 kwargs["binpath"] = self.binpath
1028 kwargs["binpath"] = self.binpath
1029
1029
1030 self.tport = self.transport(self.sockpath, self.timeout, **kwargs)
1030 self.tport = self.transport(self.sockpath, self.timeout, **kwargs)
1031 self.sendConn = self.sendCodec(self.tport)
1031 self.sendConn = self.sendCodec(self.tport)
1032 self.recvConn = self.recvCodec(self.tport)
1032 self.recvConn = self.recvCodec(self.tport)
1033 self.pid = os.getpid()
1033 self.pid = os.getpid()
1034
1034
1035 def __del__(self):
1035 def __del__(self):
1036 self.close()
1036 self.close()
1037
1037
1038 def __enter__(self):
1038 def __enter__(self):
1039 self._connect()
1039 self._connect()
1040 return self
1040 return self
1041
1041
1042 def __exit__(self, exc_type, exc_value, exc_traceback):
1042 def __exit__(self, exc_type, exc_value, exc_traceback):
1043 self.close()
1043 self.close()
1044
1044
1045 def close(self):
1045 def close(self):
1046 if self.tport:
1046 if self.tport:
1047 self.tport.close()
1047 self.tport.close()
1048 self.tport = None
1048 self.tport = None
1049 self.recvConn = None
1049 self.recvConn = None
1050 self.sendConn = None
1050 self.sendConn = None
1051
1051
1052 def receive(self):
1052 def receive(self):
1053 """ receive the next PDU from the watchman service
1053 """ receive the next PDU from the watchman service
1054
1054
1055 If the client has activated subscriptions or logs then
1055 If the client has activated subscriptions or logs then
1056 this PDU may be a unilateral PDU sent by the service to
1056 this PDU may be a unilateral PDU sent by the service to
1057 inform the client of a log event or subscription change.
1057 inform the client of a log event or subscription change.
1058
1058
1059 It may also simply be the response portion of a request
1059 It may also simply be the response portion of a request
1060 initiated by query.
1060 initiated by query.
1061
1061
1062 There are clients in production that subscribe and call
1062 There are clients in production that subscribe and call
1063 this in a loop to retrieve all subscription responses,
1063 this in a loop to retrieve all subscription responses,
1064 so care should be taken when making changes here.
1064 so care should be taken when making changes here.
1065 """
1065 """
1066
1066
1067 self._connect()
1067 self._connect()
1068 result = self.recvConn.receive()
1068 result = self.recvConn.receive()
1069 if self._hasprop(result, "error"):
1069 if self._hasprop(result, "error"):
1070 raise CommandError(result["error"])
1070 raise CommandError(result["error"])
1071
1071
1072 if self._hasprop(result, "log"):
1072 if self._hasprop(result, "log"):
1073 self.logs.append(result["log"])
1073 self.logs.append(result["log"])
1074
1074
1075 if self._hasprop(result, "subscription"):
1075 if self._hasprop(result, "subscription"):
1076 sub = result["subscription"]
1076 sub = result["subscription"]
1077 if not (sub in self.subs):
1077 if not (sub in self.subs):
1078 self.subs[sub] = []
1078 self.subs[sub] = []
1079 self.subs[sub].append(result)
1079 self.subs[sub].append(result)
1080
1080
1081 # also accumulate in {root,sub} keyed store
1081 # also accumulate in {root,sub} keyed store
1082 root = os.path.normpath(os.path.normcase(result["root"]))
1082 root = os.path.normpath(os.path.normcase(result["root"]))
1083 if not root in self.sub_by_root:
1083 if not root in self.sub_by_root:
1084 self.sub_by_root[root] = {}
1084 self.sub_by_root[root] = {}
1085 if not sub in self.sub_by_root[root]:
1085 if not sub in self.sub_by_root[root]:
1086 self.sub_by_root[root][sub] = []
1086 self.sub_by_root[root][sub] = []
1087 self.sub_by_root[root][sub].append(result)
1087 self.sub_by_root[root][sub].append(result)
1088
1088
1089 return result
1089 return result
1090
1090
1091 def isUnilateralResponse(self, res):
1091 def isUnilateralResponse(self, res):
1092 if "unilateral" in res and res["unilateral"]:
1092 if "unilateral" in res and res["unilateral"]:
1093 return True
1093 return True
1094 # Fall back to checking for known unilateral responses
1094 # Fall back to checking for known unilateral responses
1095 for k in self.unilateral:
1095 for k in self.unilateral:
1096 if k in res:
1096 if k in res:
1097 return True
1097 return True
1098 return False
1098 return False
1099
1099
1100 def getLog(self, remove=True):
1100 def getLog(self, remove=True):
1101 """ Retrieve buffered log data
1101 """ Retrieve buffered log data
1102
1102
1103 If remove is true the data will be removed from the buffer.
1103 If remove is true the data will be removed from the buffer.
1104 Otherwise it will be left in the buffer
1104 Otherwise it will be left in the buffer
1105 """
1105 """
1106 res = self.logs
1106 res = self.logs
1107 if remove:
1107 if remove:
1108 self.logs = []
1108 self.logs = []
1109 return res
1109 return res
1110
1110
1111 def getSubscription(self, name, remove=True, root=None):
1111 def getSubscription(self, name, remove=True, root=None):
1112 """ Retrieve the data associated with a named subscription
1112 """ Retrieve the data associated with a named subscription
1113
1113
1114 If remove is True (the default), the subscription data is removed
1114 If remove is True (the default), the subscription data is removed
1115 from the buffer. Otherwise the data is returned but left in
1115 from the buffer. Otherwise the data is returned but left in
1116 the buffer.
1116 the buffer.
1117
1117
1118 Returns None if there is no data associated with `name`
1118 Returns None if there is no data associated with `name`
1119
1119
1120 If root is not None, then only return the subscription
1120 If root is not None, then only return the subscription
1121 data that matches both root and name. When used in this way,
1121 data that matches both root and name. When used in this way,
1122 remove processing impacts both the unscoped and scoped stores
1122 remove processing impacts both the unscoped and scoped stores
1123 for the subscription data.
1123 for the subscription data.
1124 """
1124 """
1125 if root is not None:
1125 if root is not None:
1126 root = os.path.normpath(os.path.normcase(root))
1126 root = os.path.normpath(os.path.normcase(root))
1127 if root not in self.sub_by_root:
1127 if root not in self.sub_by_root:
1128 return None
1128 return None
1129 if name not in self.sub_by_root[root]:
1129 if name not in self.sub_by_root[root]:
1130 return None
1130 return None
1131 sub = self.sub_by_root[root][name]
1131 sub = self.sub_by_root[root][name]
1132 if remove:
1132 if remove:
1133 del self.sub_by_root[root][name]
1133 del self.sub_by_root[root][name]
1134 # don't let this grow unbounded
1134 # don't let this grow unbounded
1135 if name in self.subs:
1135 if name in self.subs:
1136 del self.subs[name]
1136 del self.subs[name]
1137 return sub
1137 return sub
1138
1138
1139 if name not in self.subs:
1139 if name not in self.subs:
1140 return None
1140 return None
1141 sub = self.subs[name]
1141 sub = self.subs[name]
1142 if remove:
1142 if remove:
1143 del self.subs[name]
1143 del self.subs[name]
1144 return sub
1144 return sub
1145
1145
1146 def query(self, *args):
1146 def query(self, *args):
1147 """ Send a query to the watchman service and return the response
1147 """ Send a query to the watchman service and return the response
1148
1148
1149 This call will block until the response is returned.
1149 This call will block until the response is returned.
1150 If any unilateral responses are sent by the service in between
1150 If any unilateral responses are sent by the service in between
1151 the request-response they will be buffered up in the client object
1151 the request-response they will be buffered up in the client object
1152 and NOT returned via this method.
1152 and NOT returned via this method.
1153 """
1153 """
1154
1154
1155 log("calling client.query")
1155 log("calling client.query")
1156 self._connect()
1156 self._connect()
1157 try:
1157 try:
1158 self.sendConn.send(args)
1158 self.sendConn.send(args)
1159
1159
1160 res = self.receive()
1160 res = self.receive()
1161 while self.isUnilateralResponse(res):
1161 while self.isUnilateralResponse(res):
1162 res = self.receive()
1162 res = self.receive()
1163
1163
1164 return res
1164 return res
1165 except EnvironmentError as ee:
1165 except EnvironmentError as ee:
1166 # When we can depend on Python 3, we can use PEP 3134
1166 # When we can depend on Python 3, we can use PEP 3134
1167 # exception chaining here.
1167 # exception chaining here.
1168 raise WatchmanEnvironmentError(
1168 raise WatchmanEnvironmentError(
1169 "I/O error communicating with watchman daemon",
1169 "I/O error communicating with watchman daemon",
1170 ee.errno,
1170 ee.errno,
1171 ee.strerror,
1171 ee.strerror,
1172 args,
1172 args,
1173 )
1173 )
1174 except WatchmanError as ex:
1174 except WatchmanError as ex:
1175 ex.setCommand(args)
1175 ex.setCommand(args)
1176 raise
1176 raise
1177
1177
1178 def capabilityCheck(self, optional=None, required=None):
1178 def capabilityCheck(self, optional=None, required=None):
1179 """ Perform a server capability check """
1179 """ Perform a server capability check """
1180 res = self.query(
1180 res = self.query(
1181 "version", {"optional": optional or [], "required": required or []}
1181 "version", {"optional": optional or [], "required": required or []}
1182 )
1182 )
1183
1183
1184 if not self._hasprop(res, "capabilities"):
1184 if not self._hasprop(res, "capabilities"):
1185 # Server doesn't support capabilities, so we need to
1185 # Server doesn't support capabilities, so we need to
1186 # synthesize the results based on the version
1186 # synthesize the results based on the version
1187 capabilities.synthesize(res, optional)
1187 capabilities.synthesize(res, optional)
1188 if "error" in res:
1188 if "error" in res:
1189 raise CommandError(res["error"])
1189 raise CommandError(res["error"])
1190
1190
1191 return res
1191 return res
1192
1192
1193 def setTimeout(self, value):
1193 def setTimeout(self, value):
1194 self.recvConn.setTimeout(value)
1194 self.recvConn.setTimeout(value)
1195 self.sendConn.setTimeout(value)
1195 self.sendConn.setTimeout(value)
General Comments 0
You need to be logged in to leave comments. Login now