##// END OF EJS Templates
watchman: refactor transport connecting to unconfuse pytype...
Matt Harbison -
r50751:40d3ee57 default
parent child Browse files
Show More
@@ -1,1192 +1,1192 b''
1 1 # Copyright 2014-present Facebook, Inc.
2 2 # All rights reserved.
3 3 #
4 4 # Redistribution and use in source and binary forms, with or without
5 5 # modification, are permitted provided that the following conditions are met:
6 6 #
7 7 # * Redistributions of source code must retain the above copyright notice,
8 8 # this list of conditions and the following disclaimer.
9 9 #
10 10 # * Redistributions in binary form must reproduce the above copyright notice,
11 11 # this list of conditions and the following disclaimer in the documentation
12 12 # and/or other materials provided with the distribution.
13 13 #
14 14 # * Neither the name Facebook nor the names of its contributors may be used to
15 15 # endorse or promote products derived from this software without specific
16 16 # prior written permission.
17 17 #
18 18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 19 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 20 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21 21 # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
22 22 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23 23 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24 24 # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
25 25 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
26 26 # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27 27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 28
29 29 import inspect
30 30 import math
31 31 import os
32 32 import socket
33 33 import subprocess
34 34 import time
35 35
36 36 from . import capabilities, compat, encoding, load
37 37
38 38
39 39 # Sometimes it's really hard to get Python extensions to compile,
40 40 # so fall back to a pure Python implementation.
41 41 try:
42 42 from . import bser
43 43
44 44 # Demandimport causes modules to be loaded lazily. Force the load now
45 45 # so that we can fall back on pybser if bser doesn't exist
46 46 bser.pdu_info
47 47 except ImportError:
48 48 from . import pybser as bser
49 49
50 50
51 51 if os.name == "nt":
52 52 import ctypes
53 53 import ctypes.wintypes
54 54
55 55 wintypes = ctypes.wintypes
56 56 GENERIC_READ = 0x80000000
57 57 GENERIC_WRITE = 0x40000000
58 58 FILE_FLAG_OVERLAPPED = 0x40000000
59 59 OPEN_EXISTING = 3
60 60 INVALID_HANDLE_VALUE = ctypes.c_void_p(-1).value
61 61 FORMAT_MESSAGE_FROM_SYSTEM = 0x00001000
62 62 FORMAT_MESSAGE_ALLOCATE_BUFFER = 0x00000100
63 63 FORMAT_MESSAGE_IGNORE_INSERTS = 0x00000200
64 64 WAIT_FAILED = 0xFFFFFFFF
65 65 WAIT_TIMEOUT = 0x00000102
66 66 WAIT_OBJECT_0 = 0x00000000
67 67 WAIT_IO_COMPLETION = 0x000000C0
68 68 INFINITE = 0xFFFFFFFF
69 69
70 70 # Overlapped I/O operation is in progress. (997)
71 71 ERROR_IO_PENDING = 0x000003E5
72 72
73 73 # The pointer size follows the architecture
74 74 # We use WPARAM since this type is already conditionally defined
75 75 ULONG_PTR = ctypes.wintypes.WPARAM
76 76
77 77 class OVERLAPPED(ctypes.Structure):
78 78 _fields_ = [
79 79 ("Internal", ULONG_PTR),
80 80 ("InternalHigh", ULONG_PTR),
81 81 ("Offset", wintypes.DWORD),
82 82 ("OffsetHigh", wintypes.DWORD),
83 83 ("hEvent", wintypes.HANDLE),
84 84 ]
85 85
86 86 def __init__(self):
87 87 self.Internal = 0
88 88 self.InternalHigh = 0
89 89 self.Offset = 0
90 90 self.OffsetHigh = 0
91 91 self.hEvent = 0
92 92
93 93 LPDWORD = ctypes.POINTER(wintypes.DWORD)
94 94
95 95 _kernel32 = ctypes.windll.kernel32 # pytype: disable=module-attr
96 96
97 97 CreateFile = _kernel32.CreateFileA
98 98 CreateFile.argtypes = [
99 99 wintypes.LPSTR,
100 100 wintypes.DWORD,
101 101 wintypes.DWORD,
102 102 wintypes.LPVOID,
103 103 wintypes.DWORD,
104 104 wintypes.DWORD,
105 105 wintypes.HANDLE,
106 106 ]
107 107 CreateFile.restype = wintypes.HANDLE
108 108
109 109 CloseHandle = _kernel32.CloseHandle
110 110 CloseHandle.argtypes = [wintypes.HANDLE]
111 111 CloseHandle.restype = wintypes.BOOL
112 112
113 113 ReadFile = _kernel32.ReadFile
114 114 ReadFile.argtypes = [
115 115 wintypes.HANDLE,
116 116 wintypes.LPVOID,
117 117 wintypes.DWORD,
118 118 LPDWORD,
119 119 ctypes.POINTER(OVERLAPPED),
120 120 ]
121 121 ReadFile.restype = wintypes.BOOL
122 122
123 123 WriteFile = _kernel32.WriteFile
124 124 WriteFile.argtypes = [
125 125 wintypes.HANDLE,
126 126 wintypes.LPVOID,
127 127 wintypes.DWORD,
128 128 LPDWORD,
129 129 ctypes.POINTER(OVERLAPPED),
130 130 ]
131 131 WriteFile.restype = wintypes.BOOL
132 132
133 133 GetLastError = _kernel32.GetLastError
134 134 GetLastError.argtypes = []
135 135 GetLastError.restype = wintypes.DWORD
136 136
137 137 SetLastError = _kernel32.SetLastError
138 138 SetLastError.argtypes = [wintypes.DWORD]
139 139 SetLastError.restype = None
140 140
141 141 FormatMessage = _kernel32.FormatMessageA
142 142 FormatMessage.argtypes = [
143 143 wintypes.DWORD,
144 144 wintypes.LPVOID,
145 145 wintypes.DWORD,
146 146 wintypes.DWORD,
147 147 ctypes.POINTER(wintypes.LPSTR),
148 148 wintypes.DWORD,
149 149 wintypes.LPVOID,
150 150 ]
151 151 FormatMessage.restype = wintypes.DWORD
152 152
153 153 LocalFree = _kernel32.LocalFree
154 154
155 155 GetOverlappedResult = _kernel32.GetOverlappedResult
156 156 GetOverlappedResult.argtypes = [
157 157 wintypes.HANDLE,
158 158 ctypes.POINTER(OVERLAPPED),
159 159 LPDWORD,
160 160 wintypes.BOOL,
161 161 ]
162 162 GetOverlappedResult.restype = wintypes.BOOL
163 163
164 164 GetOverlappedResultEx = getattr(_kernel32, "GetOverlappedResultEx", None)
165 165 if GetOverlappedResultEx is not None:
166 166 GetOverlappedResultEx.argtypes = [
167 167 wintypes.HANDLE,
168 168 ctypes.POINTER(OVERLAPPED),
169 169 LPDWORD,
170 170 wintypes.DWORD,
171 171 wintypes.BOOL,
172 172 ]
173 173 GetOverlappedResultEx.restype = wintypes.BOOL
174 174
175 175 WaitForSingleObjectEx = _kernel32.WaitForSingleObjectEx
176 176 WaitForSingleObjectEx.argtypes = [
177 177 wintypes.HANDLE,
178 178 wintypes.DWORD,
179 179 wintypes.BOOL,
180 180 ]
181 181 WaitForSingleObjectEx.restype = wintypes.DWORD
182 182
183 183 CreateEvent = _kernel32.CreateEventA
184 184 CreateEvent.argtypes = [
185 185 LPDWORD,
186 186 wintypes.BOOL,
187 187 wintypes.BOOL,
188 188 wintypes.LPSTR,
189 189 ]
190 190 CreateEvent.restype = wintypes.HANDLE
191 191
192 192 # Windows Vista is the minimum supported client for CancelIoEx.
193 193 CancelIoEx = _kernel32.CancelIoEx
194 194 CancelIoEx.argtypes = [wintypes.HANDLE, ctypes.POINTER(OVERLAPPED)]
195 195 CancelIoEx.restype = wintypes.BOOL
196 196
197 197 # 2 bytes marker, 1 byte int size, 8 bytes int64 value
198 198 sniff_len = 13
199 199
200 200 # This is a helper for debugging the client.
201 201 _debugging = False
202 202 if _debugging:
203 203
204 204 def log(fmt, *args):
205 205 print(
206 206 "[%s] %s"
207 207 % (
208 208 time.strftime("%a, %d %b %Y %H:%M:%S", time.gmtime()),
209 209 fmt % args[:],
210 210 )
211 211 )
212 212
213 213
214 214 else:
215 215
216 216 def log(fmt, *args):
217 217 pass
218 218
219 219
220 220 def _win32_strerror(err):
221 221 """expand a win32 error code into a human readable message"""
222 222
223 223 # FormatMessage will allocate memory and assign it here
224 224 buf = ctypes.c_char_p()
225 225 FormatMessage(
226 226 FORMAT_MESSAGE_FROM_SYSTEM
227 227 | FORMAT_MESSAGE_ALLOCATE_BUFFER
228 228 | FORMAT_MESSAGE_IGNORE_INSERTS,
229 229 None,
230 230 err,
231 231 0,
232 232 buf,
233 233 0,
234 234 None,
235 235 )
236 236 try:
237 237 return buf.value
238 238 finally:
239 239 LocalFree(buf)
240 240
241 241
242 242 class WatchmanError(Exception):
243 243 def __init__(self, msg=None, cmd=None):
244 244 self.msg = msg
245 245 self.cmd = cmd
246 246
247 247 def setCommand(self, cmd):
248 248 self.cmd = cmd
249 249
250 250 def __str__(self):
251 251 if self.cmd:
252 252 return "%s, while executing %s" % (self.msg, self.cmd)
253 253 return self.msg
254 254
255 255
256 256 class BSERv1Unsupported(WatchmanError):
257 257 pass
258 258
259 259
260 260 class UseAfterFork(WatchmanError):
261 261 pass
262 262
263 263
264 264 class WatchmanEnvironmentError(WatchmanError):
265 265 def __init__(self, msg, errno, errmsg, cmd=None):
266 266 super(WatchmanEnvironmentError, self).__init__(
267 267 "{0}: errno={1} errmsg={2}".format(msg, errno, errmsg), cmd
268 268 )
269 269
270 270
271 271 class SocketConnectError(WatchmanError):
272 272 def __init__(self, sockpath, exc):
273 273 super(SocketConnectError, self).__init__(
274 274 "unable to connect to %s: %s" % (sockpath, exc)
275 275 )
276 276 self.sockpath = sockpath
277 277 self.exc = exc
278 278
279 279
280 280 class SocketTimeout(WatchmanError):
281 281 """A specialized exception raised for socket timeouts during communication to/from watchman.
282 282 This makes it easier to implement non-blocking loops as callers can easily distinguish
283 283 between a routine timeout and an actual error condition.
284 284
285 285 Note that catching WatchmanError will also catch this as it is a super-class, so backwards
286 286 compatibility in exception handling is preserved.
287 287 """
288 288
289 289
290 290 class CommandError(WatchmanError):
291 291 """error returned by watchman
292 292
293 293 self.msg is the message returned by watchman.
294 294 """
295 295
296 296 def __init__(self, msg, cmd=None):
297 297 super(CommandError, self).__init__(
298 298 "watchman command error: %s" % (msg,), cmd
299 299 )
300 300
301 301
302 302 class Transport:
303 303 """communication transport to the watchman server"""
304 304
305 305 buf = None
306 306
307 307 def close(self):
308 308 """tear it down"""
309 309 raise NotImplementedError()
310 310
311 311 def readBytes(self, size):
312 312 """read size bytes"""
313 313 raise NotImplementedError()
314 314
315 315 def write(self, buf):
316 316 """write some data"""
317 317 raise NotImplementedError()
318 318
319 319 def setTimeout(self, value):
320 320 pass
321 321
322 322 def readLine(self):
323 323 """read a line
324 324 Maintains its own buffer, callers of the transport should not mix
325 325 calls to readBytes and readLine.
326 326 """
327 327 if self.buf is None:
328 328 self.buf = []
329 329
330 330 # Buffer may already have a line if we've received unilateral
331 331 # response(s) from the server
332 332 if len(self.buf) == 1 and b"\n" in self.buf[0]:
333 333 (line, b) = self.buf[0].split(b"\n", 1)
334 334 self.buf = [b]
335 335 return line
336 336
337 337 while True:
338 338 b = self.readBytes(4096)
339 339 if b"\n" in b:
340 340 result = b"".join(self.buf)
341 341 (line, b) = b.split(b"\n", 1)
342 342 self.buf = [b]
343 343 return result + line
344 344 self.buf.append(b)
345 345
346 346
347 347 class Codec:
348 348 """communication encoding for the watchman server"""
349 349
350 350 transport = None
351 351
352 352 def __init__(self, transport):
353 353 self.transport = transport
354 354
355 355 def receive(self):
356 356 raise NotImplementedError()
357 357
358 358 def send(self, *args):
359 359 raise NotImplementedError()
360 360
361 361 def setTimeout(self, value):
362 362 self.transport.setTimeout(value)
363 363
364 364
365 365 class UnixSocketTransport(Transport):
366 366 """local unix domain socket transport"""
367 367
368 368 sock = None
369 369
370 370 def __init__(self, sockpath, timeout):
371 371 self.sockpath = sockpath
372 372 self.timeout = timeout
373 373
374 374 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
375 375 try:
376 376 sock.settimeout(self.timeout)
377 377 sock.connect(self.sockpath)
378 378 self.sock = sock
379 379 except socket.error as e:
380 380 sock.close()
381 381 raise SocketConnectError(self.sockpath, e)
382 382
383 383 def close(self):
384 384 if self.sock:
385 385 self.sock.close()
386 386 self.sock = None
387 387
388 388 def setTimeout(self, value):
389 389 self.timeout = value
390 390 self.sock.settimeout(self.timeout)
391 391
392 392 def readBytes(self, size):
393 393 try:
394 394 buf = [self.sock.recv(size)]
395 395 if not buf[0]:
396 396 raise WatchmanError("empty watchman response")
397 397 return buf[0]
398 398 except socket.timeout:
399 399 raise SocketTimeout("timed out waiting for response")
400 400
401 401 def write(self, data):
402 402 try:
403 403 self.sock.sendall(data)
404 404 except socket.timeout:
405 405 raise SocketTimeout("timed out sending query command")
406 406
407 407
408 408 def _get_overlapped_result_ex_impl(pipe, olap, nbytes, millis, alertable):
409 409 """Windows 7 and earlier does not support GetOverlappedResultEx. The
410 410 alternative is to use GetOverlappedResult and wait for read or write
411 411 operation to complete. This is done be using CreateEvent and
412 412 WaitForSingleObjectEx. CreateEvent, WaitForSingleObjectEx
413 413 and GetOverlappedResult are all part of Windows API since WindowsXP.
414 414 This is the exact same implementation that can be found in the watchman
415 415 source code (see get_overlapped_result_ex_impl in stream_win.c). This
416 416 way, maintenance should be simplified.
417 417 """
418 418 log("Preparing to wait for maximum %dms", millis)
419 419 if millis != 0:
420 420 waitReturnCode = WaitForSingleObjectEx(olap.hEvent, millis, alertable)
421 421 if waitReturnCode == WAIT_OBJECT_0:
422 422 # Event is signaled, overlapped IO operation result should be available.
423 423 pass
424 424 elif waitReturnCode == WAIT_IO_COMPLETION:
425 425 # WaitForSingleObjectEx returnes because the system added an I/O completion
426 426 # routine or an asynchronous procedure call (APC) to the thread queue.
427 427 SetLastError(WAIT_IO_COMPLETION)
428 428 pass
429 429 elif waitReturnCode == WAIT_TIMEOUT:
430 430 # We reached the maximum allowed wait time, the IO operation failed
431 431 # to complete in timely fashion.
432 432 SetLastError(WAIT_TIMEOUT)
433 433 return False
434 434 elif waitReturnCode == WAIT_FAILED:
435 435 # something went wrong calling WaitForSingleObjectEx
436 436 err = GetLastError()
437 437 log("WaitForSingleObjectEx failed: %s", _win32_strerror(err))
438 438 return False
439 439 else:
440 440 # unexpected situation deserving investigation.
441 441 err = GetLastError()
442 442 log("Unexpected error: %s", _win32_strerror(err))
443 443 return False
444 444
445 445 return GetOverlappedResult(pipe, olap, nbytes, False)
446 446
447 447
448 448 class WindowsNamedPipeTransport(Transport):
449 449 """connect to a named pipe"""
450 450
451 451 def __init__(self, sockpath, timeout):
452 452 self.sockpath = sockpath
453 453 self.timeout = int(math.ceil(timeout * 1000))
454 454 self._iobuf = None
455 455
456 456 if compat.PYTHON3:
457 457 sockpath = os.fsencode(sockpath)
458 458 self.pipe = CreateFile(
459 459 sockpath,
460 460 GENERIC_READ | GENERIC_WRITE,
461 461 0,
462 462 None,
463 463 OPEN_EXISTING,
464 464 FILE_FLAG_OVERLAPPED,
465 465 None,
466 466 )
467 467
468 468 err = GetLastError()
469 469 if self.pipe == INVALID_HANDLE_VALUE or self.pipe == 0:
470 470 self.pipe = None
471 471 raise SocketConnectError(self.sockpath, self._make_win_err("", err))
472 472
473 473 # event for the overlapped I/O operations
474 474 self._waitable = CreateEvent(None, True, False, None)
475 475 err = GetLastError()
476 476 if self._waitable is None:
477 477 self._raise_win_err("CreateEvent failed", err)
478 478
479 479 self._get_overlapped_result_ex = GetOverlappedResultEx
480 480 if (
481 481 os.getenv("WATCHMAN_WIN7_COMPAT") == "1"
482 482 or self._get_overlapped_result_ex is None
483 483 ):
484 484 self._get_overlapped_result_ex = _get_overlapped_result_ex_impl
485 485
486 486 def _raise_win_err(self, msg, err):
487 487 raise self._make_win_err(msg, err)
488 488
489 489 def _make_win_err(self, msg, err):
490 490 return IOError(
491 491 "%s win32 error code: %d %s" % (msg, err, _win32_strerror(err))
492 492 )
493 493
494 494 def close(self):
495 495 if self.pipe:
496 496 log("Closing pipe")
497 497 CloseHandle(self.pipe)
498 498 self.pipe = None
499 499
500 500 if self._waitable is not None:
501 501 # We release the handle for the event
502 502 CloseHandle(self._waitable)
503 503 self._waitable = None
504 504
505 505 def setTimeout(self, value):
506 506 # convert to milliseconds
507 507 self.timeout = int(value * 1000)
508 508
509 509 def readBytes(self, size):
510 510 """A read can block for an unbounded amount of time, even if the
511 511 kernel reports that the pipe handle is signalled, so we need to
512 512 always perform our reads asynchronously
513 513 """
514 514
515 515 # try to satisfy the read from any buffered data
516 516 if self._iobuf:
517 517 if size >= len(self._iobuf):
518 518 res = self._iobuf
519 519 self.buf = None
520 520 return res
521 521 res = self._iobuf[:size]
522 522 self._iobuf = self._iobuf[size:]
523 523 return res
524 524
525 525 # We need to initiate a read
526 526 buf = ctypes.create_string_buffer(size)
527 527 olap = OVERLAPPED()
528 528 olap.hEvent = self._waitable
529 529
530 530 log("made read buff of size %d", size)
531 531
532 532 # ReadFile docs warn against sending in the nread parameter for async
533 533 # operations, so we always collect it via GetOverlappedResultEx
534 534 immediate = ReadFile(self.pipe, buf, size, None, olap)
535 535
536 536 if not immediate:
537 537 err = GetLastError()
538 538 if err != ERROR_IO_PENDING:
539 539 self._raise_win_err("failed to read %d bytes" % size, err)
540 540
541 541 nread = wintypes.DWORD()
542 542 if not self._get_overlapped_result_ex(
543 543 self.pipe, olap, nread, 0 if immediate else self.timeout, True
544 544 ):
545 545 err = GetLastError()
546 546 CancelIoEx(self.pipe, olap)
547 547
548 548 if err == WAIT_TIMEOUT:
549 549 log("GetOverlappedResultEx timedout")
550 550 raise SocketTimeout(
551 551 "timed out after waiting %dms for read" % self.timeout
552 552 )
553 553
554 554 log("GetOverlappedResultEx reports error %d", err)
555 555 self._raise_win_err("error while waiting for read", err)
556 556
557 557 nread = nread.value
558 558 if nread == 0:
559 559 # Docs say that named pipes return 0 byte when the other end did
560 560 # a zero byte write. Since we don't ever do that, the only
561 561 # other way this shows up is if the client has gotten in a weird
562 562 # state, so let's bail out
563 563 CancelIoEx(self.pipe, olap)
564 564 raise IOError("Async read yielded 0 bytes; unpossible!")
565 565
566 566 # Holds precisely the bytes that we read from the prior request
567 567 buf = buf[:nread]
568 568
569 569 returned_size = min(nread, size)
570 570 if returned_size == nread:
571 571 return buf
572 572
573 573 # keep any left-overs around for a later read to consume
574 574 self._iobuf = buf[returned_size:]
575 575 return buf[:returned_size]
576 576
577 577 def write(self, data):
578 578 olap = OVERLAPPED()
579 579 olap.hEvent = self._waitable
580 580
581 581 immediate = WriteFile(
582 582 self.pipe, ctypes.c_char_p(data), len(data), None, olap
583 583 )
584 584
585 585 if not immediate:
586 586 err = GetLastError()
587 587 if err != ERROR_IO_PENDING:
588 588 self._raise_win_err(
589 589 "failed to write %d bytes to handle %r"
590 590 % (len(data), self.pipe),
591 591 err,
592 592 )
593 593
594 594 # Obtain results, waiting if needed
595 595 nwrote = wintypes.DWORD()
596 596 if self._get_overlapped_result_ex(
597 597 self.pipe, olap, nwrote, 0 if immediate else self.timeout, True
598 598 ):
599 599 log("made write of %d bytes", nwrote.value)
600 600 return nwrote.value
601 601
602 602 err = GetLastError()
603 603
604 604 # It's potentially unsafe to allow the write to continue after
605 605 # we unwind, so let's make a best effort to avoid that happening
606 606 CancelIoEx(self.pipe, olap)
607 607
608 608 if err == WAIT_TIMEOUT:
609 609 raise SocketTimeout(
610 610 "timed out after waiting %dms for write" % self.timeout
611 611 )
612 612 self._raise_win_err(
613 613 "error while waiting for write of %d bytes" % len(data), err
614 614 )
615 615
616 616
617 617 def _default_binpath(binpath=None):
618 618 if binpath:
619 619 return binpath
620 620 # The test harness sets WATCHMAN_BINARY to the binary under test,
621 621 # so we use that by default, otherwise, allow resolving watchman
622 622 # from the users PATH.
623 623 return os.environ.get("WATCHMAN_BINARY", "watchman")
624 624
625 625
626 626 class CLIProcessTransport(Transport):
627 627 """open a pipe to the cli to talk to the service
628 628 This intended to be used only in the test harness!
629 629
630 630 The CLI is an oddball because we only support JSON input
631 631 and cannot send multiple commands through the same instance,
632 632 so we spawn a new process for each command.
633 633
634 634 We disable server spawning for this implementation, again, because
635 635 it is intended to be used only in our test harness. You really
636 636 should not need to use the CLI transport for anything real.
637 637
638 638 While the CLI can output in BSER, our Transport interface doesn't
639 639 support telling this instance that it should do so. That effectively
640 640 limits this implementation to JSON input and output only at this time.
641 641
642 642 It is the responsibility of the caller to set the send and
643 643 receive codecs appropriately.
644 644 """
645 645
646 646 proc = None
647 647 closed = True
648 648
649 649 def __init__(self, sockpath, timeout, binpath=None):
650 650 self.sockpath = sockpath
651 651 self.timeout = timeout
652 652 self.binpath = _default_binpath(binpath)
653 653
654 654 def close(self):
655 655 if self.proc:
656 656 if self.proc.pid is not None:
657 657 self.proc.kill()
658 658 self.proc.stdin.close()
659 659 self.proc.stdout.close()
660 660 self.proc.wait()
661 661 self.proc = None
662 662
663 663 def _connect(self):
664 664 if self.proc:
665 665 return self.proc
666 666 args = [
667 667 self.binpath,
668 668 "--sockname={0}".format(self.sockpath),
669 669 "--logfile=/BOGUS",
670 670 "--statefile=/BOGUS",
671 671 "--no-spawn",
672 672 "--no-local",
673 673 "--no-pretty",
674 674 "-j",
675 675 ]
676 676 self.proc = subprocess.Popen(
677 677 args, stdin=subprocess.PIPE, stdout=subprocess.PIPE
678 678 )
679 679 return self.proc
680 680
681 681 def readBytes(self, size):
682 682 self._connect()
683 683 res = self.proc.stdout.read(size)
684 684 if not res:
685 685 raise WatchmanError("EOF on CLI process transport")
686 686 return res
687 687
688 688 def write(self, data):
689 689 if self.closed:
690 690 self.close()
691 691 self.closed = False
692 self._connect()
693 res = self.proc.stdin.write(data)
694 self.proc.stdin.close()
692 proc = self._connect()
693 res = proc.stdin.write(data)
694 proc.stdin.close()
695 695 self.closed = True
696 696 return res
697 697
698 698
699 699 class BserCodec(Codec):
700 700 """use the BSER encoding. This is the default, preferred codec"""
701 701
702 702 def __init__(self, transport, value_encoding, value_errors):
703 703 super(BserCodec, self).__init__(transport)
704 704 self._value_encoding = value_encoding
705 705 self._value_errors = value_errors
706 706
707 707 def _loads(self, response):
708 708 return bser.loads(
709 709 response,
710 710 value_encoding=self._value_encoding,
711 711 value_errors=self._value_errors,
712 712 )
713 713
714 714 def receive(self):
715 715 buf = [self.transport.readBytes(sniff_len)]
716 716 if not buf[0]:
717 717 raise WatchmanError("empty watchman response")
718 718
719 719 _1, _2, elen = bser.pdu_info(buf[0])
720 720
721 721 rlen = len(buf[0])
722 722 while elen > rlen:
723 723 buf.append(self.transport.readBytes(elen - rlen))
724 724 rlen += len(buf[-1])
725 725
726 726 response = b"".join(buf)
727 727 try:
728 728 res = self._loads(response)
729 729 return res
730 730 except ValueError as e:
731 731 raise WatchmanError("watchman response decode error: %s" % e)
732 732
733 733 def send(self, *args):
734 734 cmd = bser.dumps(*args) # Defaults to BSER v1
735 735 self.transport.write(cmd)
736 736
737 737
738 738 class ImmutableBserCodec(BserCodec):
739 739 """use the BSER encoding, decoding values using the newer
740 740 immutable object support"""
741 741
742 742 def _loads(self, response):
743 743 return bser.loads(
744 744 response,
745 745 False,
746 746 value_encoding=self._value_encoding,
747 747 value_errors=self._value_errors,
748 748 )
749 749
750 750
751 751 class Bser2WithFallbackCodec(BserCodec):
752 752 """use BSER v2 encoding"""
753 753
754 754 def __init__(self, transport, value_encoding, value_errors):
755 755 super(Bser2WithFallbackCodec, self).__init__(
756 756 transport, value_encoding, value_errors
757 757 )
758 758 if compat.PYTHON3:
759 759 bserv2_key = "required"
760 760 else:
761 761 bserv2_key = "optional"
762 762
763 763 self.send(["version", {bserv2_key: ["bser-v2"]}])
764 764
765 765 capabilities = self.receive()
766 766
767 767 if "error" in capabilities:
768 768 raise BSERv1Unsupported(
769 769 "The watchman server version does not support Python 3. Please "
770 770 "upgrade your watchman server."
771 771 )
772 772
773 773 if capabilities["capabilities"]["bser-v2"]:
774 774 self.bser_version = 2
775 775 self.bser_capabilities = 0
776 776 else:
777 777 self.bser_version = 1
778 778 self.bser_capabilities = 0
779 779
780 780 def receive(self):
781 781 buf = [self.transport.readBytes(sniff_len)]
782 782 if not buf[0]:
783 783 raise WatchmanError("empty watchman response")
784 784
785 785 recv_bser_version, recv_bser_capabilities, elen = bser.pdu_info(buf[0])
786 786
787 787 if hasattr(self, "bser_version"):
788 788 # Readjust BSER version and capabilities if necessary
789 789 self.bser_version = max(self.bser_version, recv_bser_version)
790 790 self.capabilities = self.bser_capabilities & recv_bser_capabilities
791 791
792 792 rlen = len(buf[0])
793 793 while elen > rlen:
794 794 buf.append(self.transport.readBytes(elen - rlen))
795 795 rlen += len(buf[-1])
796 796
797 797 response = b"".join(buf)
798 798 try:
799 799 res = self._loads(response)
800 800 return res
801 801 except ValueError as e:
802 802 raise WatchmanError("watchman response decode error: %s" % e)
803 803
804 804 def send(self, *args):
805 805 if hasattr(self, "bser_version"):
806 806 cmd = bser.dumps(
807 807 *args,
808 808 version=self.bser_version,
809 809 capabilities=self.bser_capabilities
810 810 )
811 811 else:
812 812 cmd = bser.dumps(*args)
813 813 self.transport.write(cmd)
814 814
815 815
816 816 class ImmutableBser2Codec(Bser2WithFallbackCodec, ImmutableBserCodec):
817 817 """use the BSER encoding, decoding values using the newer
818 818 immutable object support"""
819 819
820 820 pass
821 821
822 822
823 823 class JsonCodec(Codec):
824 824 """Use json codec. This is here primarily for testing purposes"""
825 825
826 826 json = None
827 827
828 828 def __init__(self, transport):
829 829 super(JsonCodec, self).__init__(transport)
830 830 # optional dep on json, only if JsonCodec is used
831 831 import json
832 832
833 833 self.json = json
834 834
835 835 def receive(self):
836 836 line = self.transport.readLine()
837 837 try:
838 838 # In Python 3, json.loads is a transformation from Unicode string to
839 839 # objects possibly containing Unicode strings. We typically expect
840 840 # the JSON blob to be ASCII-only with non-ASCII characters escaped,
841 841 # but it's possible we might get non-ASCII bytes that are valid
842 842 # UTF-8.
843 843 if compat.PYTHON3:
844 844 line = line.decode("utf-8")
845 845 return self.json.loads(line)
846 846 except Exception as e:
847 847 print(e, line)
848 848 raise
849 849
850 850 def send(self, *args):
851 851 cmd = self.json.dumps(*args)
852 852 # In Python 3, json.dumps is a transformation from objects possibly
853 853 # containing Unicode strings to Unicode string. Even with (the default)
854 854 # ensure_ascii=True, dumps returns a Unicode string.
855 855 if compat.PYTHON3:
856 856 cmd = cmd.encode("ascii")
857 857 self.transport.write(cmd + b"\n")
858 858
859 859
860 860 class client:
861 861 """Handles the communication with the watchman service"""
862 862
863 863 sockpath = None
864 864 transport = None
865 865 sendCodec = None
866 866 recvCodec = None
867 867 sendConn = None
868 868 recvConn = None
869 869 subs = {} # Keyed by subscription name
870 870 sub_by_root = {} # Keyed by root, then by subscription name
871 871 logs = [] # When log level is raised
872 872 unilateral = ["log", "subscription"]
873 873 tport = None
874 874 useImmutableBser = None
875 875 pid = None
876 876
877 877 def __init__(
878 878 self,
879 879 sockpath=None,
880 880 timeout=1.0,
881 881 transport=None,
882 882 sendEncoding=None,
883 883 recvEncoding=None,
884 884 useImmutableBser=False,
885 885 # use False for these two because None has a special
886 886 # meaning
887 887 valueEncoding=False,
888 888 valueErrors=False,
889 889 binpath=None,
890 890 ):
891 891 self.sockpath = sockpath
892 892 self.timeout = timeout
893 893 self.useImmutableBser = useImmutableBser
894 894 self.binpath = _default_binpath(binpath)
895 895
896 896 if inspect.isclass(transport) and issubclass(transport, Transport):
897 897 self.transport = transport
898 898 else:
899 899 transport = transport or os.getenv("WATCHMAN_TRANSPORT") or "local"
900 900 if transport == "local" and os.name == "nt":
901 901 self.transport = WindowsNamedPipeTransport
902 902 elif transport == "local":
903 903 self.transport = UnixSocketTransport
904 904 elif transport == "cli":
905 905 self.transport = CLIProcessTransport
906 906 if sendEncoding is None:
907 907 sendEncoding = "json"
908 908 if recvEncoding is None:
909 909 recvEncoding = sendEncoding
910 910 else:
911 911 raise WatchmanError("invalid transport %s" % transport)
912 912
913 913 sendEncoding = str(
914 914 sendEncoding or os.getenv("WATCHMAN_ENCODING") or "bser"
915 915 )
916 916 recvEncoding = str(
917 917 recvEncoding or os.getenv("WATCHMAN_ENCODING") or "bser"
918 918 )
919 919
920 920 self.recvCodec = self._parseEncoding(recvEncoding)
921 921 self.sendCodec = self._parseEncoding(sendEncoding)
922 922
923 923 # We want to act like the native OS methods as much as possible. This
924 924 # means returning bytestrings on Python 2 by default and Unicode
925 925 # strings on Python 3. However we take an optional argument that lets
926 926 # users override this.
927 927 if valueEncoding is False:
928 928 if compat.PYTHON3:
929 929 self.valueEncoding = encoding.get_local_encoding()
930 930 self.valueErrors = encoding.default_local_errors
931 931 else:
932 932 self.valueEncoding = None
933 933 self.valueErrors = None
934 934 else:
935 935 self.valueEncoding = valueEncoding
936 936 if valueErrors is False:
937 937 self.valueErrors = encoding.default_local_errors
938 938 else:
939 939 self.valueErrors = valueErrors
940 940
941 941 def _makeBSERCodec(self, codec):
942 942 def make_codec(transport):
943 943 return codec(transport, self.valueEncoding, self.valueErrors)
944 944
945 945 return make_codec
946 946
947 947 def _parseEncoding(self, enc):
948 948 if enc == "bser":
949 949 if self.useImmutableBser:
950 950 return self._makeBSERCodec(ImmutableBser2Codec)
951 951 return self._makeBSERCodec(Bser2WithFallbackCodec)
952 952 elif enc == "bser-v1":
953 953 if compat.PYTHON3:
954 954 raise BSERv1Unsupported(
955 955 "Python 3 does not support the BSER v1 encoding: specify "
956 956 '"bser" or omit the sendEncoding and recvEncoding '
957 957 "arguments"
958 958 )
959 959 if self.useImmutableBser:
960 960 return self._makeBSERCodec(ImmutableBserCodec)
961 961 return self._makeBSERCodec(BserCodec)
962 962 elif enc == "json":
963 963 return JsonCodec
964 964 else:
965 965 raise WatchmanError("invalid encoding %s" % enc)
966 966
967 967 def _hasprop(self, result, name):
968 968 if self.useImmutableBser:
969 969 return hasattr(result, name)
970 970 return name in result
971 971
972 972 def _resolvesockname(self):
973 973 # if invoked via a trigger, watchman will set this env var; we
974 974 # should use it unless explicitly set otherwise
975 975 path = os.getenv("WATCHMAN_SOCK")
976 976 if path:
977 977 return path
978 978
979 979 cmd = [self.binpath, "--output-encoding=bser", "get-sockname"]
980 980 try:
981 981 args = dict(
982 982 stdout=subprocess.PIPE, stderr=subprocess.PIPE
983 983 ) # noqa: C408
984 984
985 985 if os.name == "nt":
986 986 # if invoked via an application with graphical user interface,
987 987 # this call will cause a brief command window pop-up.
988 988 # Using the flag STARTF_USESHOWWINDOW to avoid this behavior.
989 989 startupinfo = subprocess.STARTUPINFO()
990 990 startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
991 991 args["startupinfo"] = startupinfo
992 992
993 993 p = subprocess.Popen(cmd, **args)
994 994
995 995 except OSError as e:
996 996 raise WatchmanError('"watchman" executable not in PATH (%s)' % e)
997 997
998 998 stdout, stderr = p.communicate()
999 999 exitcode = p.poll()
1000 1000
1001 1001 if exitcode:
1002 1002 raise WatchmanError("watchman exited with code %d" % exitcode)
1003 1003
1004 1004 result = bser.loads(stdout)
1005 1005 if "error" in result:
1006 1006 raise WatchmanError("get-sockname error: %s" % result["error"])
1007 1007
1008 1008 return result["sockname"]
1009 1009
1010 1010 def _connect(self):
1011 1011 """establish transport connection"""
1012 1012
1013 1013 if self.recvConn:
1014 1014 if self.pid != os.getpid():
1015 1015 raise UseAfterFork(
1016 1016 "do not re-use a connection after fork; open a new client instead"
1017 1017 )
1018 1018 return
1019 1019
1020 1020 if self.sockpath is None:
1021 1021 self.sockpath = self._resolvesockname()
1022 1022
1023 1023 kwargs = {}
1024 1024 if self.transport == CLIProcessTransport:
1025 1025 kwargs["binpath"] = self.binpath
1026 1026
1027 1027 self.tport = self.transport(self.sockpath, self.timeout, **kwargs)
1028 1028 self.sendConn = self.sendCodec(self.tport)
1029 1029 self.recvConn = self.recvCodec(self.tport)
1030 1030 self.pid = os.getpid()
1031 1031
1032 1032 def __del__(self):
1033 1033 self.close()
1034 1034
1035 1035 def __enter__(self):
1036 1036 self._connect()
1037 1037 return self
1038 1038
1039 1039 def __exit__(self, exc_type, exc_value, exc_traceback):
1040 1040 self.close()
1041 1041
1042 1042 def close(self):
1043 1043 if self.tport:
1044 1044 self.tport.close()
1045 1045 self.tport = None
1046 1046 self.recvConn = None
1047 1047 self.sendConn = None
1048 1048
1049 1049 def receive(self):
1050 1050 """receive the next PDU from the watchman service
1051 1051
1052 1052 If the client has activated subscriptions or logs then
1053 1053 this PDU may be a unilateral PDU sent by the service to
1054 1054 inform the client of a log event or subscription change.
1055 1055
1056 1056 It may also simply be the response portion of a request
1057 1057 initiated by query.
1058 1058
1059 1059 There are clients in production that subscribe and call
1060 1060 this in a loop to retrieve all subscription responses,
1061 1061 so care should be taken when making changes here.
1062 1062 """
1063 1063
1064 1064 self._connect()
1065 1065 result = self.recvConn.receive()
1066 1066 if self._hasprop(result, "error"):
1067 1067 raise CommandError(result["error"])
1068 1068
1069 1069 if self._hasprop(result, "log"):
1070 1070 self.logs.append(result["log"])
1071 1071
1072 1072 if self._hasprop(result, "subscription"):
1073 1073 sub = result["subscription"]
1074 1074 if not (sub in self.subs):
1075 1075 self.subs[sub] = []
1076 1076 self.subs[sub].append(result)
1077 1077
1078 1078 # also accumulate in {root,sub} keyed store
1079 1079 root = os.path.normpath(os.path.normcase(result["root"]))
1080 1080 if not root in self.sub_by_root:
1081 1081 self.sub_by_root[root] = {}
1082 1082 if not sub in self.sub_by_root[root]:
1083 1083 self.sub_by_root[root][sub] = []
1084 1084 self.sub_by_root[root][sub].append(result)
1085 1085
1086 1086 return result
1087 1087
1088 1088 def isUnilateralResponse(self, res):
1089 1089 if "unilateral" in res and res["unilateral"]:
1090 1090 return True
1091 1091 # Fall back to checking for known unilateral responses
1092 1092 for k in self.unilateral:
1093 1093 if k in res:
1094 1094 return True
1095 1095 return False
1096 1096
1097 1097 def getLog(self, remove=True):
1098 1098 """Retrieve buffered log data
1099 1099
1100 1100 If remove is true the data will be removed from the buffer.
1101 1101 Otherwise it will be left in the buffer
1102 1102 """
1103 1103 res = self.logs
1104 1104 if remove:
1105 1105 self.logs = []
1106 1106 return res
1107 1107
1108 1108 def getSubscription(self, name, remove=True, root=None):
1109 1109 """Retrieve the data associated with a named subscription
1110 1110
1111 1111 If remove is True (the default), the subscription data is removed
1112 1112 from the buffer. Otherwise the data is returned but left in
1113 1113 the buffer.
1114 1114
1115 1115 Returns None if there is no data associated with `name`
1116 1116
1117 1117 If root is not None, then only return the subscription
1118 1118 data that matches both root and name. When used in this way,
1119 1119 remove processing impacts both the unscoped and scoped stores
1120 1120 for the subscription data.
1121 1121 """
1122 1122 if root is not None:
1123 1123 root = os.path.normpath(os.path.normcase(root))
1124 1124 if root not in self.sub_by_root:
1125 1125 return None
1126 1126 if name not in self.sub_by_root[root]:
1127 1127 return None
1128 1128 sub = self.sub_by_root[root][name]
1129 1129 if remove:
1130 1130 del self.sub_by_root[root][name]
1131 1131 # don't let this grow unbounded
1132 1132 if name in self.subs:
1133 1133 del self.subs[name]
1134 1134 return sub
1135 1135
1136 1136 if name not in self.subs:
1137 1137 return None
1138 1138 sub = self.subs[name]
1139 1139 if remove:
1140 1140 del self.subs[name]
1141 1141 return sub
1142 1142
1143 1143 def query(self, *args):
1144 1144 """Send a query to the watchman service and return the response
1145 1145
1146 1146 This call will block until the response is returned.
1147 1147 If any unilateral responses are sent by the service in between
1148 1148 the request-response they will be buffered up in the client object
1149 1149 and NOT returned via this method.
1150 1150 """
1151 1151
1152 1152 log("calling client.query")
1153 1153 self._connect()
1154 1154 try:
1155 1155 self.sendConn.send(args)
1156 1156
1157 1157 res = self.receive()
1158 1158 while self.isUnilateralResponse(res):
1159 1159 res = self.receive()
1160 1160
1161 1161 return res
1162 1162 except EnvironmentError as ee:
1163 1163 # When we can depend on Python 3, we can use PEP 3134
1164 1164 # exception chaining here.
1165 1165 raise WatchmanEnvironmentError(
1166 1166 "I/O error communicating with watchman daemon",
1167 1167 ee.errno,
1168 1168 ee.strerror,
1169 1169 args,
1170 1170 )
1171 1171 except WatchmanError as ex:
1172 1172 ex.setCommand(args)
1173 1173 raise
1174 1174
1175 1175 def capabilityCheck(self, optional=None, required=None):
1176 1176 """Perform a server capability check"""
1177 1177 res = self.query(
1178 1178 "version", {"optional": optional or [], "required": required or []}
1179 1179 )
1180 1180
1181 1181 if not self._hasprop(res, "capabilities"):
1182 1182 # Server doesn't support capabilities, so we need to
1183 1183 # synthesize the results based on the version
1184 1184 capabilities.synthesize(res, optional)
1185 1185 if "error" in res:
1186 1186 raise CommandError(res["error"])
1187 1187
1188 1188 return res
1189 1189
1190 1190 def setTimeout(self, value):
1191 1191 self.recvConn.setTimeout(value)
1192 1192 self.sendConn.setTimeout(value)
General Comments 0
You need to be logged in to leave comments. Login now