##// END OF EJS Templates
tests: fix dummysmtpd argument check
Mads Kiilerich -
r51624:8823e4d4 stable
parent child Browse files
Show More
@@ -1,125 +1,125 b''
1 1 #!/usr/bin/env python
2 2
3 3 """dummy SMTP server for use in tests"""
4 4
5 5
6 6 import asyncore
7 7 import optparse
8 8 import smtpd
9 9 import ssl
10 10 import sys
11 11 import traceback
12 12
13 13 from mercurial import (
14 14 pycompat,
15 15 server,
16 16 sslutil,
17 17 ui as uimod,
18 18 )
19 19
20 20
21 21 def log(msg):
22 22 sys.stdout.write(msg)
23 23 sys.stdout.flush()
24 24
25 25
26 26 class dummysmtpserver(smtpd.SMTPServer):
27 27 def __init__(self, localaddr):
28 28 smtpd.SMTPServer.__init__(self, localaddr, remoteaddr=None)
29 29
30 30 def process_message(self, peer, mailfrom, rcpttos, data, **kwargs):
31 31 log(
32 32 '%s from=%s to=%s\n%s\n'
33 33 % (peer[0], mailfrom, ', '.join(rcpttos), data.decode())
34 34 )
35 35
36 36 def handle_error(self):
37 37 # On Windows, a bad SSL connection sometimes generates a WSAECONNRESET.
38 38 # The default handler will shutdown this server, and then both the
39 39 # current connection and subsequent ones fail on the client side with
40 40 # "No connection could be made because the target machine actively
41 41 # refused it". If we eat the error, then the client properly aborts in
42 42 # the expected way, and the server is available for subsequent requests.
43 43 traceback.print_exc()
44 44
45 45
46 46 class dummysmtpsecureserver(dummysmtpserver):
47 47 def __init__(self, localaddr, certfile):
48 48 dummysmtpserver.__init__(self, localaddr)
49 49 self._certfile = certfile
50 50
51 51 def handle_accept(self):
52 52 pair = self.accept()
53 53 if not pair:
54 54 return
55 55 conn, addr = pair
56 56 ui = uimod.ui.load()
57 57 try:
58 58 # wrap_socket() would block, but we don't care
59 59 conn = sslutil.wrapserversocket(conn, ui, certfile=self._certfile)
60 60 except ssl.SSLError as e:
61 61 log('%s ssl error: %s\n' % (addr[0], e))
62 62 conn.close()
63 63 return
64 64 smtpd.SMTPChannel(self, conn, addr)
65 65
66 66
67 67 def run():
68 68 try:
69 69 asyncore.loop()
70 70 except KeyboardInterrupt:
71 71 pass
72 72
73 73
74 74 def _encodestrsonly(v):
75 75 if isinstance(v, type(u'')):
76 76 return v.encode('ascii')
77 77 return v
78 78
79 79
80 80 def bytesvars(obj):
81 81 unidict = vars(obj)
82 82 bd = {k.encode('ascii'): _encodestrsonly(v) for k, v in unidict.items()}
83 83 if bd[b'daemon_postexec'] is not None:
84 84 bd[b'daemon_postexec'] = [
85 85 _encodestrsonly(v) for v in bd[b'daemon_postexec']
86 86 ]
87 87 return bd
88 88
89 89
90 90 def main():
91 91 op = optparse.OptionParser()
92 92 op.add_option('-d', '--daemon', action='store_true')
93 93 op.add_option('--daemon-postexec', action='append')
94 94 op.add_option('-p', '--port', type=int, default=8025)
95 95 op.add_option('-a', '--address', default='localhost')
96 96 op.add_option('--pid-file', metavar='FILE')
97 97 op.add_option('--tls', choices=['none', 'smtps'], default='none')
98 98 op.add_option('--certificate', metavar='FILE')
99 99 op.add_option('--logfile', metavar='FILE')
100 100
101 101 opts, args = op.parse_args()
102 if opts.tls == 'smtps' and not opts.certificate:
103 op.error('--certificate must be specified')
102 if (opts.tls == 'smtps') != bool(opts.certificate):
103 op.error('--certificate must be specified with --tls=smtps')
104 104
105 105 addr = (opts.address, opts.port)
106 106
107 107 def init():
108 108 if opts.tls == 'none':
109 109 dummysmtpserver(addr)
110 110 else:
111 111 dummysmtpsecureserver(addr, opts.certificate)
112 112 log('listening at %s:%d\n' % addr)
113 113
114 114 server.runservice(
115 115 bytesvars(opts),
116 116 initfn=init,
117 117 runfn=run,
118 118 runargs=[pycompat.sysexecutable, pycompat.fsencode(__file__)]
119 119 + pycompat.sysargv[1:],
120 120 logfile=opts.logfile,
121 121 )
122 122
123 123
124 124 if __name__ == '__main__':
125 125 main()
General Comments 0
You need to be logged in to leave comments. Login now