##// END OF EJS Templates
py3: remove retry on EINTR errno...
Manuel Jacob -
r50197:ee4537e3 default
parent child Browse files
Show More
@@ -1,756 +1,744 b''
1 1 # commandserver.py - communicate with Mercurial's API over a pipe
2 2 #
3 3 # Copyright Olivia Mackall <olivia@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8
9 9 import errno
10 10 import gc
11 11 import os
12 12 import random
13 13 import selectors
14 14 import signal
15 15 import socket
16 16 import struct
17 17 import traceback
18 18
19 19 from .i18n import _
20 20 from .pycompat import getattr
21 21 from . import (
22 22 encoding,
23 23 error,
24 24 loggingutil,
25 25 pycompat,
26 26 repocache,
27 27 util,
28 28 vfs as vfsmod,
29 29 )
30 30 from .utils import (
31 31 cborutil,
32 32 procutil,
33 33 )
34 34
35 35
36 36 class channeledoutput:
37 37 """
38 38 Write data to out in the following format:
39 39
40 40 data length (unsigned int),
41 41 data
42 42 """
43 43
44 44 def __init__(self, out, channel):
45 45 self.out = out
46 46 self.channel = channel
47 47
48 48 @property
49 49 def name(self):
50 50 return b'<%c-channel>' % self.channel
51 51
52 52 def write(self, data):
53 53 if not data:
54 54 return
55 55 # single write() to guarantee the same atomicity as the underlying file
56 56 self.out.write(struct.pack(b'>cI', self.channel, len(data)) + data)
57 57 self.out.flush()
58 58
59 59 def __getattr__(self, attr):
60 60 if attr in ('isatty', 'fileno', 'tell', 'seek'):
61 61 raise AttributeError(attr)
62 62 return getattr(self.out, attr)
63 63
64 64
65 65 class channeledmessage:
66 66 """
67 67 Write encoded message and metadata to out in the following format:
68 68
69 69 data length (unsigned int),
70 70 encoded message and metadata, as a flat key-value dict.
71 71
72 72 Each message should have 'type' attribute. Messages of unknown type
73 73 should be ignored.
74 74 """
75 75
76 76 # teach ui that write() can take **opts
77 77 structured = True
78 78
79 79 def __init__(self, out, channel, encodename, encodefn):
80 80 self._cout = channeledoutput(out, channel)
81 81 self.encoding = encodename
82 82 self._encodefn = encodefn
83 83
84 84 def write(self, data, **opts):
85 85 opts = pycompat.byteskwargs(opts)
86 86 if data is not None:
87 87 opts[b'data'] = data
88 88 self._cout.write(self._encodefn(opts))
89 89
90 90 def __getattr__(self, attr):
91 91 return getattr(self._cout, attr)
92 92
93 93
94 94 class channeledinput:
95 95 """
96 96 Read data from in_.
97 97
98 98 Requests for input are written to out in the following format:
99 99 channel identifier - 'I' for plain input, 'L' line based (1 byte)
100 100 how many bytes to send at most (unsigned int),
101 101
102 102 The client replies with:
103 103 data length (unsigned int), 0 meaning EOF
104 104 data
105 105 """
106 106
107 107 maxchunksize = 4 * 1024
108 108
109 109 def __init__(self, in_, out, channel):
110 110 self.in_ = in_
111 111 self.out = out
112 112 self.channel = channel
113 113
114 114 @property
115 115 def name(self):
116 116 return b'<%c-channel>' % self.channel
117 117
118 118 def read(self, size=-1):
119 119 if size < 0:
120 120 # if we need to consume all the clients input, ask for 4k chunks
121 121 # so the pipe doesn't fill up risking a deadlock
122 122 size = self.maxchunksize
123 123 s = self._read(size, self.channel)
124 124 buf = s
125 125 while s:
126 126 s = self._read(size, self.channel)
127 127 buf += s
128 128
129 129 return buf
130 130 else:
131 131 return self._read(size, self.channel)
132 132
133 133 def _read(self, size, channel):
134 134 if not size:
135 135 return b''
136 136 assert size > 0
137 137
138 138 # tell the client we need at most size bytes
139 139 self.out.write(struct.pack(b'>cI', channel, size))
140 140 self.out.flush()
141 141
142 142 length = self.in_.read(4)
143 143 length = struct.unpack(b'>I', length)[0]
144 144 if not length:
145 145 return b''
146 146 else:
147 147 return self.in_.read(length)
148 148
149 149 def readline(self, size=-1):
150 150 if size < 0:
151 151 size = self.maxchunksize
152 152 s = self._read(size, b'L')
153 153 buf = s
154 154 # keep asking for more until there's either no more or
155 155 # we got a full line
156 156 while s and not s.endswith(b'\n'):
157 157 s = self._read(size, b'L')
158 158 buf += s
159 159
160 160 return buf
161 161 else:
162 162 return self._read(size, b'L')
163 163
164 164 def __iter__(self):
165 165 return self
166 166
167 167 def next(self):
168 168 l = self.readline()
169 169 if not l:
170 170 raise StopIteration
171 171 return l
172 172
173 173 __next__ = next
174 174
175 175 def __getattr__(self, attr):
176 176 if attr in ('isatty', 'fileno', 'tell', 'seek'):
177 177 raise AttributeError(attr)
178 178 return getattr(self.in_, attr)
179 179
180 180
181 181 _messageencoders = {
182 182 b'cbor': lambda v: b''.join(cborutil.streamencode(v)),
183 183 }
184 184
185 185
186 186 def _selectmessageencoder(ui):
187 187 encnames = ui.configlist(b'cmdserver', b'message-encodings')
188 188 for n in encnames:
189 189 f = _messageencoders.get(n)
190 190 if f:
191 191 return n, f
192 192 raise error.Abort(
193 193 b'no supported message encodings: %s' % b' '.join(encnames)
194 194 )
195 195
196 196
197 197 class server:
198 198 """
199 199 Listens for commands on fin, runs them and writes the output on a channel
200 200 based stream to fout.
201 201 """
202 202
203 203 def __init__(self, ui, repo, fin, fout, prereposetups=None):
204 204 self.cwd = encoding.getcwd()
205 205
206 206 if repo:
207 207 # the ui here is really the repo ui so take its baseui so we don't
208 208 # end up with its local configuration
209 209 self.ui = repo.baseui
210 210 self.repo = repo
211 211 self.repoui = repo.ui
212 212 else:
213 213 self.ui = ui
214 214 self.repo = self.repoui = None
215 215 self._prereposetups = prereposetups
216 216
217 217 self.cdebug = channeledoutput(fout, b'd')
218 218 self.cerr = channeledoutput(fout, b'e')
219 219 self.cout = channeledoutput(fout, b'o')
220 220 self.cin = channeledinput(fin, fout, b'I')
221 221 self.cresult = channeledoutput(fout, b'r')
222 222
223 223 if self.ui.config(b'cmdserver', b'log') == b'-':
224 224 # switch log stream of server's ui to the 'd' (debug) channel
225 225 # (don't touch repo.ui as its lifetime is longer than the server)
226 226 self.ui = self.ui.copy()
227 227 setuplogging(self.ui, repo=None, fp=self.cdebug)
228 228
229 229 self.cmsg = None
230 230 if ui.config(b'ui', b'message-output') == b'channel':
231 231 encname, encfn = _selectmessageencoder(ui)
232 232 self.cmsg = channeledmessage(fout, b'm', encname, encfn)
233 233
234 234 self.client = fin
235 235
236 236 # If shutdown-on-interrupt is off, the default SIGINT handler is
237 237 # removed so that client-server communication wouldn't be interrupted.
238 238 # For example, 'runcommand' handler will issue three short read()s.
239 239 # If one of the first two read()s were interrupted, the communication
240 240 # channel would be left at dirty state and the subsequent request
241 241 # wouldn't be parsed. So catching KeyboardInterrupt isn't enough.
242 242 self._shutdown_on_interrupt = ui.configbool(
243 243 b'cmdserver', b'shutdown-on-interrupt'
244 244 )
245 245 self._old_inthandler = None
246 246 if not self._shutdown_on_interrupt:
247 247 self._old_inthandler = signal.signal(signal.SIGINT, signal.SIG_IGN)
248 248
249 249 def cleanup(self):
250 250 """release and restore resources taken during server session"""
251 251 if not self._shutdown_on_interrupt:
252 252 signal.signal(signal.SIGINT, self._old_inthandler)
253 253
254 254 def _read(self, size):
255 255 if not size:
256 256 return b''
257 257
258 258 data = self.client.read(size)
259 259
260 260 # is the other end closed?
261 261 if not data:
262 262 raise EOFError
263 263
264 264 return data
265 265
266 266 def _readstr(self):
267 267 """read a string from the channel
268 268
269 269 format:
270 270 data length (uint32), data
271 271 """
272 272 length = struct.unpack(b'>I', self._read(4))[0]
273 273 if not length:
274 274 return b''
275 275 return self._read(length)
276 276
277 277 def _readlist(self):
278 278 """read a list of NULL separated strings from the channel"""
279 279 s = self._readstr()
280 280 if s:
281 281 return s.split(b'\0')
282 282 else:
283 283 return []
284 284
285 285 def _dispatchcommand(self, req):
286 286 from . import dispatch # avoid cycle
287 287
288 288 if self._shutdown_on_interrupt:
289 289 # no need to restore SIGINT handler as it is unmodified.
290 290 return dispatch.dispatch(req)
291 291
292 292 try:
293 293 signal.signal(signal.SIGINT, self._old_inthandler)
294 294 return dispatch.dispatch(req)
295 295 except error.SignalInterrupt:
296 296 # propagate SIGBREAK, SIGHUP, or SIGTERM.
297 297 raise
298 298 except KeyboardInterrupt:
299 299 # SIGINT may be received out of the try-except block of dispatch(),
300 300 # so catch it as last ditch. Another KeyboardInterrupt may be
301 301 # raised while handling exceptions here, but there's no way to
302 302 # avoid that except for doing everything in C.
303 303 pass
304 304 finally:
305 305 signal.signal(signal.SIGINT, signal.SIG_IGN)
306 306 # On KeyboardInterrupt, print error message and exit *after* SIGINT
307 307 # handler removed.
308 308 req.ui.error(_(b'interrupted!\n'))
309 309 return -1
310 310
311 311 def runcommand(self):
312 312 """reads a list of \0 terminated arguments, executes
313 313 and writes the return code to the result channel"""
314 314 from . import dispatch # avoid cycle
315 315
316 316 args = self._readlist()
317 317
318 318 # copy the uis so changes (e.g. --config or --verbose) don't
319 319 # persist between requests
320 320 copiedui = self.ui.copy()
321 321 uis = [copiedui]
322 322 if self.repo:
323 323 self.repo.baseui = copiedui
324 324 # clone ui without using ui.copy because this is protected
325 325 repoui = self.repoui.__class__(self.repoui)
326 326 repoui.copy = copiedui.copy # redo copy protection
327 327 uis.append(repoui)
328 328 self.repo.ui = self.repo.dirstate._ui = repoui
329 329 self.repo.invalidateall()
330 330
331 331 for ui in uis:
332 332 ui.resetstate()
333 333 # any kind of interaction must use server channels, but chg may
334 334 # replace channels by fully functional tty files. so nontty is
335 335 # enforced only if cin is a channel.
336 336 if not util.safehasattr(self.cin, b'fileno'):
337 337 ui.setconfig(b'ui', b'nontty', b'true', b'commandserver')
338 338
339 339 req = dispatch.request(
340 340 args[:],
341 341 copiedui,
342 342 self.repo,
343 343 self.cin,
344 344 self.cout,
345 345 self.cerr,
346 346 self.cmsg,
347 347 prereposetups=self._prereposetups,
348 348 )
349 349
350 350 try:
351 351 ret = self._dispatchcommand(req) & 255
352 352 # If shutdown-on-interrupt is off, it's important to write the
353 353 # result code *after* SIGINT handler removed. If the result code
354 354 # were lost, the client wouldn't be able to continue processing.
355 355 self.cresult.write(struct.pack(b'>i', int(ret)))
356 356 finally:
357 357 # restore old cwd
358 358 if b'--cwd' in args:
359 359 os.chdir(self.cwd)
360 360
361 361 def getencoding(self):
362 362 """writes the current encoding to the result channel"""
363 363 self.cresult.write(encoding.encoding)
364 364
365 365 def serveone(self):
366 366 cmd = self.client.readline()[:-1]
367 367 if cmd:
368 368 handler = self.capabilities.get(cmd)
369 369 if handler:
370 370 handler(self)
371 371 else:
372 372 # clients are expected to check what commands are supported by
373 373 # looking at the servers capabilities
374 374 raise error.Abort(_(b'unknown command %s') % cmd)
375 375
376 376 return cmd != b''
377 377
378 378 capabilities = {b'runcommand': runcommand, b'getencoding': getencoding}
379 379
380 380 def serve(self):
381 381 hellomsg = b'capabilities: ' + b' '.join(sorted(self.capabilities))
382 382 hellomsg += b'\n'
383 383 hellomsg += b'encoding: ' + encoding.encoding
384 384 hellomsg += b'\n'
385 385 if self.cmsg:
386 386 hellomsg += b'message-encoding: %s\n' % self.cmsg.encoding
387 387 hellomsg += b'pid: %d' % procutil.getpid()
388 388 if util.safehasattr(os, b'getpgid'):
389 389 hellomsg += b'\n'
390 390 hellomsg += b'pgid: %d' % os.getpgid(0)
391 391
392 392 # write the hello msg in -one- chunk
393 393 self.cout.write(hellomsg)
394 394
395 395 try:
396 396 while self.serveone():
397 397 pass
398 398 except EOFError:
399 399 # we'll get here if the client disconnected while we were reading
400 400 # its request
401 401 return 1
402 402
403 403 return 0
404 404
405 405
406 406 def setuplogging(ui, repo=None, fp=None):
407 407 """Set up server logging facility
408 408
409 409 If cmdserver.log is '-', log messages will be sent to the given fp.
410 410 It should be the 'd' channel while a client is connected, and otherwise
411 411 is the stderr of the server process.
412 412 """
413 413 # developer config: cmdserver.log
414 414 logpath = ui.config(b'cmdserver', b'log')
415 415 if not logpath:
416 416 return
417 417 # developer config: cmdserver.track-log
418 418 tracked = set(ui.configlist(b'cmdserver', b'track-log'))
419 419
420 420 if logpath == b'-' and fp:
421 421 logger = loggingutil.fileobjectlogger(fp, tracked)
422 422 elif logpath == b'-':
423 423 logger = loggingutil.fileobjectlogger(ui.ferr, tracked)
424 424 else:
425 425 logpath = util.abspath(util.expandpath(logpath))
426 426 # developer config: cmdserver.max-log-files
427 427 maxfiles = ui.configint(b'cmdserver', b'max-log-files')
428 428 # developer config: cmdserver.max-log-size
429 429 maxsize = ui.configbytes(b'cmdserver', b'max-log-size')
430 430 vfs = vfsmod.vfs(os.path.dirname(logpath))
431 431 logger = loggingutil.filelogger(
432 432 vfs,
433 433 os.path.basename(logpath),
434 434 tracked,
435 435 maxfiles=maxfiles,
436 436 maxsize=maxsize,
437 437 )
438 438
439 439 targetuis = {ui}
440 440 if repo:
441 441 targetuis.add(repo.baseui)
442 442 targetuis.add(repo.ui)
443 443 for u in targetuis:
444 444 u.setlogger(b'cmdserver', logger)
445 445
446 446
447 447 class pipeservice:
448 448 def __init__(self, ui, repo, opts):
449 449 self.ui = ui
450 450 self.repo = repo
451 451
452 452 def init(self):
453 453 pass
454 454
455 455 def run(self):
456 456 ui = self.ui
457 457 # redirect stdio to null device so that broken extensions or in-process
458 458 # hooks will never cause corruption of channel protocol.
459 459 with ui.protectedfinout() as (fin, fout):
460 460 sv = server(ui, self.repo, fin, fout)
461 461 try:
462 462 return sv.serve()
463 463 finally:
464 464 sv.cleanup()
465 465
466 466
467 467 def _initworkerprocess():
468 468 # use a different process group from the master process, in order to:
469 469 # 1. make the current process group no longer "orphaned" (because the
470 470 # parent of this process is in a different process group while
471 471 # remains in a same session)
472 472 # according to POSIX 2.2.2.52, orphaned process group will ignore
473 473 # terminal-generated stop signals like SIGTSTP (Ctrl+Z), which will
474 474 # cause trouble for things like ncurses.
475 475 # 2. the client can use kill(-pgid, sig) to simulate terminal-generated
476 476 # SIGINT (Ctrl+C) and process-exit-generated SIGHUP. our child
477 477 # processes like ssh will be killed properly, without affecting
478 478 # unrelated processes.
479 479 os.setpgid(0, 0)
480 480 # change random state otherwise forked request handlers would have a
481 481 # same state inherited from parent.
482 482 random.seed()
483 483
484 484
485 485 def _serverequest(ui, repo, conn, createcmdserver, prereposetups):
486 486 fin = conn.makefile('rb')
487 487 fout = conn.makefile('wb')
488 488 sv = None
489 489 try:
490 490 sv = createcmdserver(repo, conn, fin, fout, prereposetups)
491 491 try:
492 492 sv.serve()
493 493 # handle exceptions that may be raised by command server. most of
494 494 # known exceptions are caught by dispatch.
495 495 except error.Abort as inst:
496 496 ui.error(_(b'abort: %s\n') % inst.message)
497 497 except IOError as inst:
498 498 if inst.errno != errno.EPIPE:
499 499 raise
500 500 except KeyboardInterrupt:
501 501 pass
502 502 finally:
503 503 sv.cleanup()
504 504 except: # re-raises
505 505 # also write traceback to error channel. otherwise client cannot
506 506 # see it because it is written to server's stderr by default.
507 507 if sv:
508 508 cerr = sv.cerr
509 509 else:
510 510 cerr = channeledoutput(fout, b'e')
511 511 cerr.write(encoding.strtolocal(traceback.format_exc()))
512 512 raise
513 513 finally:
514 514 fin.close()
515 515 try:
516 516 fout.close() # implicit flush() may cause another EPIPE
517 517 except IOError as inst:
518 518 if inst.errno != errno.EPIPE:
519 519 raise
520 520
521 521
522 522 class unixservicehandler:
523 523 """Set of pluggable operations for unix-mode services
524 524
525 525 Almost all methods except for createcmdserver() are called in the main
526 526 process. You can't pass mutable resource back from createcmdserver().
527 527 """
528 528
529 529 pollinterval = None
530 530
531 531 def __init__(self, ui):
532 532 self.ui = ui
533 533
534 534 def bindsocket(self, sock, address):
535 535 util.bindunixsocket(sock, address)
536 536 sock.listen(socket.SOMAXCONN)
537 537 self.ui.status(_(b'listening at %s\n') % address)
538 538 self.ui.flush() # avoid buffering of status message
539 539
540 540 def unlinksocket(self, address):
541 541 os.unlink(address)
542 542
543 543 def shouldexit(self):
544 544 """True if server should shut down; checked per pollinterval"""
545 545 return False
546 546
547 547 def newconnection(self):
548 548 """Called when main process notices new connection"""
549 549
550 550 def createcmdserver(self, repo, conn, fin, fout, prereposetups):
551 551 """Create new command server instance; called in the process that
552 552 serves for the current connection"""
553 553 return server(self.ui, repo, fin, fout, prereposetups)
554 554
555 555
556 556 class unixforkingservice:
557 557 """
558 558 Listens on unix domain socket and forks server per connection
559 559 """
560 560
561 561 def __init__(self, ui, repo, opts, handler=None):
562 562 self.ui = ui
563 563 self.repo = repo
564 564 self.address = opts[b'address']
565 565 if not util.safehasattr(socket, b'AF_UNIX'):
566 566 raise error.Abort(_(b'unsupported platform'))
567 567 if not self.address:
568 568 raise error.Abort(_(b'no socket path specified with --address'))
569 569 self._servicehandler = handler or unixservicehandler(ui)
570 570 self._sock = None
571 571 self._mainipc = None
572 572 self._workeripc = None
573 573 self._oldsigchldhandler = None
574 574 self._workerpids = set() # updated by signal handler; do not iterate
575 575 self._socketunlinked = None
576 576 # experimental config: cmdserver.max-repo-cache
577 577 maxlen = ui.configint(b'cmdserver', b'max-repo-cache')
578 578 if maxlen < 0:
579 579 raise error.Abort(_(b'negative max-repo-cache size not allowed'))
580 580 self._repoloader = repocache.repoloader(ui, maxlen)
581 581 # attempt to avoid crash in CoreFoundation when using chg after fix in
582 582 # a89381e04c58
583 583 if pycompat.isdarwin:
584 584 procutil.gui()
585 585
586 586 def init(self):
587 587 self._sock = socket.socket(socket.AF_UNIX)
588 588 # IPC channel from many workers to one main process; this is actually
589 589 # a uni-directional pipe, but is backed by a DGRAM socket so each
590 590 # message can be easily separated.
591 591 o = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM)
592 592 self._mainipc, self._workeripc = o
593 593 self._servicehandler.bindsocket(self._sock, self.address)
594 594 if util.safehasattr(procutil, b'unblocksignal'):
595 595 procutil.unblocksignal(signal.SIGCHLD)
596 596 o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
597 597 self._oldsigchldhandler = o
598 598 self._socketunlinked = False
599 599 self._repoloader.start()
600 600
601 601 def _unlinksocket(self):
602 602 if not self._socketunlinked:
603 603 self._servicehandler.unlinksocket(self.address)
604 604 self._socketunlinked = True
605 605
606 606 def _cleanup(self):
607 607 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
608 608 self._sock.close()
609 609 self._mainipc.close()
610 610 self._workeripc.close()
611 611 self._unlinksocket()
612 612 self._repoloader.stop()
613 613 # don't kill child processes as they have active clients, just wait
614 614 self._reapworkers(0)
615 615
616 616 def run(self):
617 617 try:
618 618 self._mainloop()
619 619 finally:
620 620 self._cleanup()
621 621
622 622 def _mainloop(self):
623 623 exiting = False
624 624 h = self._servicehandler
625 625 selector = selectors.DefaultSelector()
626 626 selector.register(
627 627 self._sock, selectors.EVENT_READ, self._acceptnewconnection
628 628 )
629 629 selector.register(
630 630 self._mainipc, selectors.EVENT_READ, self._handlemainipc
631 631 )
632 632 while True:
633 633 if not exiting and h.shouldexit():
634 634 # clients can no longer connect() to the domain socket, so
635 635 # we stop queuing new requests.
636 636 # for requests that are queued (connect()-ed, but haven't been
637 637 # accept()-ed), handle them before exit. otherwise, clients
638 638 # waiting for recv() will receive ECONNRESET.
639 639 self._unlinksocket()
640 640 exiting = True
641 641 events = selector.select(timeout=h.pollinterval)
642 642 if not events:
643 643 # only exit if we completed all queued requests
644 644 if exiting:
645 645 break
646 646 continue
647 647 for key, _mask in events:
648 648 key.data(key.fileobj, selector)
649 649 selector.close()
650 650
651 651 def _acceptnewconnection(self, sock, selector):
652 652 h = self._servicehandler
653 try:
654 conn, _addr = sock.accept()
655 except socket.error as inst:
656 if inst.args[0] == errno.EINTR:
657 return
658 raise
653 conn, _addr = sock.accept()
659 654
660 655 # Future improvement: On Python 3.7, maybe gc.freeze() can be used
661 656 # to prevent COW memory from being touched by GC.
662 657 # https://instagram-engineering.com/
663 658 # copy-on-write-friendly-python-garbage-collection-ad6ed5233ddf
664 659 pid = os.fork()
665 660 if pid:
666 661 try:
667 662 self.ui.log(
668 663 b'cmdserver', b'forked worker process (pid=%d)\n', pid
669 664 )
670 665 self._workerpids.add(pid)
671 666 h.newconnection()
672 667 finally:
673 668 conn.close() # release handle in parent process
674 669 else:
675 670 try:
676 671 selector.close()
677 672 sock.close()
678 673 self._mainipc.close()
679 674 self._runworker(conn)
680 675 conn.close()
681 676 self._workeripc.close()
682 677 os._exit(0)
683 678 except: # never return, hence no re-raises
684 679 try:
685 680 self.ui.traceback(force=True)
686 681 finally:
687 682 os._exit(255)
688 683
689 684 def _handlemainipc(self, sock, selector):
690 685 """Process messages sent from a worker"""
691 try:
692 path = sock.recv(32768) # large enough to receive path
693 except socket.error as inst:
694 if inst.args[0] == errno.EINTR:
695 return
696 raise
686 path = sock.recv(32768) # large enough to receive path
697 687 self._repoloader.load(path)
698 688
699 689 def _sigchldhandler(self, signal, frame):
700 690 self._reapworkers(os.WNOHANG)
701 691
702 692 def _reapworkers(self, options):
703 693 while self._workerpids:
704 694 try:
705 695 pid, _status = os.waitpid(-1, options)
706 696 except OSError as inst:
707 if inst.errno == errno.EINTR:
708 continue
709 697 if inst.errno != errno.ECHILD:
710 698 raise
711 699 # no child processes at all (reaped by other waitpid()?)
712 700 self._workerpids.clear()
713 701 return
714 702 if pid == 0:
715 703 # no waitable child processes
716 704 return
717 705 self.ui.log(b'cmdserver', b'worker process exited (pid=%d)\n', pid)
718 706 self._workerpids.discard(pid)
719 707
720 708 def _runworker(self, conn):
721 709 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
722 710 _initworkerprocess()
723 711 h = self._servicehandler
724 712 try:
725 713 _serverequest(
726 714 self.ui,
727 715 self.repo,
728 716 conn,
729 717 h.createcmdserver,
730 718 prereposetups=[self._reposetup],
731 719 )
732 720 finally:
733 721 gc.collect() # trigger __del__ since worker process uses os._exit
734 722
735 723 def _reposetup(self, ui, repo):
736 724 if not repo.local():
737 725 return
738 726
739 727 class unixcmdserverrepo(repo.__class__):
740 728 def close(self):
741 729 super(unixcmdserverrepo, self).close()
742 730 try:
743 731 self._cmdserveripc.send(self.root)
744 732 except socket.error:
745 733 self.ui.log(
746 734 b'cmdserver', b'failed to send repo root to master\n'
747 735 )
748 736
749 737 repo.__class__ = unixcmdserverrepo
750 738 repo._cmdserveripc = self._workeripc
751 739
752 740 cachedrepo = self._repoloader.get(repo.root)
753 741 if cachedrepo is None:
754 742 return
755 743 repo.ui.log(b'repocache', b'repo from cache: %s\n', repo.root)
756 744 repocache.copycache(cachedrepo, repo)
@@ -1,774 +1,767 b''
1 1 # posix.py - Posix utility function implementations for Mercurial
2 2 #
3 3 # Copyright 2005-2009 Olivia Mackall <olivia@selenic.com> and others
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8
9 9 import errno
10 10 import fcntl
11 11 import getpass
12 12 import grp
13 13 import os
14 14 import pwd
15 15 import re
16 16 import select
17 17 import stat
18 18 import sys
19 19 import tempfile
20 20 import unicodedata
21 21
22 22 from .i18n import _
23 23 from .pycompat import (
24 24 getattr,
25 25 open,
26 26 )
27 27 from . import (
28 28 encoding,
29 29 error,
30 30 policy,
31 31 pycompat,
32 32 )
33 33
34 34 osutil = policy.importmod('osutil')
35 35
36 36 normpath = os.path.normpath
37 37 samestat = os.path.samestat
38 38 abspath = os.path.abspath # re-exports
39 39
40 40 try:
41 41 oslink = os.link
42 42 except AttributeError:
43 43 # Some platforms build Python without os.link on systems that are
44 44 # vaguely unix-like but don't have hardlink support. For those
45 45 # poor souls, just say we tried and that it failed so we fall back
46 46 # to copies.
47 47 def oslink(src, dst):
48 48 raise OSError(
49 49 errno.EINVAL, b'hardlinks not supported: %s to %s' % (src, dst)
50 50 )
51 51
52 52
53 53 readlink = os.readlink
54 54 unlink = os.unlink
55 55 rename = os.rename
56 56 removedirs = os.removedirs
57 57 expandglobs = False
58 58
59 59 umask = os.umask(0)
60 60 os.umask(umask)
61 61
62 62 posixfile = open
63 63
64 64
65 65 def split(p):
66 66 """Same as posixpath.split, but faster
67 67
68 68 >>> import posixpath
69 69 >>> for f in [b'/absolute/path/to/file',
70 70 ... b'relative/path/to/file',
71 71 ... b'file_alone',
72 72 ... b'path/to/directory/',
73 73 ... b'/multiple/path//separators',
74 74 ... b'/file_at_root',
75 75 ... b'///multiple_leading_separators_at_root',
76 76 ... b'']:
77 77 ... assert split(f) == posixpath.split(f), f
78 78 """
79 79 ht = p.rsplit(b'/', 1)
80 80 if len(ht) == 1:
81 81 return b'', p
82 82 nh = ht[0].rstrip(b'/')
83 83 if nh:
84 84 return nh, ht[1]
85 85 return ht[0] + b'/', ht[1]
86 86
87 87
88 88 def openhardlinks():
89 89 '''return true if it is safe to hold open file handles to hardlinks'''
90 90 return True
91 91
92 92
93 93 def nlinks(name):
94 94 '''return number of hardlinks for the given file'''
95 95 return os.lstat(name).st_nlink
96 96
97 97
98 98 def parsepatchoutput(output_line):
99 99 """parses the output produced by patch and returns the filename"""
100 100 pf = output_line[14:]
101 101 if pycompat.sysplatform == b'OpenVMS':
102 102 if pf[0] == b'`':
103 103 pf = pf[1:-1] # Remove the quotes
104 104 else:
105 105 if pf.startswith(b"'") and pf.endswith(b"'") and b" " in pf:
106 106 pf = pf[1:-1] # Remove the quotes
107 107 return pf
108 108
109 109
110 110 def sshargs(sshcmd, host, user, port):
111 111 '''Build argument list for ssh'''
112 112 args = user and (b"%s@%s" % (user, host)) or host
113 113 if b'-' in args[:1]:
114 114 raise error.Abort(
115 115 _(b'illegal ssh hostname or username starting with -: %s') % args
116 116 )
117 117 args = shellquote(args)
118 118 if port:
119 119 args = b'-p %s %s' % (shellquote(port), args)
120 120 return args
121 121
122 122
123 123 def isexec(f):
124 124 """check whether a file is executable"""
125 125 return os.lstat(f).st_mode & 0o100 != 0
126 126
127 127
128 128 def setflags(f, l, x):
129 129 st = os.lstat(f)
130 130 s = st.st_mode
131 131 if l:
132 132 if not stat.S_ISLNK(s):
133 133 # switch file to link
134 134 with open(f, b'rb') as fp:
135 135 data = fp.read()
136 136 unlink(f)
137 137 try:
138 138 os.symlink(data, f)
139 139 except OSError:
140 140 # failed to make a link, rewrite file
141 141 with open(f, b"wb") as fp:
142 142 fp.write(data)
143 143
144 144 # no chmod needed at this point
145 145 return
146 146 if stat.S_ISLNK(s):
147 147 # switch link to file
148 148 data = os.readlink(f)
149 149 unlink(f)
150 150 with open(f, b"wb") as fp:
151 151 fp.write(data)
152 152 s = 0o666 & ~umask # avoid restatting for chmod
153 153
154 154 sx = s & 0o100
155 155 if st.st_nlink > 1 and bool(x) != bool(sx):
156 156 # the file is a hardlink, break it
157 157 with open(f, b"rb") as fp:
158 158 data = fp.read()
159 159 unlink(f)
160 160 with open(f, b"wb") as fp:
161 161 fp.write(data)
162 162
163 163 if x and not sx:
164 164 # Turn on +x for every +r bit when making a file executable
165 165 # and obey umask.
166 166 os.chmod(f, s | (s & 0o444) >> 2 & ~umask)
167 167 elif not x and sx:
168 168 # Turn off all +x bits
169 169 os.chmod(f, s & 0o666)
170 170
171 171
172 172 def copymode(src, dst, mode=None, enforcewritable=False):
173 173 """Copy the file mode from the file at path src to dst.
174 174 If src doesn't exist, we're using mode instead. If mode is None, we're
175 175 using umask."""
176 176 try:
177 177 st_mode = os.lstat(src).st_mode & 0o777
178 178 except OSError as inst:
179 179 if inst.errno != errno.ENOENT:
180 180 raise
181 181 st_mode = mode
182 182 if st_mode is None:
183 183 st_mode = ~umask
184 184 st_mode &= 0o666
185 185
186 186 new_mode = st_mode
187 187
188 188 if enforcewritable:
189 189 new_mode |= stat.S_IWUSR
190 190
191 191 os.chmod(dst, new_mode)
192 192
193 193
194 194 def checkexec(path):
195 195 """
196 196 Check whether the given path is on a filesystem with UNIX-like exec flags
197 197
198 198 Requires a directory (like /foo/.hg)
199 199 """
200 200
201 201 # VFAT on some Linux versions can flip mode but it doesn't persist
202 202 # a FS remount. Frequently we can detect it if files are created
203 203 # with exec bit on.
204 204
205 205 try:
206 206 EXECFLAGS = stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
207 207 basedir = os.path.join(path, b'.hg')
208 208 cachedir = os.path.join(basedir, b'wcache')
209 209 storedir = os.path.join(basedir, b'store')
210 210 if not os.path.exists(cachedir):
211 211 try:
212 212 # we want to create the 'cache' directory, not the '.hg' one.
213 213 # Automatically creating '.hg' directory could silently spawn
214 214 # invalid Mercurial repositories. That seems like a bad idea.
215 215 os.mkdir(cachedir)
216 216 if os.path.exists(storedir):
217 217 copymode(storedir, cachedir)
218 218 else:
219 219 copymode(basedir, cachedir)
220 220 except (IOError, OSError):
221 221 # we other fallback logic triggers
222 222 pass
223 223 if os.path.isdir(cachedir):
224 224 checkisexec = os.path.join(cachedir, b'checkisexec')
225 225 checknoexec = os.path.join(cachedir, b'checknoexec')
226 226
227 227 try:
228 228 m = os.stat(checkisexec).st_mode
229 229 except OSError as e:
230 230 if e.errno != errno.ENOENT:
231 231 raise
232 232 # checkisexec does not exist - fall through ...
233 233 else:
234 234 # checkisexec exists, check if it actually is exec
235 235 if m & EXECFLAGS != 0:
236 236 # ensure checkisexec exists, check it isn't exec
237 237 try:
238 238 m = os.stat(checknoexec).st_mode
239 239 except OSError as e:
240 240 if e.errno != errno.ENOENT:
241 241 raise
242 242 open(checknoexec, b'w').close() # might fail
243 243 m = os.stat(checknoexec).st_mode
244 244 if m & EXECFLAGS == 0:
245 245 # check-exec is exec and check-no-exec is not exec
246 246 return True
247 247 # checknoexec exists but is exec - delete it
248 248 unlink(checknoexec)
249 249 # checkisexec exists but is not exec - delete it
250 250 unlink(checkisexec)
251 251
252 252 # check using one file, leave it as checkisexec
253 253 checkdir = cachedir
254 254 else:
255 255 # check directly in path and don't leave checkisexec behind
256 256 checkdir = path
257 257 checkisexec = None
258 258 fh, fn = pycompat.mkstemp(dir=checkdir, prefix=b'hg-checkexec-')
259 259 try:
260 260 os.close(fh)
261 261 m = os.stat(fn).st_mode
262 262 if m & EXECFLAGS == 0:
263 263 os.chmod(fn, m & 0o777 | EXECFLAGS)
264 264 if os.stat(fn).st_mode & EXECFLAGS != 0:
265 265 if checkisexec is not None:
266 266 os.rename(fn, checkisexec)
267 267 fn = None
268 268 return True
269 269 finally:
270 270 if fn is not None:
271 271 unlink(fn)
272 272 except (IOError, OSError):
273 273 # we don't care, the user probably won't be able to commit anyway
274 274 return False
275 275
276 276
277 277 def checklink(path):
278 278 """check whether the given path is on a symlink-capable filesystem"""
279 279 # mktemp is not racy because symlink creation will fail if the
280 280 # file already exists
281 281 while True:
282 282 cachedir = os.path.join(path, b'.hg', b'wcache')
283 283 checklink = os.path.join(cachedir, b'checklink')
284 284 # try fast path, read only
285 285 if os.path.islink(checklink):
286 286 return True
287 287 if os.path.isdir(cachedir):
288 288 checkdir = cachedir
289 289 else:
290 290 checkdir = path
291 291 cachedir = None
292 292 name = tempfile.mktemp(
293 293 dir=pycompat.fsdecode(checkdir), prefix=r'checklink-'
294 294 )
295 295 name = pycompat.fsencode(name)
296 296 try:
297 297 fd = None
298 298 if cachedir is None:
299 299 fd = pycompat.namedtempfile(
300 300 dir=checkdir, prefix=b'hg-checklink-'
301 301 )
302 302 target = os.path.basename(fd.name)
303 303 else:
304 304 # create a fixed file to link to; doesn't matter if it
305 305 # already exists.
306 306 target = b'checklink-target'
307 307 try:
308 308 fullpath = os.path.join(cachedir, target)
309 309 open(fullpath, b'w').close()
310 310 except IOError as inst:
311 311 # pytype: disable=unsupported-operands
312 312 if inst[0] == errno.EACCES:
313 313 # pytype: enable=unsupported-operands
314 314
315 315 # If we can't write to cachedir, just pretend
316 316 # that the fs is readonly and by association
317 317 # that the fs won't support symlinks. This
318 318 # seems like the least dangerous way to avoid
319 319 # data loss.
320 320 return False
321 321 raise
322 322 try:
323 323 os.symlink(target, name)
324 324 if cachedir is None:
325 325 unlink(name)
326 326 else:
327 327 try:
328 328 os.rename(name, checklink)
329 329 except OSError:
330 330 unlink(name)
331 331 return True
332 332 except OSError as inst:
333 333 # link creation might race, try again
334 334 if inst.errno == errno.EEXIST:
335 335 continue
336 336 raise
337 337 finally:
338 338 if fd is not None:
339 339 fd.close()
340 340 except AttributeError:
341 341 return False
342 342 except OSError as inst:
343 343 # sshfs might report failure while successfully creating the link
344 344 if inst.errno == errno.EIO and os.path.exists(name):
345 345 unlink(name)
346 346 return False
347 347
348 348
349 349 def checkosfilename(path):
350 350 """Check that the base-relative path is a valid filename on this platform.
351 351 Returns None if the path is ok, or a UI string describing the problem."""
352 352 return None # on posix platforms, every path is ok
353 353
354 354
355 355 def getfsmountpoint(dirpath):
356 356 """Get the filesystem mount point from a directory (best-effort)
357 357
358 358 Returns None if we are unsure. Raises OSError on ENOENT, EPERM, etc.
359 359 """
360 360 return getattr(osutil, 'getfsmountpoint', lambda x: None)(dirpath)
361 361
362 362
363 363 def getfstype(dirpath):
364 364 """Get the filesystem type name from a directory (best-effort)
365 365
366 366 Returns None if we are unsure. Raises OSError on ENOENT, EPERM, etc.
367 367 """
368 368 return getattr(osutil, 'getfstype', lambda x: None)(dirpath)
369 369
370 370
371 371 def get_password():
372 372 return encoding.strtolocal(getpass.getpass(''))
373 373
374 374
375 375 def setbinary(fd):
376 376 pass
377 377
378 378
379 379 def pconvert(path):
380 380 return path
381 381
382 382
383 383 def localpath(path):
384 384 return path
385 385
386 386
387 387 def samefile(fpath1, fpath2):
388 388 """Returns whether path1 and path2 refer to the same file. This is only
389 389 guaranteed to work for files, not directories."""
390 390 return os.path.samefile(fpath1, fpath2)
391 391
392 392
393 393 def samedevice(fpath1, fpath2):
394 394 """Returns whether fpath1 and fpath2 are on the same device. This is only
395 395 guaranteed to work for files, not directories."""
396 396 st1 = os.lstat(fpath1)
397 397 st2 = os.lstat(fpath2)
398 398 return st1.st_dev == st2.st_dev
399 399
400 400
401 401 # os.path.normcase is a no-op, which doesn't help us on non-native filesystems
402 402 def normcase(path):
403 403 return path.lower()
404 404
405 405
406 406 # what normcase does to ASCII strings
407 407 normcasespec = encoding.normcasespecs.lower
408 408 # fallback normcase function for non-ASCII strings
409 409 normcasefallback = normcase
410 410
411 411 if pycompat.isdarwin:
412 412
413 413 def normcase(path):
414 414 """
415 415 Normalize a filename for OS X-compatible comparison:
416 416 - escape-encode invalid characters
417 417 - decompose to NFD
418 418 - lowercase
419 419 - omit ignored characters [200c-200f, 202a-202e, 206a-206f,feff]
420 420
421 421 >>> normcase(b'UPPER')
422 422 'upper'
423 423 >>> normcase(b'Caf\\xc3\\xa9')
424 424 'cafe\\xcc\\x81'
425 425 >>> normcase(b'\\xc3\\x89')
426 426 'e\\xcc\\x81'
427 427 >>> normcase(b'\\xb8\\xca\\xc3\\xca\\xbe\\xc8.JPG') # issue3918
428 428 '%b8%ca%c3\\xca\\xbe%c8.jpg'
429 429 """
430 430
431 431 try:
432 432 return encoding.asciilower(path) # exception for non-ASCII
433 433 except UnicodeDecodeError:
434 434 return normcasefallback(path)
435 435
436 436 normcasespec = encoding.normcasespecs.lower
437 437
438 438 def normcasefallback(path):
439 439 try:
440 440 u = path.decode('utf-8')
441 441 except UnicodeDecodeError:
442 442 # OS X percent-encodes any bytes that aren't valid utf-8
443 443 s = b''
444 444 pos = 0
445 445 l = len(path)
446 446 while pos < l:
447 447 try:
448 448 c = encoding.getutf8char(path, pos)
449 449 pos += len(c)
450 450 except ValueError:
451 451 c = b'%%%02X' % ord(path[pos : pos + 1])
452 452 pos += 1
453 453 s += c
454 454
455 455 u = s.decode('utf-8')
456 456
457 457 # Decompose then lowercase (HFS+ technote specifies lower)
458 458 enc = unicodedata.normalize('NFD', u).lower().encode('utf-8')
459 459 # drop HFS+ ignored characters
460 460 return encoding.hfsignoreclean(enc)
461 461
462 462
463 463 if pycompat.sysplatform == b'cygwin':
464 464 # workaround for cygwin, in which mount point part of path is
465 465 # treated as case sensitive, even though underlying NTFS is case
466 466 # insensitive.
467 467
468 468 # default mount points
469 469 cygwinmountpoints = sorted(
470 470 [
471 471 b"/usr/bin",
472 472 b"/usr/lib",
473 473 b"/cygdrive",
474 474 ],
475 475 reverse=True,
476 476 )
477 477
478 478 # use upper-ing as normcase as same as NTFS workaround
479 479 def normcase(path):
480 480 pathlen = len(path)
481 481 if (pathlen == 0) or (path[0] != pycompat.ossep):
482 482 # treat as relative
483 483 return encoding.upper(path)
484 484
485 485 # to preserve case of mountpoint part
486 486 for mp in cygwinmountpoints:
487 487 if not path.startswith(mp):
488 488 continue
489 489
490 490 mplen = len(mp)
491 491 if mplen == pathlen: # mount point itself
492 492 return mp
493 493 if path[mplen] == pycompat.ossep:
494 494 return mp + encoding.upper(path[mplen:])
495 495
496 496 return encoding.upper(path)
497 497
498 498 normcasespec = encoding.normcasespecs.other
499 499 normcasefallback = normcase
500 500
501 501 # Cygwin translates native ACLs to POSIX permissions,
502 502 # but these translations are not supported by native
503 503 # tools, so the exec bit tends to be set erroneously.
504 504 # Therefore, disable executable bit access on Cygwin.
505 505 def checkexec(path):
506 506 return False
507 507
508 508 # Similarly, Cygwin's symlink emulation is likely to create
509 509 # problems when Mercurial is used from both Cygwin and native
510 510 # Windows, with other native tools, or on shared volumes
511 511 def checklink(path):
512 512 return False
513 513
514 514
515 515 _needsshellquote = None
516 516
517 517
518 518 def shellquote(s):
519 519 if pycompat.sysplatform == b'OpenVMS':
520 520 return b'"%s"' % s
521 521 global _needsshellquote
522 522 if _needsshellquote is None:
523 523 _needsshellquote = re.compile(br'[^a-zA-Z0-9._/+-]').search
524 524 if s and not _needsshellquote(s):
525 525 # "s" shouldn't have to be quoted
526 526 return s
527 527 else:
528 528 return b"'%s'" % s.replace(b"'", b"'\\''")
529 529
530 530
531 531 def shellsplit(s):
532 532 """Parse a command string in POSIX shell way (best-effort)"""
533 533 return pycompat.shlexsplit(s, posix=True)
534 534
535 535
536 536 def testpid(pid):
537 537 '''return False if pid dead, True if running or not sure'''
538 538 if pycompat.sysplatform == b'OpenVMS':
539 539 return True
540 540 try:
541 541 os.kill(pid, 0)
542 542 return True
543 543 except OSError as inst:
544 544 return inst.errno != errno.ESRCH
545 545
546 546
547 547 def isowner(st):
548 548 """Return True if the stat object st is from the current user."""
549 549 return st.st_uid == os.getuid()
550 550
551 551
552 552 def findexe(command):
553 553 """Find executable for command searching like which does.
554 554 If command is a basename then PATH is searched for command.
555 555 PATH isn't searched if command is an absolute or relative path.
556 556 If command isn't found None is returned."""
557 557 if pycompat.sysplatform == b'OpenVMS':
558 558 return command
559 559
560 560 def findexisting(executable):
561 561 b'Will return executable if existing file'
562 562 if os.path.isfile(executable) and os.access(executable, os.X_OK):
563 563 return executable
564 564 return None
565 565
566 566 if pycompat.ossep in command:
567 567 return findexisting(command)
568 568
569 569 if pycompat.sysplatform == b'plan9':
570 570 return findexisting(os.path.join(b'/bin', command))
571 571
572 572 for path in encoding.environ.get(b'PATH', b'').split(pycompat.ospathsep):
573 573 executable = findexisting(os.path.join(path, command))
574 574 if executable is not None:
575 575 return executable
576 576 return None
577 577
578 578
579 579 def setsignalhandler():
580 580 pass
581 581
582 582
583 583 _wantedkinds = {stat.S_IFREG, stat.S_IFLNK}
584 584
585 585
586 586 def statfiles(files):
587 587 """Stat each file in files. Yield each stat, or None if a file does not
588 588 exist or has a type we don't care about."""
589 589 lstat = os.lstat
590 590 getkind = stat.S_IFMT
591 591 for nf in files:
592 592 try:
593 593 st = lstat(nf)
594 594 if getkind(st.st_mode) not in _wantedkinds:
595 595 st = None
596 596 except OSError as err:
597 597 if err.errno not in (errno.ENOENT, errno.ENOTDIR):
598 598 raise
599 599 st = None
600 600 yield st
601 601
602 602
603 603 def getuser():
604 604 '''return name of current user'''
605 605 return pycompat.fsencode(getpass.getuser())
606 606
607 607
608 608 def username(uid=None):
609 609 """Return the name of the user with the given uid.
610 610
611 611 If uid is None, return the name of the current user."""
612 612
613 613 if uid is None:
614 614 uid = os.getuid()
615 615 try:
616 616 return pycompat.fsencode(pwd.getpwuid(uid)[0])
617 617 except KeyError:
618 618 return b'%d' % uid
619 619
620 620
621 621 def groupname(gid=None):
622 622 """Return the name of the group with the given gid.
623 623
624 624 If gid is None, return the name of the current group."""
625 625
626 626 if gid is None:
627 627 gid = os.getgid()
628 628 try:
629 629 return pycompat.fsencode(grp.getgrgid(gid)[0])
630 630 except KeyError:
631 631 return pycompat.bytestr(gid)
632 632
633 633
634 634 def groupmembers(name):
635 635 """Return the list of members of the group with the given
636 636 name, KeyError if the group does not exist.
637 637 """
638 638 name = pycompat.fsdecode(name)
639 639 return pycompat.rapply(pycompat.fsencode, list(grp.getgrnam(name).gr_mem))
640 640
641 641
642 642 def spawndetached(args):
643 643 return os.spawnvp(os.P_NOWAIT | getattr(os, 'P_DETACH', 0), args[0], args)
644 644
645 645
646 646 def gethgcmd():
647 647 return sys.argv[:1]
648 648
649 649
650 650 def makedir(path, notindexed):
651 651 os.mkdir(path)
652 652
653 653
654 654 def lookupreg(key, name=None, scope=None):
655 655 return None
656 656
657 657
658 658 def hidewindow():
659 659 """Hide current shell window.
660 660
661 661 Used to hide the window opened when starting asynchronous
662 662 child process under Windows, unneeded on other systems.
663 663 """
664 664 pass
665 665
666 666
667 667 class cachestat:
668 668 def __init__(self, path):
669 669 self.stat = os.stat(path)
670 670
671 671 def cacheable(self):
672 672 return bool(self.stat.st_ino)
673 673
674 674 __hash__ = object.__hash__
675 675
676 676 def __eq__(self, other):
677 677 try:
678 678 # Only dev, ino, size, mtime and atime are likely to change. Out
679 679 # of these, we shouldn't compare atime but should compare the
680 680 # rest. However, one of the other fields changing indicates
681 681 # something fishy going on, so return False if anything but atime
682 682 # changes.
683 683 return (
684 684 self.stat.st_mode == other.stat.st_mode
685 685 and self.stat.st_ino == other.stat.st_ino
686 686 and self.stat.st_dev == other.stat.st_dev
687 687 and self.stat.st_nlink == other.stat.st_nlink
688 688 and self.stat.st_uid == other.stat.st_uid
689 689 and self.stat.st_gid == other.stat.st_gid
690 690 and self.stat.st_size == other.stat.st_size
691 691 and self.stat[stat.ST_MTIME] == other.stat[stat.ST_MTIME]
692 692 and self.stat[stat.ST_CTIME] == other.stat[stat.ST_CTIME]
693 693 )
694 694 except AttributeError:
695 695 return False
696 696
697 697 def __ne__(self, other):
698 698 return not self == other
699 699
700 700
701 701 def statislink(st):
702 702 '''check whether a stat result is a symlink'''
703 703 return st and stat.S_ISLNK(st.st_mode)
704 704
705 705
706 706 def statisexec(st):
707 707 '''check whether a stat result is an executable file'''
708 708 return st and (st.st_mode & 0o100 != 0)
709 709
710 710
711 711 def poll(fds):
712 712 """block until something happens on any file descriptor
713 713
714 714 This is a generic helper that will check for any activity
715 715 (read, write. exception) and return the list of touched files.
716 716
717 717 In unsupported cases, it will raise a NotImplementedError"""
718 718 try:
719 while True:
720 try:
721 res = select.select(fds, fds, fds)
722 break
723 except select.error as inst:
724 if inst.args[0] == errno.EINTR:
725 continue
726 raise
719 res = select.select(fds, fds, fds)
727 720 except ValueError: # out of range file descriptor
728 721 raise NotImplementedError()
729 722 return sorted(list(set(sum(res, []))))
730 723
731 724
732 725 def readpipe(pipe):
733 726 """Read all available data from a pipe."""
734 727 # We can't fstat() a pipe because Linux will always report 0.
735 728 # So, we set the pipe to non-blocking mode and read everything
736 729 # that's available.
737 730 flags = fcntl.fcntl(pipe, fcntl.F_GETFL)
738 731 flags |= os.O_NONBLOCK
739 732 oldflags = fcntl.fcntl(pipe, fcntl.F_SETFL, flags)
740 733
741 734 try:
742 735 chunks = []
743 736 while True:
744 737 try:
745 738 s = pipe.read()
746 739 if not s:
747 740 break
748 741 chunks.append(s)
749 742 except IOError:
750 743 break
751 744
752 745 return b''.join(chunks)
753 746 finally:
754 747 fcntl.fcntl(pipe, fcntl.F_SETFL, oldflags)
755 748
756 749
757 750 def bindunixsocket(sock, path):
758 751 """Bind the UNIX domain socket to the specified path"""
759 752 # use relative path instead of full path at bind() if possible, since
760 753 # AF_UNIX path has very small length limit (107 chars) on common
761 754 # platforms (see sys/un.h)
762 755 dirname, basename = os.path.split(path)
763 756 bakwdfd = None
764 757
765 758 try:
766 759 if dirname:
767 760 bakwdfd = os.open(b'.', os.O_DIRECTORY)
768 761 os.chdir(dirname)
769 762 sock.bind(basename)
770 763 if bakwdfd:
771 764 os.fchdir(bakwdfd)
772 765 finally:
773 766 if bakwdfd:
774 767 os.close(bakwdfd)
@@ -1,315 +1,295 b''
1 1 # progress.py progress bars related code
2 2 #
3 3 # Copyright (C) 2010 Augie Fackler <durin42@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8
9 import errno
10 9 import threading
11 10 import time
12 11
13 12 from .i18n import _
14 13 from . import encoding
15 14
16 15
17 16 def spacejoin(*args):
18 17 return b' '.join(s for s in args if s)
19 18
20 19
21 20 def shouldprint(ui):
22 21 return not (ui.quiet or ui.plain(b'progress')) and (
23 22 ui._isatty(ui.ferr) or ui.configbool(b'progress', b'assume-tty')
24 23 )
25 24
26 25
27 26 def fmtremaining(seconds):
28 27 """format a number of remaining seconds in human readable way
29 28
30 29 This will properly display seconds, minutes, hours, days if needed"""
31 30 if seconds < 60:
32 31 # i18n: format XX seconds as "XXs"
33 32 return _(b"%02ds") % seconds
34 33 minutes = seconds // 60
35 34 if minutes < 60:
36 35 seconds -= minutes * 60
37 36 # i18n: format X minutes and YY seconds as "XmYYs"
38 37 return _(b"%dm%02ds") % (minutes, seconds)
39 38 # we're going to ignore seconds in this case
40 39 minutes += 1
41 40 hours = minutes // 60
42 41 minutes -= hours * 60
43 42 if hours < 30:
44 43 # i18n: format X hours and YY minutes as "XhYYm"
45 44 return _(b"%dh%02dm") % (hours, minutes)
46 45 # we're going to ignore minutes in this case
47 46 hours += 1
48 47 days = hours // 24
49 48 hours -= days * 24
50 49 if days < 15:
51 50 # i18n: format X days and YY hours as "XdYYh"
52 51 return _(b"%dd%02dh") % (days, hours)
53 52 # we're going to ignore hours in this case
54 53 days += 1
55 54 weeks = days // 7
56 55 days -= weeks * 7
57 56 if weeks < 55:
58 57 # i18n: format X weeks and YY days as "XwYYd"
59 58 return _(b"%dw%02dd") % (weeks, days)
60 59 # we're going to ignore days and treat a year as 52 weeks
61 60 weeks += 1
62 61 years = weeks // 52
63 62 weeks -= years * 52
64 63 # i18n: format X years and YY weeks as "XyYYw"
65 64 return _(b"%dy%02dw") % (years, weeks)
66 65
67 66
68 # file_write() and file_flush() of Python 2 do not restart on EINTR if
69 # the file is attached to a "slow" device (e.g. a terminal) and raise
70 # IOError. We cannot know how many bytes would be written by file_write(),
71 # but a progress text is known to be short enough to be written by a
72 # single write() syscall, so we can just retry file_write() with the whole
73 # text. (issue5532)
74 #
75 # This should be a short-term workaround. We'll need to fix every occurrence
76 # of write() to a terminal or pipe.
77 def _eintrretry(func, *args):
78 while True:
79 try:
80 return func(*args)
81 except IOError as err:
82 if err.errno == errno.EINTR:
83 continue
84 raise
85
86
87 67 class progbar:
88 68 def __init__(self, ui):
89 69 self.ui = ui
90 70 self._refreshlock = threading.Lock()
91 71 self.resetstate()
92 72
93 73 def resetstate(self):
94 74 self.topics = []
95 75 self.topicstates = {}
96 76 self.starttimes = {}
97 77 self.startvals = {}
98 78 self.printed = False
99 79 self.lastprint = time.time() + float(
100 80 self.ui.config(b'progress', b'delay')
101 81 )
102 82 self.curtopic = None
103 83 self.lasttopic = None
104 84 self.indetcount = 0
105 85 self.refresh = float(self.ui.config(b'progress', b'refresh'))
106 86 self.changedelay = max(
107 87 3 * self.refresh, float(self.ui.config(b'progress', b'changedelay'))
108 88 )
109 89 self.order = self.ui.configlist(b'progress', b'format')
110 90 self.estimateinterval = self.ui.configwith(
111 91 float, b'progress', b'estimateinterval'
112 92 )
113 93
114 94 def show(self, now, topic, pos, item, unit, total):
115 95 if not shouldprint(self.ui):
116 96 return
117 97 termwidth = self.width()
118 98 self.printed = True
119 99 head = b''
120 100 needprogress = False
121 101 tail = b''
122 102 for indicator in self.order:
123 103 add = b''
124 104 if indicator == b'topic':
125 105 add = topic
126 106 elif indicator == b'number':
127 107 if total:
128 108 add = b'%*d/%d' % (len(str(total)), pos, total)
129 109 else:
130 110 add = b'%d' % pos
131 111 elif indicator.startswith(b'item') and item:
132 112 slice = b'end'
133 113 if b'-' in indicator:
134 114 wid = int(indicator.split(b'-')[1])
135 115 elif b'+' in indicator:
136 116 slice = b'beginning'
137 117 wid = int(indicator.split(b'+')[1])
138 118 else:
139 119 wid = 20
140 120 if slice == b'end':
141 121 add = encoding.trim(item, wid, leftside=True)
142 122 else:
143 123 add = encoding.trim(item, wid)
144 124 add += (wid - encoding.colwidth(add)) * b' '
145 125 elif indicator == b'bar':
146 126 add = b''
147 127 needprogress = True
148 128 elif indicator == b'unit' and unit:
149 129 add = unit
150 130 elif indicator == b'estimate':
151 131 add = self.estimate(topic, pos, total, now)
152 132 elif indicator == b'speed':
153 133 add = self.speed(topic, pos, unit, now)
154 134 if not needprogress:
155 135 head = spacejoin(head, add)
156 136 else:
157 137 tail = spacejoin(tail, add)
158 138 if needprogress:
159 139 used = 0
160 140 if head:
161 141 used += encoding.colwidth(head) + 1
162 142 if tail:
163 143 used += encoding.colwidth(tail) + 1
164 144 progwidth = termwidth - used - 3
165 145 if total and pos <= total:
166 146 amt = pos * progwidth // total
167 147 bar = b'=' * (amt - 1)
168 148 if amt > 0:
169 149 bar += b'>'
170 150 bar += b' ' * (progwidth - amt)
171 151 else:
172 152 progwidth -= 3
173 153 self.indetcount += 1
174 154 # mod the count by twice the width so we can make the
175 155 # cursor bounce between the right and left sides
176 156 amt = self.indetcount % (2 * progwidth)
177 157 amt -= progwidth
178 158 bar = (
179 159 b' ' * int(progwidth - abs(amt))
180 160 + b'<=>'
181 161 + b' ' * int(abs(amt))
182 162 )
183 163 prog = b''.join((b'[', bar, b']'))
184 164 out = spacejoin(head, prog, tail)
185 165 else:
186 166 out = spacejoin(head, tail)
187 167 self._writeerr(b'\r' + encoding.trim(out, termwidth))
188 168 self.lasttopic = topic
189 169 self._flusherr()
190 170
191 171 def clear(self):
192 172 if not self.printed or not self.lastprint or not shouldprint(self.ui):
193 173 return
194 174 self._writeerr(b'\r%s\r' % (b' ' * self.width()))
195 175 self._flusherr()
196 176 if self.printed:
197 177 # force immediate re-paint of progress bar
198 178 self.lastprint = 0
199 179
200 180 def complete(self):
201 181 if not shouldprint(self.ui):
202 182 return
203 183 if self.ui.configbool(b'progress', b'clear-complete'):
204 184 self.clear()
205 185 else:
206 186 self._writeerr(b'\n')
207 187 self._flusherr()
208 188
209 189 def _flusherr(self):
210 _eintrretry(self.ui.ferr.flush)
190 self.ui.ferr.flush()
211 191
212 192 def _writeerr(self, msg):
213 _eintrretry(self.ui.ferr.write, msg)
193 self.ui.ferr.write(msg)
214 194
215 195 def width(self):
216 196 tw = self.ui.termwidth()
217 197 return min(int(self.ui.config(b'progress', b'width', default=tw)), tw)
218 198
219 199 def estimate(self, topic, pos, total, now):
220 200 if total is None:
221 201 return b''
222 202 initialpos = self.startvals[topic]
223 203 target = total - initialpos
224 204 delta = pos - initialpos
225 205 if delta > 0:
226 206 elapsed = now - self.starttimes[topic]
227 207 seconds = (elapsed * (target - delta)) // delta + 1
228 208 return fmtremaining(seconds)
229 209 return b''
230 210
231 211 def speed(self, topic, pos, unit, now):
232 212 initialpos = self.startvals[topic]
233 213 delta = pos - initialpos
234 214 elapsed = now - self.starttimes[topic]
235 215 if elapsed > 0:
236 216 return _(b'%d %s/sec') % (delta / elapsed, unit)
237 217 return b''
238 218
239 219 def _oktoprint(self, now):
240 220 '''Check if conditions are met to print - e.g. changedelay elapsed'''
241 221 if (
242 222 self.lasttopic is None # first time we printed
243 223 # not a topic change
244 224 or self.curtopic == self.lasttopic
245 225 # it's been long enough we should print anyway
246 226 or now - self.lastprint >= self.changedelay
247 227 ):
248 228 return True
249 229 else:
250 230 return False
251 231
252 232 def _calibrateestimate(self, topic, now, pos):
253 233 """Adjust starttimes and startvals for topic so ETA works better
254 234
255 235 If progress is non-linear (ex. get much slower in the last minute),
256 236 it's more friendly to only use a recent time span for ETA and speed
257 237 calculation.
258 238
259 239 [======================================> ]
260 240 ^^^^^^^
261 241 estimateinterval, only use this for estimation
262 242 """
263 243 interval = self.estimateinterval
264 244 if interval <= 0:
265 245 return
266 246 elapsed = now - self.starttimes[topic]
267 247 if elapsed > interval:
268 248 delta = pos - self.startvals[topic]
269 249 newdelta = delta * interval / elapsed
270 250 # If a stall happens temporarily, ETA could change dramatically
271 251 # frequently. This is to avoid such dramatical change and make ETA
272 252 # smoother.
273 253 if newdelta < 0.1:
274 254 return
275 255 self.startvals[topic] = pos - newdelta
276 256 self.starttimes[topic] = now - interval
277 257
278 258 def progress(self, topic, pos, item=b'', unit=b'', total=None):
279 259 if pos is None:
280 260 self.closetopic(topic)
281 261 return
282 262 now = time.time()
283 263 with self._refreshlock:
284 264 if topic not in self.topics:
285 265 self.starttimes[topic] = now
286 266 self.startvals[topic] = pos
287 267 self.topics.append(topic)
288 268 self.topicstates[topic] = pos, item, unit, total
289 269 self.curtopic = topic
290 270 self._calibrateestimate(topic, now, pos)
291 271 if now - self.lastprint >= self.refresh and self.topics:
292 272 if self._oktoprint(now):
293 273 self.lastprint = now
294 274 self.show(now, topic, *self.topicstates[topic])
295 275
296 276 def closetopic(self, topic):
297 277 with self._refreshlock:
298 278 self.starttimes.pop(topic, None)
299 279 self.startvals.pop(topic, None)
300 280 self.topicstates.pop(topic, None)
301 281 # reset the progress bar if this is the outermost topic
302 282 if self.topics and self.topics[0] == topic and self.printed:
303 283 self.complete()
304 284 self.resetstate()
305 285 # truncate the list of topics assuming all topics within
306 286 # this one are also closed
307 287 if topic in self.topics:
308 288 self.topics = self.topics[: self.topics.index(topic)]
309 289 # reset the last topic to the one we just unwound to,
310 290 # so that higher-level topics will be stickier than
311 291 # lower-level topics
312 292 if self.topics:
313 293 self.lasttopic = self.topics[-1]
314 294 else:
315 295 self.lasttopic = None
@@ -1,469 +1,460 b''
1 1 # worker.py - master-slave parallelism support
2 2 #
3 3 # Copyright 2013 Facebook, Inc.
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8
9 9 import errno
10 10 import os
11 11 import pickle
12 12 import selectors
13 13 import signal
14 14 import sys
15 15 import threading
16 16 import time
17 17
18 18 from .i18n import _
19 19 from . import (
20 20 encoding,
21 21 error,
22 22 pycompat,
23 23 scmutil,
24 24 )
25 25
26 26
27 27 def countcpus():
28 28 '''try to count the number of CPUs on the system'''
29 29
30 30 # posix
31 31 try:
32 32 n = int(os.sysconf('SC_NPROCESSORS_ONLN'))
33 33 if n > 0:
34 34 return n
35 35 except (AttributeError, ValueError):
36 36 pass
37 37
38 38 # windows
39 39 try:
40 40 n = int(encoding.environ[b'NUMBER_OF_PROCESSORS'])
41 41 if n > 0:
42 42 return n
43 43 except (KeyError, ValueError):
44 44 pass
45 45
46 46 return 1
47 47
48 48
49 49 def _numworkers(ui):
50 50 s = ui.config(b'worker', b'numcpus')
51 51 if s:
52 52 try:
53 53 n = int(s)
54 54 if n >= 1:
55 55 return n
56 56 except ValueError:
57 57 raise error.Abort(_(b'number of cpus must be an integer'))
58 58 return min(max(countcpus(), 4), 32)
59 59
60 60
61 61 def ismainthread():
62 62 return threading.current_thread() == threading.main_thread()
63 63
64 64
65 65 class _blockingreader:
66 66 """Wrap unbuffered stream such that pickle.load() works with it.
67 67
68 68 pickle.load() expects that calls to read() and readinto() read as many
69 69 bytes as requested. On EOF, it is fine to read fewer bytes. In this case,
70 70 pickle.load() raises an EOFError.
71 71 """
72 72
73 73 def __init__(self, wrapped):
74 74 self._wrapped = wrapped
75 75
76 76 def readline(self):
77 77 return self._wrapped.readline()
78 78
79 79 def readinto(self, buf):
80 80 pos = 0
81 81 size = len(buf)
82 82
83 83 with memoryview(buf) as view:
84 84 while pos < size:
85 85 with view[pos:] as subview:
86 86 ret = self._wrapped.readinto(subview)
87 87 if not ret:
88 88 break
89 89 pos += ret
90 90
91 91 return pos
92 92
93 93 # issue multiple reads until size is fulfilled (or EOF is encountered)
94 94 def read(self, size=-1):
95 95 if size < 0:
96 96 return self._wrapped.readall()
97 97
98 98 buf = bytearray(size)
99 99 n_read = self.readinto(buf)
100 100 del buf[n_read:]
101 101 return bytes(buf)
102 102
103 103
104 104 if pycompat.isposix or pycompat.iswindows:
105 105 _STARTUP_COST = 0.01
106 106 # The Windows worker is thread based. If tasks are CPU bound, threads
107 107 # in the presence of the GIL result in excessive context switching and
108 108 # this overhead can slow down execution.
109 109 _DISALLOW_THREAD_UNSAFE = pycompat.iswindows
110 110 else:
111 111 _STARTUP_COST = 1e30
112 112 _DISALLOW_THREAD_UNSAFE = False
113 113
114 114
115 115 def worthwhile(ui, costperop, nops, threadsafe=True):
116 116 """try to determine whether the benefit of multiple processes can
117 117 outweigh the cost of starting them"""
118 118
119 119 if not threadsafe and _DISALLOW_THREAD_UNSAFE:
120 120 return False
121 121
122 122 linear = costperop * nops
123 123 workers = _numworkers(ui)
124 124 benefit = linear - (_STARTUP_COST * workers + linear / workers)
125 125 return benefit >= 0.15
126 126
127 127
128 128 def worker(
129 129 ui, costperarg, func, staticargs, args, hasretval=False, threadsafe=True
130 130 ):
131 131 """run a function, possibly in parallel in multiple worker
132 132 processes.
133 133
134 134 returns a progress iterator
135 135
136 136 costperarg - cost of a single task
137 137
138 138 func - function to run. It is expected to return a progress iterator.
139 139
140 140 staticargs - arguments to pass to every invocation of the function
141 141
142 142 args - arguments to split into chunks, to pass to individual
143 143 workers
144 144
145 145 hasretval - when True, func and the current function return an progress
146 146 iterator then a dict (encoded as an iterator that yield many (False, ..)
147 147 then a (True, dict)). The dicts are joined in some arbitrary order, so
148 148 overlapping keys are a bad idea.
149 149
150 150 threadsafe - whether work items are thread safe and can be executed using
151 151 a thread-based worker. Should be disabled for CPU heavy tasks that don't
152 152 release the GIL.
153 153 """
154 154 enabled = ui.configbool(b'worker', b'enabled')
155 155 if enabled and _platformworker is _posixworker and not ismainthread():
156 156 # The POSIX worker has to install a handler for SIGCHLD.
157 157 # Python up to 3.9 only allows this in the main thread.
158 158 enabled = False
159 159
160 160 if enabled and worthwhile(ui, costperarg, len(args), threadsafe=threadsafe):
161 161 return _platformworker(ui, func, staticargs, args, hasretval)
162 162 return func(*staticargs + (args,))
163 163
164 164
165 165 def _posixworker(ui, func, staticargs, args, hasretval):
166 166 workers = _numworkers(ui)
167 167 oldhandler = signal.getsignal(signal.SIGINT)
168 168 signal.signal(signal.SIGINT, signal.SIG_IGN)
169 169 pids, problem = set(), [0]
170 170
171 171 def killworkers():
172 172 # unregister SIGCHLD handler as all children will be killed. This
173 173 # function shouldn't be interrupted by another SIGCHLD; otherwise pids
174 174 # could be updated while iterating, which would cause inconsistency.
175 175 signal.signal(signal.SIGCHLD, oldchldhandler)
176 176 # if one worker bails, there's no good reason to wait for the rest
177 177 for p in pids:
178 178 try:
179 179 os.kill(p, signal.SIGTERM)
180 180 except OSError as err:
181 181 if err.errno != errno.ESRCH:
182 182 raise
183 183
184 184 def waitforworkers(blocking=True):
185 185 for pid in pids.copy():
186 186 p = st = 0
187 while True:
188 try:
189 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
190 break
191 except OSError as e:
192 if e.errno == errno.EINTR:
193 continue
194 elif e.errno == errno.ECHILD:
195 # child would already be reaped, but pids yet been
196 # updated (maybe interrupted just after waitpid)
197 pids.discard(pid)
198 break
199 else:
200 raise
187 try:
188 p, st = os.waitpid(pid, (0 if blocking else os.WNOHANG))
189 except OSError as e:
190 if e.errno == errno.ECHILD:
191 # child would already be reaped, but pids yet been
192 # updated (maybe interrupted just after waitpid)
193 pids.discard(pid)
194 else:
195 raise
201 196 if not p:
202 197 # skip subsequent steps, because child process should
203 198 # be still running in this case
204 199 continue
205 200 pids.discard(p)
206 201 st = _exitstatus(st)
207 202 if st and not problem[0]:
208 203 problem[0] = st
209 204
210 205 def sigchldhandler(signum, frame):
211 206 waitforworkers(blocking=False)
212 207 if problem[0]:
213 208 killworkers()
214 209
215 210 oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
216 211 ui.flush()
217 212 parentpid = os.getpid()
218 213 pipes = []
219 214 retval = {}
220 215 for pargs in partition(args, min(workers, len(args))):
221 216 # Every worker gets its own pipe to send results on, so we don't have to
222 217 # implement atomic writes larger than PIPE_BUF. Each forked process has
223 218 # its own pipe's descriptors in the local variables, and the parent
224 219 # process has the full list of pipe descriptors (and it doesn't really
225 220 # care what order they're in).
226 221 rfd, wfd = os.pipe()
227 222 pipes.append((rfd, wfd))
228 223 # make sure we use os._exit in all worker code paths. otherwise the
229 224 # worker may do some clean-ups which could cause surprises like
230 225 # deadlock. see sshpeer.cleanup for example.
231 226 # override error handling *before* fork. this is necessary because
232 227 # exception (signal) may arrive after fork, before "pid =" assignment
233 228 # completes, and other exception handler (dispatch.py) can lead to
234 229 # unexpected code path without os._exit.
235 230 ret = -1
236 231 try:
237 232 pid = os.fork()
238 233 if pid == 0:
239 234 signal.signal(signal.SIGINT, oldhandler)
240 235 signal.signal(signal.SIGCHLD, oldchldhandler)
241 236
242 237 def workerfunc():
243 238 for r, w in pipes[:-1]:
244 239 os.close(r)
245 240 os.close(w)
246 241 os.close(rfd)
247 242 with os.fdopen(wfd, 'wb') as wf:
248 243 for result in func(*(staticargs + (pargs,))):
249 244 pickle.dump(result, wf)
250 245 wf.flush()
251 246 return 0
252 247
253 248 ret = scmutil.callcatch(ui, workerfunc)
254 249 except: # parent re-raises, child never returns
255 250 if os.getpid() == parentpid:
256 251 raise
257 252 exctype = sys.exc_info()[0]
258 253 force = not issubclass(exctype, KeyboardInterrupt)
259 254 ui.traceback(force=force)
260 255 finally:
261 256 if os.getpid() != parentpid:
262 257 try:
263 258 ui.flush()
264 259 except: # never returns, no re-raises
265 260 pass
266 261 finally:
267 262 os._exit(ret & 255)
268 263 pids.add(pid)
269 264 selector = selectors.DefaultSelector()
270 265 for rfd, wfd in pipes:
271 266 os.close(wfd)
272 267 # The stream has to be unbuffered. Otherwise, if all data is read from
273 268 # the raw file into the buffer, the selector thinks that the FD is not
274 269 # ready to read while pickle.load() could read from the buffer. This
275 270 # would delay the processing of readable items.
276 271 selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
277 272
278 273 def cleanup():
279 274 signal.signal(signal.SIGINT, oldhandler)
280 275 waitforworkers()
281 276 signal.signal(signal.SIGCHLD, oldchldhandler)
282 277 selector.close()
283 278 return problem[0]
284 279
285 280 try:
286 281 openpipes = len(pipes)
287 282 while openpipes > 0:
288 283 for key, events in selector.select():
289 284 try:
290 285 # The pytype error likely goes away on a modern version of
291 286 # pytype having a modern typeshed snapshot.
292 287 # pytype: disable=wrong-arg-types
293 288 res = pickle.load(_blockingreader(key.fileobj))
294 289 # pytype: enable=wrong-arg-types
295 290 if hasretval and res[0]:
296 291 retval.update(res[1])
297 292 else:
298 293 yield res
299 294 except EOFError:
300 295 selector.unregister(key.fileobj)
301 296 # pytype: disable=attribute-error
302 297 key.fileobj.close()
303 298 # pytype: enable=attribute-error
304 299 openpipes -= 1
305 except IOError as e:
306 if e.errno == errno.EINTR:
307 continue
308 raise
309 300 except: # re-raises
310 301 killworkers()
311 302 cleanup()
312 303 raise
313 304 status = cleanup()
314 305 if status:
315 306 if status < 0:
316 307 os.kill(os.getpid(), -status)
317 308 raise error.WorkerError(status)
318 309 if hasretval:
319 310 yield True, retval
320 311
321 312
322 313 def _posixexitstatus(code):
323 314 """convert a posix exit status into the same form returned by
324 315 os.spawnv
325 316
326 317 returns None if the process was stopped instead of exiting"""
327 318 if os.WIFEXITED(code):
328 319 return os.WEXITSTATUS(code)
329 320 elif os.WIFSIGNALED(code):
330 321 return -(os.WTERMSIG(code))
331 322
332 323
333 324 def _windowsworker(ui, func, staticargs, args, hasretval):
334 325 class Worker(threading.Thread):
335 326 def __init__(
336 327 self, taskqueue, resultqueue, func, staticargs, *args, **kwargs
337 328 ):
338 329 threading.Thread.__init__(self, *args, **kwargs)
339 330 self._taskqueue = taskqueue
340 331 self._resultqueue = resultqueue
341 332 self._func = func
342 333 self._staticargs = staticargs
343 334 self._interrupted = False
344 335 self.daemon = True
345 336 self.exception = None
346 337
347 338 def interrupt(self):
348 339 self._interrupted = True
349 340
350 341 def run(self):
351 342 try:
352 343 while not self._taskqueue.empty():
353 344 try:
354 345 args = self._taskqueue.get_nowait()
355 346 for res in self._func(*self._staticargs + (args,)):
356 347 self._resultqueue.put(res)
357 348 # threading doesn't provide a native way to
358 349 # interrupt execution. handle it manually at every
359 350 # iteration.
360 351 if self._interrupted:
361 352 return
362 353 except pycompat.queue.Empty:
363 354 break
364 355 except Exception as e:
365 356 # store the exception such that the main thread can resurface
366 357 # it as if the func was running without workers.
367 358 self.exception = e
368 359 raise
369 360
370 361 threads = []
371 362
372 363 def trykillworkers():
373 364 # Allow up to 1 second to clean worker threads nicely
374 365 cleanupend = time.time() + 1
375 366 for t in threads:
376 367 t.interrupt()
377 368 for t in threads:
378 369 remainingtime = cleanupend - time.time()
379 370 t.join(remainingtime)
380 371 if t.is_alive():
381 372 # pass over the workers joining failure. it is more
382 373 # important to surface the inital exception than the
383 374 # fact that one of workers may be processing a large
384 375 # task and does not get to handle the interruption.
385 376 ui.warn(
386 377 _(
387 378 b"failed to kill worker threads while "
388 379 b"handling an exception\n"
389 380 )
390 381 )
391 382 return
392 383
393 384 workers = _numworkers(ui)
394 385 resultqueue = pycompat.queue.Queue()
395 386 taskqueue = pycompat.queue.Queue()
396 387 retval = {}
397 388 # partition work to more pieces than workers to minimize the chance
398 389 # of uneven distribution of large tasks between the workers
399 390 for pargs in partition(args, workers * 20):
400 391 taskqueue.put(pargs)
401 392 for _i in range(workers):
402 393 t = Worker(taskqueue, resultqueue, func, staticargs)
403 394 threads.append(t)
404 395 t.start()
405 396 try:
406 397 while len(threads) > 0:
407 398 while not resultqueue.empty():
408 399 res = resultqueue.get()
409 400 if hasretval and res[0]:
410 401 retval.update(res[1])
411 402 else:
412 403 yield res
413 404 threads[0].join(0.05)
414 405 finishedthreads = [_t for _t in threads if not _t.is_alive()]
415 406 for t in finishedthreads:
416 407 if t.exception is not None:
417 408 raise t.exception
418 409 threads.remove(t)
419 410 except (Exception, KeyboardInterrupt): # re-raises
420 411 trykillworkers()
421 412 raise
422 413 while not resultqueue.empty():
423 414 res = resultqueue.get()
424 415 if hasretval and res[0]:
425 416 retval.update(res[1])
426 417 else:
427 418 yield res
428 419 if hasretval:
429 420 yield True, retval
430 421
431 422
432 423 if pycompat.iswindows:
433 424 _platformworker = _windowsworker
434 425 else:
435 426 _platformworker = _posixworker
436 427 _exitstatus = _posixexitstatus
437 428
438 429
439 430 def partition(lst, nslices):
440 431 """partition a list into N slices of roughly equal size
441 432
442 433 The current strategy takes every Nth element from the input. If
443 434 we ever write workers that need to preserve grouping in input
444 435 we should consider allowing callers to specify a partition strategy.
445 436
446 437 olivia is not a fan of this partitioning strategy when files are involved.
447 438 In his words:
448 439
449 440 Single-threaded Mercurial makes a point of creating and visiting
450 441 files in a fixed order (alphabetical). When creating files in order,
451 442 a typical filesystem is likely to allocate them on nearby regions on
452 443 disk. Thus, when revisiting in the same order, locality is maximized
453 444 and various forms of OS and disk-level caching and read-ahead get a
454 445 chance to work.
455 446
456 447 This effect can be quite significant on spinning disks. I discovered it
457 448 circa Mercurial v0.4 when revlogs were named by hashes of filenames.
458 449 Tarring a repo and copying it to another disk effectively randomized
459 450 the revlog ordering on disk by sorting the revlogs by hash and suddenly
460 451 performance of my kernel checkout benchmark dropped by ~10x because the
461 452 "working set" of sectors visited no longer fit in the drive's cache and
462 453 the workload switched from streaming to random I/O.
463 454
464 455 What we should really be doing is have workers read filenames from a
465 456 ordered queue. This preserves locality and also keeps any worker from
466 457 getting more than one file out of balance.
467 458 """
468 459 for i in range(nslices):
469 460 yield lst[i::nslices]
General Comments 0
You need to be logged in to leave comments. Login now