##// END OF EJS Templates
narrow: fix flaky behavior described in issue6150...
narrow: fix flaky behavior described in issue6150 This has been plaguing the CI for a good while, and it doesn't appear to have an easy fix proposed yet. The solution in this change is to always do an unambiguous (but expensive) lookup in case of comparison. This should always be correct, albeit suboptimal. Differential Revision: https://phab.mercurial-scm.org/D10034

File last commit:

r46434:c102b704 default
r47280:b994db7c stable
Show More
dummysmtpd.py
121 lines | 3.3 KiB | text/x-python | PythonLexer
Gregory Szorc
global: use python3 in shebangs...
r46434 #!/usr/bin/env python3
Yuya Nishihara
tests: add dummy SMTP daemon for SSL tests...
r29332
"""dummy SMTP server for use in tests"""
from __future__ import absolute_import
import asyncore
import optparse
import smtpd
import ssl
import sys
Matt Harbison
dummysmtpd: don't die on client connection errors...
r35794 import traceback
Yuya Nishihara
tests: add dummy SMTP daemon for SSL tests...
r29332
from mercurial import (
Augie Fackler
tests: help dummysmtpd work on python 3...
r36584 pycompat,
Yuya Nishihara
server: move cmdutil.service() to new module (API)...
r30506 server,
Gregory Szorc
tests: use sslutil.wrapserversocket()...
r29556 sslutil,
ui as uimod,
Yuya Nishihara
tests: add dummy SMTP daemon for SSL tests...
r29332 )
Augie Fackler
formatting: blacken the codebase...
r43346
Yuya Nishihara
tests: add dummy SMTP daemon for SSL tests...
r29332 def log(msg):
sys.stdout.write(msg)
sys.stdout.flush()
Augie Fackler
formatting: blacken the codebase...
r43346
Yuya Nishihara
tests: add dummy SMTP daemon for SSL tests...
r29332 class dummysmtpserver(smtpd.SMTPServer):
def __init__(self, localaddr):
smtpd.SMTPServer.__init__(self, localaddr, remoteaddr=None)
Augie Fackler
dummysmtpd: accept additional kwargs from stdlib smtpd...
r39064 def process_message(self, peer, mailfrom, rcpttos, data, **kwargs):
Yuya Nishihara
tests: add dummy SMTP daemon for SSL tests...
r29332 log('%s from=%s to=%s\n' % (peer[0], mailfrom, ', '.join(rcpttos)))
Matt Harbison
dummysmtpd: don't die on client connection errors...
r35794 def handle_error(self):
# On Windows, a bad SSL connection sometimes generates a WSAECONNRESET.
# The default handler will shutdown this server, and then both the
# current connection and subsequent ones fail on the client side with
# "No connection could be made because the target machine actively
# refused it". If we eat the error, then the client properly aborts in
# the expected way, and the server is available for subsequent requests.
traceback.print_exc()
Augie Fackler
formatting: blacken the codebase...
r43346
Yuya Nishihara
tests: add dummy SMTP daemon for SSL tests...
r29332 class dummysmtpsecureserver(dummysmtpserver):
def __init__(self, localaddr, certfile):
dummysmtpserver.__init__(self, localaddr)
self._certfile = certfile
def handle_accept(self):
pair = self.accept()
if not pair:
return
conn, addr = pair
Yuya Nishihara
ui: factor out ui.load() to create a ui without loading configs (API)...
r30559 ui = uimod.ui.load()
Yuya Nishihara
tests: add dummy SMTP daemon for SSL tests...
r29332 try:
# wrap_socket() would block, but we don't care
Gregory Szorc
tests: use sslutil.wrapserversocket()...
r29556 conn = sslutil.wrapserversocket(conn, ui, certfile=self._certfile)
Yuya Nishihara
tests: add dummy SMTP daemon for SSL tests...
r29332 except ssl.SSLError:
log('%s ssl error\n' % addr[0])
conn.close()
return
smtpd.SMTPChannel(self, conn, addr)
Augie Fackler
formatting: blacken the codebase...
r43346
Yuya Nishihara
tests: add dummy SMTP daemon for SSL tests...
r29332 def run():
try:
asyncore.loop()
except KeyboardInterrupt:
pass
Augie Fackler
formatting: blacken the codebase...
r43346
Augie Fackler
tests: help dummysmtpd work on python 3...
r36584 def _encodestrsonly(v):
if isinstance(v, type(u'')):
return v.encode('ascii')
return v
Augie Fackler
formatting: blacken the codebase...
r43346
Augie Fackler
tests: help dummysmtpd work on python 3...
r36584 def bytesvars(obj):
unidict = vars(obj)
bd = {k.encode('ascii'): _encodestrsonly(v) for k, v in unidict.items()}
if bd[b'daemon_postexec'] is not None:
bd[b'daemon_postexec'] = [
Augie Fackler
formatting: blacken the codebase...
r43346 _encodestrsonly(v) for v in bd[b'daemon_postexec']
]
Augie Fackler
tests: help dummysmtpd work on python 3...
r36584 return bd
Augie Fackler
formatting: blacken the codebase...
r43346
Yuya Nishihara
tests: add dummy SMTP daemon for SSL tests...
r29332 def main():
op = optparse.OptionParser()
op.add_option('-d', '--daemon', action='store_true')
op.add_option('--daemon-postexec', action='append')
op.add_option('-p', '--port', type=int, default=8025)
op.add_option('-a', '--address', default='localhost')
op.add_option('--pid-file', metavar='FILE')
op.add_option('--tls', choices=['none', 'smtps'], default='none')
op.add_option('--certificate', metavar='FILE')
opts, args = op.parse_args()
if opts.tls == 'smtps' and not opts.certificate:
op.error('--certificate must be specified')
addr = (opts.address, opts.port)
Augie Fackler
formatting: blacken the codebase...
r43346
Yuya Nishihara
tests: add dummy SMTP daemon for SSL tests...
r29332 def init():
if opts.tls == 'none':
dummysmtpserver(addr)
else:
dummysmtpsecureserver(addr, opts.certificate)
log('listening at %s:%d\n' % addr)
Augie Fackler
tests: help dummysmtpd work on python 3...
r36584 server.runservice(
Augie Fackler
formatting: blacken the codebase...
r43346 bytesvars(opts),
initfn=init,
runfn=run,
runargs=[pycompat.sysexecutable, pycompat.fsencode(__file__)]
+ pycompat.sysargv[1:],
)
Yuya Nishihara
tests: add dummy SMTP daemon for SSL tests...
r29332
if __name__ == '__main__':
main()