__init__.py
1031 lines
| 34.8 KiB
| text/x-python
|
PythonLexer
Martijn Pieters
|
r28432 | # Copyright 2014-present Facebook, Inc. | ||
# All rights reserved. | ||||
# | ||||
# Redistribution and use in source and binary forms, with or without | ||||
# modification, are permitted provided that the following conditions are met: | ||||
# | ||||
# * Redistributions of source code must retain the above copyright notice, | ||||
# this list of conditions and the following disclaimer. | ||||
# | ||||
# * Redistributions in binary form must reproduce the above copyright notice, | ||||
# this list of conditions and the following disclaimer in the documentation | ||||
# and/or other materials provided with the distribution. | ||||
# | ||||
# * Neither the name Facebook nor the names of its contributors may be used to | ||||
# endorse or promote products derived from this software without specific | ||||
# prior written permission. | ||||
# | ||||
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" | ||||
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | ||||
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE | ||||
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE | ||||
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL | ||||
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR | ||||
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER | ||||
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, | ||||
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE | ||||
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | ||||
Zack Hricz
|
r30656 | from __future__ import absolute_import | ||
from __future__ import division | ||||
from __future__ import print_function | ||||
# no unicode literals | ||||
import inspect | ||||
import math | ||||
Martijn Pieters
|
r28432 | import os | ||
import socket | ||||
import subprocess | ||||
import time | ||||
# Sometimes it's really hard to get Python extensions to compile, | ||||
# so fall back to a pure Python implementation. | ||||
try: | ||||
Zack Hricz
|
r30656 | from . import bser | ||
# Demandimport causes modules to be loaded lazily. Force the load now | ||||
# so that we can fall back on pybser if bser doesn't exist | ||||
bser.pdu_info | ||||
Martijn Pieters
|
r28432 | except ImportError: | ||
Zack Hricz
|
r30656 | from . import pybser as bser | ||
Martijn Pieters
|
r28432 | |||
Matt Harbison
|
r39851 | from mercurial.utils import ( | ||
procutil, | ||||
) | ||||
from mercurial import ( | ||||
pycompat, | ||||
) | ||||
Zack Hricz
|
r30656 | from . import ( | ||
capabilities, | ||||
compat, | ||||
encoding, | ||||
load, | ||||
) | ||||
Martijn Pieters
|
r28432 | |||
if os.name == 'nt': | ||||
import ctypes | ||||
import ctypes.wintypes | ||||
wintypes = ctypes.wintypes | ||||
GENERIC_READ = 0x80000000 | ||||
GENERIC_WRITE = 0x40000000 | ||||
FILE_FLAG_OVERLAPPED = 0x40000000 | ||||
OPEN_EXISTING = 3 | ||||
INVALID_HANDLE_VALUE = -1 | ||||
FORMAT_MESSAGE_FROM_SYSTEM = 0x00001000 | ||||
FORMAT_MESSAGE_ALLOCATE_BUFFER = 0x00000100 | ||||
FORMAT_MESSAGE_IGNORE_INSERTS = 0x00000200 | ||||
Zack Hricz
|
r30656 | WAIT_FAILED = 0xFFFFFFFF | ||
Martijn Pieters
|
r28432 | WAIT_TIMEOUT = 0x00000102 | ||
WAIT_OBJECT_0 = 0x00000000 | ||||
Zack Hricz
|
r30656 | WAIT_IO_COMPLETION = 0x000000C0 | ||
INFINITE = 0xFFFFFFFF | ||||
# Overlapped I/O operation is in progress. (997) | ||||
ERROR_IO_PENDING = 0x000003E5 | ||||
# The pointer size follows the architecture | ||||
# We use WPARAM since this type is already conditionally defined | ||||
ULONG_PTR = ctypes.wintypes.WPARAM | ||||
Martijn Pieters
|
r28432 | |||
class OVERLAPPED(ctypes.Structure): | ||||
_fields_ = [ | ||||
Zack Hricz
|
r30656 | ("Internal", ULONG_PTR), ("InternalHigh", ULONG_PTR), | ||
Martijn Pieters
|
r28432 | ("Offset", wintypes.DWORD), ("OffsetHigh", wintypes.DWORD), | ||
("hEvent", wintypes.HANDLE) | ||||
] | ||||
def __init__(self): | ||||
Zack Hricz
|
r30656 | self.Internal = 0 | ||
self.InternalHigh = 0 | ||||
Martijn Pieters
|
r28432 | self.Offset = 0 | ||
self.OffsetHigh = 0 | ||||
self.hEvent = 0 | ||||
LPDWORD = ctypes.POINTER(wintypes.DWORD) | ||||
CreateFile = ctypes.windll.kernel32.CreateFileA | ||||
CreateFile.argtypes = [wintypes.LPSTR, wintypes.DWORD, wintypes.DWORD, | ||||
wintypes.LPVOID, wintypes.DWORD, wintypes.DWORD, | ||||
wintypes.HANDLE] | ||||
CreateFile.restype = wintypes.HANDLE | ||||
CloseHandle = ctypes.windll.kernel32.CloseHandle | ||||
CloseHandle.argtypes = [wintypes.HANDLE] | ||||
CloseHandle.restype = wintypes.BOOL | ||||
ReadFile = ctypes.windll.kernel32.ReadFile | ||||
ReadFile.argtypes = [wintypes.HANDLE, wintypes.LPVOID, wintypes.DWORD, | ||||
LPDWORD, ctypes.POINTER(OVERLAPPED)] | ||||
ReadFile.restype = wintypes.BOOL | ||||
WriteFile = ctypes.windll.kernel32.WriteFile | ||||
WriteFile.argtypes = [wintypes.HANDLE, wintypes.LPVOID, wintypes.DWORD, | ||||
LPDWORD, ctypes.POINTER(OVERLAPPED)] | ||||
WriteFile.restype = wintypes.BOOL | ||||
GetLastError = ctypes.windll.kernel32.GetLastError | ||||
GetLastError.argtypes = [] | ||||
GetLastError.restype = wintypes.DWORD | ||||
Zack Hricz
|
r30656 | SetLastError = ctypes.windll.kernel32.SetLastError | ||
SetLastError.argtypes = [wintypes.DWORD] | ||||
SetLastError.restype = None | ||||
Martijn Pieters
|
r28432 | FormatMessage = ctypes.windll.kernel32.FormatMessageA | ||
FormatMessage.argtypes = [wintypes.DWORD, wintypes.LPVOID, wintypes.DWORD, | ||||
wintypes.DWORD, ctypes.POINTER(wintypes.LPSTR), | ||||
wintypes.DWORD, wintypes.LPVOID] | ||||
FormatMessage.restype = wintypes.DWORD | ||||
LocalFree = ctypes.windll.kernel32.LocalFree | ||||
Zack Hricz
|
r30656 | GetOverlappedResult = ctypes.windll.kernel32.GetOverlappedResult | ||
GetOverlappedResult.argtypes = [wintypes.HANDLE, | ||||
ctypes.POINTER(OVERLAPPED), LPDWORD, | ||||
wintypes.BOOL] | ||||
GetOverlappedResult.restype = wintypes.BOOL | ||||
Martijn Pieters
|
r28432 | |||
Zack Hricz
|
r30656 | GetOverlappedResultEx = getattr(ctypes.windll.kernel32, | ||
'GetOverlappedResultEx', None) | ||||
if GetOverlappedResultEx is not None: | ||||
GetOverlappedResultEx.argtypes = [wintypes.HANDLE, | ||||
ctypes.POINTER(OVERLAPPED), LPDWORD, | ||||
wintypes.DWORD, wintypes.BOOL] | ||||
GetOverlappedResultEx.restype = wintypes.BOOL | ||||
WaitForSingleObjectEx = ctypes.windll.kernel32.WaitForSingleObjectEx | ||||
WaitForSingleObjectEx.argtypes = [wintypes.HANDLE, wintypes.DWORD, wintypes.BOOL] | ||||
WaitForSingleObjectEx.restype = wintypes.DWORD | ||||
CreateEvent = ctypes.windll.kernel32.CreateEventA | ||||
CreateEvent.argtypes = [LPDWORD, wintypes.BOOL, wintypes.BOOL, | ||||
wintypes.LPSTR] | ||||
CreateEvent.restype = wintypes.HANDLE | ||||
# Windows Vista is the minimum supported client for CancelIoEx. | ||||
Martijn Pieters
|
r28432 | CancelIoEx = ctypes.windll.kernel32.CancelIoEx | ||
CancelIoEx.argtypes = [wintypes.HANDLE, ctypes.POINTER(OVERLAPPED)] | ||||
CancelIoEx.restype = wintypes.BOOL | ||||
# 2 bytes marker, 1 byte int size, 8 bytes int64 value | ||||
sniff_len = 13 | ||||
# This is a helper for debugging the client. | ||||
_debugging = False | ||||
if _debugging: | ||||
def log(fmt, *args): | ||||
print('[%s] %s' % | ||||
(time.strftime("%a, %d %b %Y %H:%M:%S", time.gmtime()), | ||||
fmt % args[:])) | ||||
else: | ||||
def log(fmt, *args): | ||||
pass | ||||
Zack Hricz
|
r30656 | def _win32_strerror(err): | ||
""" expand a win32 error code into a human readable message """ | ||||
# FormatMessage will allocate memory and assign it here | ||||
buf = ctypes.c_char_p() | ||||
FormatMessage( | ||||
FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_ALLOCATE_BUFFER | ||||
| FORMAT_MESSAGE_IGNORE_INSERTS, None, err, 0, buf, 0, None) | ||||
try: | ||||
return buf.value | ||||
finally: | ||||
LocalFree(buf) | ||||
Martijn Pieters
|
r28432 | class WatchmanError(Exception): | ||
Zack Hricz
|
r30656 | def __init__(self, msg=None, cmd=None): | ||
self.msg = msg | ||||
self.cmd = cmd | ||||
def setCommand(self, cmd): | ||||
self.cmd = cmd | ||||
def __str__(self): | ||||
if self.cmd: | ||||
return '%s, while executing %s' % (self.msg, self.cmd) | ||||
return self.msg | ||||
class WatchmanEnvironmentError(WatchmanError): | ||||
def __init__(self, msg, errno, errmsg, cmd=None): | ||||
super(WatchmanEnvironmentError, self).__init__( | ||||
'{0}: errno={1} errmsg={2}'.format(msg, errno, errmsg), | ||||
cmd) | ||||
class SocketConnectError(WatchmanError): | ||||
def __init__(self, sockpath, exc): | ||||
super(SocketConnectError, self).__init__( | ||||
'unable to connect to %s: %s' % (sockpath, exc)) | ||||
self.sockpath = sockpath | ||||
self.exc = exc | ||||
Martijn Pieters
|
r28432 | |||
class SocketTimeout(WatchmanError): | ||||
"""A specialized exception raised for socket timeouts during communication to/from watchman. | ||||
This makes it easier to implement non-blocking loops as callers can easily distinguish | ||||
between a routine timeout and an actual error condition. | ||||
Note that catching WatchmanError will also catch this as it is a super-class, so backwards | ||||
compatibility in exception handling is preserved. | ||||
""" | ||||
class CommandError(WatchmanError): | ||||
"""error returned by watchman | ||||
self.msg is the message returned by watchman. | ||||
""" | ||||
def __init__(self, msg, cmd=None): | ||||
Zack Hricz
|
r30656 | super(CommandError, self).__init__( | ||
'watchman command error: %s' % (msg, ), | ||||
cmd, | ||||
) | ||||
Martijn Pieters
|
r28432 | |||
class Transport(object): | ||||
""" communication transport to the watchman server """ | ||||
buf = None | ||||
def close(self): | ||||
""" tear it down """ | ||||
raise NotImplementedError() | ||||
def readBytes(self, size): | ||||
""" read size bytes """ | ||||
raise NotImplementedError() | ||||
def write(self, buf): | ||||
""" write some data """ | ||||
raise NotImplementedError() | ||||
def setTimeout(self, value): | ||||
pass | ||||
def readLine(self): | ||||
""" read a line | ||||
Maintains its own buffer, callers of the transport should not mix | ||||
calls to readBytes and readLine. | ||||
""" | ||||
if self.buf is None: | ||||
self.buf = [] | ||||
# Buffer may already have a line if we've received unilateral | ||||
# response(s) from the server | ||||
Zack Hricz
|
r30656 | if len(self.buf) == 1 and b"\n" in self.buf[0]: | ||
(line, b) = self.buf[0].split(b"\n", 1) | ||||
Martijn Pieters
|
r28432 | self.buf = [b] | ||
return line | ||||
while True: | ||||
b = self.readBytes(4096) | ||||
Zack Hricz
|
r30656 | if b"\n" in b: | ||
result = b''.join(self.buf) | ||||
(line, b) = b.split(b"\n", 1) | ||||
Martijn Pieters
|
r28432 | self.buf = [b] | ||
return result + line | ||||
self.buf.append(b) | ||||
class Codec(object): | ||||
""" communication encoding for the watchman server """ | ||||
transport = None | ||||
def __init__(self, transport): | ||||
self.transport = transport | ||||
def receive(self): | ||||
raise NotImplementedError() | ||||
def send(self, *args): | ||||
raise NotImplementedError() | ||||
def setTimeout(self, value): | ||||
self.transport.setTimeout(value) | ||||
class UnixSocketTransport(Transport): | ||||
""" local unix domain socket transport """ | ||||
sock = None | ||||
def __init__(self, sockpath, timeout): | ||||
self.sockpath = sockpath | ||||
self.timeout = timeout | ||||
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | ||||
try: | ||||
sock.settimeout(self.timeout) | ||||
sock.connect(self.sockpath) | ||||
self.sock = sock | ||||
except socket.error as e: | ||||
Zack Hricz
|
r30656 | sock.close() | ||
raise SocketConnectError(self.sockpath, e) | ||||
Martijn Pieters
|
r28432 | |||
def close(self): | ||||
self.sock.close() | ||||
self.sock = None | ||||
def setTimeout(self, value): | ||||
self.timeout = value | ||||
self.sock.settimeout(self.timeout) | ||||
def readBytes(self, size): | ||||
try: | ||||
buf = [self.sock.recv(size)] | ||||
if not buf[0]: | ||||
raise WatchmanError('empty watchman response') | ||||
return buf[0] | ||||
except socket.timeout: | ||||
raise SocketTimeout('timed out waiting for response') | ||||
def write(self, data): | ||||
try: | ||||
self.sock.sendall(data) | ||||
except socket.timeout: | ||||
raise SocketTimeout('timed out sending query command') | ||||
Zack Hricz
|
r30656 | def _get_overlapped_result_ex_impl(pipe, olap, nbytes, millis, alertable): | ||
""" Windows 7 and earlier does not support GetOverlappedResultEx. The | ||||
alternative is to use GetOverlappedResult and wait for read or write | ||||
operation to complete. This is done be using CreateEvent and | ||||
WaitForSingleObjectEx. CreateEvent, WaitForSingleObjectEx | ||||
and GetOverlappedResult are all part of Windows API since WindowsXP. | ||||
This is the exact same implementation that can be found in the watchman | ||||
source code (see get_overlapped_result_ex_impl in stream_win.c). This | ||||
way, maintenance should be simplified. | ||||
""" | ||||
log('Preparing to wait for maximum %dms', millis ) | ||||
if millis != 0: | ||||
waitReturnCode = WaitForSingleObjectEx(olap.hEvent, millis, alertable) | ||||
if waitReturnCode == WAIT_OBJECT_0: | ||||
# Event is signaled, overlapped IO operation result should be available. | ||||
pass | ||||
elif waitReturnCode == WAIT_IO_COMPLETION: | ||||
# WaitForSingleObjectEx returnes because the system added an I/O completion | ||||
# routine or an asynchronous procedure call (APC) to the thread queue. | ||||
SetLastError(WAIT_IO_COMPLETION) | ||||
pass | ||||
elif waitReturnCode == WAIT_TIMEOUT: | ||||
# We reached the maximum allowed wait time, the IO operation failed | ||||
# to complete in timely fashion. | ||||
SetLastError(WAIT_TIMEOUT) | ||||
return False | ||||
elif waitReturnCode == WAIT_FAILED: | ||||
# something went wrong calling WaitForSingleObjectEx | ||||
err = GetLastError() | ||||
log('WaitForSingleObjectEx failed: %s', _win32_strerror(err)) | ||||
return False | ||||
else: | ||||
# unexpected situation deserving investigation. | ||||
err = GetLastError() | ||||
log('Unexpected error: %s', _win32_strerror(err)) | ||||
return False | ||||
return GetOverlappedResult(pipe, olap, nbytes, False) | ||||
Martijn Pieters
|
r28432 | class WindowsNamedPipeTransport(Transport): | ||
""" connect to a named pipe """ | ||||
def __init__(self, sockpath, timeout): | ||||
self.sockpath = sockpath | ||||
self.timeout = int(math.ceil(timeout * 1000)) | ||||
self._iobuf = None | ||||
self.pipe = CreateFile(sockpath, GENERIC_READ | GENERIC_WRITE, 0, None, | ||||
OPEN_EXISTING, FILE_FLAG_OVERLAPPED, None) | ||||
if self.pipe == INVALID_HANDLE_VALUE: | ||||
self.pipe = None | ||||
self._raise_win_err('failed to open pipe %s' % sockpath, | ||||
GetLastError()) | ||||
Zack Hricz
|
r30656 | # event for the overlapped I/O operations | ||
self._waitable = CreateEvent(None, True, False, None) | ||||
if self._waitable is None: | ||||
self._raise_win_err('CreateEvent failed', GetLastError()) | ||||
Martijn Pieters
|
r28432 | |||
Zack Hricz
|
r30656 | self._get_overlapped_result_ex = GetOverlappedResultEx | ||
if (os.getenv('WATCHMAN_WIN7_COMPAT') == '1' or | ||||
self._get_overlapped_result_ex is None): | ||||
self._get_overlapped_result_ex = _get_overlapped_result_ex_impl | ||||
Martijn Pieters
|
r28432 | |||
def _raise_win_err(self, msg, err): | ||||
raise IOError('%s win32 error code: %d %s' % | ||||
Zack Hricz
|
r30656 | (msg, err, _win32_strerror(err))) | ||
Martijn Pieters
|
r28432 | |||
def close(self): | ||||
if self.pipe: | ||||
Zack Hricz
|
r30656 | log('Closing pipe') | ||
Martijn Pieters
|
r28432 | CloseHandle(self.pipe) | ||
self.pipe = None | ||||
Zack Hricz
|
r30656 | if self._waitable is not None: | ||
# We release the handle for the event | ||||
CloseHandle(self._waitable) | ||||
self._waitable = None | ||||
def setTimeout(self, value): | ||||
# convert to milliseconds | ||||
self.timeout = int(value * 1000) | ||||
Martijn Pieters
|
r28432 | def readBytes(self, size): | ||
""" A read can block for an unbounded amount of time, even if the | ||||
kernel reports that the pipe handle is signalled, so we need to | ||||
always perform our reads asynchronously | ||||
""" | ||||
# try to satisfy the read from any buffered data | ||||
if self._iobuf: | ||||
if size >= len(self._iobuf): | ||||
res = self._iobuf | ||||
self.buf = None | ||||
return res | ||||
res = self._iobuf[:size] | ||||
self._iobuf = self._iobuf[size:] | ||||
return res | ||||
# We need to initiate a read | ||||
buf = ctypes.create_string_buffer(size) | ||||
olap = OVERLAPPED() | ||||
Zack Hricz
|
r30656 | olap.hEvent = self._waitable | ||
Martijn Pieters
|
r28432 | |||
log('made read buff of size %d', size) | ||||
# ReadFile docs warn against sending in the nread parameter for async | ||||
# operations, so we always collect it via GetOverlappedResultEx | ||||
immediate = ReadFile(self.pipe, buf, size, None, olap) | ||||
if not immediate: | ||||
err = GetLastError() | ||||
if err != ERROR_IO_PENDING: | ||||
self._raise_win_err('failed to read %d bytes' % size, | ||||
GetLastError()) | ||||
nread = wintypes.DWORD() | ||||
Zack Hricz
|
r30656 | if not self._get_overlapped_result_ex(self.pipe, olap, nread, | ||
0 if immediate else self.timeout, | ||||
True): | ||||
Martijn Pieters
|
r28432 | err = GetLastError() | ||
CancelIoEx(self.pipe, olap) | ||||
if err == WAIT_TIMEOUT: | ||||
log('GetOverlappedResultEx timedout') | ||||
raise SocketTimeout('timed out after waiting %dms for read' % | ||||
self.timeout) | ||||
log('GetOverlappedResultEx reports error %d', err) | ||||
self._raise_win_err('error while waiting for read', err) | ||||
nread = nread.value | ||||
if nread == 0: | ||||
# Docs say that named pipes return 0 byte when the other end did | ||||
# a zero byte write. Since we don't ever do that, the only | ||||
# other way this shows up is if the client has gotten in a weird | ||||
# state, so let's bail out | ||||
CancelIoEx(self.pipe, olap) | ||||
raise IOError('Async read yielded 0 bytes; unpossible!') | ||||
# Holds precisely the bytes that we read from the prior request | ||||
buf = buf[:nread] | ||||
returned_size = min(nread, size) | ||||
if returned_size == nread: | ||||
return buf | ||||
# keep any left-overs around for a later read to consume | ||||
self._iobuf = buf[returned_size:] | ||||
return buf[:returned_size] | ||||
def write(self, data): | ||||
olap = OVERLAPPED() | ||||
Zack Hricz
|
r30656 | olap.hEvent = self._waitable | ||
Martijn Pieters
|
r28432 | immediate = WriteFile(self.pipe, ctypes.c_char_p(data), len(data), | ||
None, olap) | ||||
if not immediate: | ||||
err = GetLastError() | ||||
if err != ERROR_IO_PENDING: | ||||
self._raise_win_err('failed to write %d bytes' % len(data), | ||||
GetLastError()) | ||||
# Obtain results, waiting if needed | ||||
nwrote = wintypes.DWORD() | ||||
Zack Hricz
|
r30656 | if self._get_overlapped_result_ex(self.pipe, olap, nwrote, | ||
0 if immediate else self.timeout, | ||||
True): | ||||
log('made write of %d bytes', nwrote.value) | ||||
Martijn Pieters
|
r28432 | return nwrote.value | ||
err = GetLastError() | ||||
# It's potentially unsafe to allow the write to continue after | ||||
# we unwind, so let's make a best effort to avoid that happening | ||||
CancelIoEx(self.pipe, olap) | ||||
if err == WAIT_TIMEOUT: | ||||
raise SocketTimeout('timed out after waiting %dms for write' % | ||||
self.timeout) | ||||
self._raise_win_err('error while waiting for write of %d bytes' % | ||||
len(data), err) | ||||
class CLIProcessTransport(Transport): | ||||
""" open a pipe to the cli to talk to the service | ||||
This intended to be used only in the test harness! | ||||
The CLI is an oddball because we only support JSON input | ||||
and cannot send multiple commands through the same instance, | ||||
so we spawn a new process for each command. | ||||
We disable server spawning for this implementation, again, because | ||||
it is intended to be used only in our test harness. You really | ||||
should not need to use the CLI transport for anything real. | ||||
While the CLI can output in BSER, our Transport interface doesn't | ||||
support telling this instance that it should do so. That effectively | ||||
limits this implementation to JSON input and output only at this time. | ||||
It is the responsibility of the caller to set the send and | ||||
receive codecs appropriately. | ||||
""" | ||||
proc = None | ||||
closed = True | ||||
def __init__(self, sockpath, timeout): | ||||
self.sockpath = sockpath | ||||
self.timeout = timeout | ||||
def close(self): | ||||
if self.proc: | ||||
Zack Hricz
|
r30656 | if self.proc.pid is not None: | ||
self.proc.kill() | ||||
self.proc.stdin.close() | ||||
self.proc.stdout.close() | ||||
Martijn Pieters
|
r28432 | self.proc = None | ||
def _connect(self): | ||||
if self.proc: | ||||
return self.proc | ||||
args = [ | ||||
'watchman', | ||||
Zack Hricz
|
r30656 | '--sockname={0}'.format(self.sockpath), | ||
Martijn Pieters
|
r28432 | '--logfile=/BOGUS', | ||
'--statefile=/BOGUS', | ||||
'--no-spawn', | ||||
'--no-local', | ||||
'--no-pretty', | ||||
'-j', | ||||
] | ||||
Matt Harbison
|
r39851 | self.proc = subprocess.Popen(pycompat.rapply(procutil.tonativestr, | ||
args), | ||||
Martijn Pieters
|
r28432 | stdin=subprocess.PIPE, | ||
stdout=subprocess.PIPE) | ||||
return self.proc | ||||
def readBytes(self, size): | ||||
self._connect() | ||||
res = self.proc.stdout.read(size) | ||||
if res == '': | ||||
raise WatchmanError('EOF on CLI process transport') | ||||
return res | ||||
def write(self, data): | ||||
if self.closed: | ||||
Zack Hricz
|
r30656 | self.close() | ||
Martijn Pieters
|
r28432 | self.closed = False | ||
self._connect() | ||||
res = self.proc.stdin.write(data) | ||||
self.proc.stdin.close() | ||||
self.closed = True | ||||
return res | ||||
class BserCodec(Codec): | ||||
""" use the BSER encoding. This is the default, preferred codec """ | ||||
def _loads(self, response): | ||||
Zack Hricz
|
r30656 | return bser.loads(response) # Defaults to BSER v1 | ||
Martijn Pieters
|
r28432 | |||
def receive(self): | ||||
buf = [self.transport.readBytes(sniff_len)] | ||||
if not buf[0]: | ||||
raise WatchmanError('empty watchman response') | ||||
Zack Hricz
|
r30656 | _1, _2, elen = bser.pdu_info(buf[0]) | ||
Martijn Pieters
|
r28432 | |||
rlen = len(buf[0]) | ||||
while elen > rlen: | ||||
buf.append(self.transport.readBytes(elen - rlen)) | ||||
rlen += len(buf[-1]) | ||||
Zack Hricz
|
r30656 | response = b''.join(buf) | ||
Martijn Pieters
|
r28432 | try: | ||
res = self._loads(response) | ||||
return res | ||||
except ValueError as e: | ||||
raise WatchmanError('watchman response decode error: %s' % e) | ||||
def send(self, *args): | ||||
Zack Hricz
|
r30656 | cmd = bser.dumps(*args) # Defaults to BSER v1 | ||
Martijn Pieters
|
r28432 | self.transport.write(cmd) | ||
class ImmutableBserCodec(BserCodec): | ||||
""" use the BSER encoding, decoding values using the newer | ||||
immutable object support """ | ||||
def _loads(self, response): | ||||
Zack Hricz
|
r30656 | return bser.loads(response, False) # Defaults to BSER v1 | ||
class Bser2WithFallbackCodec(BserCodec): | ||||
""" use BSER v2 encoding """ | ||||
def __init__(self, transport): | ||||
super(Bser2WithFallbackCodec, self).__init__(transport) | ||||
# Once the server advertises support for bser-v2 we should switch this | ||||
# to 'required' on Python 3. | ||||
self.send(["version", {"optional": ["bser-v2"]}]) | ||||
capabilities = self.receive() | ||||
if 'error' in capabilities: | ||||
raise Exception('Unsupported BSER version') | ||||
if capabilities['capabilities']['bser-v2']: | ||||
self.bser_version = 2 | ||||
self.bser_capabilities = 0 | ||||
else: | ||||
self.bser_version = 1 | ||||
self.bser_capabilities = 0 | ||||
def _loads(self, response): | ||||
return bser.loads(response) | ||||
def receive(self): | ||||
buf = [self.transport.readBytes(sniff_len)] | ||||
if not buf[0]: | ||||
raise WatchmanError('empty watchman response') | ||||
recv_bser_version, recv_bser_capabilities, elen = bser.pdu_info(buf[0]) | ||||
if hasattr(self, 'bser_version'): | ||||
# Readjust BSER version and capabilities if necessary | ||||
self.bser_version = max(self.bser_version, recv_bser_version) | ||||
self.capabilities = self.bser_capabilities & recv_bser_capabilities | ||||
rlen = len(buf[0]) | ||||
while elen > rlen: | ||||
buf.append(self.transport.readBytes(elen - rlen)) | ||||
rlen += len(buf[-1]) | ||||
response = b''.join(buf) | ||||
try: | ||||
res = self._loads(response) | ||||
return res | ||||
except ValueError as e: | ||||
raise WatchmanError('watchman response decode error: %s' % e) | ||||
def send(self, *args): | ||||
if hasattr(self, 'bser_version'): | ||||
cmd = bser.dumps(*args, version=self.bser_version, | ||||
capabilities=self.bser_capabilities) | ||||
else: | ||||
cmd = bser.dumps(*args) | ||||
self.transport.write(cmd) | ||||
Martijn Pieters
|
r28432 | |||
class JsonCodec(Codec): | ||||
""" Use json codec. This is here primarily for testing purposes """ | ||||
json = None | ||||
def __init__(self, transport): | ||||
super(JsonCodec, self).__init__(transport) | ||||
# optional dep on json, only if JsonCodec is used | ||||
import json | ||||
self.json = json | ||||
def receive(self): | ||||
line = self.transport.readLine() | ||||
try: | ||||
Zack Hricz
|
r30656 | # In Python 3, json.loads is a transformation from Unicode string to | ||
# objects possibly containing Unicode strings. We typically expect | ||||
# the JSON blob to be ASCII-only with non-ASCII characters escaped, | ||||
# but it's possible we might get non-ASCII bytes that are valid | ||||
# UTF-8. | ||||
if compat.PYTHON3: | ||||
line = line.decode('utf-8') | ||||
Martijn Pieters
|
r28432 | return self.json.loads(line) | ||
except Exception as e: | ||||
print(e, line) | ||||
raise | ||||
def send(self, *args): | ||||
cmd = self.json.dumps(*args) | ||||
Zack Hricz
|
r30656 | # In Python 3, json.dumps is a transformation from objects possibly | ||
# containing Unicode strings to Unicode string. Even with (the default) | ||||
# ensure_ascii=True, dumps returns a Unicode string. | ||||
if compat.PYTHON3: | ||||
cmd = cmd.encode('ascii') | ||||
self.transport.write(cmd + b"\n") | ||||
Martijn Pieters
|
r28432 | |||
class client(object): | ||||
""" Handles the communication with the watchman service """ | ||||
sockpath = None | ||||
transport = None | ||||
sendCodec = None | ||||
recvCodec = None | ||||
sendConn = None | ||||
recvConn = None | ||||
subs = {} # Keyed by subscription name | ||||
sub_by_root = {} # Keyed by root, then by subscription name | ||||
logs = [] # When log level is raised | ||||
unilateral = ['log', 'subscription'] | ||||
tport = None | ||||
useImmutableBser = None | ||||
def __init__(self, | ||||
sockpath=None, | ||||
timeout=1.0, | ||||
transport=None, | ||||
sendEncoding=None, | ||||
recvEncoding=None, | ||||
useImmutableBser=False): | ||||
self.sockpath = sockpath | ||||
self.timeout = timeout | ||||
self.useImmutableBser = useImmutableBser | ||||
Zack Hricz
|
r30656 | if inspect.isclass(transport) and issubclass(transport, Transport): | ||
self.transport = transport | ||||
Martijn Pieters
|
r28432 | else: | ||
Zack Hricz
|
r30656 | transport = transport or os.getenv('WATCHMAN_TRANSPORT') or 'local' | ||
if transport == 'local' and os.name == 'nt': | ||||
self.transport = WindowsNamedPipeTransport | ||||
elif transport == 'local': | ||||
self.transport = UnixSocketTransport | ||||
elif transport == 'cli': | ||||
self.transport = CLIProcessTransport | ||||
if sendEncoding is None: | ||||
sendEncoding = 'json' | ||||
if recvEncoding is None: | ||||
recvEncoding = sendEncoding | ||||
else: | ||||
raise WatchmanError('invalid transport %s' % transport) | ||||
Martijn Pieters
|
r28432 | |||
Zack Hricz
|
r30656 | sendEncoding = str(sendEncoding or os.getenv('WATCHMAN_ENCODING') or | ||
'bser') | ||||
recvEncoding = str(recvEncoding or os.getenv('WATCHMAN_ENCODING') or | ||||
'bser') | ||||
Martijn Pieters
|
r28432 | |||
self.recvCodec = self._parseEncoding(recvEncoding) | ||||
self.sendCodec = self._parseEncoding(sendEncoding) | ||||
def _parseEncoding(self, enc): | ||||
if enc == 'bser': | ||||
if self.useImmutableBser: | ||||
return ImmutableBserCodec | ||||
return BserCodec | ||||
Zack Hricz
|
r30656 | elif enc == 'experimental-bser-v2': | ||
return Bser2WithFallbackCodec | ||||
Martijn Pieters
|
r28432 | elif enc == 'json': | ||
return JsonCodec | ||||
else: | ||||
raise WatchmanError('invalid encoding %s' % enc) | ||||
def _hasprop(self, result, name): | ||||
if self.useImmutableBser: | ||||
return hasattr(result, name) | ||||
return name in result | ||||
def _resolvesockname(self): | ||||
# if invoked via a trigger, watchman will set this env var; we | ||||
# should use it unless explicitly set otherwise | ||||
path = os.getenv('WATCHMAN_SOCK') | ||||
if path: | ||||
return path | ||||
cmd = ['watchman', '--output-encoding=bser', 'get-sockname'] | ||||
try: | ||||
Zack Hricz
|
r30656 | args = dict(stdout=subprocess.PIPE, | ||
stderr=subprocess.PIPE, | ||||
close_fds=os.name != 'nt') | ||||
if os.name == 'nt': | ||||
# if invoked via an application with graphical user interface, | ||||
# this call will cause a brief command window pop-up. | ||||
# Using the flag STARTF_USESHOWWINDOW to avoid this behavior. | ||||
startupinfo = subprocess.STARTUPINFO() | ||||
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW | ||||
args['startupinfo'] = startupinfo | ||||
Matt Harbison
|
r39851 | p = subprocess.Popen(pycompat.rapply(procutil.tonativestr, cmd), | ||
**args) | ||||
Zack Hricz
|
r30656 | |||
Martijn Pieters
|
r28432 | except OSError as e: | ||
Jun Wu
|
r33764 | raise WatchmanError('"watchman" executable not in PATH (%s)' % e) | ||
Martijn Pieters
|
r28432 | |||
stdout, stderr = p.communicate() | ||||
exitcode = p.poll() | ||||
if exitcode: | ||||
raise WatchmanError("watchman exited with code %d" % exitcode) | ||||
result = bser.loads(stdout) | ||||
Zack Hricz
|
r30656 | if b'error' in result: | ||
Martijn Pieters
|
r28432 | raise WatchmanError('get-sockname error: %s' % result['error']) | ||
Zack Hricz
|
r30656 | return result[b'sockname'] | ||
Martijn Pieters
|
r28432 | |||
def _connect(self): | ||||
""" establish transport connection """ | ||||
if self.recvConn: | ||||
return | ||||
if self.sockpath is None: | ||||
self.sockpath = self._resolvesockname() | ||||
self.tport = self.transport(self.sockpath, self.timeout) | ||||
self.sendConn = self.sendCodec(self.tport) | ||||
self.recvConn = self.recvCodec(self.tport) | ||||
def __del__(self): | ||||
self.close() | ||||
def close(self): | ||||
if self.tport: | ||||
self.tport.close() | ||||
self.tport = None | ||||
self.recvConn = None | ||||
self.sendConn = None | ||||
def receive(self): | ||||
""" receive the next PDU from the watchman service | ||||
If the client has activated subscriptions or logs then | ||||
this PDU may be a unilateral PDU sent by the service to | ||||
inform the client of a log event or subscription change. | ||||
It may also simply be the response portion of a request | ||||
initiated by query. | ||||
There are clients in production that subscribe and call | ||||
this in a loop to retrieve all subscription responses, | ||||
so care should be taken when making changes here. | ||||
""" | ||||
self._connect() | ||||
result = self.recvConn.receive() | ||||
if self._hasprop(result, 'error'): | ||||
Zack Hricz
|
r30656 | error = result['error'] | ||
if compat.PYTHON3 and isinstance(self.recvConn, BserCodec): | ||||
error = result['error'].decode('utf-8', 'surrogateescape') | ||||
raise CommandError(error) | ||||
Martijn Pieters
|
r28432 | |||
if self._hasprop(result, 'log'): | ||||
Zack Hricz
|
r30656 | log = result['log'] | ||
if compat.PYTHON3 and isinstance(self.recvConn, BserCodec): | ||||
log = log.decode('utf-8', 'surrogateescape') | ||||
self.logs.append(log) | ||||
Martijn Pieters
|
r28432 | |||
if self._hasprop(result, 'subscription'): | ||||
sub = result['subscription'] | ||||
if not (sub in self.subs): | ||||
self.subs[sub] = [] | ||||
self.subs[sub].append(result) | ||||
# also accumulate in {root,sub} keyed store | ||||
root = os.path.normcase(result['root']) | ||||
if not root in self.sub_by_root: | ||||
self.sub_by_root[root] = {} | ||||
if not sub in self.sub_by_root[root]: | ||||
self.sub_by_root[root][sub] = [] | ||||
self.sub_by_root[root][sub].append(result) | ||||
return result | ||||
def isUnilateralResponse(self, res): | ||||
Zack Hricz
|
r30656 | if 'unilateral' in res and res['unilateral']: | ||
return True | ||||
# Fall back to checking for known unilateral responses | ||||
Martijn Pieters
|
r28432 | for k in self.unilateral: | ||
if k in res: | ||||
return True | ||||
return False | ||||
def getLog(self, remove=True): | ||||
""" Retrieve buffered log data | ||||
If remove is true the data will be removed from the buffer. | ||||
Otherwise it will be left in the buffer | ||||
""" | ||||
res = self.logs | ||||
if remove: | ||||
self.logs = [] | ||||
return res | ||||
def getSubscription(self, name, remove=True, root=None): | ||||
""" Retrieve the data associated with a named subscription | ||||
If remove is True (the default), the subscription data is removed | ||||
from the buffer. Otherwise the data is returned but left in | ||||
the buffer. | ||||
Returns None if there is no data associated with `name` | ||||
If root is not None, then only return the subscription | ||||
data that matches both root and name. When used in this way, | ||||
remove processing impacts both the unscoped and scoped stores | ||||
for the subscription data. | ||||
""" | ||||
Zack Hricz
|
r30656 | if compat.PYTHON3 and issubclass(self.recvCodec, BserCodec): | ||
# People may pass in Unicode strings here -- but currently BSER only | ||||
# returns bytestrings. Deal with that. | ||||
if isinstance(root, str): | ||||
root = encoding.encode_local(root) | ||||
if isinstance(name, str): | ||||
name = name.encode('utf-8') | ||||
Martijn Pieters
|
r28432 | |||
if root is not None: | ||||
if not root in self.sub_by_root: | ||||
return None | ||||
if not name in self.sub_by_root[root]: | ||||
return None | ||||
sub = self.sub_by_root[root][name] | ||||
if remove: | ||||
del self.sub_by_root[root][name] | ||||
# don't let this grow unbounded | ||||
if name in self.subs: | ||||
del self.subs[name] | ||||
return sub | ||||
if not (name in self.subs): | ||||
return None | ||||
sub = self.subs[name] | ||||
if remove: | ||||
del self.subs[name] | ||||
return sub | ||||
def query(self, *args): | ||||
""" Send a query to the watchman service and return the response | ||||
This call will block until the response is returned. | ||||
If any unilateral responses are sent by the service in between | ||||
the request-response they will be buffered up in the client object | ||||
and NOT returned via this method. | ||||
""" | ||||
log('calling client.query') | ||||
self._connect() | ||||
try: | ||||
self.sendConn.send(args) | ||||
res = self.receive() | ||||
while self.isUnilateralResponse(res): | ||||
res = self.receive() | ||||
return res | ||||
Zack Hricz
|
r30656 | except EnvironmentError as ee: | ||
# When we can depend on Python 3, we can use PEP 3134 | ||||
# exception chaining here. | ||||
raise WatchmanEnvironmentError( | ||||
'I/O error communicating with watchman daemon', | ||||
ee.errno, | ||||
ee.strerror, | ||||
args) | ||||
except WatchmanError as ex: | ||||
Martijn Pieters
|
r28432 | ex.setCommand(args) | ||
Zack Hricz
|
r30656 | raise | ||
Martijn Pieters
|
r28432 | |||
def capabilityCheck(self, optional=None, required=None): | ||||
""" Perform a server capability check """ | ||||
res = self.query('version', { | ||||
'optional': optional or [], | ||||
'required': required or [] | ||||
}) | ||||
if not self._hasprop(res, 'capabilities'): | ||||
# Server doesn't support capabilities, so we need to | ||||
# synthesize the results based on the version | ||||
capabilities.synthesize(res, optional) | ||||
if 'error' in res: | ||||
raise CommandError(res['error']) | ||||
return res | ||||
def setTimeout(self, value): | ||||
self.recvConn.setTimeout(value) | ||||
self.sendConn.setTimeout(value) | ||||