##// END OF EJS Templates
fsmonitor: correct an error message...
Jun Wu -
r33764:dd35abc4 default
parent child Browse files
Show More
@@ -1,1021 +1,1021 b''
1 1 # Copyright 2014-present Facebook, Inc.
2 2 # All rights reserved.
3 3 #
4 4 # Redistribution and use in source and binary forms, with or without
5 5 # modification, are permitted provided that the following conditions are met:
6 6 #
7 7 # * Redistributions of source code must retain the above copyright notice,
8 8 # this list of conditions and the following disclaimer.
9 9 #
10 10 # * Redistributions in binary form must reproduce the above copyright notice,
11 11 # this list of conditions and the following disclaimer in the documentation
12 12 # and/or other materials provided with the distribution.
13 13 #
14 14 # * Neither the name Facebook nor the names of its contributors may be used to
15 15 # endorse or promote products derived from this software without specific
16 16 # prior written permission.
17 17 #
18 18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19 19 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20 20 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21 21 # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
22 22 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
23 23 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
24 24 # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
25 25 # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
26 26 # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27 27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 28
29 29 from __future__ import absolute_import
30 30 from __future__ import division
31 31 from __future__ import print_function
32 32 # no unicode literals
33 33
34 34 import inspect
35 35 import math
36 36 import os
37 37 import socket
38 38 import subprocess
39 39 import time
40 40
41 41 # Sometimes it's really hard to get Python extensions to compile,
42 42 # so fall back to a pure Python implementation.
43 43 try:
44 44 from . import bser
45 45 # Demandimport causes modules to be loaded lazily. Force the load now
46 46 # so that we can fall back on pybser if bser doesn't exist
47 47 bser.pdu_info
48 48 except ImportError:
49 49 from . import pybser as bser
50 50
51 51 from . import (
52 52 capabilities,
53 53 compat,
54 54 encoding,
55 55 load,
56 56 )
57 57
58 58
59 59 if os.name == 'nt':
60 60 import ctypes
61 61 import ctypes.wintypes
62 62
63 63 wintypes = ctypes.wintypes
64 64 GENERIC_READ = 0x80000000
65 65 GENERIC_WRITE = 0x40000000
66 66 FILE_FLAG_OVERLAPPED = 0x40000000
67 67 OPEN_EXISTING = 3
68 68 INVALID_HANDLE_VALUE = -1
69 69 FORMAT_MESSAGE_FROM_SYSTEM = 0x00001000
70 70 FORMAT_MESSAGE_ALLOCATE_BUFFER = 0x00000100
71 71 FORMAT_MESSAGE_IGNORE_INSERTS = 0x00000200
72 72 WAIT_FAILED = 0xFFFFFFFF
73 73 WAIT_TIMEOUT = 0x00000102
74 74 WAIT_OBJECT_0 = 0x00000000
75 75 WAIT_IO_COMPLETION = 0x000000C0
76 76 INFINITE = 0xFFFFFFFF
77 77
78 78 # Overlapped I/O operation is in progress. (997)
79 79 ERROR_IO_PENDING = 0x000003E5
80 80
81 81 # The pointer size follows the architecture
82 82 # We use WPARAM since this type is already conditionally defined
83 83 ULONG_PTR = ctypes.wintypes.WPARAM
84 84
85 85 class OVERLAPPED(ctypes.Structure):
86 86 _fields_ = [
87 87 ("Internal", ULONG_PTR), ("InternalHigh", ULONG_PTR),
88 88 ("Offset", wintypes.DWORD), ("OffsetHigh", wintypes.DWORD),
89 89 ("hEvent", wintypes.HANDLE)
90 90 ]
91 91
92 92 def __init__(self):
93 93 self.Internal = 0
94 94 self.InternalHigh = 0
95 95 self.Offset = 0
96 96 self.OffsetHigh = 0
97 97 self.hEvent = 0
98 98
99 99 LPDWORD = ctypes.POINTER(wintypes.DWORD)
100 100
101 101 CreateFile = ctypes.windll.kernel32.CreateFileA
102 102 CreateFile.argtypes = [wintypes.LPSTR, wintypes.DWORD, wintypes.DWORD,
103 103 wintypes.LPVOID, wintypes.DWORD, wintypes.DWORD,
104 104 wintypes.HANDLE]
105 105 CreateFile.restype = wintypes.HANDLE
106 106
107 107 CloseHandle = ctypes.windll.kernel32.CloseHandle
108 108 CloseHandle.argtypes = [wintypes.HANDLE]
109 109 CloseHandle.restype = wintypes.BOOL
110 110
111 111 ReadFile = ctypes.windll.kernel32.ReadFile
112 112 ReadFile.argtypes = [wintypes.HANDLE, wintypes.LPVOID, wintypes.DWORD,
113 113 LPDWORD, ctypes.POINTER(OVERLAPPED)]
114 114 ReadFile.restype = wintypes.BOOL
115 115
116 116 WriteFile = ctypes.windll.kernel32.WriteFile
117 117 WriteFile.argtypes = [wintypes.HANDLE, wintypes.LPVOID, wintypes.DWORD,
118 118 LPDWORD, ctypes.POINTER(OVERLAPPED)]
119 119 WriteFile.restype = wintypes.BOOL
120 120
121 121 GetLastError = ctypes.windll.kernel32.GetLastError
122 122 GetLastError.argtypes = []
123 123 GetLastError.restype = wintypes.DWORD
124 124
125 125 SetLastError = ctypes.windll.kernel32.SetLastError
126 126 SetLastError.argtypes = [wintypes.DWORD]
127 127 SetLastError.restype = None
128 128
129 129 FormatMessage = ctypes.windll.kernel32.FormatMessageA
130 130 FormatMessage.argtypes = [wintypes.DWORD, wintypes.LPVOID, wintypes.DWORD,
131 131 wintypes.DWORD, ctypes.POINTER(wintypes.LPSTR),
132 132 wintypes.DWORD, wintypes.LPVOID]
133 133 FormatMessage.restype = wintypes.DWORD
134 134
135 135 LocalFree = ctypes.windll.kernel32.LocalFree
136 136
137 137 GetOverlappedResult = ctypes.windll.kernel32.GetOverlappedResult
138 138 GetOverlappedResult.argtypes = [wintypes.HANDLE,
139 139 ctypes.POINTER(OVERLAPPED), LPDWORD,
140 140 wintypes.BOOL]
141 141 GetOverlappedResult.restype = wintypes.BOOL
142 142
143 143 GetOverlappedResultEx = getattr(ctypes.windll.kernel32,
144 144 'GetOverlappedResultEx', None)
145 145 if GetOverlappedResultEx is not None:
146 146 GetOverlappedResultEx.argtypes = [wintypes.HANDLE,
147 147 ctypes.POINTER(OVERLAPPED), LPDWORD,
148 148 wintypes.DWORD, wintypes.BOOL]
149 149 GetOverlappedResultEx.restype = wintypes.BOOL
150 150
151 151 WaitForSingleObjectEx = ctypes.windll.kernel32.WaitForSingleObjectEx
152 152 WaitForSingleObjectEx.argtypes = [wintypes.HANDLE, wintypes.DWORD, wintypes.BOOL]
153 153 WaitForSingleObjectEx.restype = wintypes.DWORD
154 154
155 155 CreateEvent = ctypes.windll.kernel32.CreateEventA
156 156 CreateEvent.argtypes = [LPDWORD, wintypes.BOOL, wintypes.BOOL,
157 157 wintypes.LPSTR]
158 158 CreateEvent.restype = wintypes.HANDLE
159 159
160 160 # Windows Vista is the minimum supported client for CancelIoEx.
161 161 CancelIoEx = ctypes.windll.kernel32.CancelIoEx
162 162 CancelIoEx.argtypes = [wintypes.HANDLE, ctypes.POINTER(OVERLAPPED)]
163 163 CancelIoEx.restype = wintypes.BOOL
164 164
165 165 # 2 bytes marker, 1 byte int size, 8 bytes int64 value
166 166 sniff_len = 13
167 167
168 168 # This is a helper for debugging the client.
169 169 _debugging = False
170 170 if _debugging:
171 171
172 172 def log(fmt, *args):
173 173 print('[%s] %s' %
174 174 (time.strftime("%a, %d %b %Y %H:%M:%S", time.gmtime()),
175 175 fmt % args[:]))
176 176 else:
177 177
178 178 def log(fmt, *args):
179 179 pass
180 180
181 181
182 182 def _win32_strerror(err):
183 183 """ expand a win32 error code into a human readable message """
184 184
185 185 # FormatMessage will allocate memory and assign it here
186 186 buf = ctypes.c_char_p()
187 187 FormatMessage(
188 188 FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_ALLOCATE_BUFFER
189 189 | FORMAT_MESSAGE_IGNORE_INSERTS, None, err, 0, buf, 0, None)
190 190 try:
191 191 return buf.value
192 192 finally:
193 193 LocalFree(buf)
194 194
195 195
196 196 class WatchmanError(Exception):
197 197 def __init__(self, msg=None, cmd=None):
198 198 self.msg = msg
199 199 self.cmd = cmd
200 200
201 201 def setCommand(self, cmd):
202 202 self.cmd = cmd
203 203
204 204 def __str__(self):
205 205 if self.cmd:
206 206 return '%s, while executing %s' % (self.msg, self.cmd)
207 207 return self.msg
208 208
209 209
210 210 class WatchmanEnvironmentError(WatchmanError):
211 211 def __init__(self, msg, errno, errmsg, cmd=None):
212 212 super(WatchmanEnvironmentError, self).__init__(
213 213 '{0}: errno={1} errmsg={2}'.format(msg, errno, errmsg),
214 214 cmd)
215 215
216 216
217 217 class SocketConnectError(WatchmanError):
218 218 def __init__(self, sockpath, exc):
219 219 super(SocketConnectError, self).__init__(
220 220 'unable to connect to %s: %s' % (sockpath, exc))
221 221 self.sockpath = sockpath
222 222 self.exc = exc
223 223
224 224
225 225 class SocketTimeout(WatchmanError):
226 226 """A specialized exception raised for socket timeouts during communication to/from watchman.
227 227 This makes it easier to implement non-blocking loops as callers can easily distinguish
228 228 between a routine timeout and an actual error condition.
229 229
230 230 Note that catching WatchmanError will also catch this as it is a super-class, so backwards
231 231 compatibility in exception handling is preserved.
232 232 """
233 233
234 234
235 235 class CommandError(WatchmanError):
236 236 """error returned by watchman
237 237
238 238 self.msg is the message returned by watchman.
239 239 """
240 240 def __init__(self, msg, cmd=None):
241 241 super(CommandError, self).__init__(
242 242 'watchman command error: %s' % (msg, ),
243 243 cmd,
244 244 )
245 245
246 246
247 247 class Transport(object):
248 248 """ communication transport to the watchman server """
249 249 buf = None
250 250
251 251 def close(self):
252 252 """ tear it down """
253 253 raise NotImplementedError()
254 254
255 255 def readBytes(self, size):
256 256 """ read size bytes """
257 257 raise NotImplementedError()
258 258
259 259 def write(self, buf):
260 260 """ write some data """
261 261 raise NotImplementedError()
262 262
263 263 def setTimeout(self, value):
264 264 pass
265 265
266 266 def readLine(self):
267 267 """ read a line
268 268 Maintains its own buffer, callers of the transport should not mix
269 269 calls to readBytes and readLine.
270 270 """
271 271 if self.buf is None:
272 272 self.buf = []
273 273
274 274 # Buffer may already have a line if we've received unilateral
275 275 # response(s) from the server
276 276 if len(self.buf) == 1 and b"\n" in self.buf[0]:
277 277 (line, b) = self.buf[0].split(b"\n", 1)
278 278 self.buf = [b]
279 279 return line
280 280
281 281 while True:
282 282 b = self.readBytes(4096)
283 283 if b"\n" in b:
284 284 result = b''.join(self.buf)
285 285 (line, b) = b.split(b"\n", 1)
286 286 self.buf = [b]
287 287 return result + line
288 288 self.buf.append(b)
289 289
290 290
291 291 class Codec(object):
292 292 """ communication encoding for the watchman server """
293 293 transport = None
294 294
295 295 def __init__(self, transport):
296 296 self.transport = transport
297 297
298 298 def receive(self):
299 299 raise NotImplementedError()
300 300
301 301 def send(self, *args):
302 302 raise NotImplementedError()
303 303
304 304 def setTimeout(self, value):
305 305 self.transport.setTimeout(value)
306 306
307 307
308 308 class UnixSocketTransport(Transport):
309 309 """ local unix domain socket transport """
310 310 sock = None
311 311
312 312 def __init__(self, sockpath, timeout):
313 313 self.sockpath = sockpath
314 314 self.timeout = timeout
315 315
316 316 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
317 317 try:
318 318 sock.settimeout(self.timeout)
319 319 sock.connect(self.sockpath)
320 320 self.sock = sock
321 321 except socket.error as e:
322 322 sock.close()
323 323 raise SocketConnectError(self.sockpath, e)
324 324
325 325 def close(self):
326 326 self.sock.close()
327 327 self.sock = None
328 328
329 329 def setTimeout(self, value):
330 330 self.timeout = value
331 331 self.sock.settimeout(self.timeout)
332 332
333 333 def readBytes(self, size):
334 334 try:
335 335 buf = [self.sock.recv(size)]
336 336 if not buf[0]:
337 337 raise WatchmanError('empty watchman response')
338 338 return buf[0]
339 339 except socket.timeout:
340 340 raise SocketTimeout('timed out waiting for response')
341 341
342 342 def write(self, data):
343 343 try:
344 344 self.sock.sendall(data)
345 345 except socket.timeout:
346 346 raise SocketTimeout('timed out sending query command')
347 347
348 348
349 349 def _get_overlapped_result_ex_impl(pipe, olap, nbytes, millis, alertable):
350 350 """ Windows 7 and earlier does not support GetOverlappedResultEx. The
351 351 alternative is to use GetOverlappedResult and wait for read or write
352 352 operation to complete. This is done be using CreateEvent and
353 353 WaitForSingleObjectEx. CreateEvent, WaitForSingleObjectEx
354 354 and GetOverlappedResult are all part of Windows API since WindowsXP.
355 355 This is the exact same implementation that can be found in the watchman
356 356 source code (see get_overlapped_result_ex_impl in stream_win.c). This
357 357 way, maintenance should be simplified.
358 358 """
359 359 log('Preparing to wait for maximum %dms', millis )
360 360 if millis != 0:
361 361 waitReturnCode = WaitForSingleObjectEx(olap.hEvent, millis, alertable)
362 362 if waitReturnCode == WAIT_OBJECT_0:
363 363 # Event is signaled, overlapped IO operation result should be available.
364 364 pass
365 365 elif waitReturnCode == WAIT_IO_COMPLETION:
366 366 # WaitForSingleObjectEx returnes because the system added an I/O completion
367 367 # routine or an asynchronous procedure call (APC) to the thread queue.
368 368 SetLastError(WAIT_IO_COMPLETION)
369 369 pass
370 370 elif waitReturnCode == WAIT_TIMEOUT:
371 371 # We reached the maximum allowed wait time, the IO operation failed
372 372 # to complete in timely fashion.
373 373 SetLastError(WAIT_TIMEOUT)
374 374 return False
375 375 elif waitReturnCode == WAIT_FAILED:
376 376 # something went wrong calling WaitForSingleObjectEx
377 377 err = GetLastError()
378 378 log('WaitForSingleObjectEx failed: %s', _win32_strerror(err))
379 379 return False
380 380 else:
381 381 # unexpected situation deserving investigation.
382 382 err = GetLastError()
383 383 log('Unexpected error: %s', _win32_strerror(err))
384 384 return False
385 385
386 386 return GetOverlappedResult(pipe, olap, nbytes, False)
387 387
388 388
389 389 class WindowsNamedPipeTransport(Transport):
390 390 """ connect to a named pipe """
391 391
392 392 def __init__(self, sockpath, timeout):
393 393 self.sockpath = sockpath
394 394 self.timeout = int(math.ceil(timeout * 1000))
395 395 self._iobuf = None
396 396
397 397 self.pipe = CreateFile(sockpath, GENERIC_READ | GENERIC_WRITE, 0, None,
398 398 OPEN_EXISTING, FILE_FLAG_OVERLAPPED, None)
399 399
400 400 if self.pipe == INVALID_HANDLE_VALUE:
401 401 self.pipe = None
402 402 self._raise_win_err('failed to open pipe %s' % sockpath,
403 403 GetLastError())
404 404
405 405 # event for the overlapped I/O operations
406 406 self._waitable = CreateEvent(None, True, False, None)
407 407 if self._waitable is None:
408 408 self._raise_win_err('CreateEvent failed', GetLastError())
409 409
410 410 self._get_overlapped_result_ex = GetOverlappedResultEx
411 411 if (os.getenv('WATCHMAN_WIN7_COMPAT') == '1' or
412 412 self._get_overlapped_result_ex is None):
413 413 self._get_overlapped_result_ex = _get_overlapped_result_ex_impl
414 414
415 415 def _raise_win_err(self, msg, err):
416 416 raise IOError('%s win32 error code: %d %s' %
417 417 (msg, err, _win32_strerror(err)))
418 418
419 419 def close(self):
420 420 if self.pipe:
421 421 log('Closing pipe')
422 422 CloseHandle(self.pipe)
423 423 self.pipe = None
424 424
425 425 if self._waitable is not None:
426 426 # We release the handle for the event
427 427 CloseHandle(self._waitable)
428 428 self._waitable = None
429 429
430 430 def setTimeout(self, value):
431 431 # convert to milliseconds
432 432 self.timeout = int(value * 1000)
433 433
434 434 def readBytes(self, size):
435 435 """ A read can block for an unbounded amount of time, even if the
436 436 kernel reports that the pipe handle is signalled, so we need to
437 437 always perform our reads asynchronously
438 438 """
439 439
440 440 # try to satisfy the read from any buffered data
441 441 if self._iobuf:
442 442 if size >= len(self._iobuf):
443 443 res = self._iobuf
444 444 self.buf = None
445 445 return res
446 446 res = self._iobuf[:size]
447 447 self._iobuf = self._iobuf[size:]
448 448 return res
449 449
450 450 # We need to initiate a read
451 451 buf = ctypes.create_string_buffer(size)
452 452 olap = OVERLAPPED()
453 453 olap.hEvent = self._waitable
454 454
455 455 log('made read buff of size %d', size)
456 456
457 457 # ReadFile docs warn against sending in the nread parameter for async
458 458 # operations, so we always collect it via GetOverlappedResultEx
459 459 immediate = ReadFile(self.pipe, buf, size, None, olap)
460 460
461 461 if not immediate:
462 462 err = GetLastError()
463 463 if err != ERROR_IO_PENDING:
464 464 self._raise_win_err('failed to read %d bytes' % size,
465 465 GetLastError())
466 466
467 467 nread = wintypes.DWORD()
468 468 if not self._get_overlapped_result_ex(self.pipe, olap, nread,
469 469 0 if immediate else self.timeout,
470 470 True):
471 471 err = GetLastError()
472 472 CancelIoEx(self.pipe, olap)
473 473
474 474 if err == WAIT_TIMEOUT:
475 475 log('GetOverlappedResultEx timedout')
476 476 raise SocketTimeout('timed out after waiting %dms for read' %
477 477 self.timeout)
478 478
479 479 log('GetOverlappedResultEx reports error %d', err)
480 480 self._raise_win_err('error while waiting for read', err)
481 481
482 482 nread = nread.value
483 483 if nread == 0:
484 484 # Docs say that named pipes return 0 byte when the other end did
485 485 # a zero byte write. Since we don't ever do that, the only
486 486 # other way this shows up is if the client has gotten in a weird
487 487 # state, so let's bail out
488 488 CancelIoEx(self.pipe, olap)
489 489 raise IOError('Async read yielded 0 bytes; unpossible!')
490 490
491 491 # Holds precisely the bytes that we read from the prior request
492 492 buf = buf[:nread]
493 493
494 494 returned_size = min(nread, size)
495 495 if returned_size == nread:
496 496 return buf
497 497
498 498 # keep any left-overs around for a later read to consume
499 499 self._iobuf = buf[returned_size:]
500 500 return buf[:returned_size]
501 501
502 502 def write(self, data):
503 503 olap = OVERLAPPED()
504 504 olap.hEvent = self._waitable
505 505
506 506 immediate = WriteFile(self.pipe, ctypes.c_char_p(data), len(data),
507 507 None, olap)
508 508
509 509 if not immediate:
510 510 err = GetLastError()
511 511 if err != ERROR_IO_PENDING:
512 512 self._raise_win_err('failed to write %d bytes' % len(data),
513 513 GetLastError())
514 514
515 515 # Obtain results, waiting if needed
516 516 nwrote = wintypes.DWORD()
517 517 if self._get_overlapped_result_ex(self.pipe, olap, nwrote,
518 518 0 if immediate else self.timeout,
519 519 True):
520 520 log('made write of %d bytes', nwrote.value)
521 521 return nwrote.value
522 522
523 523 err = GetLastError()
524 524
525 525 # It's potentially unsafe to allow the write to continue after
526 526 # we unwind, so let's make a best effort to avoid that happening
527 527 CancelIoEx(self.pipe, olap)
528 528
529 529 if err == WAIT_TIMEOUT:
530 530 raise SocketTimeout('timed out after waiting %dms for write' %
531 531 self.timeout)
532 532 self._raise_win_err('error while waiting for write of %d bytes' %
533 533 len(data), err)
534 534
535 535
536 536 class CLIProcessTransport(Transport):
537 537 """ open a pipe to the cli to talk to the service
538 538 This intended to be used only in the test harness!
539 539
540 540 The CLI is an oddball because we only support JSON input
541 541 and cannot send multiple commands through the same instance,
542 542 so we spawn a new process for each command.
543 543
544 544 We disable server spawning for this implementation, again, because
545 545 it is intended to be used only in our test harness. You really
546 546 should not need to use the CLI transport for anything real.
547 547
548 548 While the CLI can output in BSER, our Transport interface doesn't
549 549 support telling this instance that it should do so. That effectively
550 550 limits this implementation to JSON input and output only at this time.
551 551
552 552 It is the responsibility of the caller to set the send and
553 553 receive codecs appropriately.
554 554 """
555 555 proc = None
556 556 closed = True
557 557
558 558 def __init__(self, sockpath, timeout):
559 559 self.sockpath = sockpath
560 560 self.timeout = timeout
561 561
562 562 def close(self):
563 563 if self.proc:
564 564 if self.proc.pid is not None:
565 565 self.proc.kill()
566 566 self.proc.stdin.close()
567 567 self.proc.stdout.close()
568 568 self.proc = None
569 569
570 570 def _connect(self):
571 571 if self.proc:
572 572 return self.proc
573 573 args = [
574 574 'watchman',
575 575 '--sockname={0}'.format(self.sockpath),
576 576 '--logfile=/BOGUS',
577 577 '--statefile=/BOGUS',
578 578 '--no-spawn',
579 579 '--no-local',
580 580 '--no-pretty',
581 581 '-j',
582 582 ]
583 583 self.proc = subprocess.Popen(args,
584 584 stdin=subprocess.PIPE,
585 585 stdout=subprocess.PIPE)
586 586 return self.proc
587 587
588 588 def readBytes(self, size):
589 589 self._connect()
590 590 res = self.proc.stdout.read(size)
591 591 if res == '':
592 592 raise WatchmanError('EOF on CLI process transport')
593 593 return res
594 594
595 595 def write(self, data):
596 596 if self.closed:
597 597 self.close()
598 598 self.closed = False
599 599 self._connect()
600 600 res = self.proc.stdin.write(data)
601 601 self.proc.stdin.close()
602 602 self.closed = True
603 603 return res
604 604
605 605
606 606 class BserCodec(Codec):
607 607 """ use the BSER encoding. This is the default, preferred codec """
608 608
609 609 def _loads(self, response):
610 610 return bser.loads(response) # Defaults to BSER v1
611 611
612 612 def receive(self):
613 613 buf = [self.transport.readBytes(sniff_len)]
614 614 if not buf[0]:
615 615 raise WatchmanError('empty watchman response')
616 616
617 617 _1, _2, elen = bser.pdu_info(buf[0])
618 618
619 619 rlen = len(buf[0])
620 620 while elen > rlen:
621 621 buf.append(self.transport.readBytes(elen - rlen))
622 622 rlen += len(buf[-1])
623 623
624 624 response = b''.join(buf)
625 625 try:
626 626 res = self._loads(response)
627 627 return res
628 628 except ValueError as e:
629 629 raise WatchmanError('watchman response decode error: %s' % e)
630 630
631 631 def send(self, *args):
632 632 cmd = bser.dumps(*args) # Defaults to BSER v1
633 633 self.transport.write(cmd)
634 634
635 635
636 636 class ImmutableBserCodec(BserCodec):
637 637 """ use the BSER encoding, decoding values using the newer
638 638 immutable object support """
639 639
640 640 def _loads(self, response):
641 641 return bser.loads(response, False) # Defaults to BSER v1
642 642
643 643
644 644 class Bser2WithFallbackCodec(BserCodec):
645 645 """ use BSER v2 encoding """
646 646
647 647 def __init__(self, transport):
648 648 super(Bser2WithFallbackCodec, self).__init__(transport)
649 649 # Once the server advertises support for bser-v2 we should switch this
650 650 # to 'required' on Python 3.
651 651 self.send(["version", {"optional": ["bser-v2"]}])
652 652
653 653 capabilities = self.receive()
654 654
655 655 if 'error' in capabilities:
656 656 raise Exception('Unsupported BSER version')
657 657
658 658 if capabilities['capabilities']['bser-v2']:
659 659 self.bser_version = 2
660 660 self.bser_capabilities = 0
661 661 else:
662 662 self.bser_version = 1
663 663 self.bser_capabilities = 0
664 664
665 665 def _loads(self, response):
666 666 return bser.loads(response)
667 667
668 668 def receive(self):
669 669 buf = [self.transport.readBytes(sniff_len)]
670 670 if not buf[0]:
671 671 raise WatchmanError('empty watchman response')
672 672
673 673 recv_bser_version, recv_bser_capabilities, elen = bser.pdu_info(buf[0])
674 674
675 675 if hasattr(self, 'bser_version'):
676 676 # Readjust BSER version and capabilities if necessary
677 677 self.bser_version = max(self.bser_version, recv_bser_version)
678 678 self.capabilities = self.bser_capabilities & recv_bser_capabilities
679 679
680 680 rlen = len(buf[0])
681 681 while elen > rlen:
682 682 buf.append(self.transport.readBytes(elen - rlen))
683 683 rlen += len(buf[-1])
684 684
685 685 response = b''.join(buf)
686 686 try:
687 687 res = self._loads(response)
688 688 return res
689 689 except ValueError as e:
690 690 raise WatchmanError('watchman response decode error: %s' % e)
691 691
692 692 def send(self, *args):
693 693 if hasattr(self, 'bser_version'):
694 694 cmd = bser.dumps(*args, version=self.bser_version,
695 695 capabilities=self.bser_capabilities)
696 696 else:
697 697 cmd = bser.dumps(*args)
698 698 self.transport.write(cmd)
699 699
700 700
701 701 class JsonCodec(Codec):
702 702 """ Use json codec. This is here primarily for testing purposes """
703 703 json = None
704 704
705 705 def __init__(self, transport):
706 706 super(JsonCodec, self).__init__(transport)
707 707 # optional dep on json, only if JsonCodec is used
708 708 import json
709 709 self.json = json
710 710
711 711 def receive(self):
712 712 line = self.transport.readLine()
713 713 try:
714 714 # In Python 3, json.loads is a transformation from Unicode string to
715 715 # objects possibly containing Unicode strings. We typically expect
716 716 # the JSON blob to be ASCII-only with non-ASCII characters escaped,
717 717 # but it's possible we might get non-ASCII bytes that are valid
718 718 # UTF-8.
719 719 if compat.PYTHON3:
720 720 line = line.decode('utf-8')
721 721 return self.json.loads(line)
722 722 except Exception as e:
723 723 print(e, line)
724 724 raise
725 725
726 726 def send(self, *args):
727 727 cmd = self.json.dumps(*args)
728 728 # In Python 3, json.dumps is a transformation from objects possibly
729 729 # containing Unicode strings to Unicode string. Even with (the default)
730 730 # ensure_ascii=True, dumps returns a Unicode string.
731 731 if compat.PYTHON3:
732 732 cmd = cmd.encode('ascii')
733 733 self.transport.write(cmd + b"\n")
734 734
735 735
736 736 class client(object):
737 737 """ Handles the communication with the watchman service """
738 738 sockpath = None
739 739 transport = None
740 740 sendCodec = None
741 741 recvCodec = None
742 742 sendConn = None
743 743 recvConn = None
744 744 subs = {} # Keyed by subscription name
745 745 sub_by_root = {} # Keyed by root, then by subscription name
746 746 logs = [] # When log level is raised
747 747 unilateral = ['log', 'subscription']
748 748 tport = None
749 749 useImmutableBser = None
750 750
751 751 def __init__(self,
752 752 sockpath=None,
753 753 timeout=1.0,
754 754 transport=None,
755 755 sendEncoding=None,
756 756 recvEncoding=None,
757 757 useImmutableBser=False):
758 758 self.sockpath = sockpath
759 759 self.timeout = timeout
760 760 self.useImmutableBser = useImmutableBser
761 761
762 762 if inspect.isclass(transport) and issubclass(transport, Transport):
763 763 self.transport = transport
764 764 else:
765 765 transport = transport or os.getenv('WATCHMAN_TRANSPORT') or 'local'
766 766 if transport == 'local' and os.name == 'nt':
767 767 self.transport = WindowsNamedPipeTransport
768 768 elif transport == 'local':
769 769 self.transport = UnixSocketTransport
770 770 elif transport == 'cli':
771 771 self.transport = CLIProcessTransport
772 772 if sendEncoding is None:
773 773 sendEncoding = 'json'
774 774 if recvEncoding is None:
775 775 recvEncoding = sendEncoding
776 776 else:
777 777 raise WatchmanError('invalid transport %s' % transport)
778 778
779 779 sendEncoding = str(sendEncoding or os.getenv('WATCHMAN_ENCODING') or
780 780 'bser')
781 781 recvEncoding = str(recvEncoding or os.getenv('WATCHMAN_ENCODING') or
782 782 'bser')
783 783
784 784 self.recvCodec = self._parseEncoding(recvEncoding)
785 785 self.sendCodec = self._parseEncoding(sendEncoding)
786 786
787 787 def _parseEncoding(self, enc):
788 788 if enc == 'bser':
789 789 if self.useImmutableBser:
790 790 return ImmutableBserCodec
791 791 return BserCodec
792 792 elif enc == 'experimental-bser-v2':
793 793 return Bser2WithFallbackCodec
794 794 elif enc == 'json':
795 795 return JsonCodec
796 796 else:
797 797 raise WatchmanError('invalid encoding %s' % enc)
798 798
799 799 def _hasprop(self, result, name):
800 800 if self.useImmutableBser:
801 801 return hasattr(result, name)
802 802 return name in result
803 803
804 804 def _resolvesockname(self):
805 805 # if invoked via a trigger, watchman will set this env var; we
806 806 # should use it unless explicitly set otherwise
807 807 path = os.getenv('WATCHMAN_SOCK')
808 808 if path:
809 809 return path
810 810
811 811 cmd = ['watchman', '--output-encoding=bser', 'get-sockname']
812 812 try:
813 813 args = dict(stdout=subprocess.PIPE,
814 814 stderr=subprocess.PIPE,
815 815 close_fds=os.name != 'nt')
816 816
817 817 if os.name == 'nt':
818 818 # if invoked via an application with graphical user interface,
819 819 # this call will cause a brief command window pop-up.
820 820 # Using the flag STARTF_USESHOWWINDOW to avoid this behavior.
821 821 startupinfo = subprocess.STARTUPINFO()
822 822 startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
823 823 args['startupinfo'] = startupinfo
824 824
825 825 p = subprocess.Popen(cmd, **args)
826 826
827 827 except OSError as e:
828 raise WatchmanError('"watchman" executable not in PATH (%s)', e)
828 raise WatchmanError('"watchman" executable not in PATH (%s)' % e)
829 829
830 830 stdout, stderr = p.communicate()
831 831 exitcode = p.poll()
832 832
833 833 if exitcode:
834 834 raise WatchmanError("watchman exited with code %d" % exitcode)
835 835
836 836 result = bser.loads(stdout)
837 837 if b'error' in result:
838 838 raise WatchmanError('get-sockname error: %s' % result['error'])
839 839
840 840 return result[b'sockname']
841 841
842 842 def _connect(self):
843 843 """ establish transport connection """
844 844
845 845 if self.recvConn:
846 846 return
847 847
848 848 if self.sockpath is None:
849 849 self.sockpath = self._resolvesockname()
850 850
851 851 self.tport = self.transport(self.sockpath, self.timeout)
852 852 self.sendConn = self.sendCodec(self.tport)
853 853 self.recvConn = self.recvCodec(self.tport)
854 854
855 855 def __del__(self):
856 856 self.close()
857 857
858 858 def close(self):
859 859 if self.tport:
860 860 self.tport.close()
861 861 self.tport = None
862 862 self.recvConn = None
863 863 self.sendConn = None
864 864
865 865 def receive(self):
866 866 """ receive the next PDU from the watchman service
867 867
868 868 If the client has activated subscriptions or logs then
869 869 this PDU may be a unilateral PDU sent by the service to
870 870 inform the client of a log event or subscription change.
871 871
872 872 It may also simply be the response portion of a request
873 873 initiated by query.
874 874
875 875 There are clients in production that subscribe and call
876 876 this in a loop to retrieve all subscription responses,
877 877 so care should be taken when making changes here.
878 878 """
879 879
880 880 self._connect()
881 881 result = self.recvConn.receive()
882 882 if self._hasprop(result, 'error'):
883 883 error = result['error']
884 884 if compat.PYTHON3 and isinstance(self.recvConn, BserCodec):
885 885 error = result['error'].decode('utf-8', 'surrogateescape')
886 886 raise CommandError(error)
887 887
888 888 if self._hasprop(result, 'log'):
889 889 log = result['log']
890 890 if compat.PYTHON3 and isinstance(self.recvConn, BserCodec):
891 891 log = log.decode('utf-8', 'surrogateescape')
892 892 self.logs.append(log)
893 893
894 894 if self._hasprop(result, 'subscription'):
895 895 sub = result['subscription']
896 896 if not (sub in self.subs):
897 897 self.subs[sub] = []
898 898 self.subs[sub].append(result)
899 899
900 900 # also accumulate in {root,sub} keyed store
901 901 root = os.path.normcase(result['root'])
902 902 if not root in self.sub_by_root:
903 903 self.sub_by_root[root] = {}
904 904 if not sub in self.sub_by_root[root]:
905 905 self.sub_by_root[root][sub] = []
906 906 self.sub_by_root[root][sub].append(result)
907 907
908 908 return result
909 909
910 910 def isUnilateralResponse(self, res):
911 911 if 'unilateral' in res and res['unilateral']:
912 912 return True
913 913 # Fall back to checking for known unilateral responses
914 914 for k in self.unilateral:
915 915 if k in res:
916 916 return True
917 917 return False
918 918
919 919 def getLog(self, remove=True):
920 920 """ Retrieve buffered log data
921 921
922 922 If remove is true the data will be removed from the buffer.
923 923 Otherwise it will be left in the buffer
924 924 """
925 925 res = self.logs
926 926 if remove:
927 927 self.logs = []
928 928 return res
929 929
930 930 def getSubscription(self, name, remove=True, root=None):
931 931 """ Retrieve the data associated with a named subscription
932 932
933 933 If remove is True (the default), the subscription data is removed
934 934 from the buffer. Otherwise the data is returned but left in
935 935 the buffer.
936 936
937 937 Returns None if there is no data associated with `name`
938 938
939 939 If root is not None, then only return the subscription
940 940 data that matches both root and name. When used in this way,
941 941 remove processing impacts both the unscoped and scoped stores
942 942 for the subscription data.
943 943 """
944 944 if compat.PYTHON3 and issubclass(self.recvCodec, BserCodec):
945 945 # People may pass in Unicode strings here -- but currently BSER only
946 946 # returns bytestrings. Deal with that.
947 947 if isinstance(root, str):
948 948 root = encoding.encode_local(root)
949 949 if isinstance(name, str):
950 950 name = name.encode('utf-8')
951 951
952 952 if root is not None:
953 953 if not root in self.sub_by_root:
954 954 return None
955 955 if not name in self.sub_by_root[root]:
956 956 return None
957 957 sub = self.sub_by_root[root][name]
958 958 if remove:
959 959 del self.sub_by_root[root][name]
960 960 # don't let this grow unbounded
961 961 if name in self.subs:
962 962 del self.subs[name]
963 963 return sub
964 964
965 965 if not (name in self.subs):
966 966 return None
967 967 sub = self.subs[name]
968 968 if remove:
969 969 del self.subs[name]
970 970 return sub
971 971
972 972 def query(self, *args):
973 973 """ Send a query to the watchman service and return the response
974 974
975 975 This call will block until the response is returned.
976 976 If any unilateral responses are sent by the service in between
977 977 the request-response they will be buffered up in the client object
978 978 and NOT returned via this method.
979 979 """
980 980
981 981 log('calling client.query')
982 982 self._connect()
983 983 try:
984 984 self.sendConn.send(args)
985 985
986 986 res = self.receive()
987 987 while self.isUnilateralResponse(res):
988 988 res = self.receive()
989 989
990 990 return res
991 991 except EnvironmentError as ee:
992 992 # When we can depend on Python 3, we can use PEP 3134
993 993 # exception chaining here.
994 994 raise WatchmanEnvironmentError(
995 995 'I/O error communicating with watchman daemon',
996 996 ee.errno,
997 997 ee.strerror,
998 998 args)
999 999 except WatchmanError as ex:
1000 1000 ex.setCommand(args)
1001 1001 raise
1002 1002
1003 1003 def capabilityCheck(self, optional=None, required=None):
1004 1004 """ Perform a server capability check """
1005 1005 res = self.query('version', {
1006 1006 'optional': optional or [],
1007 1007 'required': required or []
1008 1008 })
1009 1009
1010 1010 if not self._hasprop(res, 'capabilities'):
1011 1011 # Server doesn't support capabilities, so we need to
1012 1012 # synthesize the results based on the version
1013 1013 capabilities.synthesize(res, optional)
1014 1014 if 'error' in res:
1015 1015 raise CommandError(res['error'])
1016 1016
1017 1017 return res
1018 1018
1019 1019 def setTimeout(self, value):
1020 1020 self.recvConn.setTimeout(value)
1021 1021 self.sendConn.setTimeout(value)
General Comments 0
You need to be logged in to leave comments. Login now