##// END OF EJS Templates
typing: add stub functions for `cext/charencoding`...
typing: add stub functions for `cext/charencoding` I'm not sure if it's better to have a separate file, and currently pytype doesn't really know how to handle these, so it's no help in figuring that out. Technically, these methods are part of the `mercurial.cext.parsers` module, so put them into the existing stub until there's a reason to split it out.

File last commit:

r52603:8fe7c0e1 default
r52834:e58f02e2 default
Show More
dummysmtpd.py
161 lines | 4.2 KiB | text/x-python | PythonLexer
run-tests: stop writing a `python3` symlink pointing to python2...
r48294 #!/usr/bin/env python
Yuya Nishihara
tests: add dummy SMTP daemon for SSL tests...
r29332
"""dummy SMTP server for use in tests"""
import optparse
Mads Kiilerich
tests: use simple mock smtp server instead of deprecated asyncore smtpd...
r51625 import os
import socket
Yuya Nishihara
tests: add dummy SMTP daemon for SSL tests...
r29332 import ssl
import sys
from mercurial import (
Augie Fackler
tests: help dummysmtpd work on python 3...
r36584 pycompat,
Yuya Nishihara
server: move cmdutil.service() to new module (API)...
r30506 server,
Gregory Szorc
tests: use sslutil.wrapserversocket()...
r29556 sslutil,
ui as uimod,
Yuya Nishihara
tests: add dummy SMTP daemon for SSL tests...
r29332 )
Augie Fackler
formatting: blacken the codebase...
r43346
Mads Kiilerich
tests: use simple mock smtp server instead of deprecated asyncore smtpd...
r51625 if os.environ.get('HGIPV6', '0') == '1':
family = socket.AF_INET6
else:
family = socket.AF_INET
Yuya Nishihara
tests: add dummy SMTP daemon for SSL tests...
r29332 def log(msg):
sys.stdout.write(msg)
sys.stdout.flush()
Augie Fackler
formatting: blacken the codebase...
r43346
Mads Kiilerich
tests: use simple mock smtp server instead of deprecated asyncore smtpd...
r51625 def mocksmtpserversession(conn, addr):
conn.send(b'220 smtp.example.com ESMTP\r\n')
Raphaël Gomès
dummysmtpd: fix EOF handling on newer versions of OpenSSL...
r52603 try:
# Newer versions of OpenSSL raise on EOF
line = conn.recv(1024)
except ssl.SSLError:
log('no hello: EOF\n')
return
Mads Kiilerich
tests: use simple mock smtp server instead of deprecated asyncore smtpd...
r51625 if not line.lower().startswith(b'ehlo '):
Raphaël Gomès
dummysmtpd: fix EOF handling on newer versions of OpenSSL...
r52603 # Older versions of OpenSSl don't raise
Mads Kiilerich
tests: use simple mock smtp server instead of deprecated asyncore smtpd...
r51625 log('no hello: %s\n' % line)
return
conn.send(b'250 Hello\r\n')
line = conn.recv(1024)
if not line.lower().startswith(b'mail from:'):
log('no mail from: %s\n' % line)
return
mailfrom = line[10:].decode().rstrip()
if mailfrom.startswith('<') and mailfrom.endswith('>'):
mailfrom = mailfrom[1:-1]
conn.send(b'250 Ok\r\n')
Yuya Nishihara
tests: add dummy SMTP daemon for SSL tests...
r29332
Mads Kiilerich
tests: use simple mock smtp server instead of deprecated asyncore smtpd...
r51625 rcpttos = []
while True:
line = conn.recv(1024)
if not line.lower().startswith(b'rcpt to:'):
break
rcptto = line[8:].decode().rstrip()
if rcptto.startswith('<') and rcptto.endswith('>'):
rcptto = rcptto[1:-1]
rcpttos.append(rcptto)
conn.send(b'250 Ok\r\n')
if not line.lower().strip() == b'data':
log('no rcpt to or data: %s' % line)
conn.send(b'354 Go ahead\r\n')
Yuya Nishihara
tests: add dummy SMTP daemon for SSL tests...
r29332
Mads Kiilerich
tests: use simple mock smtp server instead of deprecated asyncore smtpd...
r51625 data = b''
while True:
line = conn.recv(1024)
if not line:
log('connection closed before end of data')
break
data += line
if data.endswith(b'\r\n.\r\n'):
data = data[:-5]
break
conn.send(b'250 Ok\r\n')
log(
'%s from=%s to=%s\n%s\n'
% (addr[0], mailfrom, ', '.join(rcpttos), data.decode())
)
Matt Harbison
dummysmtpd: don't die on client connection errors...
r35794
Augie Fackler
formatting: blacken the codebase...
r43346
Mads Kiilerich
tests: use simple mock smtp server instead of deprecated asyncore smtpd...
r51625 def run(host, port, certificate):
ui = uimod.ui.load()
with socket.socket(family, socket.SOCK_STREAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((host, port))
# log('listening at %s:%d\n' % (host, port))
s.listen(1)
Yuya Nishihara
tests: add dummy SMTP daemon for SSL tests...
r29332 try:
Mads Kiilerich
tests: use simple mock smtp server instead of deprecated asyncore smtpd...
r51625 while True:
conn, addr = s.accept()
if certificate:
try:
conn = sslutil.wrapserversocket(
conn, ui, certfile=certificate
)
except ssl.SSLError as e:
log('%s ssl error: %s\n' % (addr[0], e))
conn.close()
continue
log("connection from %s:%s\n" % addr)
mocksmtpserversession(conn, addr)
conn.close()
except KeyboardInterrupt:
pass
Yuya Nishihara
tests: add dummy SMTP daemon for SSL tests...
r29332
Augie Fackler
formatting: blacken the codebase...
r43346
Augie Fackler
tests: help dummysmtpd work on python 3...
r36584 def _encodestrsonly(v):
if isinstance(v, type(u'')):
return v.encode('ascii')
return v
Augie Fackler
formatting: blacken the codebase...
r43346
Augie Fackler
tests: help dummysmtpd work on python 3...
r36584 def bytesvars(obj):
unidict = vars(obj)
bd = {k.encode('ascii'): _encodestrsonly(v) for k, v in unidict.items()}
if bd[b'daemon_postexec'] is not None:
bd[b'daemon_postexec'] = [
Augie Fackler
formatting: blacken the codebase...
r43346 _encodestrsonly(v) for v in bd[b'daemon_postexec']
]
Augie Fackler
tests: help dummysmtpd work on python 3...
r36584 return bd
Augie Fackler
formatting: blacken the codebase...
r43346
Yuya Nishihara
tests: add dummy SMTP daemon for SSL tests...
r29332 def main():
op = optparse.OptionParser()
op.add_option('-d', '--daemon', action='store_true')
op.add_option('--daemon-postexec', action='append')
op.add_option('-p', '--port', type=int, default=8025)
op.add_option('-a', '--address', default='localhost')
op.add_option('--pid-file', metavar='FILE')
op.add_option('--tls', choices=['none', 'smtps'], default='none')
op.add_option('--certificate', metavar='FILE')
Mads Kiilerich
tests: show test-patchbomb-tls.t smtp server log...
r51622 op.add_option('--logfile', metavar='FILE')
Yuya Nishihara
tests: add dummy SMTP daemon for SSL tests...
r29332
opts, args = op.parse_args()
Mads Kiilerich
tests: fix dummysmtpd argument check
r51624 if (opts.tls == 'smtps') != bool(opts.certificate):
op.error('--certificate must be specified with --tls=smtps')
Yuya Nishihara
tests: add dummy SMTP daemon for SSL tests...
r29332
Augie Fackler
tests: help dummysmtpd work on python 3...
r36584 server.runservice(
Augie Fackler
formatting: blacken the codebase...
r43346 bytesvars(opts),
Mads Kiilerich
tests: use simple mock smtp server instead of deprecated asyncore smtpd...
r51625 runfn=lambda: run(opts.address, opts.port, opts.certificate),
Augie Fackler
formatting: blacken the codebase...
r43346 runargs=[pycompat.sysexecutable, pycompat.fsencode(__file__)]
+ pycompat.sysargv[1:],
Mads Kiilerich
tests: show test-patchbomb-tls.t smtp server log...
r51622 logfile=opts.logfile,
Augie Fackler
formatting: blacken the codebase...
r43346 )
Yuya Nishihara
tests: add dummy SMTP daemon for SSL tests...
r29332
if __name__ == '__main__':
main()