__init__.py
779 lines
| 25.0 KiB
| text/x-python
|
PythonLexer
Martijn Pieters
|
r28432 | # Copyright 2014-present Facebook, Inc. | ||
# All rights reserved. | ||||
# | ||||
# Redistribution and use in source and binary forms, with or without | ||||
# modification, are permitted provided that the following conditions are met: | ||||
# | ||||
# * Redistributions of source code must retain the above copyright notice, | ||||
# this list of conditions and the following disclaimer. | ||||
# | ||||
# * Redistributions in binary form must reproduce the above copyright notice, | ||||
# this list of conditions and the following disclaimer in the documentation | ||||
# and/or other materials provided with the distribution. | ||||
# | ||||
# * Neither the name Facebook nor the names of its contributors may be used to | ||||
# endorse or promote products derived from this software without specific | ||||
# prior written permission. | ||||
# | ||||
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" | ||||
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | ||||
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE | ||||
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE | ||||
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL | ||||
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR | ||||
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER | ||||
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, | ||||
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE | ||||
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | ||||
import os | ||||
import errno | ||||
import math | ||||
import socket | ||||
import subprocess | ||||
import time | ||||
# Sometimes it's really hard to get Python extensions to compile, | ||||
# so fall back to a pure Python implementation. | ||||
try: | ||||
import bser | ||||
except ImportError: | ||||
import pybser as bser | ||||
import capabilities | ||||
if os.name == 'nt': | ||||
import ctypes | ||||
import ctypes.wintypes | ||||
wintypes = ctypes.wintypes | ||||
GENERIC_READ = 0x80000000 | ||||
GENERIC_WRITE = 0x40000000 | ||||
FILE_FLAG_OVERLAPPED = 0x40000000 | ||||
OPEN_EXISTING = 3 | ||||
INVALID_HANDLE_VALUE = -1 | ||||
FORMAT_MESSAGE_FROM_SYSTEM = 0x00001000 | ||||
FORMAT_MESSAGE_ALLOCATE_BUFFER = 0x00000100 | ||||
FORMAT_MESSAGE_IGNORE_INSERTS = 0x00000200 | ||||
WAIT_TIMEOUT = 0x00000102 | ||||
WAIT_OBJECT_0 = 0x00000000 | ||||
ERROR_IO_PENDING = 997 | ||||
class OVERLAPPED(ctypes.Structure): | ||||
_fields_ = [ | ||||
("Internal", wintypes.ULONG), ("InternalHigh", wintypes.ULONG), | ||||
("Offset", wintypes.DWORD), ("OffsetHigh", wintypes.DWORD), | ||||
("hEvent", wintypes.HANDLE) | ||||
] | ||||
def __init__(self): | ||||
self.Offset = 0 | ||||
self.OffsetHigh = 0 | ||||
self.hEvent = 0 | ||||
LPDWORD = ctypes.POINTER(wintypes.DWORD) | ||||
CreateFile = ctypes.windll.kernel32.CreateFileA | ||||
CreateFile.argtypes = [wintypes.LPSTR, wintypes.DWORD, wintypes.DWORD, | ||||
wintypes.LPVOID, wintypes.DWORD, wintypes.DWORD, | ||||
wintypes.HANDLE] | ||||
CreateFile.restype = wintypes.HANDLE | ||||
CloseHandle = ctypes.windll.kernel32.CloseHandle | ||||
CloseHandle.argtypes = [wintypes.HANDLE] | ||||
CloseHandle.restype = wintypes.BOOL | ||||
ReadFile = ctypes.windll.kernel32.ReadFile | ||||
ReadFile.argtypes = [wintypes.HANDLE, wintypes.LPVOID, wintypes.DWORD, | ||||
LPDWORD, ctypes.POINTER(OVERLAPPED)] | ||||
ReadFile.restype = wintypes.BOOL | ||||
WriteFile = ctypes.windll.kernel32.WriteFile | ||||
WriteFile.argtypes = [wintypes.HANDLE, wintypes.LPVOID, wintypes.DWORD, | ||||
LPDWORD, ctypes.POINTER(OVERLAPPED)] | ||||
WriteFile.restype = wintypes.BOOL | ||||
GetLastError = ctypes.windll.kernel32.GetLastError | ||||
GetLastError.argtypes = [] | ||||
GetLastError.restype = wintypes.DWORD | ||||
FormatMessage = ctypes.windll.kernel32.FormatMessageA | ||||
FormatMessage.argtypes = [wintypes.DWORD, wintypes.LPVOID, wintypes.DWORD, | ||||
wintypes.DWORD, ctypes.POINTER(wintypes.LPSTR), | ||||
wintypes.DWORD, wintypes.LPVOID] | ||||
FormatMessage.restype = wintypes.DWORD | ||||
LocalFree = ctypes.windll.kernel32.LocalFree | ||||
GetOverlappedResultEx = ctypes.windll.kernel32.GetOverlappedResultEx | ||||
GetOverlappedResultEx.argtypes = [wintypes.HANDLE, | ||||
ctypes.POINTER(OVERLAPPED), LPDWORD, | ||||
wintypes.DWORD, wintypes.BOOL] | ||||
GetOverlappedResultEx.restype = wintypes.BOOL | ||||
CancelIoEx = ctypes.windll.kernel32.CancelIoEx | ||||
CancelIoEx.argtypes = [wintypes.HANDLE, ctypes.POINTER(OVERLAPPED)] | ||||
CancelIoEx.restype = wintypes.BOOL | ||||
# 2 bytes marker, 1 byte int size, 8 bytes int64 value | ||||
sniff_len = 13 | ||||
# This is a helper for debugging the client. | ||||
_debugging = False | ||||
if _debugging: | ||||
def log(fmt, *args): | ||||
print('[%s] %s' % | ||||
(time.strftime("%a, %d %b %Y %H:%M:%S", time.gmtime()), | ||||
fmt % args[:])) | ||||
else: | ||||
def log(fmt, *args): | ||||
pass | ||||
class WatchmanError(Exception): | ||||
pass | ||||
class SocketTimeout(WatchmanError): | ||||
"""A specialized exception raised for socket timeouts during communication to/from watchman. | ||||
This makes it easier to implement non-blocking loops as callers can easily distinguish | ||||
between a routine timeout and an actual error condition. | ||||
Note that catching WatchmanError will also catch this as it is a super-class, so backwards | ||||
compatibility in exception handling is preserved. | ||||
""" | ||||
class CommandError(WatchmanError): | ||||
"""error returned by watchman | ||||
self.msg is the message returned by watchman. | ||||
""" | ||||
def __init__(self, msg, cmd=None): | ||||
self.msg = msg | ||||
self.cmd = cmd | ||||
super(CommandError, self).__init__('watchman command error: %s' % msg) | ||||
def setCommand(self, cmd): | ||||
self.cmd = cmd | ||||
def __str__(self): | ||||
if self.cmd: | ||||
return '%s, while executing %s' % (self.msg, self.cmd) | ||||
return self.msg | ||||
class Transport(object): | ||||
""" communication transport to the watchman server """ | ||||
buf = None | ||||
def close(self): | ||||
""" tear it down """ | ||||
raise NotImplementedError() | ||||
def readBytes(self, size): | ||||
""" read size bytes """ | ||||
raise NotImplementedError() | ||||
def write(self, buf): | ||||
""" write some data """ | ||||
raise NotImplementedError() | ||||
def setTimeout(self, value): | ||||
pass | ||||
def readLine(self): | ||||
""" read a line | ||||
Maintains its own buffer, callers of the transport should not mix | ||||
calls to readBytes and readLine. | ||||
""" | ||||
if self.buf is None: | ||||
self.buf = [] | ||||
# Buffer may already have a line if we've received unilateral | ||||
# response(s) from the server | ||||
if len(self.buf) == 1 and "\n" in self.buf[0]: | ||||
(line, b) = self.buf[0].split("\n", 1) | ||||
self.buf = [b] | ||||
return line | ||||
while True: | ||||
b = self.readBytes(4096) | ||||
if "\n" in b: | ||||
result = ''.join(self.buf) | ||||
(line, b) = b.split("\n", 1) | ||||
self.buf = [b] | ||||
return result + line | ||||
self.buf.append(b) | ||||
class Codec(object): | ||||
""" communication encoding for the watchman server """ | ||||
transport = None | ||||
def __init__(self, transport): | ||||
self.transport = transport | ||||
def receive(self): | ||||
raise NotImplementedError() | ||||
def send(self, *args): | ||||
raise NotImplementedError() | ||||
def setTimeout(self, value): | ||||
self.transport.setTimeout(value) | ||||
class UnixSocketTransport(Transport): | ||||
""" local unix domain socket transport """ | ||||
sock = None | ||||
def __init__(self, sockpath, timeout): | ||||
self.sockpath = sockpath | ||||
self.timeout = timeout | ||||
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | ||||
try: | ||||
sock.settimeout(self.timeout) | ||||
sock.connect(self.sockpath) | ||||
self.sock = sock | ||||
except socket.error as e: | ||||
raise WatchmanError('unable to connect to %s: %s' % | ||||
(self.sockpath, e)) | ||||
def close(self): | ||||
self.sock.close() | ||||
self.sock = None | ||||
def setTimeout(self, value): | ||||
self.timeout = value | ||||
self.sock.settimeout(self.timeout) | ||||
def readBytes(self, size): | ||||
try: | ||||
buf = [self.sock.recv(size)] | ||||
if not buf[0]: | ||||
raise WatchmanError('empty watchman response') | ||||
return buf[0] | ||||
except socket.timeout: | ||||
raise SocketTimeout('timed out waiting for response') | ||||
def write(self, data): | ||||
try: | ||||
self.sock.sendall(data) | ||||
except socket.timeout: | ||||
raise SocketTimeout('timed out sending query command') | ||||
class WindowsNamedPipeTransport(Transport): | ||||
""" connect to a named pipe """ | ||||
def __init__(self, sockpath, timeout): | ||||
self.sockpath = sockpath | ||||
self.timeout = int(math.ceil(timeout * 1000)) | ||||
self._iobuf = None | ||||
self.pipe = CreateFile(sockpath, GENERIC_READ | GENERIC_WRITE, 0, None, | ||||
OPEN_EXISTING, FILE_FLAG_OVERLAPPED, None) | ||||
if self.pipe == INVALID_HANDLE_VALUE: | ||||
self.pipe = None | ||||
self._raise_win_err('failed to open pipe %s' % sockpath, | ||||
GetLastError()) | ||||
def _win32_strerror(self, err): | ||||
""" expand a win32 error code into a human readable message """ | ||||
# FormatMessage will allocate memory and assign it here | ||||
buf = ctypes.c_char_p() | ||||
FormatMessage( | ||||
FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_ALLOCATE_BUFFER | ||||
| FORMAT_MESSAGE_IGNORE_INSERTS, None, err, 0, buf, 0, None) | ||||
try: | ||||
return buf.value | ||||
finally: | ||||
LocalFree(buf) | ||||
def _raise_win_err(self, msg, err): | ||||
raise IOError('%s win32 error code: %d %s' % | ||||
(msg, err, self._win32_strerror(err))) | ||||
def close(self): | ||||
if self.pipe: | ||||
CloseHandle(self.pipe) | ||||
self.pipe = None | ||||
def readBytes(self, size): | ||||
""" A read can block for an unbounded amount of time, even if the | ||||
kernel reports that the pipe handle is signalled, so we need to | ||||
always perform our reads asynchronously | ||||
""" | ||||
# try to satisfy the read from any buffered data | ||||
if self._iobuf: | ||||
if size >= len(self._iobuf): | ||||
res = self._iobuf | ||||
self.buf = None | ||||
return res | ||||
res = self._iobuf[:size] | ||||
self._iobuf = self._iobuf[size:] | ||||
return res | ||||
# We need to initiate a read | ||||
buf = ctypes.create_string_buffer(size) | ||||
olap = OVERLAPPED() | ||||
log('made read buff of size %d', size) | ||||
# ReadFile docs warn against sending in the nread parameter for async | ||||
# operations, so we always collect it via GetOverlappedResultEx | ||||
immediate = ReadFile(self.pipe, buf, size, None, olap) | ||||
if not immediate: | ||||
err = GetLastError() | ||||
if err != ERROR_IO_PENDING: | ||||
self._raise_win_err('failed to read %d bytes' % size, | ||||
GetLastError()) | ||||
nread = wintypes.DWORD() | ||||
if not GetOverlappedResultEx(self.pipe, olap, nread, | ||||
0 if immediate else self.timeout, True): | ||||
err = GetLastError() | ||||
CancelIoEx(self.pipe, olap) | ||||
if err == WAIT_TIMEOUT: | ||||
log('GetOverlappedResultEx timedout') | ||||
raise SocketTimeout('timed out after waiting %dms for read' % | ||||
self.timeout) | ||||
log('GetOverlappedResultEx reports error %d', err) | ||||
self._raise_win_err('error while waiting for read', err) | ||||
nread = nread.value | ||||
if nread == 0: | ||||
# Docs say that named pipes return 0 byte when the other end did | ||||
# a zero byte write. Since we don't ever do that, the only | ||||
# other way this shows up is if the client has gotten in a weird | ||||
# state, so let's bail out | ||||
CancelIoEx(self.pipe, olap) | ||||
raise IOError('Async read yielded 0 bytes; unpossible!') | ||||
# Holds precisely the bytes that we read from the prior request | ||||
buf = buf[:nread] | ||||
returned_size = min(nread, size) | ||||
if returned_size == nread: | ||||
return buf | ||||
# keep any left-overs around for a later read to consume | ||||
self._iobuf = buf[returned_size:] | ||||
return buf[:returned_size] | ||||
def write(self, data): | ||||
olap = OVERLAPPED() | ||||
immediate = WriteFile(self.pipe, ctypes.c_char_p(data), len(data), | ||||
None, olap) | ||||
if not immediate: | ||||
err = GetLastError() | ||||
if err != ERROR_IO_PENDING: | ||||
self._raise_win_err('failed to write %d bytes' % len(data), | ||||
GetLastError()) | ||||
# Obtain results, waiting if needed | ||||
nwrote = wintypes.DWORD() | ||||
if GetOverlappedResultEx(self.pipe, olap, nwrote, 0 if immediate else | ||||
self.timeout, True): | ||||
return nwrote.value | ||||
err = GetLastError() | ||||
# It's potentially unsafe to allow the write to continue after | ||||
# we unwind, so let's make a best effort to avoid that happening | ||||
CancelIoEx(self.pipe, olap) | ||||
if err == WAIT_TIMEOUT: | ||||
raise SocketTimeout('timed out after waiting %dms for write' % | ||||
self.timeout) | ||||
self._raise_win_err('error while waiting for write of %d bytes' % | ||||
len(data), err) | ||||
class CLIProcessTransport(Transport): | ||||
""" open a pipe to the cli to talk to the service | ||||
This intended to be used only in the test harness! | ||||
The CLI is an oddball because we only support JSON input | ||||
and cannot send multiple commands through the same instance, | ||||
so we spawn a new process for each command. | ||||
We disable server spawning for this implementation, again, because | ||||
it is intended to be used only in our test harness. You really | ||||
should not need to use the CLI transport for anything real. | ||||
While the CLI can output in BSER, our Transport interface doesn't | ||||
support telling this instance that it should do so. That effectively | ||||
limits this implementation to JSON input and output only at this time. | ||||
It is the responsibility of the caller to set the send and | ||||
receive codecs appropriately. | ||||
""" | ||||
proc = None | ||||
closed = True | ||||
def __init__(self, sockpath, timeout): | ||||
self.sockpath = sockpath | ||||
self.timeout = timeout | ||||
def close(self): | ||||
if self.proc: | ||||
self.proc.kill() | ||||
self.proc = None | ||||
def _connect(self): | ||||
if self.proc: | ||||
return self.proc | ||||
args = [ | ||||
'watchman', | ||||
'--sockname={}'.format(self.sockpath), | ||||
'--logfile=/BOGUS', | ||||
'--statefile=/BOGUS', | ||||
'--no-spawn', | ||||
'--no-local', | ||||
'--no-pretty', | ||||
'-j', | ||||
] | ||||
self.proc = subprocess.Popen(args, | ||||
stdin=subprocess.PIPE, | ||||
stdout=subprocess.PIPE) | ||||
return self.proc | ||||
def readBytes(self, size): | ||||
self._connect() | ||||
res = self.proc.stdout.read(size) | ||||
if res == '': | ||||
raise WatchmanError('EOF on CLI process transport') | ||||
return res | ||||
def write(self, data): | ||||
if self.closed: | ||||
self.closed = False | ||||
self.proc = None | ||||
self._connect() | ||||
res = self.proc.stdin.write(data) | ||||
self.proc.stdin.close() | ||||
self.closed = True | ||||
return res | ||||
class BserCodec(Codec): | ||||
""" use the BSER encoding. This is the default, preferred codec """ | ||||
def _loads(self, response): | ||||
return bser.loads(response) | ||||
def receive(self): | ||||
buf = [self.transport.readBytes(sniff_len)] | ||||
if not buf[0]: | ||||
raise WatchmanError('empty watchman response') | ||||
elen = bser.pdu_len(buf[0]) | ||||
rlen = len(buf[0]) | ||||
while elen > rlen: | ||||
buf.append(self.transport.readBytes(elen - rlen)) | ||||
rlen += len(buf[-1]) | ||||
response = ''.join(buf) | ||||
try: | ||||
res = self._loads(response) | ||||
return res | ||||
except ValueError as e: | ||||
raise WatchmanError('watchman response decode error: %s' % e) | ||||
def send(self, *args): | ||||
cmd = bser.dumps(*args) | ||||
self.transport.write(cmd) | ||||
class ImmutableBserCodec(BserCodec): | ||||
""" use the BSER encoding, decoding values using the newer | ||||
immutable object support """ | ||||
def _loads(self, response): | ||||
return bser.loads(response, False) | ||||
class JsonCodec(Codec): | ||||
""" Use json codec. This is here primarily for testing purposes """ | ||||
json = None | ||||
def __init__(self, transport): | ||||
super(JsonCodec, self).__init__(transport) | ||||
# optional dep on json, only if JsonCodec is used | ||||
import json | ||||
self.json = json | ||||
def receive(self): | ||||
line = self.transport.readLine() | ||||
try: | ||||
return self.json.loads(line) | ||||
except Exception as e: | ||||
print(e, line) | ||||
raise | ||||
def send(self, *args): | ||||
cmd = self.json.dumps(*args) | ||||
self.transport.write(cmd + "\n") | ||||
class client(object): | ||||
""" Handles the communication with the watchman service """ | ||||
sockpath = None | ||||
transport = None | ||||
sendCodec = None | ||||
recvCodec = None | ||||
sendConn = None | ||||
recvConn = None | ||||
subs = {} # Keyed by subscription name | ||||
sub_by_root = {} # Keyed by root, then by subscription name | ||||
logs = [] # When log level is raised | ||||
unilateral = ['log', 'subscription'] | ||||
tport = None | ||||
useImmutableBser = None | ||||
def __init__(self, | ||||
sockpath=None, | ||||
timeout=1.0, | ||||
transport=None, | ||||
sendEncoding=None, | ||||
recvEncoding=None, | ||||
useImmutableBser=False): | ||||
self.sockpath = sockpath | ||||
self.timeout = timeout | ||||
self.useImmutableBser = useImmutableBser | ||||
transport = transport or os.getenv('WATCHMAN_TRANSPORT') or 'local' | ||||
if transport == 'local' and os.name == 'nt': | ||||
self.transport = WindowsNamedPipeTransport | ||||
elif transport == 'local': | ||||
self.transport = UnixSocketTransport | ||||
elif transport == 'cli': | ||||
self.transport = CLIProcessTransport | ||||
if sendEncoding is None: | ||||
sendEncoding = 'json' | ||||
if recvEncoding is None: | ||||
recvEncoding = sendEncoding | ||||
else: | ||||
raise WatchmanError('invalid transport %s' % transport) | ||||
sendEncoding = sendEncoding or os.getenv('WATCHMAN_ENCODING') or 'bser' | ||||
recvEncoding = recvEncoding or os.getenv('WATCHMAN_ENCODING') or 'bser' | ||||
self.recvCodec = self._parseEncoding(recvEncoding) | ||||
self.sendCodec = self._parseEncoding(sendEncoding) | ||||
def _parseEncoding(self, enc): | ||||
if enc == 'bser': | ||||
if self.useImmutableBser: | ||||
return ImmutableBserCodec | ||||
return BserCodec | ||||
elif enc == 'json': | ||||
return JsonCodec | ||||
else: | ||||
raise WatchmanError('invalid encoding %s' % enc) | ||||
def _hasprop(self, result, name): | ||||
if self.useImmutableBser: | ||||
return hasattr(result, name) | ||||
return name in result | ||||
def _resolvesockname(self): | ||||
# if invoked via a trigger, watchman will set this env var; we | ||||
# should use it unless explicitly set otherwise | ||||
path = os.getenv('WATCHMAN_SOCK') | ||||
if path: | ||||
return path | ||||
cmd = ['watchman', '--output-encoding=bser', 'get-sockname'] | ||||
try: | ||||
p = subprocess.Popen(cmd, | ||||
stdout=subprocess.PIPE, | ||||
stderr=subprocess.PIPE, | ||||
close_fds=os.name != 'nt') | ||||
except OSError as e: | ||||
raise WatchmanError('"watchman" executable not in PATH (%s)', e) | ||||
stdout, stderr = p.communicate() | ||||
exitcode = p.poll() | ||||
if exitcode: | ||||
raise WatchmanError("watchman exited with code %d" % exitcode) | ||||
result = bser.loads(stdout) | ||||
if 'error' in result: | ||||
raise WatchmanError('get-sockname error: %s' % result['error']) | ||||
return result['sockname'] | ||||
def _connect(self): | ||||
""" establish transport connection """ | ||||
if self.recvConn: | ||||
return | ||||
if self.sockpath is None: | ||||
self.sockpath = self._resolvesockname() | ||||
self.tport = self.transport(self.sockpath, self.timeout) | ||||
self.sendConn = self.sendCodec(self.tport) | ||||
self.recvConn = self.recvCodec(self.tport) | ||||
def __del__(self): | ||||
self.close() | ||||
def close(self): | ||||
if self.tport: | ||||
self.tport.close() | ||||
self.tport = None | ||||
self.recvConn = None | ||||
self.sendConn = None | ||||
def receive(self): | ||||
""" receive the next PDU from the watchman service | ||||
If the client has activated subscriptions or logs then | ||||
this PDU may be a unilateral PDU sent by the service to | ||||
inform the client of a log event or subscription change. | ||||
It may also simply be the response portion of a request | ||||
initiated by query. | ||||
There are clients in production that subscribe and call | ||||
this in a loop to retrieve all subscription responses, | ||||
so care should be taken when making changes here. | ||||
""" | ||||
self._connect() | ||||
result = self.recvConn.receive() | ||||
if self._hasprop(result, 'error'): | ||||
raise CommandError(result['error']) | ||||
if self._hasprop(result, 'log'): | ||||
self.logs.append(result['log']) | ||||
if self._hasprop(result, 'subscription'): | ||||
sub = result['subscription'] | ||||
if not (sub in self.subs): | ||||
self.subs[sub] = [] | ||||
self.subs[sub].append(result) | ||||
# also accumulate in {root,sub} keyed store | ||||
root = os.path.normcase(result['root']) | ||||
if not root in self.sub_by_root: | ||||
self.sub_by_root[root] = {} | ||||
if not sub in self.sub_by_root[root]: | ||||
self.sub_by_root[root][sub] = [] | ||||
self.sub_by_root[root][sub].append(result) | ||||
return result | ||||
def isUnilateralResponse(self, res): | ||||
for k in self.unilateral: | ||||
if k in res: | ||||
return True | ||||
return False | ||||
def getLog(self, remove=True): | ||||
""" Retrieve buffered log data | ||||
If remove is true the data will be removed from the buffer. | ||||
Otherwise it will be left in the buffer | ||||
""" | ||||
res = self.logs | ||||
if remove: | ||||
self.logs = [] | ||||
return res | ||||
def getSubscription(self, name, remove=True, root=None): | ||||
""" Retrieve the data associated with a named subscription | ||||
If remove is True (the default), the subscription data is removed | ||||
from the buffer. Otherwise the data is returned but left in | ||||
the buffer. | ||||
Returns None if there is no data associated with `name` | ||||
If root is not None, then only return the subscription | ||||
data that matches both root and name. When used in this way, | ||||
remove processing impacts both the unscoped and scoped stores | ||||
for the subscription data. | ||||
""" | ||||
if root is not None: | ||||
if not root in self.sub_by_root: | ||||
return None | ||||
if not name in self.sub_by_root[root]: | ||||
return None | ||||
sub = self.sub_by_root[root][name] | ||||
if remove: | ||||
del self.sub_by_root[root][name] | ||||
# don't let this grow unbounded | ||||
if name in self.subs: | ||||
del self.subs[name] | ||||
return sub | ||||
if not (name in self.subs): | ||||
return None | ||||
sub = self.subs[name] | ||||
if remove: | ||||
del self.subs[name] | ||||
return sub | ||||
def query(self, *args): | ||||
""" Send a query to the watchman service and return the response | ||||
This call will block until the response is returned. | ||||
If any unilateral responses are sent by the service in between | ||||
the request-response they will be buffered up in the client object | ||||
and NOT returned via this method. | ||||
""" | ||||
log('calling client.query') | ||||
self._connect() | ||||
try: | ||||
self.sendConn.send(args) | ||||
res = self.receive() | ||||
while self.isUnilateralResponse(res): | ||||
res = self.receive() | ||||
return res | ||||
except CommandError as ex: | ||||
ex.setCommand(args) | ||||
raise ex | ||||
def capabilityCheck(self, optional=None, required=None): | ||||
""" Perform a server capability check """ | ||||
res = self.query('version', { | ||||
'optional': optional or [], | ||||
'required': required or [] | ||||
}) | ||||
if not self._hasprop(res, 'capabilities'): | ||||
# Server doesn't support capabilities, so we need to | ||||
# synthesize the results based on the version | ||||
capabilities.synthesize(res, optional) | ||||
if 'error' in res: | ||||
raise CommandError(res['error']) | ||||
return res | ||||
def setTimeout(self, value): | ||||
self.recvConn.setTimeout(value) | ||||
self.sendConn.setTimeout(value) | ||||
# no-check-code -- this is a 3rd party library | ||||