##// END OF EJS Templates
windows: use abspath in commandserver...
marmoute -
r48424:27e75b8b default
parent child Browse files
Show More
@@ -1,771 +1,771 b''
1 1 # commandserver.py - communicate with Mercurial's API over a pipe
2 2 #
3 3 # Copyright Olivia Mackall <olivia@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import errno
11 11 import gc
12 12 import os
13 13 import random
14 14 import signal
15 15 import socket
16 16 import struct
17 17 import traceback
18 18
19 19 try:
20 20 import selectors
21 21
22 22 selectors.BaseSelector
23 23 except ImportError:
24 24 from .thirdparty import selectors2 as selectors
25 25
26 26 from .i18n import _
27 27 from .pycompat import getattr
28 28 from . import (
29 29 encoding,
30 30 error,
31 31 loggingutil,
32 32 pycompat,
33 33 repocache,
34 34 util,
35 35 vfs as vfsmod,
36 36 )
37 37 from .utils import (
38 38 cborutil,
39 39 procutil,
40 40 )
41 41
42 42
43 43 class channeledoutput(object):
44 44 """
45 45 Write data to out in the following format:
46 46
47 47 data length (unsigned int),
48 48 data
49 49 """
50 50
51 51 def __init__(self, out, channel):
52 52 self.out = out
53 53 self.channel = channel
54 54
55 55 @property
56 56 def name(self):
57 57 return b'<%c-channel>' % self.channel
58 58
59 59 def write(self, data):
60 60 if not data:
61 61 return
62 62 # single write() to guarantee the same atomicity as the underlying file
63 63 self.out.write(struct.pack(b'>cI', self.channel, len(data)) + data)
64 64 self.out.flush()
65 65
66 66 def __getattr__(self, attr):
67 67 if attr in ('isatty', 'fileno', 'tell', 'seek'):
68 68 raise AttributeError(attr)
69 69 return getattr(self.out, attr)
70 70
71 71
72 72 class channeledmessage(object):
73 73 """
74 74 Write encoded message and metadata to out in the following format:
75 75
76 76 data length (unsigned int),
77 77 encoded message and metadata, as a flat key-value dict.
78 78
79 79 Each message should have 'type' attribute. Messages of unknown type
80 80 should be ignored.
81 81 """
82 82
83 83 # teach ui that write() can take **opts
84 84 structured = True
85 85
86 86 def __init__(self, out, channel, encodename, encodefn):
87 87 self._cout = channeledoutput(out, channel)
88 88 self.encoding = encodename
89 89 self._encodefn = encodefn
90 90
91 91 def write(self, data, **opts):
92 92 opts = pycompat.byteskwargs(opts)
93 93 if data is not None:
94 94 opts[b'data'] = data
95 95 self._cout.write(self._encodefn(opts))
96 96
97 97 def __getattr__(self, attr):
98 98 return getattr(self._cout, attr)
99 99
100 100
101 101 class channeledinput(object):
102 102 """
103 103 Read data from in_.
104 104
105 105 Requests for input are written to out in the following format:
106 106 channel identifier - 'I' for plain input, 'L' line based (1 byte)
107 107 how many bytes to send at most (unsigned int),
108 108
109 109 The client replies with:
110 110 data length (unsigned int), 0 meaning EOF
111 111 data
112 112 """
113 113
114 114 maxchunksize = 4 * 1024
115 115
116 116 def __init__(self, in_, out, channel):
117 117 self.in_ = in_
118 118 self.out = out
119 119 self.channel = channel
120 120
121 121 @property
122 122 def name(self):
123 123 return b'<%c-channel>' % self.channel
124 124
125 125 def read(self, size=-1):
126 126 if size < 0:
127 127 # if we need to consume all the clients input, ask for 4k chunks
128 128 # so the pipe doesn't fill up risking a deadlock
129 129 size = self.maxchunksize
130 130 s = self._read(size, self.channel)
131 131 buf = s
132 132 while s:
133 133 s = self._read(size, self.channel)
134 134 buf += s
135 135
136 136 return buf
137 137 else:
138 138 return self._read(size, self.channel)
139 139
140 140 def _read(self, size, channel):
141 141 if not size:
142 142 return b''
143 143 assert size > 0
144 144
145 145 # tell the client we need at most size bytes
146 146 self.out.write(struct.pack(b'>cI', channel, size))
147 147 self.out.flush()
148 148
149 149 length = self.in_.read(4)
150 150 length = struct.unpack(b'>I', length)[0]
151 151 if not length:
152 152 return b''
153 153 else:
154 154 return self.in_.read(length)
155 155
156 156 def readline(self, size=-1):
157 157 if size < 0:
158 158 size = self.maxchunksize
159 159 s = self._read(size, b'L')
160 160 buf = s
161 161 # keep asking for more until there's either no more or
162 162 # we got a full line
163 163 while s and not s.endswith(b'\n'):
164 164 s = self._read(size, b'L')
165 165 buf += s
166 166
167 167 return buf
168 168 else:
169 169 return self._read(size, b'L')
170 170
171 171 def __iter__(self):
172 172 return self
173 173
174 174 def next(self):
175 175 l = self.readline()
176 176 if not l:
177 177 raise StopIteration
178 178 return l
179 179
180 180 __next__ = next
181 181
182 182 def __getattr__(self, attr):
183 183 if attr in ('isatty', 'fileno', 'tell', 'seek'):
184 184 raise AttributeError(attr)
185 185 return getattr(self.in_, attr)
186 186
187 187
188 188 _messageencoders = {
189 189 b'cbor': lambda v: b''.join(cborutil.streamencode(v)),
190 190 }
191 191
192 192
193 193 def _selectmessageencoder(ui):
194 194 encnames = ui.configlist(b'cmdserver', b'message-encodings')
195 195 for n in encnames:
196 196 f = _messageencoders.get(n)
197 197 if f:
198 198 return n, f
199 199 raise error.Abort(
200 200 b'no supported message encodings: %s' % b' '.join(encnames)
201 201 )
202 202
203 203
204 204 class server(object):
205 205 """
206 206 Listens for commands on fin, runs them and writes the output on a channel
207 207 based stream to fout.
208 208 """
209 209
210 210 def __init__(self, ui, repo, fin, fout, prereposetups=None):
211 211 self.cwd = encoding.getcwd()
212 212
213 213 if repo:
214 214 # the ui here is really the repo ui so take its baseui so we don't
215 215 # end up with its local configuration
216 216 self.ui = repo.baseui
217 217 self.repo = repo
218 218 self.repoui = repo.ui
219 219 else:
220 220 self.ui = ui
221 221 self.repo = self.repoui = None
222 222 self._prereposetups = prereposetups
223 223
224 224 self.cdebug = channeledoutput(fout, b'd')
225 225 self.cerr = channeledoutput(fout, b'e')
226 226 self.cout = channeledoutput(fout, b'o')
227 227 self.cin = channeledinput(fin, fout, b'I')
228 228 self.cresult = channeledoutput(fout, b'r')
229 229
230 230 if self.ui.config(b'cmdserver', b'log') == b'-':
231 231 # switch log stream of server's ui to the 'd' (debug) channel
232 232 # (don't touch repo.ui as its lifetime is longer than the server)
233 233 self.ui = self.ui.copy()
234 234 setuplogging(self.ui, repo=None, fp=self.cdebug)
235 235
236 236 self.cmsg = None
237 237 if ui.config(b'ui', b'message-output') == b'channel':
238 238 encname, encfn = _selectmessageencoder(ui)
239 239 self.cmsg = channeledmessage(fout, b'm', encname, encfn)
240 240
241 241 self.client = fin
242 242
243 243 # If shutdown-on-interrupt is off, the default SIGINT handler is
244 244 # removed so that client-server communication wouldn't be interrupted.
245 245 # For example, 'runcommand' handler will issue three short read()s.
246 246 # If one of the first two read()s were interrupted, the communication
247 247 # channel would be left at dirty state and the subsequent request
248 248 # wouldn't be parsed. So catching KeyboardInterrupt isn't enough.
249 249 self._shutdown_on_interrupt = ui.configbool(
250 250 b'cmdserver', b'shutdown-on-interrupt'
251 251 )
252 252 self._old_inthandler = None
253 253 if not self._shutdown_on_interrupt:
254 254 self._old_inthandler = signal.signal(signal.SIGINT, signal.SIG_IGN)
255 255
256 256 def cleanup(self):
257 257 """release and restore resources taken during server session"""
258 258 if not self._shutdown_on_interrupt:
259 259 signal.signal(signal.SIGINT, self._old_inthandler)
260 260
261 261 def _read(self, size):
262 262 if not size:
263 263 return b''
264 264
265 265 data = self.client.read(size)
266 266
267 267 # is the other end closed?
268 268 if not data:
269 269 raise EOFError
270 270
271 271 return data
272 272
273 273 def _readstr(self):
274 274 """read a string from the channel
275 275
276 276 format:
277 277 data length (uint32), data
278 278 """
279 279 length = struct.unpack(b'>I', self._read(4))[0]
280 280 if not length:
281 281 return b''
282 282 return self._read(length)
283 283
284 284 def _readlist(self):
285 285 """read a list of NULL separated strings from the channel"""
286 286 s = self._readstr()
287 287 if s:
288 288 return s.split(b'\0')
289 289 else:
290 290 return []
291 291
292 292 def _dispatchcommand(self, req):
293 293 from . import dispatch # avoid cycle
294 294
295 295 if self._shutdown_on_interrupt:
296 296 # no need to restore SIGINT handler as it is unmodified.
297 297 return dispatch.dispatch(req)
298 298
299 299 try:
300 300 signal.signal(signal.SIGINT, self._old_inthandler)
301 301 return dispatch.dispatch(req)
302 302 except error.SignalInterrupt:
303 303 # propagate SIGBREAK, SIGHUP, or SIGTERM.
304 304 raise
305 305 except KeyboardInterrupt:
306 306 # SIGINT may be received out of the try-except block of dispatch(),
307 307 # so catch it as last ditch. Another KeyboardInterrupt may be
308 308 # raised while handling exceptions here, but there's no way to
309 309 # avoid that except for doing everything in C.
310 310 pass
311 311 finally:
312 312 signal.signal(signal.SIGINT, signal.SIG_IGN)
313 313 # On KeyboardInterrupt, print error message and exit *after* SIGINT
314 314 # handler removed.
315 315 req.ui.error(_(b'interrupted!\n'))
316 316 return -1
317 317
318 318 def runcommand(self):
319 319 """reads a list of \0 terminated arguments, executes
320 320 and writes the return code to the result channel"""
321 321 from . import dispatch # avoid cycle
322 322
323 323 args = self._readlist()
324 324
325 325 # copy the uis so changes (e.g. --config or --verbose) don't
326 326 # persist between requests
327 327 copiedui = self.ui.copy()
328 328 uis = [copiedui]
329 329 if self.repo:
330 330 self.repo.baseui = copiedui
331 331 # clone ui without using ui.copy because this is protected
332 332 repoui = self.repoui.__class__(self.repoui)
333 333 repoui.copy = copiedui.copy # redo copy protection
334 334 uis.append(repoui)
335 335 self.repo.ui = self.repo.dirstate._ui = repoui
336 336 self.repo.invalidateall()
337 337
338 338 for ui in uis:
339 339 ui.resetstate()
340 340 # any kind of interaction must use server channels, but chg may
341 341 # replace channels by fully functional tty files. so nontty is
342 342 # enforced only if cin is a channel.
343 343 if not util.safehasattr(self.cin, b'fileno'):
344 344 ui.setconfig(b'ui', b'nontty', b'true', b'commandserver')
345 345
346 346 req = dispatch.request(
347 347 args[:],
348 348 copiedui,
349 349 self.repo,
350 350 self.cin,
351 351 self.cout,
352 352 self.cerr,
353 353 self.cmsg,
354 354 prereposetups=self._prereposetups,
355 355 )
356 356
357 357 try:
358 358 ret = self._dispatchcommand(req) & 255
359 359 # If shutdown-on-interrupt is off, it's important to write the
360 360 # result code *after* SIGINT handler removed. If the result code
361 361 # were lost, the client wouldn't be able to continue processing.
362 362 self.cresult.write(struct.pack(b'>i', int(ret)))
363 363 finally:
364 364 # restore old cwd
365 365 if b'--cwd' in args:
366 366 os.chdir(self.cwd)
367 367
368 368 def getencoding(self):
369 369 """writes the current encoding to the result channel"""
370 370 self.cresult.write(encoding.encoding)
371 371
372 372 def serveone(self):
373 373 cmd = self.client.readline()[:-1]
374 374 if cmd:
375 375 handler = self.capabilities.get(cmd)
376 376 if handler:
377 377 handler(self)
378 378 else:
379 379 # clients are expected to check what commands are supported by
380 380 # looking at the servers capabilities
381 381 raise error.Abort(_(b'unknown command %s') % cmd)
382 382
383 383 return cmd != b''
384 384
385 385 capabilities = {b'runcommand': runcommand, b'getencoding': getencoding}
386 386
387 387 def serve(self):
388 388 hellomsg = b'capabilities: ' + b' '.join(sorted(self.capabilities))
389 389 hellomsg += b'\n'
390 390 hellomsg += b'encoding: ' + encoding.encoding
391 391 hellomsg += b'\n'
392 392 if self.cmsg:
393 393 hellomsg += b'message-encoding: %s\n' % self.cmsg.encoding
394 394 hellomsg += b'pid: %d' % procutil.getpid()
395 395 if util.safehasattr(os, b'getpgid'):
396 396 hellomsg += b'\n'
397 397 hellomsg += b'pgid: %d' % os.getpgid(0)
398 398
399 399 # write the hello msg in -one- chunk
400 400 self.cout.write(hellomsg)
401 401
402 402 try:
403 403 while self.serveone():
404 404 pass
405 405 except EOFError:
406 406 # we'll get here if the client disconnected while we were reading
407 407 # its request
408 408 return 1
409 409
410 410 return 0
411 411
412 412
413 413 def setuplogging(ui, repo=None, fp=None):
414 414 """Set up server logging facility
415 415
416 416 If cmdserver.log is '-', log messages will be sent to the given fp.
417 417 It should be the 'd' channel while a client is connected, and otherwise
418 418 is the stderr of the server process.
419 419 """
420 420 # developer config: cmdserver.log
421 421 logpath = ui.config(b'cmdserver', b'log')
422 422 if not logpath:
423 423 return
424 424 # developer config: cmdserver.track-log
425 425 tracked = set(ui.configlist(b'cmdserver', b'track-log'))
426 426
427 427 if logpath == b'-' and fp:
428 428 logger = loggingutil.fileobjectlogger(fp, tracked)
429 429 elif logpath == b'-':
430 430 logger = loggingutil.fileobjectlogger(ui.ferr, tracked)
431 431 else:
432 logpath = os.path.abspath(util.expandpath(logpath))
432 logpath = util.abspath(util.expandpath(logpath))
433 433 # developer config: cmdserver.max-log-files
434 434 maxfiles = ui.configint(b'cmdserver', b'max-log-files')
435 435 # developer config: cmdserver.max-log-size
436 436 maxsize = ui.configbytes(b'cmdserver', b'max-log-size')
437 437 vfs = vfsmod.vfs(os.path.dirname(logpath))
438 438 logger = loggingutil.filelogger(
439 439 vfs,
440 440 os.path.basename(logpath),
441 441 tracked,
442 442 maxfiles=maxfiles,
443 443 maxsize=maxsize,
444 444 )
445 445
446 446 targetuis = {ui}
447 447 if repo:
448 448 targetuis.add(repo.baseui)
449 449 targetuis.add(repo.ui)
450 450 for u in targetuis:
451 451 u.setlogger(b'cmdserver', logger)
452 452
453 453
454 454 class pipeservice(object):
455 455 def __init__(self, ui, repo, opts):
456 456 self.ui = ui
457 457 self.repo = repo
458 458
459 459 def init(self):
460 460 pass
461 461
462 462 def run(self):
463 463 ui = self.ui
464 464 # redirect stdio to null device so that broken extensions or in-process
465 465 # hooks will never cause corruption of channel protocol.
466 466 with ui.protectedfinout() as (fin, fout):
467 467 sv = server(ui, self.repo, fin, fout)
468 468 try:
469 469 return sv.serve()
470 470 finally:
471 471 sv.cleanup()
472 472
473 473
474 474 def _initworkerprocess():
475 475 # use a different process group from the master process, in order to:
476 476 # 1. make the current process group no longer "orphaned" (because the
477 477 # parent of this process is in a different process group while
478 478 # remains in a same session)
479 479 # according to POSIX 2.2.2.52, orphaned process group will ignore
480 480 # terminal-generated stop signals like SIGTSTP (Ctrl+Z), which will
481 481 # cause trouble for things like ncurses.
482 482 # 2. the client can use kill(-pgid, sig) to simulate terminal-generated
483 483 # SIGINT (Ctrl+C) and process-exit-generated SIGHUP. our child
484 484 # processes like ssh will be killed properly, without affecting
485 485 # unrelated processes.
486 486 os.setpgid(0, 0)
487 487 # change random state otherwise forked request handlers would have a
488 488 # same state inherited from parent.
489 489 random.seed()
490 490
491 491
492 492 def _serverequest(ui, repo, conn, createcmdserver, prereposetups):
493 493 fin = conn.makefile('rb')
494 494 fout = conn.makefile('wb')
495 495 sv = None
496 496 try:
497 497 sv = createcmdserver(repo, conn, fin, fout, prereposetups)
498 498 try:
499 499 sv.serve()
500 500 # handle exceptions that may be raised by command server. most of
501 501 # known exceptions are caught by dispatch.
502 502 except error.Abort as inst:
503 503 ui.error(_(b'abort: %s\n') % inst.message)
504 504 except IOError as inst:
505 505 if inst.errno != errno.EPIPE:
506 506 raise
507 507 except KeyboardInterrupt:
508 508 pass
509 509 finally:
510 510 sv.cleanup()
511 511 except: # re-raises
512 512 # also write traceback to error channel. otherwise client cannot
513 513 # see it because it is written to server's stderr by default.
514 514 if sv:
515 515 cerr = sv.cerr
516 516 else:
517 517 cerr = channeledoutput(fout, b'e')
518 518 cerr.write(encoding.strtolocal(traceback.format_exc()))
519 519 raise
520 520 finally:
521 521 fin.close()
522 522 try:
523 523 fout.close() # implicit flush() may cause another EPIPE
524 524 except IOError as inst:
525 525 if inst.errno != errno.EPIPE:
526 526 raise
527 527
528 528
529 529 class unixservicehandler(object):
530 530 """Set of pluggable operations for unix-mode services
531 531
532 532 Almost all methods except for createcmdserver() are called in the main
533 533 process. You can't pass mutable resource back from createcmdserver().
534 534 """
535 535
536 536 pollinterval = None
537 537
538 538 def __init__(self, ui):
539 539 self.ui = ui
540 540
541 541 def bindsocket(self, sock, address):
542 542 util.bindunixsocket(sock, address)
543 543 sock.listen(socket.SOMAXCONN)
544 544 self.ui.status(_(b'listening at %s\n') % address)
545 545 self.ui.flush() # avoid buffering of status message
546 546
547 547 def unlinksocket(self, address):
548 548 os.unlink(address)
549 549
550 550 def shouldexit(self):
551 551 """True if server should shut down; checked per pollinterval"""
552 552 return False
553 553
554 554 def newconnection(self):
555 555 """Called when main process notices new connection"""
556 556
557 557 def createcmdserver(self, repo, conn, fin, fout, prereposetups):
558 558 """Create new command server instance; called in the process that
559 559 serves for the current connection"""
560 560 return server(self.ui, repo, fin, fout, prereposetups)
561 561
562 562
563 563 class unixforkingservice(object):
564 564 """
565 565 Listens on unix domain socket and forks server per connection
566 566 """
567 567
568 568 def __init__(self, ui, repo, opts, handler=None):
569 569 self.ui = ui
570 570 self.repo = repo
571 571 self.address = opts[b'address']
572 572 if not util.safehasattr(socket, b'AF_UNIX'):
573 573 raise error.Abort(_(b'unsupported platform'))
574 574 if not self.address:
575 575 raise error.Abort(_(b'no socket path specified with --address'))
576 576 self._servicehandler = handler or unixservicehandler(ui)
577 577 self._sock = None
578 578 self._mainipc = None
579 579 self._workeripc = None
580 580 self._oldsigchldhandler = None
581 581 self._workerpids = set() # updated by signal handler; do not iterate
582 582 self._socketunlinked = None
583 583 # experimental config: cmdserver.max-repo-cache
584 584 maxlen = ui.configint(b'cmdserver', b'max-repo-cache')
585 585 if maxlen < 0:
586 586 raise error.Abort(_(b'negative max-repo-cache size not allowed'))
587 587 self._repoloader = repocache.repoloader(ui, maxlen)
588 588 # attempt to avoid crash in CoreFoundation when using chg after fix in
589 589 # a89381e04c58
590 590 if pycompat.isdarwin:
591 591 procutil.gui()
592 592
593 593 def init(self):
594 594 self._sock = socket.socket(socket.AF_UNIX)
595 595 # IPC channel from many workers to one main process; this is actually
596 596 # a uni-directional pipe, but is backed by a DGRAM socket so each
597 597 # message can be easily separated.
598 598 o = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM)
599 599 self._mainipc, self._workeripc = o
600 600 self._servicehandler.bindsocket(self._sock, self.address)
601 601 if util.safehasattr(procutil, b'unblocksignal'):
602 602 procutil.unblocksignal(signal.SIGCHLD)
603 603 o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
604 604 self._oldsigchldhandler = o
605 605 self._socketunlinked = False
606 606 self._repoloader.start()
607 607
608 608 def _unlinksocket(self):
609 609 if not self._socketunlinked:
610 610 self._servicehandler.unlinksocket(self.address)
611 611 self._socketunlinked = True
612 612
613 613 def _cleanup(self):
614 614 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
615 615 self._sock.close()
616 616 self._mainipc.close()
617 617 self._workeripc.close()
618 618 self._unlinksocket()
619 619 self._repoloader.stop()
620 620 # don't kill child processes as they have active clients, just wait
621 621 self._reapworkers(0)
622 622
623 623 def run(self):
624 624 try:
625 625 self._mainloop()
626 626 finally:
627 627 self._cleanup()
628 628
629 629 def _mainloop(self):
630 630 exiting = False
631 631 h = self._servicehandler
632 632 selector = selectors.DefaultSelector()
633 633 selector.register(
634 634 self._sock, selectors.EVENT_READ, self._acceptnewconnection
635 635 )
636 636 selector.register(
637 637 self._mainipc, selectors.EVENT_READ, self._handlemainipc
638 638 )
639 639 while True:
640 640 if not exiting and h.shouldexit():
641 641 # clients can no longer connect() to the domain socket, so
642 642 # we stop queuing new requests.
643 643 # for requests that are queued (connect()-ed, but haven't been
644 644 # accept()-ed), handle them before exit. otherwise, clients
645 645 # waiting for recv() will receive ECONNRESET.
646 646 self._unlinksocket()
647 647 exiting = True
648 648 try:
649 649 events = selector.select(timeout=h.pollinterval)
650 650 except OSError as inst:
651 651 # selectors2 raises ETIMEDOUT if timeout exceeded while
652 652 # handling signal interrupt. That's probably wrong, but
653 653 # we can easily get around it.
654 654 if inst.errno != errno.ETIMEDOUT:
655 655 raise
656 656 events = []
657 657 if not events:
658 658 # only exit if we completed all queued requests
659 659 if exiting:
660 660 break
661 661 continue
662 662 for key, _mask in events:
663 663 key.data(key.fileobj, selector)
664 664 selector.close()
665 665
666 666 def _acceptnewconnection(self, sock, selector):
667 667 h = self._servicehandler
668 668 try:
669 669 conn, _addr = sock.accept()
670 670 except socket.error as inst:
671 671 if inst.args[0] == errno.EINTR:
672 672 return
673 673 raise
674 674
675 675 # Future improvement: On Python 3.7, maybe gc.freeze() can be used
676 676 # to prevent COW memory from being touched by GC.
677 677 # https://instagram-engineering.com/
678 678 # copy-on-write-friendly-python-garbage-collection-ad6ed5233ddf
679 679 pid = os.fork()
680 680 if pid:
681 681 try:
682 682 self.ui.log(
683 683 b'cmdserver', b'forked worker process (pid=%d)\n', pid
684 684 )
685 685 self._workerpids.add(pid)
686 686 h.newconnection()
687 687 finally:
688 688 conn.close() # release handle in parent process
689 689 else:
690 690 try:
691 691 selector.close()
692 692 sock.close()
693 693 self._mainipc.close()
694 694 self._runworker(conn)
695 695 conn.close()
696 696 self._workeripc.close()
697 697 os._exit(0)
698 698 except: # never return, hence no re-raises
699 699 try:
700 700 self.ui.traceback(force=True)
701 701 finally:
702 702 os._exit(255)
703 703
704 704 def _handlemainipc(self, sock, selector):
705 705 """Process messages sent from a worker"""
706 706 try:
707 707 path = sock.recv(32768) # large enough to receive path
708 708 except socket.error as inst:
709 709 if inst.args[0] == errno.EINTR:
710 710 return
711 711 raise
712 712 self._repoloader.load(path)
713 713
714 714 def _sigchldhandler(self, signal, frame):
715 715 self._reapworkers(os.WNOHANG)
716 716
717 717 def _reapworkers(self, options):
718 718 while self._workerpids:
719 719 try:
720 720 pid, _status = os.waitpid(-1, options)
721 721 except OSError as inst:
722 722 if inst.errno == errno.EINTR:
723 723 continue
724 724 if inst.errno != errno.ECHILD:
725 725 raise
726 726 # no child processes at all (reaped by other waitpid()?)
727 727 self._workerpids.clear()
728 728 return
729 729 if pid == 0:
730 730 # no waitable child processes
731 731 return
732 732 self.ui.log(b'cmdserver', b'worker process exited (pid=%d)\n', pid)
733 733 self._workerpids.discard(pid)
734 734
735 735 def _runworker(self, conn):
736 736 signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
737 737 _initworkerprocess()
738 738 h = self._servicehandler
739 739 try:
740 740 _serverequest(
741 741 self.ui,
742 742 self.repo,
743 743 conn,
744 744 h.createcmdserver,
745 745 prereposetups=[self._reposetup],
746 746 )
747 747 finally:
748 748 gc.collect() # trigger __del__ since worker process uses os._exit
749 749
750 750 def _reposetup(self, ui, repo):
751 751 if not repo.local():
752 752 return
753 753
754 754 class unixcmdserverrepo(repo.__class__):
755 755 def close(self):
756 756 super(unixcmdserverrepo, self).close()
757 757 try:
758 758 self._cmdserveripc.send(self.root)
759 759 except socket.error:
760 760 self.ui.log(
761 761 b'cmdserver', b'failed to send repo root to master\n'
762 762 )
763 763
764 764 repo.__class__ = unixcmdserverrepo
765 765 repo._cmdserveripc = self._workeripc
766 766
767 767 cachedrepo = self._repoloader.get(repo.root)
768 768 if cachedrepo is None:
769 769 return
770 770 repo.ui.log(b'repocache', b'repo from cache: %s\n', repo.root)
771 771 repocache.copycache(cachedrepo, repo)
General Comments 0
You need to be logged in to leave comments. Login now