##// END OF EJS Templates
watchman: refactor transport connecting to unconfuse pytype...
Matt Harbison -
r50751:40d3ee57 default
parent child Browse files
Show More
@@ -1,1192 +1,1192 b''
1 # Copyright 2014-present Facebook, Inc.
1 # Copyright 2014-present Facebook, Inc.
2 # All rights reserved.
2 # All rights reserved.
3 #
3 #
4 # Redistribution and use in source and binary forms, with or without
4 # Redistribution and use in source and binary forms, with or without
5 # modification, are permitted provided that the following conditions are met:
5 # modification, are permitted provided that the following conditions are met:
6 #
6 #
7 # * Redistributions of source code must retain the above copyright notice,
7 # * Redistributions of source code must retain the above copyright notice,
8 # this list of conditions and the following disclaimer.
8 # this list of conditions and the following disclaimer.
9 #
9 #
10 # * Redistributions in binary form must reproduce the above copyright notice,
10 # * Redistributions in binary form must reproduce the above copyright notice,
11 # this list of conditions and the following disclaimer in the documentation
11 # this list of conditions and the following disclaimer in the documentation
12 # and/or other materials provided with the distribution.
12 # and/or other materials provided with the distribution.
13 #
13 #
14 # * Neither the name Facebook nor the names of its contributors may be used to
14 # * Neither the name Facebook nor the names of its contributors may be used to
15 # endorse or promote products derived from this software without specific
15 # endorse or promote products derived from this software without specific
16 # prior written permission.
16 # prior written permission.
17 #
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
20 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21 # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
21 # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
22 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
22 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
23 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24 # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
24 # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
25 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
25 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
26 # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26 # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28
28
29 import inspect
29 import inspect
30 import math
30 import math
31 import os
31 import os
32 import socket
32 import socket
33 import subprocess
33 import subprocess
34 import time
34 import time
35
35
36 from . import capabilities, compat, encoding, load
36 from . import capabilities, compat, encoding, load
37
37
38
38
39 # Sometimes it's really hard to get Python extensions to compile,
39 # Sometimes it's really hard to get Python extensions to compile,
40 # so fall back to a pure Python implementation.
40 # so fall back to a pure Python implementation.
41 try:
41 try:
42 from . import bser
42 from . import bser
43
43
44 # Demandimport causes modules to be loaded lazily. Force the load now
44 # Demandimport causes modules to be loaded lazily. Force the load now
45 # so that we can fall back on pybser if bser doesn't exist
45 # so that we can fall back on pybser if bser doesn't exist
46 bser.pdu_info
46 bser.pdu_info
47 except ImportError:
47 except ImportError:
48 from . import pybser as bser
48 from . import pybser as bser
49
49
50
50
51 if os.name == "nt":
51 if os.name == "nt":
52 import ctypes
52 import ctypes
53 import ctypes.wintypes
53 import ctypes.wintypes
54
54
55 wintypes = ctypes.wintypes
55 wintypes = ctypes.wintypes
56 GENERIC_READ = 0x80000000
56 GENERIC_READ = 0x80000000
57 GENERIC_WRITE = 0x40000000
57 GENERIC_WRITE = 0x40000000
58 FILE_FLAG_OVERLAPPED = 0x40000000
58 FILE_FLAG_OVERLAPPED = 0x40000000
59 OPEN_EXISTING = 3
59 OPEN_EXISTING = 3
60 INVALID_HANDLE_VALUE = ctypes.c_void_p(-1).value
60 INVALID_HANDLE_VALUE = ctypes.c_void_p(-1).value
61 FORMAT_MESSAGE_FROM_SYSTEM = 0x00001000
61 FORMAT_MESSAGE_FROM_SYSTEM = 0x00001000
62 FORMAT_MESSAGE_ALLOCATE_BUFFER = 0x00000100
62 FORMAT_MESSAGE_ALLOCATE_BUFFER = 0x00000100
63 FORMAT_MESSAGE_IGNORE_INSERTS = 0x00000200
63 FORMAT_MESSAGE_IGNORE_INSERTS = 0x00000200
64 WAIT_FAILED = 0xFFFFFFFF
64 WAIT_FAILED = 0xFFFFFFFF
65 WAIT_TIMEOUT = 0x00000102
65 WAIT_TIMEOUT = 0x00000102
66 WAIT_OBJECT_0 = 0x00000000
66 WAIT_OBJECT_0 = 0x00000000
67 WAIT_IO_COMPLETION = 0x000000C0
67 WAIT_IO_COMPLETION = 0x000000C0
68 INFINITE = 0xFFFFFFFF
68 INFINITE = 0xFFFFFFFF
69
69
70 # Overlapped I/O operation is in progress. (997)
70 # Overlapped I/O operation is in progress. (997)
71 ERROR_IO_PENDING = 0x000003E5
71 ERROR_IO_PENDING = 0x000003E5
72
72
73 # The pointer size follows the architecture
73 # The pointer size follows the architecture
74 # We use WPARAM since this type is already conditionally defined
74 # We use WPARAM since this type is already conditionally defined
75 ULONG_PTR = ctypes.wintypes.WPARAM
75 ULONG_PTR = ctypes.wintypes.WPARAM
76
76
77 class OVERLAPPED(ctypes.Structure):
77 class OVERLAPPED(ctypes.Structure):
78 _fields_ = [
78 _fields_ = [
79 ("Internal", ULONG_PTR),
79 ("Internal", ULONG_PTR),
80 ("InternalHigh", ULONG_PTR),
80 ("InternalHigh", ULONG_PTR),
81 ("Offset", wintypes.DWORD),
81 ("Offset", wintypes.DWORD),
82 ("OffsetHigh", wintypes.DWORD),
82 ("OffsetHigh", wintypes.DWORD),
83 ("hEvent", wintypes.HANDLE),
83 ("hEvent", wintypes.HANDLE),
84 ]
84 ]
85
85
86 def __init__(self):
86 def __init__(self):
87 self.Internal = 0
87 self.Internal = 0
88 self.InternalHigh = 0
88 self.InternalHigh = 0
89 self.Offset = 0
89 self.Offset = 0
90 self.OffsetHigh = 0
90 self.OffsetHigh = 0
91 self.hEvent = 0
91 self.hEvent = 0
92
92
93 LPDWORD = ctypes.POINTER(wintypes.DWORD)
93 LPDWORD = ctypes.POINTER(wintypes.DWORD)
94
94
95 _kernel32 = ctypes.windll.kernel32 # pytype: disable=module-attr
95 _kernel32 = ctypes.windll.kernel32 # pytype: disable=module-attr
96
96
97 CreateFile = _kernel32.CreateFileA
97 CreateFile = _kernel32.CreateFileA
98 CreateFile.argtypes = [
98 CreateFile.argtypes = [
99 wintypes.LPSTR,
99 wintypes.LPSTR,
100 wintypes.DWORD,
100 wintypes.DWORD,
101 wintypes.DWORD,
101 wintypes.DWORD,
102 wintypes.LPVOID,
102 wintypes.LPVOID,
103 wintypes.DWORD,
103 wintypes.DWORD,
104 wintypes.DWORD,
104 wintypes.DWORD,
105 wintypes.HANDLE,
105 wintypes.HANDLE,
106 ]
106 ]
107 CreateFile.restype = wintypes.HANDLE
107 CreateFile.restype = wintypes.HANDLE
108
108
109 CloseHandle = _kernel32.CloseHandle
109 CloseHandle = _kernel32.CloseHandle
110 CloseHandle.argtypes = [wintypes.HANDLE]
110 CloseHandle.argtypes = [wintypes.HANDLE]
111 CloseHandle.restype = wintypes.BOOL
111 CloseHandle.restype = wintypes.BOOL
112
112
113 ReadFile = _kernel32.ReadFile
113 ReadFile = _kernel32.ReadFile
114 ReadFile.argtypes = [
114 ReadFile.argtypes = [
115 wintypes.HANDLE,
115 wintypes.HANDLE,
116 wintypes.LPVOID,
116 wintypes.LPVOID,
117 wintypes.DWORD,
117 wintypes.DWORD,
118 LPDWORD,
118 LPDWORD,
119 ctypes.POINTER(OVERLAPPED),
119 ctypes.POINTER(OVERLAPPED),
120 ]
120 ]
121 ReadFile.restype = wintypes.BOOL
121 ReadFile.restype = wintypes.BOOL
122
122
123 WriteFile = _kernel32.WriteFile
123 WriteFile = _kernel32.WriteFile
124 WriteFile.argtypes = [
124 WriteFile.argtypes = [
125 wintypes.HANDLE,
125 wintypes.HANDLE,
126 wintypes.LPVOID,
126 wintypes.LPVOID,
127 wintypes.DWORD,
127 wintypes.DWORD,
128 LPDWORD,
128 LPDWORD,
129 ctypes.POINTER(OVERLAPPED),
129 ctypes.POINTER(OVERLAPPED),
130 ]
130 ]
131 WriteFile.restype = wintypes.BOOL
131 WriteFile.restype = wintypes.BOOL
132
132
133 GetLastError = _kernel32.GetLastError
133 GetLastError = _kernel32.GetLastError
134 GetLastError.argtypes = []
134 GetLastError.argtypes = []
135 GetLastError.restype = wintypes.DWORD
135 GetLastError.restype = wintypes.DWORD
136
136
137 SetLastError = _kernel32.SetLastError
137 SetLastError = _kernel32.SetLastError
138 SetLastError.argtypes = [wintypes.DWORD]
138 SetLastError.argtypes = [wintypes.DWORD]
139 SetLastError.restype = None
139 SetLastError.restype = None
140
140
141 FormatMessage = _kernel32.FormatMessageA
141 FormatMessage = _kernel32.FormatMessageA
142 FormatMessage.argtypes = [
142 FormatMessage.argtypes = [
143 wintypes.DWORD,
143 wintypes.DWORD,
144 wintypes.LPVOID,
144 wintypes.LPVOID,
145 wintypes.DWORD,
145 wintypes.DWORD,
146 wintypes.DWORD,
146 wintypes.DWORD,
147 ctypes.POINTER(wintypes.LPSTR),
147 ctypes.POINTER(wintypes.LPSTR),
148 wintypes.DWORD,
148 wintypes.DWORD,
149 wintypes.LPVOID,
149 wintypes.LPVOID,
150 ]
150 ]
151 FormatMessage.restype = wintypes.DWORD
151 FormatMessage.restype = wintypes.DWORD
152
152
153 LocalFree = _kernel32.LocalFree
153 LocalFree = _kernel32.LocalFree
154
154
155 GetOverlappedResult = _kernel32.GetOverlappedResult
155 GetOverlappedResult = _kernel32.GetOverlappedResult
156 GetOverlappedResult.argtypes = [
156 GetOverlappedResult.argtypes = [
157 wintypes.HANDLE,
157 wintypes.HANDLE,
158 ctypes.POINTER(OVERLAPPED),
158 ctypes.POINTER(OVERLAPPED),
159 LPDWORD,
159 LPDWORD,
160 wintypes.BOOL,
160 wintypes.BOOL,
161 ]
161 ]
162 GetOverlappedResult.restype = wintypes.BOOL
162 GetOverlappedResult.restype = wintypes.BOOL
163
163
164 GetOverlappedResultEx = getattr(_kernel32, "GetOverlappedResultEx", None)
164 GetOverlappedResultEx = getattr(_kernel32, "GetOverlappedResultEx", None)
165 if GetOverlappedResultEx is not None:
165 if GetOverlappedResultEx is not None:
166 GetOverlappedResultEx.argtypes = [
166 GetOverlappedResultEx.argtypes = [
167 wintypes.HANDLE,
167 wintypes.HANDLE,
168 ctypes.POINTER(OVERLAPPED),
168 ctypes.POINTER(OVERLAPPED),
169 LPDWORD,
169 LPDWORD,
170 wintypes.DWORD,
170 wintypes.DWORD,
171 wintypes.BOOL,
171 wintypes.BOOL,
172 ]
172 ]
173 GetOverlappedResultEx.restype = wintypes.BOOL
173 GetOverlappedResultEx.restype = wintypes.BOOL
174
174
175 WaitForSingleObjectEx = _kernel32.WaitForSingleObjectEx
175 WaitForSingleObjectEx = _kernel32.WaitForSingleObjectEx
176 WaitForSingleObjectEx.argtypes = [
176 WaitForSingleObjectEx.argtypes = [
177 wintypes.HANDLE,
177 wintypes.HANDLE,
178 wintypes.DWORD,
178 wintypes.DWORD,
179 wintypes.BOOL,
179 wintypes.BOOL,
180 ]
180 ]
181 WaitForSingleObjectEx.restype = wintypes.DWORD
181 WaitForSingleObjectEx.restype = wintypes.DWORD
182
182
183 CreateEvent = _kernel32.CreateEventA
183 CreateEvent = _kernel32.CreateEventA
184 CreateEvent.argtypes = [
184 CreateEvent.argtypes = [
185 LPDWORD,
185 LPDWORD,
186 wintypes.BOOL,
186 wintypes.BOOL,
187 wintypes.BOOL,
187 wintypes.BOOL,
188 wintypes.LPSTR,
188 wintypes.LPSTR,
189 ]
189 ]
190 CreateEvent.restype = wintypes.HANDLE
190 CreateEvent.restype = wintypes.HANDLE
191
191
192 # Windows Vista is the minimum supported client for CancelIoEx.
192 # Windows Vista is the minimum supported client for CancelIoEx.
193 CancelIoEx = _kernel32.CancelIoEx
193 CancelIoEx = _kernel32.CancelIoEx
194 CancelIoEx.argtypes = [wintypes.HANDLE, ctypes.POINTER(OVERLAPPED)]
194 CancelIoEx.argtypes = [wintypes.HANDLE, ctypes.POINTER(OVERLAPPED)]
195 CancelIoEx.restype = wintypes.BOOL
195 CancelIoEx.restype = wintypes.BOOL
196
196
197 # 2 bytes marker, 1 byte int size, 8 bytes int64 value
197 # 2 bytes marker, 1 byte int size, 8 bytes int64 value
198 sniff_len = 13
198 sniff_len = 13
199
199
200 # This is a helper for debugging the client.
200 # This is a helper for debugging the client.
201 _debugging = False
201 _debugging = False
202 if _debugging:
202 if _debugging:
203
203
204 def log(fmt, *args):
204 def log(fmt, *args):
205 print(
205 print(
206 "[%s] %s"
206 "[%s] %s"
207 % (
207 % (
208 time.strftime("%a, %d %b %Y %H:%M:%S", time.gmtime()),
208 time.strftime("%a, %d %b %Y %H:%M:%S", time.gmtime()),
209 fmt % args[:],
209 fmt % args[:],
210 )
210 )
211 )
211 )
212
212
213
213
214 else:
214 else:
215
215
216 def log(fmt, *args):
216 def log(fmt, *args):
217 pass
217 pass
218
218
219
219
220 def _win32_strerror(err):
220 def _win32_strerror(err):
221 """expand a win32 error code into a human readable message"""
221 """expand a win32 error code into a human readable message"""
222
222
223 # FormatMessage will allocate memory and assign it here
223 # FormatMessage will allocate memory and assign it here
224 buf = ctypes.c_char_p()
224 buf = ctypes.c_char_p()
225 FormatMessage(
225 FormatMessage(
226 FORMAT_MESSAGE_FROM_SYSTEM
226 FORMAT_MESSAGE_FROM_SYSTEM
227 | FORMAT_MESSAGE_ALLOCATE_BUFFER
227 | FORMAT_MESSAGE_ALLOCATE_BUFFER
228 | FORMAT_MESSAGE_IGNORE_INSERTS,
228 | FORMAT_MESSAGE_IGNORE_INSERTS,
229 None,
229 None,
230 err,
230 err,
231 0,
231 0,
232 buf,
232 buf,
233 0,
233 0,
234 None,
234 None,
235 )
235 )
236 try:
236 try:
237 return buf.value
237 return buf.value
238 finally:
238 finally:
239 LocalFree(buf)
239 LocalFree(buf)
240
240
241
241
242 class WatchmanError(Exception):
242 class WatchmanError(Exception):
243 def __init__(self, msg=None, cmd=None):
243 def __init__(self, msg=None, cmd=None):
244 self.msg = msg
244 self.msg = msg
245 self.cmd = cmd
245 self.cmd = cmd
246
246
247 def setCommand(self, cmd):
247 def setCommand(self, cmd):
248 self.cmd = cmd
248 self.cmd = cmd
249
249
250 def __str__(self):
250 def __str__(self):
251 if self.cmd:
251 if self.cmd:
252 return "%s, while executing %s" % (self.msg, self.cmd)
252 return "%s, while executing %s" % (self.msg, self.cmd)
253 return self.msg
253 return self.msg
254
254
255
255
256 class BSERv1Unsupported(WatchmanError):
256 class BSERv1Unsupported(WatchmanError):
257 pass
257 pass
258
258
259
259
260 class UseAfterFork(WatchmanError):
260 class UseAfterFork(WatchmanError):
261 pass
261 pass
262
262
263
263
264 class WatchmanEnvironmentError(WatchmanError):
264 class WatchmanEnvironmentError(WatchmanError):
265 def __init__(self, msg, errno, errmsg, cmd=None):
265 def __init__(self, msg, errno, errmsg, cmd=None):
266 super(WatchmanEnvironmentError, self).__init__(
266 super(WatchmanEnvironmentError, self).__init__(
267 "{0}: errno={1} errmsg={2}".format(msg, errno, errmsg), cmd
267 "{0}: errno={1} errmsg={2}".format(msg, errno, errmsg), cmd
268 )
268 )
269
269
270
270
271 class SocketConnectError(WatchmanError):
271 class SocketConnectError(WatchmanError):
272 def __init__(self, sockpath, exc):
272 def __init__(self, sockpath, exc):
273 super(SocketConnectError, self).__init__(
273 super(SocketConnectError, self).__init__(
274 "unable to connect to %s: %s" % (sockpath, exc)
274 "unable to connect to %s: %s" % (sockpath, exc)
275 )
275 )
276 self.sockpath = sockpath
276 self.sockpath = sockpath
277 self.exc = exc
277 self.exc = exc
278
278
279
279
280 class SocketTimeout(WatchmanError):
280 class SocketTimeout(WatchmanError):
281 """A specialized exception raised for socket timeouts during communication to/from watchman.
281 """A specialized exception raised for socket timeouts during communication to/from watchman.
282 This makes it easier to implement non-blocking loops as callers can easily distinguish
282 This makes it easier to implement non-blocking loops as callers can easily distinguish
283 between a routine timeout and an actual error condition.
283 between a routine timeout and an actual error condition.
284
284
285 Note that catching WatchmanError will also catch this as it is a super-class, so backwards
285 Note that catching WatchmanError will also catch this as it is a super-class, so backwards
286 compatibility in exception handling is preserved.
286 compatibility in exception handling is preserved.
287 """
287 """
288
288
289
289
290 class CommandError(WatchmanError):
290 class CommandError(WatchmanError):
291 """error returned by watchman
291 """error returned by watchman
292
292
293 self.msg is the message returned by watchman.
293 self.msg is the message returned by watchman.
294 """
294 """
295
295
296 def __init__(self, msg, cmd=None):
296 def __init__(self, msg, cmd=None):
297 super(CommandError, self).__init__(
297 super(CommandError, self).__init__(
298 "watchman command error: %s" % (msg,), cmd
298 "watchman command error: %s" % (msg,), cmd
299 )
299 )
300
300
301
301
302 class Transport:
302 class Transport:
303 """communication transport to the watchman server"""
303 """communication transport to the watchman server"""
304
304
305 buf = None
305 buf = None
306
306
307 def close(self):
307 def close(self):
308 """tear it down"""
308 """tear it down"""
309 raise NotImplementedError()
309 raise NotImplementedError()
310
310
311 def readBytes(self, size):
311 def readBytes(self, size):
312 """read size bytes"""
312 """read size bytes"""
313 raise NotImplementedError()
313 raise NotImplementedError()
314
314
315 def write(self, buf):
315 def write(self, buf):
316 """write some data"""
316 """write some data"""
317 raise NotImplementedError()
317 raise NotImplementedError()
318
318
319 def setTimeout(self, value):
319 def setTimeout(self, value):
320 pass
320 pass
321
321
322 def readLine(self):
322 def readLine(self):
323 """read a line
323 """read a line
324 Maintains its own buffer, callers of the transport should not mix
324 Maintains its own buffer, callers of the transport should not mix
325 calls to readBytes and readLine.
325 calls to readBytes and readLine.
326 """
326 """
327 if self.buf is None:
327 if self.buf is None:
328 self.buf = []
328 self.buf = []
329
329
330 # Buffer may already have a line if we've received unilateral
330 # Buffer may already have a line if we've received unilateral
331 # response(s) from the server
331 # response(s) from the server
332 if len(self.buf) == 1 and b"\n" in self.buf[0]:
332 if len(self.buf) == 1 and b"\n" in self.buf[0]:
333 (line, b) = self.buf[0].split(b"\n", 1)
333 (line, b) = self.buf[0].split(b"\n", 1)
334 self.buf = [b]
334 self.buf = [b]
335 return line
335 return line
336
336
337 while True:
337 while True:
338 b = self.readBytes(4096)
338 b = self.readBytes(4096)
339 if b"\n" in b:
339 if b"\n" in b:
340 result = b"".join(self.buf)
340 result = b"".join(self.buf)
341 (line, b) = b.split(b"\n", 1)
341 (line, b) = b.split(b"\n", 1)
342 self.buf = [b]
342 self.buf = [b]
343 return result + line
343 return result + line
344 self.buf.append(b)
344 self.buf.append(b)
345
345
346
346
347 class Codec:
347 class Codec:
348 """communication encoding for the watchman server"""
348 """communication encoding for the watchman server"""
349
349
350 transport = None
350 transport = None
351
351
352 def __init__(self, transport):
352 def __init__(self, transport):
353 self.transport = transport
353 self.transport = transport
354
354
355 def receive(self):
355 def receive(self):
356 raise NotImplementedError()
356 raise NotImplementedError()
357
357
358 def send(self, *args):
358 def send(self, *args):
359 raise NotImplementedError()
359 raise NotImplementedError()
360
360
361 def setTimeout(self, value):
361 def setTimeout(self, value):
362 self.transport.setTimeout(value)
362 self.transport.setTimeout(value)
363
363
364
364
365 class UnixSocketTransport(Transport):
365 class UnixSocketTransport(Transport):
366 """local unix domain socket transport"""
366 """local unix domain socket transport"""
367
367
368 sock = None
368 sock = None
369
369
370 def __init__(self, sockpath, timeout):
370 def __init__(self, sockpath, timeout):
371 self.sockpath = sockpath
371 self.sockpath = sockpath
372 self.timeout = timeout
372 self.timeout = timeout
373
373
374 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
374 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
375 try:
375 try:
376 sock.settimeout(self.timeout)
376 sock.settimeout(self.timeout)
377 sock.connect(self.sockpath)
377 sock.connect(self.sockpath)
378 self.sock = sock
378 self.sock = sock
379 except socket.error as e:
379 except socket.error as e:
380 sock.close()
380 sock.close()
381 raise SocketConnectError(self.sockpath, e)
381 raise SocketConnectError(self.sockpath, e)
382
382
383 def close(self):
383 def close(self):
384 if self.sock:
384 if self.sock:
385 self.sock.close()
385 self.sock.close()
386 self.sock = None
386 self.sock = None
387
387
388 def setTimeout(self, value):
388 def setTimeout(self, value):
389 self.timeout = value
389 self.timeout = value
390 self.sock.settimeout(self.timeout)
390 self.sock.settimeout(self.timeout)
391
391
392 def readBytes(self, size):
392 def readBytes(self, size):
393 try:
393 try:
394 buf = [self.sock.recv(size)]
394 buf = [self.sock.recv(size)]
395 if not buf[0]:
395 if not buf[0]:
396 raise WatchmanError("empty watchman response")
396 raise WatchmanError("empty watchman response")
397 return buf[0]
397 return buf[0]
398 except socket.timeout:
398 except socket.timeout:
399 raise SocketTimeout("timed out waiting for response")
399 raise SocketTimeout("timed out waiting for response")
400
400
401 def write(self, data):
401 def write(self, data):
402 try:
402 try:
403 self.sock.sendall(data)
403 self.sock.sendall(data)
404 except socket.timeout:
404 except socket.timeout:
405 raise SocketTimeout("timed out sending query command")
405 raise SocketTimeout("timed out sending query command")
406
406
407
407
408 def _get_overlapped_result_ex_impl(pipe, olap, nbytes, millis, alertable):
408 def _get_overlapped_result_ex_impl(pipe, olap, nbytes, millis, alertable):
409 """Windows 7 and earlier does not support GetOverlappedResultEx. The
409 """Windows 7 and earlier does not support GetOverlappedResultEx. The
410 alternative is to use GetOverlappedResult and wait for read or write
410 alternative is to use GetOverlappedResult and wait for read or write
411 operation to complete. This is done be using CreateEvent and
411 operation to complete. This is done be using CreateEvent and
412 WaitForSingleObjectEx. CreateEvent, WaitForSingleObjectEx
412 WaitForSingleObjectEx. CreateEvent, WaitForSingleObjectEx
413 and GetOverlappedResult are all part of Windows API since WindowsXP.
413 and GetOverlappedResult are all part of Windows API since WindowsXP.
414 This is the exact same implementation that can be found in the watchman
414 This is the exact same implementation that can be found in the watchman
415 source code (see get_overlapped_result_ex_impl in stream_win.c). This
415 source code (see get_overlapped_result_ex_impl in stream_win.c). This
416 way, maintenance should be simplified.
416 way, maintenance should be simplified.
417 """
417 """
418 log("Preparing to wait for maximum %dms", millis)
418 log("Preparing to wait for maximum %dms", millis)
419 if millis != 0:
419 if millis != 0:
420 waitReturnCode = WaitForSingleObjectEx(olap.hEvent, millis, alertable)
420 waitReturnCode = WaitForSingleObjectEx(olap.hEvent, millis, alertable)
421 if waitReturnCode == WAIT_OBJECT_0:
421 if waitReturnCode == WAIT_OBJECT_0:
422 # Event is signaled, overlapped IO operation result should be available.
422 # Event is signaled, overlapped IO operation result should be available.
423 pass
423 pass
424 elif waitReturnCode == WAIT_IO_COMPLETION:
424 elif waitReturnCode == WAIT_IO_COMPLETION:
425 # WaitForSingleObjectEx returnes because the system added an I/O completion
425 # WaitForSingleObjectEx returnes because the system added an I/O completion
426 # routine or an asynchronous procedure call (APC) to the thread queue.
426 # routine or an asynchronous procedure call (APC) to the thread queue.
427 SetLastError(WAIT_IO_COMPLETION)
427 SetLastError(WAIT_IO_COMPLETION)
428 pass
428 pass
429 elif waitReturnCode == WAIT_TIMEOUT:
429 elif waitReturnCode == WAIT_TIMEOUT:
430 # We reached the maximum allowed wait time, the IO operation failed
430 # We reached the maximum allowed wait time, the IO operation failed
431 # to complete in timely fashion.
431 # to complete in timely fashion.
432 SetLastError(WAIT_TIMEOUT)
432 SetLastError(WAIT_TIMEOUT)
433 return False
433 return False
434 elif waitReturnCode == WAIT_FAILED:
434 elif waitReturnCode == WAIT_FAILED:
435 # something went wrong calling WaitForSingleObjectEx
435 # something went wrong calling WaitForSingleObjectEx
436 err = GetLastError()
436 err = GetLastError()
437 log("WaitForSingleObjectEx failed: %s", _win32_strerror(err))
437 log("WaitForSingleObjectEx failed: %s", _win32_strerror(err))
438 return False
438 return False
439 else:
439 else:
440 # unexpected situation deserving investigation.
440 # unexpected situation deserving investigation.
441 err = GetLastError()
441 err = GetLastError()
442 log("Unexpected error: %s", _win32_strerror(err))
442 log("Unexpected error: %s", _win32_strerror(err))
443 return False
443 return False
444
444
445 return GetOverlappedResult(pipe, olap, nbytes, False)
445 return GetOverlappedResult(pipe, olap, nbytes, False)
446
446
447
447
448 class WindowsNamedPipeTransport(Transport):
448 class WindowsNamedPipeTransport(Transport):
449 """connect to a named pipe"""
449 """connect to a named pipe"""
450
450
451 def __init__(self, sockpath, timeout):
451 def __init__(self, sockpath, timeout):
452 self.sockpath = sockpath
452 self.sockpath = sockpath
453 self.timeout = int(math.ceil(timeout * 1000))
453 self.timeout = int(math.ceil(timeout * 1000))
454 self._iobuf = None
454 self._iobuf = None
455
455
456 if compat.PYTHON3:
456 if compat.PYTHON3:
457 sockpath = os.fsencode(sockpath)
457 sockpath = os.fsencode(sockpath)
458 self.pipe = CreateFile(
458 self.pipe = CreateFile(
459 sockpath,
459 sockpath,
460 GENERIC_READ | GENERIC_WRITE,
460 GENERIC_READ | GENERIC_WRITE,
461 0,
461 0,
462 None,
462 None,
463 OPEN_EXISTING,
463 OPEN_EXISTING,
464 FILE_FLAG_OVERLAPPED,
464 FILE_FLAG_OVERLAPPED,
465 None,
465 None,
466 )
466 )
467
467
468 err = GetLastError()
468 err = GetLastError()
469 if self.pipe == INVALID_HANDLE_VALUE or self.pipe == 0:
469 if self.pipe == INVALID_HANDLE_VALUE or self.pipe == 0:
470 self.pipe = None
470 self.pipe = None
471 raise SocketConnectError(self.sockpath, self._make_win_err("", err))
471 raise SocketConnectError(self.sockpath, self._make_win_err("", err))
472
472
473 # event for the overlapped I/O operations
473 # event for the overlapped I/O operations
474 self._waitable = CreateEvent(None, True, False, None)
474 self._waitable = CreateEvent(None, True, False, None)
475 err = GetLastError()
475 err = GetLastError()
476 if self._waitable is None:
476 if self._waitable is None:
477 self._raise_win_err("CreateEvent failed", err)
477 self._raise_win_err("CreateEvent failed", err)
478
478
479 self._get_overlapped_result_ex = GetOverlappedResultEx
479 self._get_overlapped_result_ex = GetOverlappedResultEx
480 if (
480 if (
481 os.getenv("WATCHMAN_WIN7_COMPAT") == "1"
481 os.getenv("WATCHMAN_WIN7_COMPAT") == "1"
482 or self._get_overlapped_result_ex is None
482 or self._get_overlapped_result_ex is None
483 ):
483 ):
484 self._get_overlapped_result_ex = _get_overlapped_result_ex_impl
484 self._get_overlapped_result_ex = _get_overlapped_result_ex_impl
485
485
486 def _raise_win_err(self, msg, err):
486 def _raise_win_err(self, msg, err):
487 raise self._make_win_err(msg, err)
487 raise self._make_win_err(msg, err)
488
488
489 def _make_win_err(self, msg, err):
489 def _make_win_err(self, msg, err):
490 return IOError(
490 return IOError(
491 "%s win32 error code: %d %s" % (msg, err, _win32_strerror(err))
491 "%s win32 error code: %d %s" % (msg, err, _win32_strerror(err))
492 )
492 )
493
493
494 def close(self):
494 def close(self):
495 if self.pipe:
495 if self.pipe:
496 log("Closing pipe")
496 log("Closing pipe")
497 CloseHandle(self.pipe)
497 CloseHandle(self.pipe)
498 self.pipe = None
498 self.pipe = None
499
499
500 if self._waitable is not None:
500 if self._waitable is not None:
501 # We release the handle for the event
501 # We release the handle for the event
502 CloseHandle(self._waitable)
502 CloseHandle(self._waitable)
503 self._waitable = None
503 self._waitable = None
504
504
505 def setTimeout(self, value):
505 def setTimeout(self, value):
506 # convert to milliseconds
506 # convert to milliseconds
507 self.timeout = int(value * 1000)
507 self.timeout = int(value * 1000)
508
508
509 def readBytes(self, size):
509 def readBytes(self, size):
510 """A read can block for an unbounded amount of time, even if the
510 """A read can block for an unbounded amount of time, even if the
511 kernel reports that the pipe handle is signalled, so we need to
511 kernel reports that the pipe handle is signalled, so we need to
512 always perform our reads asynchronously
512 always perform our reads asynchronously
513 """
513 """
514
514
515 # try to satisfy the read from any buffered data
515 # try to satisfy the read from any buffered data
516 if self._iobuf:
516 if self._iobuf:
517 if size >= len(self._iobuf):
517 if size >= len(self._iobuf):
518 res = self._iobuf
518 res = self._iobuf
519 self.buf = None
519 self.buf = None
520 return res
520 return res
521 res = self._iobuf[:size]
521 res = self._iobuf[:size]
522 self._iobuf = self._iobuf[size:]
522 self._iobuf = self._iobuf[size:]
523 return res
523 return res
524
524
525 # We need to initiate a read
525 # We need to initiate a read
526 buf = ctypes.create_string_buffer(size)
526 buf = ctypes.create_string_buffer(size)
527 olap = OVERLAPPED()
527 olap = OVERLAPPED()
528 olap.hEvent = self._waitable
528 olap.hEvent = self._waitable
529
529
530 log("made read buff of size %d", size)
530 log("made read buff of size %d", size)
531
531
532 # ReadFile docs warn against sending in the nread parameter for async
532 # ReadFile docs warn against sending in the nread parameter for async
533 # operations, so we always collect it via GetOverlappedResultEx
533 # operations, so we always collect it via GetOverlappedResultEx
534 immediate = ReadFile(self.pipe, buf, size, None, olap)
534 immediate = ReadFile(self.pipe, buf, size, None, olap)
535
535
536 if not immediate:
536 if not immediate:
537 err = GetLastError()
537 err = GetLastError()
538 if err != ERROR_IO_PENDING:
538 if err != ERROR_IO_PENDING:
539 self._raise_win_err("failed to read %d bytes" % size, err)
539 self._raise_win_err("failed to read %d bytes" % size, err)
540
540
541 nread = wintypes.DWORD()
541 nread = wintypes.DWORD()
542 if not self._get_overlapped_result_ex(
542 if not self._get_overlapped_result_ex(
543 self.pipe, olap, nread, 0 if immediate else self.timeout, True
543 self.pipe, olap, nread, 0 if immediate else self.timeout, True
544 ):
544 ):
545 err = GetLastError()
545 err = GetLastError()
546 CancelIoEx(self.pipe, olap)
546 CancelIoEx(self.pipe, olap)
547
547
548 if err == WAIT_TIMEOUT:
548 if err == WAIT_TIMEOUT:
549 log("GetOverlappedResultEx timedout")
549 log("GetOverlappedResultEx timedout")
550 raise SocketTimeout(
550 raise SocketTimeout(
551 "timed out after waiting %dms for read" % self.timeout
551 "timed out after waiting %dms for read" % self.timeout
552 )
552 )
553
553
554 log("GetOverlappedResultEx reports error %d", err)
554 log("GetOverlappedResultEx reports error %d", err)
555 self._raise_win_err("error while waiting for read", err)
555 self._raise_win_err("error while waiting for read", err)
556
556
557 nread = nread.value
557 nread = nread.value
558 if nread == 0:
558 if nread == 0:
559 # Docs say that named pipes return 0 byte when the other end did
559 # Docs say that named pipes return 0 byte when the other end did
560 # a zero byte write. Since we don't ever do that, the only
560 # a zero byte write. Since we don't ever do that, the only
561 # other way this shows up is if the client has gotten in a weird
561 # other way this shows up is if the client has gotten in a weird
562 # state, so let's bail out
562 # state, so let's bail out
563 CancelIoEx(self.pipe, olap)
563 CancelIoEx(self.pipe, olap)
564 raise IOError("Async read yielded 0 bytes; unpossible!")
564 raise IOError("Async read yielded 0 bytes; unpossible!")
565
565
566 # Holds precisely the bytes that we read from the prior request
566 # Holds precisely the bytes that we read from the prior request
567 buf = buf[:nread]
567 buf = buf[:nread]
568
568
569 returned_size = min(nread, size)
569 returned_size = min(nread, size)
570 if returned_size == nread:
570 if returned_size == nread:
571 return buf
571 return buf
572
572
573 # keep any left-overs around for a later read to consume
573 # keep any left-overs around for a later read to consume
574 self._iobuf = buf[returned_size:]
574 self._iobuf = buf[returned_size:]
575 return buf[:returned_size]
575 return buf[:returned_size]
576
576
577 def write(self, data):
577 def write(self, data):
578 olap = OVERLAPPED()
578 olap = OVERLAPPED()
579 olap.hEvent = self._waitable
579 olap.hEvent = self._waitable
580
580
581 immediate = WriteFile(
581 immediate = WriteFile(
582 self.pipe, ctypes.c_char_p(data), len(data), None, olap
582 self.pipe, ctypes.c_char_p(data), len(data), None, olap
583 )
583 )
584
584
585 if not immediate:
585 if not immediate:
586 err = GetLastError()
586 err = GetLastError()
587 if err != ERROR_IO_PENDING:
587 if err != ERROR_IO_PENDING:
588 self._raise_win_err(
588 self._raise_win_err(
589 "failed to write %d bytes to handle %r"
589 "failed to write %d bytes to handle %r"
590 % (len(data), self.pipe),
590 % (len(data), self.pipe),
591 err,
591 err,
592 )
592 )
593
593
594 # Obtain results, waiting if needed
594 # Obtain results, waiting if needed
595 nwrote = wintypes.DWORD()
595 nwrote = wintypes.DWORD()
596 if self._get_overlapped_result_ex(
596 if self._get_overlapped_result_ex(
597 self.pipe, olap, nwrote, 0 if immediate else self.timeout, True
597 self.pipe, olap, nwrote, 0 if immediate else self.timeout, True
598 ):
598 ):
599 log("made write of %d bytes", nwrote.value)
599 log("made write of %d bytes", nwrote.value)
600 return nwrote.value
600 return nwrote.value
601
601
602 err = GetLastError()
602 err = GetLastError()
603
603
604 # It's potentially unsafe to allow the write to continue after
604 # It's potentially unsafe to allow the write to continue after
605 # we unwind, so let's make a best effort to avoid that happening
605 # we unwind, so let's make a best effort to avoid that happening
606 CancelIoEx(self.pipe, olap)
606 CancelIoEx(self.pipe, olap)
607
607
608 if err == WAIT_TIMEOUT:
608 if err == WAIT_TIMEOUT:
609 raise SocketTimeout(
609 raise SocketTimeout(
610 "timed out after waiting %dms for write" % self.timeout
610 "timed out after waiting %dms for write" % self.timeout
611 )
611 )
612 self._raise_win_err(
612 self._raise_win_err(
613 "error while waiting for write of %d bytes" % len(data), err
613 "error while waiting for write of %d bytes" % len(data), err
614 )
614 )
615
615
616
616
617 def _default_binpath(binpath=None):
617 def _default_binpath(binpath=None):
618 if binpath:
618 if binpath:
619 return binpath
619 return binpath
620 # The test harness sets WATCHMAN_BINARY to the binary under test,
620 # The test harness sets WATCHMAN_BINARY to the binary under test,
621 # so we use that by default, otherwise, allow resolving watchman
621 # so we use that by default, otherwise, allow resolving watchman
622 # from the users PATH.
622 # from the users PATH.
623 return os.environ.get("WATCHMAN_BINARY", "watchman")
623 return os.environ.get("WATCHMAN_BINARY", "watchman")
624
624
625
625
626 class CLIProcessTransport(Transport):
626 class CLIProcessTransport(Transport):
627 """open a pipe to the cli to talk to the service
627 """open a pipe to the cli to talk to the service
628 This intended to be used only in the test harness!
628 This intended to be used only in the test harness!
629
629
630 The CLI is an oddball because we only support JSON input
630 The CLI is an oddball because we only support JSON input
631 and cannot send multiple commands through the same instance,
631 and cannot send multiple commands through the same instance,
632 so we spawn a new process for each command.
632 so we spawn a new process for each command.
633
633
634 We disable server spawning for this implementation, again, because
634 We disable server spawning for this implementation, again, because
635 it is intended to be used only in our test harness. You really
635 it is intended to be used only in our test harness. You really
636 should not need to use the CLI transport for anything real.
636 should not need to use the CLI transport for anything real.
637
637
638 While the CLI can output in BSER, our Transport interface doesn't
638 While the CLI can output in BSER, our Transport interface doesn't
639 support telling this instance that it should do so. That effectively
639 support telling this instance that it should do so. That effectively
640 limits this implementation to JSON input and output only at this time.
640 limits this implementation to JSON input and output only at this time.
641
641
642 It is the responsibility of the caller to set the send and
642 It is the responsibility of the caller to set the send and
643 receive codecs appropriately.
643 receive codecs appropriately.
644 """
644 """
645
645
646 proc = None
646 proc = None
647 closed = True
647 closed = True
648
648
649 def __init__(self, sockpath, timeout, binpath=None):
649 def __init__(self, sockpath, timeout, binpath=None):
650 self.sockpath = sockpath
650 self.sockpath = sockpath
651 self.timeout = timeout
651 self.timeout = timeout
652 self.binpath = _default_binpath(binpath)
652 self.binpath = _default_binpath(binpath)
653
653
654 def close(self):
654 def close(self):
655 if self.proc:
655 if self.proc:
656 if self.proc.pid is not None:
656 if self.proc.pid is not None:
657 self.proc.kill()
657 self.proc.kill()
658 self.proc.stdin.close()
658 self.proc.stdin.close()
659 self.proc.stdout.close()
659 self.proc.stdout.close()
660 self.proc.wait()
660 self.proc.wait()
661 self.proc = None
661 self.proc = None
662
662
663 def _connect(self):
663 def _connect(self):
664 if self.proc:
664 if self.proc:
665 return self.proc
665 return self.proc
666 args = [
666 args = [
667 self.binpath,
667 self.binpath,
668 "--sockname={0}".format(self.sockpath),
668 "--sockname={0}".format(self.sockpath),
669 "--logfile=/BOGUS",
669 "--logfile=/BOGUS",
670 "--statefile=/BOGUS",
670 "--statefile=/BOGUS",
671 "--no-spawn",
671 "--no-spawn",
672 "--no-local",
672 "--no-local",
673 "--no-pretty",
673 "--no-pretty",
674 "-j",
674 "-j",
675 ]
675 ]
676 self.proc = subprocess.Popen(
676 self.proc = subprocess.Popen(
677 args, stdin=subprocess.PIPE, stdout=subprocess.PIPE
677 args, stdin=subprocess.PIPE, stdout=subprocess.PIPE
678 )
678 )
679 return self.proc
679 return self.proc
680
680
681 def readBytes(self, size):
681 def readBytes(self, size):
682 self._connect()
682 self._connect()
683 res = self.proc.stdout.read(size)
683 res = self.proc.stdout.read(size)
684 if not res:
684 if not res:
685 raise WatchmanError("EOF on CLI process transport")
685 raise WatchmanError("EOF on CLI process transport")
686 return res
686 return res
687
687
688 def write(self, data):
688 def write(self, data):
689 if self.closed:
689 if self.closed:
690 self.close()
690 self.close()
691 self.closed = False
691 self.closed = False
692 self._connect()
692 proc = self._connect()
693 res = self.proc.stdin.write(data)
693 res = proc.stdin.write(data)
694 self.proc.stdin.close()
694 proc.stdin.close()
695 self.closed = True
695 self.closed = True
696 return res
696 return res
697
697
698
698
699 class BserCodec(Codec):
699 class BserCodec(Codec):
700 """use the BSER encoding. This is the default, preferred codec"""
700 """use the BSER encoding. This is the default, preferred codec"""
701
701
702 def __init__(self, transport, value_encoding, value_errors):
702 def __init__(self, transport, value_encoding, value_errors):
703 super(BserCodec, self).__init__(transport)
703 super(BserCodec, self).__init__(transport)
704 self._value_encoding = value_encoding
704 self._value_encoding = value_encoding
705 self._value_errors = value_errors
705 self._value_errors = value_errors
706
706
707 def _loads(self, response):
707 def _loads(self, response):
708 return bser.loads(
708 return bser.loads(
709 response,
709 response,
710 value_encoding=self._value_encoding,
710 value_encoding=self._value_encoding,
711 value_errors=self._value_errors,
711 value_errors=self._value_errors,
712 )
712 )
713
713
714 def receive(self):
714 def receive(self):
715 buf = [self.transport.readBytes(sniff_len)]
715 buf = [self.transport.readBytes(sniff_len)]
716 if not buf[0]:
716 if not buf[0]:
717 raise WatchmanError("empty watchman response")
717 raise WatchmanError("empty watchman response")
718
718
719 _1, _2, elen = bser.pdu_info(buf[0])
719 _1, _2, elen = bser.pdu_info(buf[0])
720
720
721 rlen = len(buf[0])
721 rlen = len(buf[0])
722 while elen > rlen:
722 while elen > rlen:
723 buf.append(self.transport.readBytes(elen - rlen))
723 buf.append(self.transport.readBytes(elen - rlen))
724 rlen += len(buf[-1])
724 rlen += len(buf[-1])
725
725
726 response = b"".join(buf)
726 response = b"".join(buf)
727 try:
727 try:
728 res = self._loads(response)
728 res = self._loads(response)
729 return res
729 return res
730 except ValueError as e:
730 except ValueError as e:
731 raise WatchmanError("watchman response decode error: %s" % e)
731 raise WatchmanError("watchman response decode error: %s" % e)
732
732
733 def send(self, *args):
733 def send(self, *args):
734 cmd = bser.dumps(*args) # Defaults to BSER v1
734 cmd = bser.dumps(*args) # Defaults to BSER v1
735 self.transport.write(cmd)
735 self.transport.write(cmd)
736
736
737
737
738 class ImmutableBserCodec(BserCodec):
738 class ImmutableBserCodec(BserCodec):
739 """use the BSER encoding, decoding values using the newer
739 """use the BSER encoding, decoding values using the newer
740 immutable object support"""
740 immutable object support"""
741
741
742 def _loads(self, response):
742 def _loads(self, response):
743 return bser.loads(
743 return bser.loads(
744 response,
744 response,
745 False,
745 False,
746 value_encoding=self._value_encoding,
746 value_encoding=self._value_encoding,
747 value_errors=self._value_errors,
747 value_errors=self._value_errors,
748 )
748 )
749
749
750
750
751 class Bser2WithFallbackCodec(BserCodec):
751 class Bser2WithFallbackCodec(BserCodec):
752 """use BSER v2 encoding"""
752 """use BSER v2 encoding"""
753
753
754 def __init__(self, transport, value_encoding, value_errors):
754 def __init__(self, transport, value_encoding, value_errors):
755 super(Bser2WithFallbackCodec, self).__init__(
755 super(Bser2WithFallbackCodec, self).__init__(
756 transport, value_encoding, value_errors
756 transport, value_encoding, value_errors
757 )
757 )
758 if compat.PYTHON3:
758 if compat.PYTHON3:
759 bserv2_key = "required"
759 bserv2_key = "required"
760 else:
760 else:
761 bserv2_key = "optional"
761 bserv2_key = "optional"
762
762
763 self.send(["version", {bserv2_key: ["bser-v2"]}])
763 self.send(["version", {bserv2_key: ["bser-v2"]}])
764
764
765 capabilities = self.receive()
765 capabilities = self.receive()
766
766
767 if "error" in capabilities:
767 if "error" in capabilities:
768 raise BSERv1Unsupported(
768 raise BSERv1Unsupported(
769 "The watchman server version does not support Python 3. Please "
769 "The watchman server version does not support Python 3. Please "
770 "upgrade your watchman server."
770 "upgrade your watchman server."
771 )
771 )
772
772
773 if capabilities["capabilities"]["bser-v2"]:
773 if capabilities["capabilities"]["bser-v2"]:
774 self.bser_version = 2
774 self.bser_version = 2
775 self.bser_capabilities = 0
775 self.bser_capabilities = 0
776 else:
776 else:
777 self.bser_version = 1
777 self.bser_version = 1
778 self.bser_capabilities = 0
778 self.bser_capabilities = 0
779
779
780 def receive(self):
780 def receive(self):
781 buf = [self.transport.readBytes(sniff_len)]
781 buf = [self.transport.readBytes(sniff_len)]
782 if not buf[0]:
782 if not buf[0]:
783 raise WatchmanError("empty watchman response")
783 raise WatchmanError("empty watchman response")
784
784
785 recv_bser_version, recv_bser_capabilities, elen = bser.pdu_info(buf[0])
785 recv_bser_version, recv_bser_capabilities, elen = bser.pdu_info(buf[0])
786
786
787 if hasattr(self, "bser_version"):
787 if hasattr(self, "bser_version"):
788 # Readjust BSER version and capabilities if necessary
788 # Readjust BSER version and capabilities if necessary
789 self.bser_version = max(self.bser_version, recv_bser_version)
789 self.bser_version = max(self.bser_version, recv_bser_version)
790 self.capabilities = self.bser_capabilities & recv_bser_capabilities
790 self.capabilities = self.bser_capabilities & recv_bser_capabilities
791
791
792 rlen = len(buf[0])
792 rlen = len(buf[0])
793 while elen > rlen:
793 while elen > rlen:
794 buf.append(self.transport.readBytes(elen - rlen))
794 buf.append(self.transport.readBytes(elen - rlen))
795 rlen += len(buf[-1])
795 rlen += len(buf[-1])
796
796
797 response = b"".join(buf)
797 response = b"".join(buf)
798 try:
798 try:
799 res = self._loads(response)
799 res = self._loads(response)
800 return res
800 return res
801 except ValueError as e:
801 except ValueError as e:
802 raise WatchmanError("watchman response decode error: %s" % e)
802 raise WatchmanError("watchman response decode error: %s" % e)
803
803
804 def send(self, *args):
804 def send(self, *args):
805 if hasattr(self, "bser_version"):
805 if hasattr(self, "bser_version"):
806 cmd = bser.dumps(
806 cmd = bser.dumps(
807 *args,
807 *args,
808 version=self.bser_version,
808 version=self.bser_version,
809 capabilities=self.bser_capabilities
809 capabilities=self.bser_capabilities
810 )
810 )
811 else:
811 else:
812 cmd = bser.dumps(*args)
812 cmd = bser.dumps(*args)
813 self.transport.write(cmd)
813 self.transport.write(cmd)
814
814
815
815
816 class ImmutableBser2Codec(Bser2WithFallbackCodec, ImmutableBserCodec):
816 class ImmutableBser2Codec(Bser2WithFallbackCodec, ImmutableBserCodec):
817 """use the BSER encoding, decoding values using the newer
817 """use the BSER encoding, decoding values using the newer
818 immutable object support"""
818 immutable object support"""
819
819
820 pass
820 pass
821
821
822
822
823 class JsonCodec(Codec):
823 class JsonCodec(Codec):
824 """Use json codec. This is here primarily for testing purposes"""
824 """Use json codec. This is here primarily for testing purposes"""
825
825
826 json = None
826 json = None
827
827
828 def __init__(self, transport):
828 def __init__(self, transport):
829 super(JsonCodec, self).__init__(transport)
829 super(JsonCodec, self).__init__(transport)
830 # optional dep on json, only if JsonCodec is used
830 # optional dep on json, only if JsonCodec is used
831 import json
831 import json
832
832
833 self.json = json
833 self.json = json
834
834
835 def receive(self):
835 def receive(self):
836 line = self.transport.readLine()
836 line = self.transport.readLine()
837 try:
837 try:
838 # In Python 3, json.loads is a transformation from Unicode string to
838 # In Python 3, json.loads is a transformation from Unicode string to
839 # objects possibly containing Unicode strings. We typically expect
839 # objects possibly containing Unicode strings. We typically expect
840 # the JSON blob to be ASCII-only with non-ASCII characters escaped,
840 # the JSON blob to be ASCII-only with non-ASCII characters escaped,
841 # but it's possible we might get non-ASCII bytes that are valid
841 # but it's possible we might get non-ASCII bytes that are valid
842 # UTF-8.
842 # UTF-8.
843 if compat.PYTHON3:
843 if compat.PYTHON3:
844 line = line.decode("utf-8")
844 line = line.decode("utf-8")
845 return self.json.loads(line)
845 return self.json.loads(line)
846 except Exception as e:
846 except Exception as e:
847 print(e, line)
847 print(e, line)
848 raise
848 raise
849
849
850 def send(self, *args):
850 def send(self, *args):
851 cmd = self.json.dumps(*args)
851 cmd = self.json.dumps(*args)
852 # In Python 3, json.dumps is a transformation from objects possibly
852 # In Python 3, json.dumps is a transformation from objects possibly
853 # containing Unicode strings to Unicode string. Even with (the default)
853 # containing Unicode strings to Unicode string. Even with (the default)
854 # ensure_ascii=True, dumps returns a Unicode string.
854 # ensure_ascii=True, dumps returns a Unicode string.
855 if compat.PYTHON3:
855 if compat.PYTHON3:
856 cmd = cmd.encode("ascii")
856 cmd = cmd.encode("ascii")
857 self.transport.write(cmd + b"\n")
857 self.transport.write(cmd + b"\n")
858
858
859
859
860 class client:
860 class client:
861 """Handles the communication with the watchman service"""
861 """Handles the communication with the watchman service"""
862
862
863 sockpath = None
863 sockpath = None
864 transport = None
864 transport = None
865 sendCodec = None
865 sendCodec = None
866 recvCodec = None
866 recvCodec = None
867 sendConn = None
867 sendConn = None
868 recvConn = None
868 recvConn = None
869 subs = {} # Keyed by subscription name
869 subs = {} # Keyed by subscription name
870 sub_by_root = {} # Keyed by root, then by subscription name
870 sub_by_root = {} # Keyed by root, then by subscription name
871 logs = [] # When log level is raised
871 logs = [] # When log level is raised
872 unilateral = ["log", "subscription"]
872 unilateral = ["log", "subscription"]
873 tport = None
873 tport = None
874 useImmutableBser = None
874 useImmutableBser = None
875 pid = None
875 pid = None
876
876
877 def __init__(
877 def __init__(
878 self,
878 self,
879 sockpath=None,
879 sockpath=None,
880 timeout=1.0,
880 timeout=1.0,
881 transport=None,
881 transport=None,
882 sendEncoding=None,
882 sendEncoding=None,
883 recvEncoding=None,
883 recvEncoding=None,
884 useImmutableBser=False,
884 useImmutableBser=False,
885 # use False for these two because None has a special
885 # use False for these two because None has a special
886 # meaning
886 # meaning
887 valueEncoding=False,
887 valueEncoding=False,
888 valueErrors=False,
888 valueErrors=False,
889 binpath=None,
889 binpath=None,
890 ):
890 ):
891 self.sockpath = sockpath
891 self.sockpath = sockpath
892 self.timeout = timeout
892 self.timeout = timeout
893 self.useImmutableBser = useImmutableBser
893 self.useImmutableBser = useImmutableBser
894 self.binpath = _default_binpath(binpath)
894 self.binpath = _default_binpath(binpath)
895
895
896 if inspect.isclass(transport) and issubclass(transport, Transport):
896 if inspect.isclass(transport) and issubclass(transport, Transport):
897 self.transport = transport
897 self.transport = transport
898 else:
898 else:
899 transport = transport or os.getenv("WATCHMAN_TRANSPORT") or "local"
899 transport = transport or os.getenv("WATCHMAN_TRANSPORT") or "local"
900 if transport == "local" and os.name == "nt":
900 if transport == "local" and os.name == "nt":
901 self.transport = WindowsNamedPipeTransport
901 self.transport = WindowsNamedPipeTransport
902 elif transport == "local":
902 elif transport == "local":
903 self.transport = UnixSocketTransport
903 self.transport = UnixSocketTransport
904 elif transport == "cli":
904 elif transport == "cli":
905 self.transport = CLIProcessTransport
905 self.transport = CLIProcessTransport
906 if sendEncoding is None:
906 if sendEncoding is None:
907 sendEncoding = "json"
907 sendEncoding = "json"
908 if recvEncoding is None:
908 if recvEncoding is None:
909 recvEncoding = sendEncoding
909 recvEncoding = sendEncoding
910 else:
910 else:
911 raise WatchmanError("invalid transport %s" % transport)
911 raise WatchmanError("invalid transport %s" % transport)
912
912
913 sendEncoding = str(
913 sendEncoding = str(
914 sendEncoding or os.getenv("WATCHMAN_ENCODING") or "bser"
914 sendEncoding or os.getenv("WATCHMAN_ENCODING") or "bser"
915 )
915 )
916 recvEncoding = str(
916 recvEncoding = str(
917 recvEncoding or os.getenv("WATCHMAN_ENCODING") or "bser"
917 recvEncoding or os.getenv("WATCHMAN_ENCODING") or "bser"
918 )
918 )
919
919
920 self.recvCodec = self._parseEncoding(recvEncoding)
920 self.recvCodec = self._parseEncoding(recvEncoding)
921 self.sendCodec = self._parseEncoding(sendEncoding)
921 self.sendCodec = self._parseEncoding(sendEncoding)
922
922
923 # We want to act like the native OS methods as much as possible. This
923 # We want to act like the native OS methods as much as possible. This
924 # means returning bytestrings on Python 2 by default and Unicode
924 # means returning bytestrings on Python 2 by default and Unicode
925 # strings on Python 3. However we take an optional argument that lets
925 # strings on Python 3. However we take an optional argument that lets
926 # users override this.
926 # users override this.
927 if valueEncoding is False:
927 if valueEncoding is False:
928 if compat.PYTHON3:
928 if compat.PYTHON3:
929 self.valueEncoding = encoding.get_local_encoding()
929 self.valueEncoding = encoding.get_local_encoding()
930 self.valueErrors = encoding.default_local_errors
930 self.valueErrors = encoding.default_local_errors
931 else:
931 else:
932 self.valueEncoding = None
932 self.valueEncoding = None
933 self.valueErrors = None
933 self.valueErrors = None
934 else:
934 else:
935 self.valueEncoding = valueEncoding
935 self.valueEncoding = valueEncoding
936 if valueErrors is False:
936 if valueErrors is False:
937 self.valueErrors = encoding.default_local_errors
937 self.valueErrors = encoding.default_local_errors
938 else:
938 else:
939 self.valueErrors = valueErrors
939 self.valueErrors = valueErrors
940
940
941 def _makeBSERCodec(self, codec):
941 def _makeBSERCodec(self, codec):
942 def make_codec(transport):
942 def make_codec(transport):
943 return codec(transport, self.valueEncoding, self.valueErrors)
943 return codec(transport, self.valueEncoding, self.valueErrors)
944
944
945 return make_codec
945 return make_codec
946
946
947 def _parseEncoding(self, enc):
947 def _parseEncoding(self, enc):
948 if enc == "bser":
948 if enc == "bser":
949 if self.useImmutableBser:
949 if self.useImmutableBser:
950 return self._makeBSERCodec(ImmutableBser2Codec)
950 return self._makeBSERCodec(ImmutableBser2Codec)
951 return self._makeBSERCodec(Bser2WithFallbackCodec)
951 return self._makeBSERCodec(Bser2WithFallbackCodec)
952 elif enc == "bser-v1":
952 elif enc == "bser-v1":
953 if compat.PYTHON3:
953 if compat.PYTHON3:
954 raise BSERv1Unsupported(
954 raise BSERv1Unsupported(
955 "Python 3 does not support the BSER v1 encoding: specify "
955 "Python 3 does not support the BSER v1 encoding: specify "
956 '"bser" or omit the sendEncoding and recvEncoding '
956 '"bser" or omit the sendEncoding and recvEncoding '
957 "arguments"
957 "arguments"
958 )
958 )
959 if self.useImmutableBser:
959 if self.useImmutableBser:
960 return self._makeBSERCodec(ImmutableBserCodec)
960 return self._makeBSERCodec(ImmutableBserCodec)
961 return self._makeBSERCodec(BserCodec)
961 return self._makeBSERCodec(BserCodec)
962 elif enc == "json":
962 elif enc == "json":
963 return JsonCodec
963 return JsonCodec
964 else:
964 else:
965 raise WatchmanError("invalid encoding %s" % enc)
965 raise WatchmanError("invalid encoding %s" % enc)
966
966
967 def _hasprop(self, result, name):
967 def _hasprop(self, result, name):
968 if self.useImmutableBser:
968 if self.useImmutableBser:
969 return hasattr(result, name)
969 return hasattr(result, name)
970 return name in result
970 return name in result
971
971
972 def _resolvesockname(self):
972 def _resolvesockname(self):
973 # if invoked via a trigger, watchman will set this env var; we
973 # if invoked via a trigger, watchman will set this env var; we
974 # should use it unless explicitly set otherwise
974 # should use it unless explicitly set otherwise
975 path = os.getenv("WATCHMAN_SOCK")
975 path = os.getenv("WATCHMAN_SOCK")
976 if path:
976 if path:
977 return path
977 return path
978
978
979 cmd = [self.binpath, "--output-encoding=bser", "get-sockname"]
979 cmd = [self.binpath, "--output-encoding=bser", "get-sockname"]
980 try:
980 try:
981 args = dict(
981 args = dict(
982 stdout=subprocess.PIPE, stderr=subprocess.PIPE
982 stdout=subprocess.PIPE, stderr=subprocess.PIPE
983 ) # noqa: C408
983 ) # noqa: C408
984
984
985 if os.name == "nt":
985 if os.name == "nt":
986 # if invoked via an application with graphical user interface,
986 # if invoked via an application with graphical user interface,
987 # this call will cause a brief command window pop-up.
987 # this call will cause a brief command window pop-up.
988 # Using the flag STARTF_USESHOWWINDOW to avoid this behavior.
988 # Using the flag STARTF_USESHOWWINDOW to avoid this behavior.
989 startupinfo = subprocess.STARTUPINFO()
989 startupinfo = subprocess.STARTUPINFO()
990 startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
990 startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
991 args["startupinfo"] = startupinfo
991 args["startupinfo"] = startupinfo
992
992
993 p = subprocess.Popen(cmd, **args)
993 p = subprocess.Popen(cmd, **args)
994
994
995 except OSError as e:
995 except OSError as e:
996 raise WatchmanError('"watchman" executable not in PATH (%s)' % e)
996 raise WatchmanError('"watchman" executable not in PATH (%s)' % e)
997
997
998 stdout, stderr = p.communicate()
998 stdout, stderr = p.communicate()
999 exitcode = p.poll()
999 exitcode = p.poll()
1000
1000
1001 if exitcode:
1001 if exitcode:
1002 raise WatchmanError("watchman exited with code %d" % exitcode)
1002 raise WatchmanError("watchman exited with code %d" % exitcode)
1003
1003
1004 result = bser.loads(stdout)
1004 result = bser.loads(stdout)
1005 if "error" in result:
1005 if "error" in result:
1006 raise WatchmanError("get-sockname error: %s" % result["error"])
1006 raise WatchmanError("get-sockname error: %s" % result["error"])
1007
1007
1008 return result["sockname"]
1008 return result["sockname"]
1009
1009
1010 def _connect(self):
1010 def _connect(self):
1011 """establish transport connection"""
1011 """establish transport connection"""
1012
1012
1013 if self.recvConn:
1013 if self.recvConn:
1014 if self.pid != os.getpid():
1014 if self.pid != os.getpid():
1015 raise UseAfterFork(
1015 raise UseAfterFork(
1016 "do not re-use a connection after fork; open a new client instead"
1016 "do not re-use a connection after fork; open a new client instead"
1017 )
1017 )
1018 return
1018 return
1019
1019
1020 if self.sockpath is None:
1020 if self.sockpath is None:
1021 self.sockpath = self._resolvesockname()
1021 self.sockpath = self._resolvesockname()
1022
1022
1023 kwargs = {}
1023 kwargs = {}
1024 if self.transport == CLIProcessTransport:
1024 if self.transport == CLIProcessTransport:
1025 kwargs["binpath"] = self.binpath
1025 kwargs["binpath"] = self.binpath
1026
1026
1027 self.tport = self.transport(self.sockpath, self.timeout, **kwargs)
1027 self.tport = self.transport(self.sockpath, self.timeout, **kwargs)
1028 self.sendConn = self.sendCodec(self.tport)
1028 self.sendConn = self.sendCodec(self.tport)
1029 self.recvConn = self.recvCodec(self.tport)
1029 self.recvConn = self.recvCodec(self.tport)
1030 self.pid = os.getpid()
1030 self.pid = os.getpid()
1031
1031
1032 def __del__(self):
1032 def __del__(self):
1033 self.close()
1033 self.close()
1034
1034
1035 def __enter__(self):
1035 def __enter__(self):
1036 self._connect()
1036 self._connect()
1037 return self
1037 return self
1038
1038
1039 def __exit__(self, exc_type, exc_value, exc_traceback):
1039 def __exit__(self, exc_type, exc_value, exc_traceback):
1040 self.close()
1040 self.close()
1041
1041
1042 def close(self):
1042 def close(self):
1043 if self.tport:
1043 if self.tport:
1044 self.tport.close()
1044 self.tport.close()
1045 self.tport = None
1045 self.tport = None
1046 self.recvConn = None
1046 self.recvConn = None
1047 self.sendConn = None
1047 self.sendConn = None
1048
1048
1049 def receive(self):
1049 def receive(self):
1050 """receive the next PDU from the watchman service
1050 """receive the next PDU from the watchman service
1051
1051
1052 If the client has activated subscriptions or logs then
1052 If the client has activated subscriptions or logs then
1053 this PDU may be a unilateral PDU sent by the service to
1053 this PDU may be a unilateral PDU sent by the service to
1054 inform the client of a log event or subscription change.
1054 inform the client of a log event or subscription change.
1055
1055
1056 It may also simply be the response portion of a request
1056 It may also simply be the response portion of a request
1057 initiated by query.
1057 initiated by query.
1058
1058
1059 There are clients in production that subscribe and call
1059 There are clients in production that subscribe and call
1060 this in a loop to retrieve all subscription responses,
1060 this in a loop to retrieve all subscription responses,
1061 so care should be taken when making changes here.
1061 so care should be taken when making changes here.
1062 """
1062 """
1063
1063
1064 self._connect()
1064 self._connect()
1065 result = self.recvConn.receive()
1065 result = self.recvConn.receive()
1066 if self._hasprop(result, "error"):
1066 if self._hasprop(result, "error"):
1067 raise CommandError(result["error"])
1067 raise CommandError(result["error"])
1068
1068
1069 if self._hasprop(result, "log"):
1069 if self._hasprop(result, "log"):
1070 self.logs.append(result["log"])
1070 self.logs.append(result["log"])
1071
1071
1072 if self._hasprop(result, "subscription"):
1072 if self._hasprop(result, "subscription"):
1073 sub = result["subscription"]
1073 sub = result["subscription"]
1074 if not (sub in self.subs):
1074 if not (sub in self.subs):
1075 self.subs[sub] = []
1075 self.subs[sub] = []
1076 self.subs[sub].append(result)
1076 self.subs[sub].append(result)
1077
1077
1078 # also accumulate in {root,sub} keyed store
1078 # also accumulate in {root,sub} keyed store
1079 root = os.path.normpath(os.path.normcase(result["root"]))
1079 root = os.path.normpath(os.path.normcase(result["root"]))
1080 if not root in self.sub_by_root:
1080 if not root in self.sub_by_root:
1081 self.sub_by_root[root] = {}
1081 self.sub_by_root[root] = {}
1082 if not sub in self.sub_by_root[root]:
1082 if not sub in self.sub_by_root[root]:
1083 self.sub_by_root[root][sub] = []
1083 self.sub_by_root[root][sub] = []
1084 self.sub_by_root[root][sub].append(result)
1084 self.sub_by_root[root][sub].append(result)
1085
1085
1086 return result
1086 return result
1087
1087
1088 def isUnilateralResponse(self, res):
1088 def isUnilateralResponse(self, res):
1089 if "unilateral" in res and res["unilateral"]:
1089 if "unilateral" in res and res["unilateral"]:
1090 return True
1090 return True
1091 # Fall back to checking for known unilateral responses
1091 # Fall back to checking for known unilateral responses
1092 for k in self.unilateral:
1092 for k in self.unilateral:
1093 if k in res:
1093 if k in res:
1094 return True
1094 return True
1095 return False
1095 return False
1096
1096
1097 def getLog(self, remove=True):
1097 def getLog(self, remove=True):
1098 """Retrieve buffered log data
1098 """Retrieve buffered log data
1099
1099
1100 If remove is true the data will be removed from the buffer.
1100 If remove is true the data will be removed from the buffer.
1101 Otherwise it will be left in the buffer
1101 Otherwise it will be left in the buffer
1102 """
1102 """
1103 res = self.logs
1103 res = self.logs
1104 if remove:
1104 if remove:
1105 self.logs = []
1105 self.logs = []
1106 return res
1106 return res
1107
1107
1108 def getSubscription(self, name, remove=True, root=None):
1108 def getSubscription(self, name, remove=True, root=None):
1109 """Retrieve the data associated with a named subscription
1109 """Retrieve the data associated with a named subscription
1110
1110
1111 If remove is True (the default), the subscription data is removed
1111 If remove is True (the default), the subscription data is removed
1112 from the buffer. Otherwise the data is returned but left in
1112 from the buffer. Otherwise the data is returned but left in
1113 the buffer.
1113 the buffer.
1114
1114
1115 Returns None if there is no data associated with `name`
1115 Returns None if there is no data associated with `name`
1116
1116
1117 If root is not None, then only return the subscription
1117 If root is not None, then only return the subscription
1118 data that matches both root and name. When used in this way,
1118 data that matches both root and name. When used in this way,
1119 remove processing impacts both the unscoped and scoped stores
1119 remove processing impacts both the unscoped and scoped stores
1120 for the subscription data.
1120 for the subscription data.
1121 """
1121 """
1122 if root is not None:
1122 if root is not None:
1123 root = os.path.normpath(os.path.normcase(root))
1123 root = os.path.normpath(os.path.normcase(root))
1124 if root not in self.sub_by_root:
1124 if root not in self.sub_by_root:
1125 return None
1125 return None
1126 if name not in self.sub_by_root[root]:
1126 if name not in self.sub_by_root[root]:
1127 return None
1127 return None
1128 sub = self.sub_by_root[root][name]
1128 sub = self.sub_by_root[root][name]
1129 if remove:
1129 if remove:
1130 del self.sub_by_root[root][name]
1130 del self.sub_by_root[root][name]
1131 # don't let this grow unbounded
1131 # don't let this grow unbounded
1132 if name in self.subs:
1132 if name in self.subs:
1133 del self.subs[name]
1133 del self.subs[name]
1134 return sub
1134 return sub
1135
1135
1136 if name not in self.subs:
1136 if name not in self.subs:
1137 return None
1137 return None
1138 sub = self.subs[name]
1138 sub = self.subs[name]
1139 if remove:
1139 if remove:
1140 del self.subs[name]
1140 del self.subs[name]
1141 return sub
1141 return sub
1142
1142
1143 def query(self, *args):
1143 def query(self, *args):
1144 """Send a query to the watchman service and return the response
1144 """Send a query to the watchman service and return the response
1145
1145
1146 This call will block until the response is returned.
1146 This call will block until the response is returned.
1147 If any unilateral responses are sent by the service in between
1147 If any unilateral responses are sent by the service in between
1148 the request-response they will be buffered up in the client object
1148 the request-response they will be buffered up in the client object
1149 and NOT returned via this method.
1149 and NOT returned via this method.
1150 """
1150 """
1151
1151
1152 log("calling client.query")
1152 log("calling client.query")
1153 self._connect()
1153 self._connect()
1154 try:
1154 try:
1155 self.sendConn.send(args)
1155 self.sendConn.send(args)
1156
1156
1157 res = self.receive()
1157 res = self.receive()
1158 while self.isUnilateralResponse(res):
1158 while self.isUnilateralResponse(res):
1159 res = self.receive()
1159 res = self.receive()
1160
1160
1161 return res
1161 return res
1162 except EnvironmentError as ee:
1162 except EnvironmentError as ee:
1163 # When we can depend on Python 3, we can use PEP 3134
1163 # When we can depend on Python 3, we can use PEP 3134
1164 # exception chaining here.
1164 # exception chaining here.
1165 raise WatchmanEnvironmentError(
1165 raise WatchmanEnvironmentError(
1166 "I/O error communicating with watchman daemon",
1166 "I/O error communicating with watchman daemon",
1167 ee.errno,
1167 ee.errno,
1168 ee.strerror,
1168 ee.strerror,
1169 args,
1169 args,
1170 )
1170 )
1171 except WatchmanError as ex:
1171 except WatchmanError as ex:
1172 ex.setCommand(args)
1172 ex.setCommand(args)
1173 raise
1173 raise
1174
1174
1175 def capabilityCheck(self, optional=None, required=None):
1175 def capabilityCheck(self, optional=None, required=None):
1176 """Perform a server capability check"""
1176 """Perform a server capability check"""
1177 res = self.query(
1177 res = self.query(
1178 "version", {"optional": optional or [], "required": required or []}
1178 "version", {"optional": optional or [], "required": required or []}
1179 )
1179 )
1180
1180
1181 if not self._hasprop(res, "capabilities"):
1181 if not self._hasprop(res, "capabilities"):
1182 # Server doesn't support capabilities, so we need to
1182 # Server doesn't support capabilities, so we need to
1183 # synthesize the results based on the version
1183 # synthesize the results based on the version
1184 capabilities.synthesize(res, optional)
1184 capabilities.synthesize(res, optional)
1185 if "error" in res:
1185 if "error" in res:
1186 raise CommandError(res["error"])
1186 raise CommandError(res["error"])
1187
1187
1188 return res
1188 return res
1189
1189
1190 def setTimeout(self, value):
1190 def setTimeout(self, value):
1191 self.recvConn.setTimeout(value)
1191 self.recvConn.setTimeout(value)
1192 self.sendConn.setTimeout(value)
1192 self.sendConn.setTimeout(value)
General Comments 0
You need to be logged in to leave comments. Login now