##// END OF EJS Templates
safehasattr: pass attribute name as string instead of bytes...
marmoute -
r51451:ef5435e7 default
parent child Browse files
Show More
@@ -1,739 +1,739 b''
1 1 # commandserver.py - communicate with Mercurial's API over a pipe
2 2 #
3 3 # Copyright Olivia Mackall <olivia@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8
9 9 import gc
10 10 import os
11 11 import random
12 12 import selectors
13 13 import signal
14 14 import socket
15 15 import struct
16 16 import traceback
17 17
18 18 from .i18n import _
19 19 from .pycompat import getattr
20 20 from . import (
21 21 encoding,
22 22 error,
23 23 loggingutil,
24 24 pycompat,
25 25 repocache,
26 26 util,
27 27 vfs as vfsmod,
28 28 )
29 29 from .utils import (
30 30 cborutil,
31 31 procutil,
32 32 )
33 33
34 34
35 35 class channeledoutput:
36 36 """
37 37 Write data to out in the following format:
38 38
39 39 data length (unsigned int),
40 40 data
41 41 """
42 42
43 43 def __init__(self, out, channel):
44 44 self.out = out
45 45 self.channel = channel
46 46
47 47 @property
48 48 def name(self):
49 49 return b'<%c-channel>' % self.channel
50 50
51 51 def write(self, data):
52 52 if not data:
53 53 return
54 54 # single write() to guarantee the same atomicity as the underlying file
55 55 self.out.write(struct.pack(b'>cI', self.channel, len(data)) + data)
56 56 self.out.flush()
57 57
58 58 def __getattr__(self, attr):
59 59 if attr in ('isatty', 'fileno', 'tell', 'seek'):
60 60 raise AttributeError(attr)
61 61 return getattr(self.out, attr)
62 62
63 63
64 64 class channeledmessage:
65 65 """
66 66 Write encoded message and metadata to out in the following format:
67 67
68 68 data length (unsigned int),
69 69 encoded message and metadata, as a flat key-value dict.
70 70
71 71 Each message should have 'type' attribute. Messages of unknown type
72 72 should be ignored.
73 73 """
74 74
75 75 # teach ui that write() can take **opts
76 76 structured = True
77 77
78 78 def __init__(self, out, channel, encodename, encodefn):
79 79 self._cout = channeledoutput(out, channel)
80 80 self.encoding = encodename
81 81 self._encodefn = encodefn
82 82
83 83 def write(self, data, **opts):
84 84 opts = pycompat.byteskwargs(opts)
85 85 if data is not None:
86 86 opts[b'data'] = data
87 87 self._cout.write(self._encodefn(opts))
88 88
89 89 def __getattr__(self, attr):
90 90 return getattr(self._cout, attr)
91 91
92 92
93 93 class channeledinput:
94 94 """
95 95 Read data from in_.
96 96
97 97 Requests for input are written to out in the following format:
98 98 channel identifier - 'I' for plain input, 'L' line based (1 byte)
99 99 how many bytes to send at most (unsigned int),
100 100
101 101 The client replies with:
102 102 data length (unsigned int), 0 meaning EOF
103 103 data
104 104 """
105 105
106 106 maxchunksize = 4 * 1024
107 107
108 108 def __init__(self, in_, out, channel):
109 109 self.in_ = in_
110 110 self.out = out
111 111 self.channel = channel
112 112
113 113 @property
114 114 def name(self):
115 115 return b'<%c-channel>' % self.channel
116 116
117 117 def read(self, size=-1):
118 118 if size < 0:
119 119 # if we need to consume all the clients input, ask for 4k chunks
120 120 # so the pipe doesn't fill up risking a deadlock
121 121 size = self.maxchunksize
122 122 s = self._read(size, self.channel)
123 123 buf = s
124 124 while s:
125 125 s = self._read(size, self.channel)
126 126 buf += s
127 127
128 128 return buf
129 129 else:
130 130 return self._read(size, self.channel)
131 131
132 132 def _read(self, size, channel):
133 133 if not size:
134 134 return b''
135 135 assert size > 0
136 136
137 137 # tell the client we need at most size bytes
138 138 self.out.write(struct.pack(b'>cI', channel, size))
139 139 self.out.flush()
140 140
141 141 length = self.in_.read(4)
142 142 length = struct.unpack(b'>I', length)[0]
143 143 if not length:
144 144 return b''
145 145 else:
146 146 return self.in_.read(length)
147 147
148 148 def readline(self, size=-1):
149 149 if size < 0:
150 150 size = self.maxchunksize
151 151 s = self._read(size, b'L')
152 152 buf = s
153 153 # keep asking for more until there's either no more or
154 154 # we got a full line
155 155 while s and not s.endswith(b'\n'):
156 156 s = self._read(size, b'L')
157 157 buf += s
158 158
159 159 return buf
160 160 else:
161 161 return self._read(size, b'L')
162 162
163 163 def __iter__(self):
164 164 return self
165 165
166 166 def next(self):
167 167 l = self.readline()
168 168 if not l:
169 169 raise StopIteration
170 170 return l
171 171
172 172 __next__ = next
173 173
174 174 def __getattr__(self, attr):
175 175 if attr in ('isatty', 'fileno', 'tell', 'seek'):
176 176 raise AttributeError(attr)
177 177 return getattr(self.in_, attr)
178 178
179 179
180 180 _messageencoders = {
181 181 b'cbor': lambda v: b''.join(cborutil.streamencode(v)),
182 182 }
183 183
184 184
185 185 def _selectmessageencoder(ui):
186 186 encnames = ui.configlist(b'cmdserver', b'message-encodings')
187 187 for n in encnames:
188 188 f = _messageencoders.get(n)
189 189 if f:
190 190 return n, f
191 191 raise error.Abort(
192 192 b'no supported message encodings: %s' % b' '.join(encnames)
193 193 )
194 194
195 195
196 196 class server:
197 197 """
198 198 Listens for commands on fin, runs them and writes the output on a channel
199 199 based stream to fout.
200 200 """
201 201
202 202 def __init__(self, ui, repo, fin, fout, prereposetups=None):
203 203 self.cwd = encoding.getcwd()
204 204
205 205 if repo:
206 206 # the ui here is really the repo ui so take its baseui so we don't
207 207 # end up with its local configuration
208 208 self.ui = repo.baseui
209 209 self.repo = repo
210 210 self.repoui = repo.ui
211 211 else:
212 212 self.ui = ui
213 213 self.repo = self.repoui = None
214 214 self._prereposetups = prereposetups
215 215
216 216 self.cdebug = channeledoutput(fout, b'd')
217 217 self.cerr = channeledoutput(fout, b'e')
218 218 self.cout = channeledoutput(fout, b'o')
219 219 self.cin = channeledinput(fin, fout, b'I')
220 220 self.cresult = channeledoutput(fout, b'r')
221 221
222 222 if self.ui.config(b'cmdserver', b'log') == b'-':
223 223 # switch log stream of server's ui to the 'd' (debug) channel
224 224 # (don't touch repo.ui as its lifetime is longer than the server)
225 225 self.ui = self.ui.copy()
226 226 setuplogging(self.ui, repo=None, fp=self.cdebug)
227 227
228 228 self.cmsg = None
229 229 if ui.config(b'ui', b'message-output') == b'channel':
230 230 encname, encfn = _selectmessageencoder(ui)
231 231 self.cmsg = channeledmessage(fout, b'm', encname, encfn)
232 232
233 233 self.client = fin
234 234
235 235 # If shutdown-on-interrupt is off, the default SIGINT handler is
236 236 # removed so that client-server communication wouldn't be interrupted.
237 237 # For example, 'runcommand' handler will issue three short read()s.
238 238 # If one of the first two read()s were interrupted, the communication
239 239 # channel would be left at dirty state and the subsequent request
240 240 # wouldn't be parsed. So catching KeyboardInterrupt isn't enough.
241 241 self._shutdown_on_interrupt = ui.configbool(
242 242 b'cmdserver', b'shutdown-on-interrupt'
243 243 )
244 244 self._old_inthandler = None
245 245 if not self._shutdown_on_interrupt:
246 246 self._old_inthandler = signal.signal(signal.SIGINT, signal.SIG_IGN)
247 247
248 248 def cleanup(self):
249 249 """release and restore resources taken during server session"""
250 250 if not self._shutdown_on_interrupt:
251 251 signal.signal(signal.SIGINT, self._old_inthandler)
252 252
253 253 def _read(self, size):
254 254 if not size:
255 255 return b''
256 256
257 257 data = self.client.read(size)
258 258
259 259 # is the other end closed?
260 260 if not data:
261 261 raise EOFError
262 262
263 263 return data
264 264
265 265 def _readstr(self):
266 266 """read a string from the channel
267 267
268 268 format:
269 269 data length (uint32), data
270 270 """
271 271 length = struct.unpack(b'>I', self._read(4))[0]
272 272 if not length:
273 273 return b''
274 274 return self._read(length)
275 275
276 276 def _readlist(self):
277 277 """read a list of NULL separated strings from the channel"""
278 278 s = self._readstr()
279 279 if s:
280 280 return s.split(b'\0')
281 281 else:
282 282 return []
283 283
284 284 def _dispatchcommand(self, req):
285 285 from . import dispatch # avoid cycle
286 286
287 287 if self._shutdown_on_interrupt:
288 288 # no need to restore SIGINT handler as it is unmodified.
289 289 return dispatch.dispatch(req)
290 290
291 291 try:
292 292 signal.signal(signal.SIGINT, self._old_inthandler)
293 293 return dispatch.dispatch(req)
294 294 except error.SignalInterrupt:
295 295 # propagate SIGBREAK, SIGHUP, or SIGTERM.
296 296 raise
297 297 except KeyboardInterrupt:
298 298 # SIGINT may be received out of the try-except block of dispatch(),
299 299 # so catch it as last ditch. Another KeyboardInterrupt may be
300 300 # raised while handling exceptions here, but there's no way to
301 301 # avoid that except for doing everything in C.
302 302 pass
303 303 finally:
304 304 signal.signal(signal.SIGINT, signal.SIG_IGN)
305 305 # On KeyboardInterrupt, print error message and exit *after* SIGINT
306 306 # handler removed.
307 307 req.ui.error(_(b'interrupted!\n'))
308 308 return -1
309 309
310 310 def runcommand(self):
311 311 """reads a list of \0 terminated arguments, executes
312 312 and writes the return code to the result channel"""
313 313 from . import dispatch # avoid cycle
314 314
315 315 args = self._readlist()
316 316
317 317 # copy the uis so changes (e.g. --config or --verbose) don't
318 318 # persist between requests
319 319 copiedui = self.ui.copy()
320 320 uis = [copiedui]
321 321 if self.repo:
322 322 self.repo.baseui = copiedui
323 323 # clone ui without using ui.copy because this is protected
324 324 repoui = self.repoui.__class__(self.repoui)
325 325 repoui.copy = copiedui.copy # redo copy protection
326 326 uis.append(repoui)
327 327 self.repo.ui = self.repo.dirstate._ui = repoui
328 328 self.repo.invalidateall()
329 329
330 330 for ui in uis:
331 331 ui.resetstate()
332 332 # any kind of interaction must use server channels, but chg may
333 333 # replace channels by fully functional tty files. so nontty is
334 334 # enforced only if cin is a channel.
335 335 if not util.safehasattr(self.cin, 'fileno'):
336 336 ui.setconfig(b'ui', b'nontty', b'true', b'commandserver')
337 337
338 338 req = dispatch.request(
339 339 args[:],
340 340 copiedui,
341 341 self.repo,
342 342 self.cin,
343 343 self.cout,
344 344 self.cerr,
345 345 self.cmsg,
346 346 prereposetups=self._prereposetups,
347 347 )
348 348
349 349 try:
350 350 ret = self._dispatchcommand(req) & 255
351 351 # If shutdown-on-interrupt is off, it's important to write the
352 352 # result code *after* SIGINT handler removed. If the result code
353 353 # were lost, the client wouldn't be able to continue processing.
354 354 self.cresult.write(struct.pack(b'>i', int(ret)))
355 355 finally:
356 356 # restore old cwd
357 357 if b'--cwd' in args:
358 358 os.chdir(self.cwd)
359 359
360 360 def getencoding(self):
361 361 """writes the current encoding to the result channel"""
362 362 self.cresult.write(encoding.encoding)
363 363
364 364 def serveone(self):
365 365 cmd = self.client.readline()[:-1]
366 366 if cmd:
367 367 handler = self.capabilities.get(cmd)
368 368 if handler:
369 369 handler(self)
370 370 else:
371 371 # clients are expected to check what commands are supported by
372 372 # looking at the servers capabilities
373 373 raise error.Abort(_(b'unknown command %s') % cmd)
374 374
375 375 return cmd != b''
376 376
377 377 capabilities = {b'runcommand': runcommand, b'getencoding': getencoding}
378 378
379 379 def serve(self):
380 380 hellomsg = b'capabilities: ' + b' '.join(sorted(self.capabilities))
381 381 hellomsg += b'\n'
382 382 hellomsg += b'encoding: ' + encoding.encoding
383 383 hellomsg += b'\n'
384 384 if self.cmsg:
385 385 hellomsg += b'message-encoding: %s\n' % self.cmsg.encoding
386 386 hellomsg += b'pid: %d' % procutil.getpid()
387 if util.safehasattr(os, b'getpgid'):
387 if util.safehasattr(os, 'getpgid'):
388 388 hellomsg += b'\n'
389 389 hellomsg += b'pgid: %d' % os.getpgid(0)
390 390
391 391 # write the hello msg in -one- chunk
392 392 self.cout.write(hellomsg)
393 393
394 394 try:
395 395 while self.serveone():
396 396 pass
397 397 except EOFError:
398 398 # we'll get here if the client disconnected while we were reading
399 399 # its request
400 400 return 1
401 401
402 402 return 0
403 403
404 404
405 405 def setuplogging(ui, repo=None, fp=None):
406 406 """Set up server logging facility
407 407
408 408 If cmdserver.log is '-', log messages will be sent to the given fp.
409 409 It should be the 'd' channel while a client is connected, and otherwise
410 410 is the stderr of the server process.
411 411 """
412 412 # developer config: cmdserver.log
413 413 logpath = ui.config(b'cmdserver', b'log')
414 414 if not logpath:
415 415 return
416 416 # developer config: cmdserver.track-log
417 417 tracked = set(ui.configlist(b'cmdserver', b'track-log'))
418 418
419 419 if logpath == b'-' and fp:
420 420 logger = loggingutil.fileobjectlogger(fp, tracked)
421 421 elif logpath == b'-':
422 422 logger = loggingutil.fileobjectlogger(ui.ferr, tracked)
423 423 else:
424 424 logpath = util.abspath(util.expandpath(logpath))
425 425 # developer config: cmdserver.max-log-files
426 426 maxfiles = ui.configint(b'cmdserver', b'max-log-files')
427 427 # developer config: cmdserver.max-log-size
428 428 maxsize = ui.configbytes(b'cmdserver', b'max-log-size')
429 429 vfs = vfsmod.vfs(os.path.dirname(logpath))
430 430 logger = loggingutil.filelogger(
431 431 vfs,
432 432 os.path.basename(logpath),
433 433 tracked,
434 434 maxfiles=maxfiles,
435 435 maxsize=maxsize,
436 436 )
437 437
438 438 targetuis = {ui}
439 439 if repo:
440 440 targetuis.add(repo.baseui)
441 441 targetuis.add(repo.ui)
442 442 for u in targetuis:
443 443 u.setlogger(b'cmdserver', logger)
444 444
445 445
446 446 class pipeservice:
447 447 def __init__(self, ui, repo, opts):
448 448 self.ui = ui
449 449 self.repo = repo
450 450
451 451 def init(self):
452 452 pass
453 453
454 454 def run(self):
455 455 ui = self.ui
456 456 # redirect stdio to null device so that broken extensions or in-process
457 457 # hooks will never cause corruption of channel protocol.
458 458 with ui.protectedfinout() as (fin, fout):
459 459 sv = server(ui, self.repo, fin, fout)
460 460 try:
461 461 return sv.serve()
462 462 finally:
463 463 sv.cleanup()
464 464
465 465
466 466 def _initworkerprocess():
467 467 # use a different process group from the master process, in order to:
468 468 # 1. make the current process group no longer "orphaned" (because the
469 469 # parent of this process is in a different process group while
470 470 # remains in a same session)
471 471 # according to POSIX 2.2.2.52, orphaned process group will ignore
472 472 # terminal-generated stop signals like SIGTSTP (Ctrl+Z), which will
473 473 # cause trouble for things like ncurses.
474 474 # 2. the client can use kill(-pgid, sig) to simulate terminal-generated
475 475 # SIGINT (Ctrl+C) and process-exit-generated SIGHUP. our child
476 476 # processes like ssh will be killed properly, without affecting
477 477 # unrelated processes.
478 478 os.setpgid(0, 0)
479 479 # change random state otherwise forked request handlers would have a
480 480 # same state inherited from parent.
481 481 random.seed()
482 482
483 483
484 484 def _serverequest(ui, repo, conn, createcmdserver, prereposetups):
485 485 fin = conn.makefile('rb')
486 486 fout = conn.makefile('wb')
487 487 sv = None
488 488 try:
489 489 sv = createcmdserver(repo, conn, fin, fout, prereposetups)
490 490 try:
491 491 sv.serve()
492 492 # handle exceptions that may be raised by command server. most of
493 493 # known exceptions are caught by dispatch.
494 494 except error.Abort as inst:
495 495 ui.error(_(b'abort: %s\n') % inst.message)
496 496 except BrokenPipeError:
497 497 pass
498 498 except KeyboardInterrupt:
499 499 pass
500 500 finally:
501 501 sv.cleanup()
502 502 except: # re-raises
503 503 # also write traceback to error channel. otherwise client cannot
504 504 # see it because it is written to server's stderr by default.
505 505 if sv:
506 506 cerr = sv.cerr
507 507 else:
508 508 cerr = channeledoutput(fout, b'e')
509 509 cerr.write(encoding.strtolocal(traceback.format_exc()))
510 510 raise
511 511 finally:
512 512 fin.close()
513 513 try:
514 514 fout.close() # implicit flush() may cause another EPIPE
515 515 except BrokenPipeError:
516 516 pass
517 517
518 518
519 519 class unixservicehandler:
520 520 """Set of pluggable operations for unix-mode services
521 521
522 522 Almost all methods except for createcmdserver() are called in the main
523 523 process. You can't pass mutable resource back from createcmdserver().
524 524 """
525 525
526 526 pollinterval = None
527 527
528 528 def __init__(self, ui):
529 529 self.ui = ui
530 530
531 531 def bindsocket(self, sock, address):
532 532 util.bindunixsocket(sock, address)
533 533 sock.listen(socket.SOMAXCONN)
534 534 self.ui.status(_(b'listening at %s\n') % address)
535 535 self.ui.flush() # avoid buffering of status message
536 536
537 537 def unlinksocket(self, address):
538 538 os.unlink(address)
539 539
540 540 def shouldexit(self):
541 541 """True if server should shut down; checked per pollinterval"""
542 542 return False
543 543
544 544 def newconnection(self):
545 545 """Called when main process notices new connection"""
546 546
547 547 def createcmdserver(self, repo, conn, fin, fout, prereposetups):
548 548 """Create new command server instance; called in the process that
549 549 serves for the current connection"""
550 550 return server(self.ui, repo, fin, fout, prereposetups)
551 551
552 552
553 553 class unixforkingservice:
554 554 """
555 555 Listens on unix domain socket and forks server per connection
556 556 """
557 557
558 558 def __init__(self, ui, repo, opts, handler=None):
559 559 self.ui = ui
560 560 self.repo = repo
561 561 self.address = opts[b'address']
562 562 if not util.safehasattr(socket, b'AF_UNIX'):
563 563 raise error.Abort(_(b'unsupported platform'))
564 564 if not self.address:
565 565 raise error.Abort(_(b'no socket path specified with --address'))
566 566 self._servicehandler = handler or unixservicehandler(ui)
567 567 self._sock = None
568 568 self._mainipc = None
569 569 self._workeripc = None
570 570 self._oldsigchldhandler = None
571 571 self._workerpids = set() # updated by signal handler; do not iterate
572 572 self._socketunlinked = None
573 573 # experimental config: cmdserver.max-repo-cache
574 574 maxlen = ui.configint(b'cmdserver', b'max-repo-cache')
575 575 if maxlen < 0:
576 576 raise error.Abort(_(b'negative max-repo-cache size not allowed'))
577 577 self._repoloader = repocache.repoloader(ui, maxlen)
578 578 # attempt to avoid crash in CoreFoundation when using chg after fix in
579 579 # a89381e04c58
580 580 if pycompat.isdarwin:
581 581 procutil.gui()
582 582
583 583 def init(self):
584 584 self._sock = socket.socket(socket.AF_UNIX)
585 585 # IPC channel from many workers to one main process; this is actually
586 586 # a uni-directional pipe, but is backed by a DGRAM socket so each
587 587 # message can be easily separated.
588 588 o = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM)
589 589 self._mainipc, self._workeripc = o
590 590 self._servicehandler.bindsocket(self._sock, self.address)
591 591 if util.safehasattr(procutil, b'unblocksignal'):
592 592 procutil.unblocksignal(signal.SIGCHLD)
593 593 o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
594 594 self._oldsigchldhandler = o
595 595 self._socketunlinked = False
596 596 self._repoloader.start()
597 597
598 598 def _unlinksocket(self):
599 599 if not self._socketunlinked:
600 600 self._servicehandler.unlinksocket(self.address)
601 601 self._socketunlinked = True
602 602
603 603 def _cleanup(self):
604 604 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
605 605 self._sock.close()
606 606 self._mainipc.close()
607 607 self._workeripc.close()
608 608 self._unlinksocket()
609 609 self._repoloader.stop()
610 610 # don't kill child processes as they have active clients, just wait
611 611 self._reapworkers(0)
612 612
613 613 def run(self):
614 614 try:
615 615 self._mainloop()
616 616 finally:
617 617 self._cleanup()
618 618
619 619 def _mainloop(self):
620 620 exiting = False
621 621 h = self._servicehandler
622 622 selector = selectors.DefaultSelector()
623 623 selector.register(
624 624 self._sock, selectors.EVENT_READ, self._acceptnewconnection
625 625 )
626 626 selector.register(
627 627 self._mainipc, selectors.EVENT_READ, self._handlemainipc
628 628 )
629 629 while True:
630 630 if not exiting and h.shouldexit():
631 631 # clients can no longer connect() to the domain socket, so
632 632 # we stop queuing new requests.
633 633 # for requests that are queued (connect()-ed, but haven't been
634 634 # accept()-ed), handle them before exit. otherwise, clients
635 635 # waiting for recv() will receive ECONNRESET.
636 636 self._unlinksocket()
637 637 exiting = True
638 638 events = selector.select(timeout=h.pollinterval)
639 639 if not events:
640 640 # only exit if we completed all queued requests
641 641 if exiting:
642 642 break
643 643 continue
644 644 for key, _mask in events:
645 645 key.data(key.fileobj, selector)
646 646 selector.close()
647 647
648 648 def _acceptnewconnection(self, sock, selector):
649 649 h = self._servicehandler
650 650 conn, _addr = sock.accept()
651 651
652 652 # Future improvement: On Python 3.7, maybe gc.freeze() can be used
653 653 # to prevent COW memory from being touched by GC.
654 654 # https://instagram-engineering.com/
655 655 # copy-on-write-friendly-python-garbage-collection-ad6ed5233ddf
656 656 pid = os.fork()
657 657 if pid:
658 658 try:
659 659 self.ui.log(
660 660 b'cmdserver', b'forked worker process (pid=%d)\n', pid
661 661 )
662 662 self._workerpids.add(pid)
663 663 h.newconnection()
664 664 finally:
665 665 conn.close() # release handle in parent process
666 666 else:
667 667 try:
668 668 selector.close()
669 669 sock.close()
670 670 self._mainipc.close()
671 671 self._runworker(conn)
672 672 conn.close()
673 673 self._workeripc.close()
674 674 os._exit(0)
675 675 except: # never return, hence no re-raises
676 676 try:
677 677 self.ui.traceback(force=True)
678 678 finally:
679 679 os._exit(255)
680 680
681 681 def _handlemainipc(self, sock, selector):
682 682 """Process messages sent from a worker"""
683 683 path = sock.recv(32768) # large enough to receive path
684 684 self._repoloader.load(path)
685 685
686 686 def _sigchldhandler(self, signal, frame):
687 687 self._reapworkers(os.WNOHANG)
688 688
689 689 def _reapworkers(self, options):
690 690 while self._workerpids:
691 691 try:
692 692 pid, _status = os.waitpid(-1, options)
693 693 except ChildProcessError:
694 694 # no child processes at all (reaped by other waitpid()?)
695 695 self._workerpids.clear()
696 696 return
697 697 if pid == 0:
698 698 # no waitable child processes
699 699 return
700 700 self.ui.log(b'cmdserver', b'worker process exited (pid=%d)\n', pid)
701 701 self._workerpids.discard(pid)
702 702
703 703 def _runworker(self, conn):
704 704 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
705 705 _initworkerprocess()
706 706 h = self._servicehandler
707 707 try:
708 708 _serverequest(
709 709 self.ui,
710 710 self.repo,
711 711 conn,
712 712 h.createcmdserver,
713 713 prereposetups=[self._reposetup],
714 714 )
715 715 finally:
716 716 gc.collect() # trigger __del__ since worker process uses os._exit
717 717
718 718 def _reposetup(self, ui, repo):
719 719 if not repo.local():
720 720 return
721 721
722 722 class unixcmdserverrepo(repo.__class__):
723 723 def close(self):
724 724 super(unixcmdserverrepo, self).close()
725 725 try:
726 726 self._cmdserveripc.send(self.root)
727 727 except socket.error:
728 728 self.ui.log(
729 729 b'cmdserver', b'failed to send repo root to master\n'
730 730 )
731 731
732 732 repo.__class__ = unixcmdserverrepo
733 733 repo._cmdserveripc = self._workeripc
734 734
735 735 cachedrepo = self._repoloader.get(repo.root)
736 736 if cachedrepo is None:
737 737 return
738 738 repo.ui.log(b'repocache', b'repo from cache: %s\n', repo.root)
739 739 repocache.copycache(cachedrepo, repo)
General Comments 0
You need to be logged in to leave comments. Login now