##// END OF EJS Templates
streamclone: move _allowstream() from wireproto...
Gregory Szorc -
r26444:62374301 default
parent child Browse files
Show More
@@ -1,221 +1,225 b''
1 # streamclone.py - producing and consuming streaming repository data
1 # streamclone.py - producing and consuming streaming repository data
2 #
2 #
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import time
10 import time
11
11
12 from .i18n import _
12 from .i18n import _
13 from . import (
13 from . import (
14 branchmap,
14 branchmap,
15 error,
15 error,
16 store,
16 store,
17 util,
17 util,
18 )
18 )
19
19
20 def allowservergeneration(ui):
21 """Whether streaming clones are allowed from the server."""
22 return ui.configbool('server', 'uncompressed', True, untrusted=True)
23
20 # This is it's own function so extensions can override it.
24 # This is it's own function so extensions can override it.
21 def _walkstreamfiles(repo):
25 def _walkstreamfiles(repo):
22 return repo.store.walk()
26 return repo.store.walk()
23
27
24 def generatev1(repo):
28 def generatev1(repo):
25 """Emit content for version 1 of a streaming clone.
29 """Emit content for version 1 of a streaming clone.
26
30
27 This is a generator of raw chunks that constitute a streaming clone.
31 This is a generator of raw chunks that constitute a streaming clone.
28
32
29 The stream begins with a line of 2 space-delimited integers containing the
33 The stream begins with a line of 2 space-delimited integers containing the
30 number of entries and total bytes size.
34 number of entries and total bytes size.
31
35
32 Next, are N entries for each file being transferred. Each file entry starts
36 Next, are N entries for each file being transferred. Each file entry starts
33 as a line with the file name and integer size delimited by a null byte.
37 as a line with the file name and integer size delimited by a null byte.
34 The raw file data follows. Following the raw file data is the next file
38 The raw file data follows. Following the raw file data is the next file
35 entry, or EOF.
39 entry, or EOF.
36
40
37 When used on the wire protocol, an additional line indicating protocol
41 When used on the wire protocol, an additional line indicating protocol
38 success will be prepended to the stream. This function is not responsible
42 success will be prepended to the stream. This function is not responsible
39 for adding it.
43 for adding it.
40
44
41 This function will obtain a repository lock to ensure a consistent view of
45 This function will obtain a repository lock to ensure a consistent view of
42 the store is captured. It therefore may raise LockError.
46 the store is captured. It therefore may raise LockError.
43 """
47 """
44 entries = []
48 entries = []
45 total_bytes = 0
49 total_bytes = 0
46 # Get consistent snapshot of repo, lock during scan.
50 # Get consistent snapshot of repo, lock during scan.
47 lock = repo.lock()
51 lock = repo.lock()
48 try:
52 try:
49 repo.ui.debug('scanning\n')
53 repo.ui.debug('scanning\n')
50 for name, ename, size in _walkstreamfiles(repo):
54 for name, ename, size in _walkstreamfiles(repo):
51 if size:
55 if size:
52 entries.append((name, size))
56 entries.append((name, size))
53 total_bytes += size
57 total_bytes += size
54 finally:
58 finally:
55 lock.release()
59 lock.release()
56
60
57 repo.ui.debug('%d files, %d bytes to transfer\n' %
61 repo.ui.debug('%d files, %d bytes to transfer\n' %
58 (len(entries), total_bytes))
62 (len(entries), total_bytes))
59 yield '%d %d\n' % (len(entries), total_bytes)
63 yield '%d %d\n' % (len(entries), total_bytes)
60
64
61 svfs = repo.svfs
65 svfs = repo.svfs
62 oldaudit = svfs.mustaudit
66 oldaudit = svfs.mustaudit
63 debugflag = repo.ui.debugflag
67 debugflag = repo.ui.debugflag
64 svfs.mustaudit = False
68 svfs.mustaudit = False
65
69
66 try:
70 try:
67 for name, size in entries:
71 for name, size in entries:
68 if debugflag:
72 if debugflag:
69 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
73 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
70 # partially encode name over the wire for backwards compat
74 # partially encode name over the wire for backwards compat
71 yield '%s\0%d\n' % (store.encodedir(name), size)
75 yield '%s\0%d\n' % (store.encodedir(name), size)
72 if size <= 65536:
76 if size <= 65536:
73 fp = svfs(name)
77 fp = svfs(name)
74 try:
78 try:
75 data = fp.read(size)
79 data = fp.read(size)
76 finally:
80 finally:
77 fp.close()
81 fp.close()
78 yield data
82 yield data
79 else:
83 else:
80 for chunk in util.filechunkiter(svfs(name), limit=size):
84 for chunk in util.filechunkiter(svfs(name), limit=size):
81 yield chunk
85 yield chunk
82 finally:
86 finally:
83 svfs.mustaudit = oldaudit
87 svfs.mustaudit = oldaudit
84
88
85 def consumev1(repo, fp):
89 def consumev1(repo, fp):
86 """Apply the contents from version 1 of a streaming clone file handle.
90 """Apply the contents from version 1 of a streaming clone file handle.
87
91
88 This takes the output from "streamout" and applies it to the specified
92 This takes the output from "streamout" and applies it to the specified
89 repository.
93 repository.
90
94
91 Like "streamout," the status line added by the wire protocol is not handled
95 Like "streamout," the status line added by the wire protocol is not handled
92 by this function.
96 by this function.
93 """
97 """
94 lock = repo.lock()
98 lock = repo.lock()
95 try:
99 try:
96 repo.ui.status(_('streaming all changes\n'))
100 repo.ui.status(_('streaming all changes\n'))
97 l = fp.readline()
101 l = fp.readline()
98 try:
102 try:
99 total_files, total_bytes = map(int, l.split(' ', 1))
103 total_files, total_bytes = map(int, l.split(' ', 1))
100 except (ValueError, TypeError):
104 except (ValueError, TypeError):
101 raise error.ResponseError(
105 raise error.ResponseError(
102 _('unexpected response from remote server:'), l)
106 _('unexpected response from remote server:'), l)
103 repo.ui.status(_('%d files to transfer, %s of data\n') %
107 repo.ui.status(_('%d files to transfer, %s of data\n') %
104 (total_files, util.bytecount(total_bytes)))
108 (total_files, util.bytecount(total_bytes)))
105 handled_bytes = 0
109 handled_bytes = 0
106 repo.ui.progress(_('clone'), 0, total=total_bytes)
110 repo.ui.progress(_('clone'), 0, total=total_bytes)
107 start = time.time()
111 start = time.time()
108
112
109 tr = repo.transaction(_('clone'))
113 tr = repo.transaction(_('clone'))
110 try:
114 try:
111 for i in xrange(total_files):
115 for i in xrange(total_files):
112 # XXX doesn't support '\n' or '\r' in filenames
116 # XXX doesn't support '\n' or '\r' in filenames
113 l = fp.readline()
117 l = fp.readline()
114 try:
118 try:
115 name, size = l.split('\0', 1)
119 name, size = l.split('\0', 1)
116 size = int(size)
120 size = int(size)
117 except (ValueError, TypeError):
121 except (ValueError, TypeError):
118 raise error.ResponseError(
122 raise error.ResponseError(
119 _('unexpected response from remote server:'), l)
123 _('unexpected response from remote server:'), l)
120 if repo.ui.debugflag:
124 if repo.ui.debugflag:
121 repo.ui.debug('adding %s (%s)\n' %
125 repo.ui.debug('adding %s (%s)\n' %
122 (name, util.bytecount(size)))
126 (name, util.bytecount(size)))
123 # for backwards compat, name was partially encoded
127 # for backwards compat, name was partially encoded
124 ofp = repo.svfs(store.decodedir(name), 'w')
128 ofp = repo.svfs(store.decodedir(name), 'w')
125 for chunk in util.filechunkiter(fp, limit=size):
129 for chunk in util.filechunkiter(fp, limit=size):
126 handled_bytes += len(chunk)
130 handled_bytes += len(chunk)
127 repo.ui.progress(_('clone'), handled_bytes,
131 repo.ui.progress(_('clone'), handled_bytes,
128 total=total_bytes)
132 total=total_bytes)
129 ofp.write(chunk)
133 ofp.write(chunk)
130 ofp.close()
134 ofp.close()
131 tr.close()
135 tr.close()
132 finally:
136 finally:
133 tr.release()
137 tr.release()
134
138
135 # Writing straight to files circumvented the inmemory caches
139 # Writing straight to files circumvented the inmemory caches
136 repo.invalidate()
140 repo.invalidate()
137
141
138 elapsed = time.time() - start
142 elapsed = time.time() - start
139 if elapsed <= 0:
143 if elapsed <= 0:
140 elapsed = 0.001
144 elapsed = 0.001
141 repo.ui.progress(_('clone'), None)
145 repo.ui.progress(_('clone'), None)
142 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
146 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
143 (util.bytecount(total_bytes), elapsed,
147 (util.bytecount(total_bytes), elapsed,
144 util.bytecount(total_bytes / elapsed)))
148 util.bytecount(total_bytes / elapsed)))
145 finally:
149 finally:
146 lock.release()
150 lock.release()
147
151
148 def streamin(repo, remote, remotereqs):
152 def streamin(repo, remote, remotereqs):
149 # Save remote branchmap. We will use it later
153 # Save remote branchmap. We will use it later
150 # to speed up branchcache creation
154 # to speed up branchcache creation
151 rbranchmap = None
155 rbranchmap = None
152 if remote.capable("branchmap"):
156 if remote.capable("branchmap"):
153 rbranchmap = remote.branchmap()
157 rbranchmap = remote.branchmap()
154
158
155 fp = remote.stream_out()
159 fp = remote.stream_out()
156 l = fp.readline()
160 l = fp.readline()
157 try:
161 try:
158 resp = int(l)
162 resp = int(l)
159 except ValueError:
163 except ValueError:
160 raise error.ResponseError(
164 raise error.ResponseError(
161 _('unexpected response from remote server:'), l)
165 _('unexpected response from remote server:'), l)
162 if resp == 1:
166 if resp == 1:
163 raise util.Abort(_('operation forbidden by server'))
167 raise util.Abort(_('operation forbidden by server'))
164 elif resp == 2:
168 elif resp == 2:
165 raise util.Abort(_('locking the remote repository failed'))
169 raise util.Abort(_('locking the remote repository failed'))
166 elif resp != 0:
170 elif resp != 0:
167 raise util.Abort(_('the server sent an unknown error code'))
171 raise util.Abort(_('the server sent an unknown error code'))
168
172
169 applyremotedata(repo, remotereqs, rbranchmap, fp)
173 applyremotedata(repo, remotereqs, rbranchmap, fp)
170 return len(repo.heads()) + 1
174 return len(repo.heads()) + 1
171
175
172 def applyremotedata(repo, remotereqs, remotebranchmap, fp):
176 def applyremotedata(repo, remotereqs, remotebranchmap, fp):
173 """Apply stream clone data to a repository.
177 """Apply stream clone data to a repository.
174
178
175 "remotereqs" is a set of requirements to handle the incoming data.
179 "remotereqs" is a set of requirements to handle the incoming data.
176 "remotebranchmap" is the result of a branchmap lookup on the remote. It
180 "remotebranchmap" is the result of a branchmap lookup on the remote. It
177 can be None.
181 can be None.
178 "fp" is a file object containing the raw stream data, suitable for
182 "fp" is a file object containing the raw stream data, suitable for
179 feeding into consumev1().
183 feeding into consumev1().
180 """
184 """
181 lock = repo.lock()
185 lock = repo.lock()
182 try:
186 try:
183 consumev1(repo, fp)
187 consumev1(repo, fp)
184
188
185 # new requirements = old non-format requirements +
189 # new requirements = old non-format requirements +
186 # new format-related remote requirements
190 # new format-related remote requirements
187 # requirements from the streamed-in repository
191 # requirements from the streamed-in repository
188 repo.requirements = remotereqs | (
192 repo.requirements = remotereqs | (
189 repo.requirements - repo.supportedformats)
193 repo.requirements - repo.supportedformats)
190 repo._applyopenerreqs()
194 repo._applyopenerreqs()
191 repo._writerequirements()
195 repo._writerequirements()
192
196
193 if remotebranchmap:
197 if remotebranchmap:
194 rbheads = []
198 rbheads = []
195 closed = []
199 closed = []
196 for bheads in remotebranchmap.itervalues():
200 for bheads in remotebranchmap.itervalues():
197 rbheads.extend(bheads)
201 rbheads.extend(bheads)
198 for h in bheads:
202 for h in bheads:
199 r = repo.changelog.rev(h)
203 r = repo.changelog.rev(h)
200 b, c = repo.changelog.branchinfo(r)
204 b, c = repo.changelog.branchinfo(r)
201 if c:
205 if c:
202 closed.append(h)
206 closed.append(h)
203
207
204 if rbheads:
208 if rbheads:
205 rtiprev = max((int(repo.changelog.rev(node))
209 rtiprev = max((int(repo.changelog.rev(node))
206 for node in rbheads))
210 for node in rbheads))
207 cache = branchmap.branchcache(remotebranchmap,
211 cache = branchmap.branchcache(remotebranchmap,
208 repo[rtiprev].node(),
212 repo[rtiprev].node(),
209 rtiprev,
213 rtiprev,
210 closednodes=closed)
214 closednodes=closed)
211 # Try to stick it as low as possible
215 # Try to stick it as low as possible
212 # filter above served are unlikely to be fetch from a clone
216 # filter above served are unlikely to be fetch from a clone
213 for candidate in ('base', 'immutable', 'served'):
217 for candidate in ('base', 'immutable', 'served'):
214 rview = repo.filtered(candidate)
218 rview = repo.filtered(candidate)
215 if cache.validfor(rview):
219 if cache.validfor(rview):
216 repo._branchcaches[candidate] = cache
220 repo._branchcaches[candidate] = cache
217 cache.write(rview)
221 cache.write(rview)
218 break
222 break
219 repo.invalidate()
223 repo.invalidate()
220 finally:
224 finally:
221 lock.release()
225 lock.release()
@@ -1,813 +1,810 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import os
10 import os
11 import sys
11 import sys
12 import tempfile
12 import tempfile
13 import urllib
13 import urllib
14
14
15 from .i18n import _
15 from .i18n import _
16 from .node import (
16 from .node import (
17 bin,
17 bin,
18 hex,
18 hex,
19 )
19 )
20
20
21 from . import (
21 from . import (
22 bundle2,
22 bundle2,
23 changegroup as changegroupmod,
23 changegroup as changegroupmod,
24 encoding,
24 encoding,
25 error,
25 error,
26 exchange,
26 exchange,
27 peer,
27 peer,
28 pushkey as pushkeymod,
28 pushkey as pushkeymod,
29 streamclone,
29 streamclone,
30 util,
30 util,
31 )
31 )
32
32
33 class abstractserverproto(object):
33 class abstractserverproto(object):
34 """abstract class that summarizes the protocol API
34 """abstract class that summarizes the protocol API
35
35
36 Used as reference and documentation.
36 Used as reference and documentation.
37 """
37 """
38
38
39 def getargs(self, args):
39 def getargs(self, args):
40 """return the value for arguments in <args>
40 """return the value for arguments in <args>
41
41
42 returns a list of values (same order as <args>)"""
42 returns a list of values (same order as <args>)"""
43 raise NotImplementedError()
43 raise NotImplementedError()
44
44
45 def getfile(self, fp):
45 def getfile(self, fp):
46 """write the whole content of a file into a file like object
46 """write the whole content of a file into a file like object
47
47
48 The file is in the form::
48 The file is in the form::
49
49
50 (<chunk-size>\n<chunk>)+0\n
50 (<chunk-size>\n<chunk>)+0\n
51
51
52 chunk size is the ascii version of the int.
52 chunk size is the ascii version of the int.
53 """
53 """
54 raise NotImplementedError()
54 raise NotImplementedError()
55
55
56 def redirect(self):
56 def redirect(self):
57 """may setup interception for stdout and stderr
57 """may setup interception for stdout and stderr
58
58
59 See also the `restore` method."""
59 See also the `restore` method."""
60 raise NotImplementedError()
60 raise NotImplementedError()
61
61
62 # If the `redirect` function does install interception, the `restore`
62 # If the `redirect` function does install interception, the `restore`
63 # function MUST be defined. If interception is not used, this function
63 # function MUST be defined. If interception is not used, this function
64 # MUST NOT be defined.
64 # MUST NOT be defined.
65 #
65 #
66 # left commented here on purpose
66 # left commented here on purpose
67 #
67 #
68 #def restore(self):
68 #def restore(self):
69 # """reinstall previous stdout and stderr and return intercepted stdout
69 # """reinstall previous stdout and stderr and return intercepted stdout
70 # """
70 # """
71 # raise NotImplementedError()
71 # raise NotImplementedError()
72
72
73 def groupchunks(self, cg):
73 def groupchunks(self, cg):
74 """return 4096 chunks from a changegroup object
74 """return 4096 chunks from a changegroup object
75
75
76 Some protocols may have compressed the contents."""
76 Some protocols may have compressed the contents."""
77 raise NotImplementedError()
77 raise NotImplementedError()
78
78
79 class remotebatch(peer.batcher):
79 class remotebatch(peer.batcher):
80 '''batches the queued calls; uses as few roundtrips as possible'''
80 '''batches the queued calls; uses as few roundtrips as possible'''
81 def __init__(self, remote):
81 def __init__(self, remote):
82 '''remote must support _submitbatch(encbatch) and
82 '''remote must support _submitbatch(encbatch) and
83 _submitone(op, encargs)'''
83 _submitone(op, encargs)'''
84 peer.batcher.__init__(self)
84 peer.batcher.__init__(self)
85 self.remote = remote
85 self.remote = remote
86 def submit(self):
86 def submit(self):
87 req, rsp = [], []
87 req, rsp = [], []
88 for name, args, opts, resref in self.calls:
88 for name, args, opts, resref in self.calls:
89 mtd = getattr(self.remote, name)
89 mtd = getattr(self.remote, name)
90 batchablefn = getattr(mtd, 'batchable', None)
90 batchablefn = getattr(mtd, 'batchable', None)
91 if batchablefn is not None:
91 if batchablefn is not None:
92 batchable = batchablefn(mtd.im_self, *args, **opts)
92 batchable = batchablefn(mtd.im_self, *args, **opts)
93 encargsorres, encresref = batchable.next()
93 encargsorres, encresref = batchable.next()
94 if encresref:
94 if encresref:
95 req.append((name, encargsorres,))
95 req.append((name, encargsorres,))
96 rsp.append((batchable, encresref, resref,))
96 rsp.append((batchable, encresref, resref,))
97 else:
97 else:
98 resref.set(encargsorres)
98 resref.set(encargsorres)
99 else:
99 else:
100 if req:
100 if req:
101 self._submitreq(req, rsp)
101 self._submitreq(req, rsp)
102 req, rsp = [], []
102 req, rsp = [], []
103 resref.set(mtd(*args, **opts))
103 resref.set(mtd(*args, **opts))
104 if req:
104 if req:
105 self._submitreq(req, rsp)
105 self._submitreq(req, rsp)
106 def _submitreq(self, req, rsp):
106 def _submitreq(self, req, rsp):
107 encresults = self.remote._submitbatch(req)
107 encresults = self.remote._submitbatch(req)
108 for encres, r in zip(encresults, rsp):
108 for encres, r in zip(encresults, rsp):
109 batchable, encresref, resref = r
109 batchable, encresref, resref = r
110 encresref.set(encres)
110 encresref.set(encres)
111 resref.set(batchable.next())
111 resref.set(batchable.next())
112
112
113 # Forward a couple of names from peer to make wireproto interactions
113 # Forward a couple of names from peer to make wireproto interactions
114 # slightly more sensible.
114 # slightly more sensible.
115 batchable = peer.batchable
115 batchable = peer.batchable
116 future = peer.future
116 future = peer.future
117
117
118 # list of nodes encoding / decoding
118 # list of nodes encoding / decoding
119
119
120 def decodelist(l, sep=' '):
120 def decodelist(l, sep=' '):
121 if l:
121 if l:
122 return map(bin, l.split(sep))
122 return map(bin, l.split(sep))
123 return []
123 return []
124
124
125 def encodelist(l, sep=' '):
125 def encodelist(l, sep=' '):
126 try:
126 try:
127 return sep.join(map(hex, l))
127 return sep.join(map(hex, l))
128 except TypeError:
128 except TypeError:
129 raise
129 raise
130
130
131 # batched call argument encoding
131 # batched call argument encoding
132
132
133 def escapearg(plain):
133 def escapearg(plain):
134 return (plain
134 return (plain
135 .replace(':', ':c')
135 .replace(':', ':c')
136 .replace(',', ':o')
136 .replace(',', ':o')
137 .replace(';', ':s')
137 .replace(';', ':s')
138 .replace('=', ':e'))
138 .replace('=', ':e'))
139
139
140 def unescapearg(escaped):
140 def unescapearg(escaped):
141 return (escaped
141 return (escaped
142 .replace(':e', '=')
142 .replace(':e', '=')
143 .replace(':s', ';')
143 .replace(':s', ';')
144 .replace(':o', ',')
144 .replace(':o', ',')
145 .replace(':c', ':'))
145 .replace(':c', ':'))
146
146
147 # mapping of options accepted by getbundle and their types
147 # mapping of options accepted by getbundle and their types
148 #
148 #
149 # Meant to be extended by extensions. It is extensions responsibility to ensure
149 # Meant to be extended by extensions. It is extensions responsibility to ensure
150 # such options are properly processed in exchange.getbundle.
150 # such options are properly processed in exchange.getbundle.
151 #
151 #
152 # supported types are:
152 # supported types are:
153 #
153 #
154 # :nodes: list of binary nodes
154 # :nodes: list of binary nodes
155 # :csv: list of comma-separated values
155 # :csv: list of comma-separated values
156 # :scsv: list of comma-separated values return as set
156 # :scsv: list of comma-separated values return as set
157 # :plain: string with no transformation needed.
157 # :plain: string with no transformation needed.
158 gboptsmap = {'heads': 'nodes',
158 gboptsmap = {'heads': 'nodes',
159 'common': 'nodes',
159 'common': 'nodes',
160 'obsmarkers': 'boolean',
160 'obsmarkers': 'boolean',
161 'bundlecaps': 'scsv',
161 'bundlecaps': 'scsv',
162 'listkeys': 'csv',
162 'listkeys': 'csv',
163 'cg': 'boolean'}
163 'cg': 'boolean'}
164
164
165 # client side
165 # client side
166
166
167 class wirepeer(peer.peerrepository):
167 class wirepeer(peer.peerrepository):
168
168
169 def batch(self):
169 def batch(self):
170 if self.capable('batch'):
170 if self.capable('batch'):
171 return remotebatch(self)
171 return remotebatch(self)
172 else:
172 else:
173 return peer.localbatch(self)
173 return peer.localbatch(self)
174 def _submitbatch(self, req):
174 def _submitbatch(self, req):
175 cmds = []
175 cmds = []
176 for op, argsdict in req:
176 for op, argsdict in req:
177 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
177 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
178 for k, v in argsdict.iteritems())
178 for k, v in argsdict.iteritems())
179 cmds.append('%s %s' % (op, args))
179 cmds.append('%s %s' % (op, args))
180 rsp = self._call("batch", cmds=';'.join(cmds))
180 rsp = self._call("batch", cmds=';'.join(cmds))
181 return [unescapearg(r) for r in rsp.split(';')]
181 return [unescapearg(r) for r in rsp.split(';')]
182 def _submitone(self, op, args):
182 def _submitone(self, op, args):
183 return self._call(op, **args)
183 return self._call(op, **args)
184
184
185 @batchable
185 @batchable
186 def lookup(self, key):
186 def lookup(self, key):
187 self.requirecap('lookup', _('look up remote revision'))
187 self.requirecap('lookup', _('look up remote revision'))
188 f = future()
188 f = future()
189 yield {'key': encoding.fromlocal(key)}, f
189 yield {'key': encoding.fromlocal(key)}, f
190 d = f.value
190 d = f.value
191 success, data = d[:-1].split(" ", 1)
191 success, data = d[:-1].split(" ", 1)
192 if int(success):
192 if int(success):
193 yield bin(data)
193 yield bin(data)
194 self._abort(error.RepoError(data))
194 self._abort(error.RepoError(data))
195
195
196 @batchable
196 @batchable
197 def heads(self):
197 def heads(self):
198 f = future()
198 f = future()
199 yield {}, f
199 yield {}, f
200 d = f.value
200 d = f.value
201 try:
201 try:
202 yield decodelist(d[:-1])
202 yield decodelist(d[:-1])
203 except ValueError:
203 except ValueError:
204 self._abort(error.ResponseError(_("unexpected response:"), d))
204 self._abort(error.ResponseError(_("unexpected response:"), d))
205
205
206 @batchable
206 @batchable
207 def known(self, nodes):
207 def known(self, nodes):
208 f = future()
208 f = future()
209 yield {'nodes': encodelist(nodes)}, f
209 yield {'nodes': encodelist(nodes)}, f
210 d = f.value
210 d = f.value
211 try:
211 try:
212 yield [bool(int(b)) for b in d]
212 yield [bool(int(b)) for b in d]
213 except ValueError:
213 except ValueError:
214 self._abort(error.ResponseError(_("unexpected response:"), d))
214 self._abort(error.ResponseError(_("unexpected response:"), d))
215
215
216 @batchable
216 @batchable
217 def branchmap(self):
217 def branchmap(self):
218 f = future()
218 f = future()
219 yield {}, f
219 yield {}, f
220 d = f.value
220 d = f.value
221 try:
221 try:
222 branchmap = {}
222 branchmap = {}
223 for branchpart in d.splitlines():
223 for branchpart in d.splitlines():
224 branchname, branchheads = branchpart.split(' ', 1)
224 branchname, branchheads = branchpart.split(' ', 1)
225 branchname = encoding.tolocal(urllib.unquote(branchname))
225 branchname = encoding.tolocal(urllib.unquote(branchname))
226 branchheads = decodelist(branchheads)
226 branchheads = decodelist(branchheads)
227 branchmap[branchname] = branchheads
227 branchmap[branchname] = branchheads
228 yield branchmap
228 yield branchmap
229 except TypeError:
229 except TypeError:
230 self._abort(error.ResponseError(_("unexpected response:"), d))
230 self._abort(error.ResponseError(_("unexpected response:"), d))
231
231
232 def branches(self, nodes):
232 def branches(self, nodes):
233 n = encodelist(nodes)
233 n = encodelist(nodes)
234 d = self._call("branches", nodes=n)
234 d = self._call("branches", nodes=n)
235 try:
235 try:
236 br = [tuple(decodelist(b)) for b in d.splitlines()]
236 br = [tuple(decodelist(b)) for b in d.splitlines()]
237 return br
237 return br
238 except ValueError:
238 except ValueError:
239 self._abort(error.ResponseError(_("unexpected response:"), d))
239 self._abort(error.ResponseError(_("unexpected response:"), d))
240
240
241 def between(self, pairs):
241 def between(self, pairs):
242 batch = 8 # avoid giant requests
242 batch = 8 # avoid giant requests
243 r = []
243 r = []
244 for i in xrange(0, len(pairs), batch):
244 for i in xrange(0, len(pairs), batch):
245 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
245 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
246 d = self._call("between", pairs=n)
246 d = self._call("between", pairs=n)
247 try:
247 try:
248 r.extend(l and decodelist(l) or [] for l in d.splitlines())
248 r.extend(l and decodelist(l) or [] for l in d.splitlines())
249 except ValueError:
249 except ValueError:
250 self._abort(error.ResponseError(_("unexpected response:"), d))
250 self._abort(error.ResponseError(_("unexpected response:"), d))
251 return r
251 return r
252
252
253 @batchable
253 @batchable
254 def pushkey(self, namespace, key, old, new):
254 def pushkey(self, namespace, key, old, new):
255 if not self.capable('pushkey'):
255 if not self.capable('pushkey'):
256 yield False, None
256 yield False, None
257 f = future()
257 f = future()
258 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
258 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
259 yield {'namespace': encoding.fromlocal(namespace),
259 yield {'namespace': encoding.fromlocal(namespace),
260 'key': encoding.fromlocal(key),
260 'key': encoding.fromlocal(key),
261 'old': encoding.fromlocal(old),
261 'old': encoding.fromlocal(old),
262 'new': encoding.fromlocal(new)}, f
262 'new': encoding.fromlocal(new)}, f
263 d = f.value
263 d = f.value
264 d, output = d.split('\n', 1)
264 d, output = d.split('\n', 1)
265 try:
265 try:
266 d = bool(int(d))
266 d = bool(int(d))
267 except ValueError:
267 except ValueError:
268 raise error.ResponseError(
268 raise error.ResponseError(
269 _('push failed (unexpected response):'), d)
269 _('push failed (unexpected response):'), d)
270 for l in output.splitlines(True):
270 for l in output.splitlines(True):
271 self.ui.status(_('remote: '), l)
271 self.ui.status(_('remote: '), l)
272 yield d
272 yield d
273
273
274 @batchable
274 @batchable
275 def listkeys(self, namespace):
275 def listkeys(self, namespace):
276 if not self.capable('pushkey'):
276 if not self.capable('pushkey'):
277 yield {}, None
277 yield {}, None
278 f = future()
278 f = future()
279 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
279 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
280 yield {'namespace': encoding.fromlocal(namespace)}, f
280 yield {'namespace': encoding.fromlocal(namespace)}, f
281 d = f.value
281 d = f.value
282 self.ui.debug('received listkey for "%s": %i bytes\n'
282 self.ui.debug('received listkey for "%s": %i bytes\n'
283 % (namespace, len(d)))
283 % (namespace, len(d)))
284 yield pushkeymod.decodekeys(d)
284 yield pushkeymod.decodekeys(d)
285
285
286 def stream_out(self):
286 def stream_out(self):
287 return self._callstream('stream_out')
287 return self._callstream('stream_out')
288
288
289 def changegroup(self, nodes, kind):
289 def changegroup(self, nodes, kind):
290 n = encodelist(nodes)
290 n = encodelist(nodes)
291 f = self._callcompressable("changegroup", roots=n)
291 f = self._callcompressable("changegroup", roots=n)
292 return changegroupmod.cg1unpacker(f, 'UN')
292 return changegroupmod.cg1unpacker(f, 'UN')
293
293
294 def changegroupsubset(self, bases, heads, kind):
294 def changegroupsubset(self, bases, heads, kind):
295 self.requirecap('changegroupsubset', _('look up remote changes'))
295 self.requirecap('changegroupsubset', _('look up remote changes'))
296 bases = encodelist(bases)
296 bases = encodelist(bases)
297 heads = encodelist(heads)
297 heads = encodelist(heads)
298 f = self._callcompressable("changegroupsubset",
298 f = self._callcompressable("changegroupsubset",
299 bases=bases, heads=heads)
299 bases=bases, heads=heads)
300 return changegroupmod.cg1unpacker(f, 'UN')
300 return changegroupmod.cg1unpacker(f, 'UN')
301
301
302 def getbundle(self, source, **kwargs):
302 def getbundle(self, source, **kwargs):
303 self.requirecap('getbundle', _('look up remote changes'))
303 self.requirecap('getbundle', _('look up remote changes'))
304 opts = {}
304 opts = {}
305 bundlecaps = kwargs.get('bundlecaps')
305 bundlecaps = kwargs.get('bundlecaps')
306 if bundlecaps is not None:
306 if bundlecaps is not None:
307 kwargs['bundlecaps'] = sorted(bundlecaps)
307 kwargs['bundlecaps'] = sorted(bundlecaps)
308 else:
308 else:
309 bundlecaps = () # kwargs could have it to None
309 bundlecaps = () # kwargs could have it to None
310 for key, value in kwargs.iteritems():
310 for key, value in kwargs.iteritems():
311 if value is None:
311 if value is None:
312 continue
312 continue
313 keytype = gboptsmap.get(key)
313 keytype = gboptsmap.get(key)
314 if keytype is None:
314 if keytype is None:
315 assert False, 'unexpected'
315 assert False, 'unexpected'
316 elif keytype == 'nodes':
316 elif keytype == 'nodes':
317 value = encodelist(value)
317 value = encodelist(value)
318 elif keytype in ('csv', 'scsv'):
318 elif keytype in ('csv', 'scsv'):
319 value = ','.join(value)
319 value = ','.join(value)
320 elif keytype == 'boolean':
320 elif keytype == 'boolean':
321 value = '%i' % bool(value)
321 value = '%i' % bool(value)
322 elif keytype != 'plain':
322 elif keytype != 'plain':
323 raise KeyError('unknown getbundle option type %s'
323 raise KeyError('unknown getbundle option type %s'
324 % keytype)
324 % keytype)
325 opts[key] = value
325 opts[key] = value
326 f = self._callcompressable("getbundle", **opts)
326 f = self._callcompressable("getbundle", **opts)
327 if any((cap.startswith('HG2') for cap in bundlecaps)):
327 if any((cap.startswith('HG2') for cap in bundlecaps)):
328 return bundle2.getunbundler(self.ui, f)
328 return bundle2.getunbundler(self.ui, f)
329 else:
329 else:
330 return changegroupmod.cg1unpacker(f, 'UN')
330 return changegroupmod.cg1unpacker(f, 'UN')
331
331
332 def unbundle(self, cg, heads, source):
332 def unbundle(self, cg, heads, source):
333 '''Send cg (a readable file-like object representing the
333 '''Send cg (a readable file-like object representing the
334 changegroup to push, typically a chunkbuffer object) to the
334 changegroup to push, typically a chunkbuffer object) to the
335 remote server as a bundle.
335 remote server as a bundle.
336
336
337 When pushing a bundle10 stream, return an integer indicating the
337 When pushing a bundle10 stream, return an integer indicating the
338 result of the push (see localrepository.addchangegroup()).
338 result of the push (see localrepository.addchangegroup()).
339
339
340 When pushing a bundle20 stream, return a bundle20 stream.'''
340 When pushing a bundle20 stream, return a bundle20 stream.'''
341
341
342 if heads != ['force'] and self.capable('unbundlehash'):
342 if heads != ['force'] and self.capable('unbundlehash'):
343 heads = encodelist(['hashed',
343 heads = encodelist(['hashed',
344 util.sha1(''.join(sorted(heads))).digest()])
344 util.sha1(''.join(sorted(heads))).digest()])
345 else:
345 else:
346 heads = encodelist(heads)
346 heads = encodelist(heads)
347
347
348 if util.safehasattr(cg, 'deltaheader'):
348 if util.safehasattr(cg, 'deltaheader'):
349 # this a bundle10, do the old style call sequence
349 # this a bundle10, do the old style call sequence
350 ret, output = self._callpush("unbundle", cg, heads=heads)
350 ret, output = self._callpush("unbundle", cg, heads=heads)
351 if ret == "":
351 if ret == "":
352 raise error.ResponseError(
352 raise error.ResponseError(
353 _('push failed:'), output)
353 _('push failed:'), output)
354 try:
354 try:
355 ret = int(ret)
355 ret = int(ret)
356 except ValueError:
356 except ValueError:
357 raise error.ResponseError(
357 raise error.ResponseError(
358 _('push failed (unexpected response):'), ret)
358 _('push failed (unexpected response):'), ret)
359
359
360 for l in output.splitlines(True):
360 for l in output.splitlines(True):
361 self.ui.status(_('remote: '), l)
361 self.ui.status(_('remote: '), l)
362 else:
362 else:
363 # bundle2 push. Send a stream, fetch a stream.
363 # bundle2 push. Send a stream, fetch a stream.
364 stream = self._calltwowaystream('unbundle', cg, heads=heads)
364 stream = self._calltwowaystream('unbundle', cg, heads=heads)
365 ret = bundle2.getunbundler(self.ui, stream)
365 ret = bundle2.getunbundler(self.ui, stream)
366 return ret
366 return ret
367
367
368 def debugwireargs(self, one, two, three=None, four=None, five=None):
368 def debugwireargs(self, one, two, three=None, four=None, five=None):
369 # don't pass optional arguments left at their default value
369 # don't pass optional arguments left at their default value
370 opts = {}
370 opts = {}
371 if three is not None:
371 if three is not None:
372 opts['three'] = three
372 opts['three'] = three
373 if four is not None:
373 if four is not None:
374 opts['four'] = four
374 opts['four'] = four
375 return self._call('debugwireargs', one=one, two=two, **opts)
375 return self._call('debugwireargs', one=one, two=two, **opts)
376
376
377 def _call(self, cmd, **args):
377 def _call(self, cmd, **args):
378 """execute <cmd> on the server
378 """execute <cmd> on the server
379
379
380 The command is expected to return a simple string.
380 The command is expected to return a simple string.
381
381
382 returns the server reply as a string."""
382 returns the server reply as a string."""
383 raise NotImplementedError()
383 raise NotImplementedError()
384
384
385 def _callstream(self, cmd, **args):
385 def _callstream(self, cmd, **args):
386 """execute <cmd> on the server
386 """execute <cmd> on the server
387
387
388 The command is expected to return a stream.
388 The command is expected to return a stream.
389
389
390 returns the server reply as a file like object."""
390 returns the server reply as a file like object."""
391 raise NotImplementedError()
391 raise NotImplementedError()
392
392
393 def _callcompressable(self, cmd, **args):
393 def _callcompressable(self, cmd, **args):
394 """execute <cmd> on the server
394 """execute <cmd> on the server
395
395
396 The command is expected to return a stream.
396 The command is expected to return a stream.
397
397
398 The stream may have been compressed in some implementations. This
398 The stream may have been compressed in some implementations. This
399 function takes care of the decompression. This is the only difference
399 function takes care of the decompression. This is the only difference
400 with _callstream.
400 with _callstream.
401
401
402 returns the server reply as a file like object.
402 returns the server reply as a file like object.
403 """
403 """
404 raise NotImplementedError()
404 raise NotImplementedError()
405
405
406 def _callpush(self, cmd, fp, **args):
406 def _callpush(self, cmd, fp, **args):
407 """execute a <cmd> on server
407 """execute a <cmd> on server
408
408
409 The command is expected to be related to a push. Push has a special
409 The command is expected to be related to a push. Push has a special
410 return method.
410 return method.
411
411
412 returns the server reply as a (ret, output) tuple. ret is either
412 returns the server reply as a (ret, output) tuple. ret is either
413 empty (error) or a stringified int.
413 empty (error) or a stringified int.
414 """
414 """
415 raise NotImplementedError()
415 raise NotImplementedError()
416
416
417 def _calltwowaystream(self, cmd, fp, **args):
417 def _calltwowaystream(self, cmd, fp, **args):
418 """execute <cmd> on server
418 """execute <cmd> on server
419
419
420 The command will send a stream to the server and get a stream in reply.
420 The command will send a stream to the server and get a stream in reply.
421 """
421 """
422 raise NotImplementedError()
422 raise NotImplementedError()
423
423
424 def _abort(self, exception):
424 def _abort(self, exception):
425 """clearly abort the wire protocol connection and raise the exception
425 """clearly abort the wire protocol connection and raise the exception
426 """
426 """
427 raise NotImplementedError()
427 raise NotImplementedError()
428
428
429 # server side
429 # server side
430
430
431 # wire protocol command can either return a string or one of these classes.
431 # wire protocol command can either return a string or one of these classes.
432 class streamres(object):
432 class streamres(object):
433 """wireproto reply: binary stream
433 """wireproto reply: binary stream
434
434
435 The call was successful and the result is a stream.
435 The call was successful and the result is a stream.
436 Iterate on the `self.gen` attribute to retrieve chunks.
436 Iterate on the `self.gen` attribute to retrieve chunks.
437 """
437 """
438 def __init__(self, gen):
438 def __init__(self, gen):
439 self.gen = gen
439 self.gen = gen
440
440
441 class pushres(object):
441 class pushres(object):
442 """wireproto reply: success with simple integer return
442 """wireproto reply: success with simple integer return
443
443
444 The call was successful and returned an integer contained in `self.res`.
444 The call was successful and returned an integer contained in `self.res`.
445 """
445 """
446 def __init__(self, res):
446 def __init__(self, res):
447 self.res = res
447 self.res = res
448
448
449 class pusherr(object):
449 class pusherr(object):
450 """wireproto reply: failure
450 """wireproto reply: failure
451
451
452 The call failed. The `self.res` attribute contains the error message.
452 The call failed. The `self.res` attribute contains the error message.
453 """
453 """
454 def __init__(self, res):
454 def __init__(self, res):
455 self.res = res
455 self.res = res
456
456
457 class ooberror(object):
457 class ooberror(object):
458 """wireproto reply: failure of a batch of operation
458 """wireproto reply: failure of a batch of operation
459
459
460 Something failed during a batch call. The error message is stored in
460 Something failed during a batch call. The error message is stored in
461 `self.message`.
461 `self.message`.
462 """
462 """
463 def __init__(self, message):
463 def __init__(self, message):
464 self.message = message
464 self.message = message
465
465
466 def dispatch(repo, proto, command):
466 def dispatch(repo, proto, command):
467 repo = repo.filtered("served")
467 repo = repo.filtered("served")
468 func, spec = commands[command]
468 func, spec = commands[command]
469 args = proto.getargs(spec)
469 args = proto.getargs(spec)
470 return func(repo, proto, *args)
470 return func(repo, proto, *args)
471
471
472 def options(cmd, keys, others):
472 def options(cmd, keys, others):
473 opts = {}
473 opts = {}
474 for k in keys:
474 for k in keys:
475 if k in others:
475 if k in others:
476 opts[k] = others[k]
476 opts[k] = others[k]
477 del others[k]
477 del others[k]
478 if others:
478 if others:
479 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
479 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
480 % (cmd, ",".join(others)))
480 % (cmd, ",".join(others)))
481 return opts
481 return opts
482
482
483 # list of commands
483 # list of commands
484 commands = {}
484 commands = {}
485
485
486 def wireprotocommand(name, args=''):
486 def wireprotocommand(name, args=''):
487 """decorator for wire protocol command"""
487 """decorator for wire protocol command"""
488 def register(func):
488 def register(func):
489 commands[name] = (func, args)
489 commands[name] = (func, args)
490 return func
490 return func
491 return register
491 return register
492
492
493 @wireprotocommand('batch', 'cmds *')
493 @wireprotocommand('batch', 'cmds *')
494 def batch(repo, proto, cmds, others):
494 def batch(repo, proto, cmds, others):
495 repo = repo.filtered("served")
495 repo = repo.filtered("served")
496 res = []
496 res = []
497 for pair in cmds.split(';'):
497 for pair in cmds.split(';'):
498 op, args = pair.split(' ', 1)
498 op, args = pair.split(' ', 1)
499 vals = {}
499 vals = {}
500 for a in args.split(','):
500 for a in args.split(','):
501 if a:
501 if a:
502 n, v = a.split('=')
502 n, v = a.split('=')
503 vals[n] = unescapearg(v)
503 vals[n] = unescapearg(v)
504 func, spec = commands[op]
504 func, spec = commands[op]
505 if spec:
505 if spec:
506 keys = spec.split()
506 keys = spec.split()
507 data = {}
507 data = {}
508 for k in keys:
508 for k in keys:
509 if k == '*':
509 if k == '*':
510 star = {}
510 star = {}
511 for key in vals.keys():
511 for key in vals.keys():
512 if key not in keys:
512 if key not in keys:
513 star[key] = vals[key]
513 star[key] = vals[key]
514 data['*'] = star
514 data['*'] = star
515 else:
515 else:
516 data[k] = vals[k]
516 data[k] = vals[k]
517 result = func(repo, proto, *[data[k] for k in keys])
517 result = func(repo, proto, *[data[k] for k in keys])
518 else:
518 else:
519 result = func(repo, proto)
519 result = func(repo, proto)
520 if isinstance(result, ooberror):
520 if isinstance(result, ooberror):
521 return result
521 return result
522 res.append(escapearg(result))
522 res.append(escapearg(result))
523 return ';'.join(res)
523 return ';'.join(res)
524
524
525 @wireprotocommand('between', 'pairs')
525 @wireprotocommand('between', 'pairs')
526 def between(repo, proto, pairs):
526 def between(repo, proto, pairs):
527 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
527 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
528 r = []
528 r = []
529 for b in repo.between(pairs):
529 for b in repo.between(pairs):
530 r.append(encodelist(b) + "\n")
530 r.append(encodelist(b) + "\n")
531 return "".join(r)
531 return "".join(r)
532
532
533 @wireprotocommand('branchmap')
533 @wireprotocommand('branchmap')
534 def branchmap(repo, proto):
534 def branchmap(repo, proto):
535 branchmap = repo.branchmap()
535 branchmap = repo.branchmap()
536 heads = []
536 heads = []
537 for branch, nodes in branchmap.iteritems():
537 for branch, nodes in branchmap.iteritems():
538 branchname = urllib.quote(encoding.fromlocal(branch))
538 branchname = urllib.quote(encoding.fromlocal(branch))
539 branchnodes = encodelist(nodes)
539 branchnodes = encodelist(nodes)
540 heads.append('%s %s' % (branchname, branchnodes))
540 heads.append('%s %s' % (branchname, branchnodes))
541 return '\n'.join(heads)
541 return '\n'.join(heads)
542
542
543 @wireprotocommand('branches', 'nodes')
543 @wireprotocommand('branches', 'nodes')
544 def branches(repo, proto, nodes):
544 def branches(repo, proto, nodes):
545 nodes = decodelist(nodes)
545 nodes = decodelist(nodes)
546 r = []
546 r = []
547 for b in repo.branches(nodes):
547 for b in repo.branches(nodes):
548 r.append(encodelist(b) + "\n")
548 r.append(encodelist(b) + "\n")
549 return "".join(r)
549 return "".join(r)
550
550
551
551
552 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
552 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
553 'known', 'getbundle', 'unbundlehash', 'batch']
553 'known', 'getbundle', 'unbundlehash', 'batch']
554
554
555 def _capabilities(repo, proto):
555 def _capabilities(repo, proto):
556 """return a list of capabilities for a repo
556 """return a list of capabilities for a repo
557
557
558 This function exists to allow extensions to easily wrap capabilities
558 This function exists to allow extensions to easily wrap capabilities
559 computation
559 computation
560
560
561 - returns a lists: easy to alter
561 - returns a lists: easy to alter
562 - change done here will be propagated to both `capabilities` and `hello`
562 - change done here will be propagated to both `capabilities` and `hello`
563 command without any other action needed.
563 command without any other action needed.
564 """
564 """
565 # copy to prevent modification of the global list
565 # copy to prevent modification of the global list
566 caps = list(wireprotocaps)
566 caps = list(wireprotocaps)
567 if _allowstream(repo.ui):
567 if streamclone.allowservergeneration(repo.ui):
568 if repo.ui.configbool('server', 'preferuncompressed', False):
568 if repo.ui.configbool('server', 'preferuncompressed', False):
569 caps.append('stream-preferred')
569 caps.append('stream-preferred')
570 requiredformats = repo.requirements & repo.supportedformats
570 requiredformats = repo.requirements & repo.supportedformats
571 # if our local revlogs are just revlogv1, add 'stream' cap
571 # if our local revlogs are just revlogv1, add 'stream' cap
572 if not requiredformats - set(('revlogv1',)):
572 if not requiredformats - set(('revlogv1',)):
573 caps.append('stream')
573 caps.append('stream')
574 # otherwise, add 'streamreqs' detailing our local revlog format
574 # otherwise, add 'streamreqs' detailing our local revlog format
575 else:
575 else:
576 caps.append('streamreqs=%s' % ','.join(requiredformats))
576 caps.append('streamreqs=%s' % ','.join(requiredformats))
577 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
577 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
578 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
578 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
579 caps.append('bundle2=' + urllib.quote(capsblob))
579 caps.append('bundle2=' + urllib.quote(capsblob))
580 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
580 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
581 caps.append(
581 caps.append(
582 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
582 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
583 return caps
583 return caps
584
584
585 # If you are writing an extension and consider wrapping this function. Wrap
585 # If you are writing an extension and consider wrapping this function. Wrap
586 # `_capabilities` instead.
586 # `_capabilities` instead.
587 @wireprotocommand('capabilities')
587 @wireprotocommand('capabilities')
588 def capabilities(repo, proto):
588 def capabilities(repo, proto):
589 return ' '.join(_capabilities(repo, proto))
589 return ' '.join(_capabilities(repo, proto))
590
590
591 @wireprotocommand('changegroup', 'roots')
591 @wireprotocommand('changegroup', 'roots')
592 def changegroup(repo, proto, roots):
592 def changegroup(repo, proto, roots):
593 nodes = decodelist(roots)
593 nodes = decodelist(roots)
594 cg = changegroupmod.changegroup(repo, nodes, 'serve')
594 cg = changegroupmod.changegroup(repo, nodes, 'serve')
595 return streamres(proto.groupchunks(cg))
595 return streamres(proto.groupchunks(cg))
596
596
597 @wireprotocommand('changegroupsubset', 'bases heads')
597 @wireprotocommand('changegroupsubset', 'bases heads')
598 def changegroupsubset(repo, proto, bases, heads):
598 def changegroupsubset(repo, proto, bases, heads):
599 bases = decodelist(bases)
599 bases = decodelist(bases)
600 heads = decodelist(heads)
600 heads = decodelist(heads)
601 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
601 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
602 return streamres(proto.groupchunks(cg))
602 return streamres(proto.groupchunks(cg))
603
603
604 @wireprotocommand('debugwireargs', 'one two *')
604 @wireprotocommand('debugwireargs', 'one two *')
605 def debugwireargs(repo, proto, one, two, others):
605 def debugwireargs(repo, proto, one, two, others):
606 # only accept optional args from the known set
606 # only accept optional args from the known set
607 opts = options('debugwireargs', ['three', 'four'], others)
607 opts = options('debugwireargs', ['three', 'four'], others)
608 return repo.debugwireargs(one, two, **opts)
608 return repo.debugwireargs(one, two, **opts)
609
609
610 # List of options accepted by getbundle.
610 # List of options accepted by getbundle.
611 #
611 #
612 # Meant to be extended by extensions. It is the extension's responsibility to
612 # Meant to be extended by extensions. It is the extension's responsibility to
613 # ensure such options are properly processed in exchange.getbundle.
613 # ensure such options are properly processed in exchange.getbundle.
614 gboptslist = ['heads', 'common', 'bundlecaps']
614 gboptslist = ['heads', 'common', 'bundlecaps']
615
615
616 @wireprotocommand('getbundle', '*')
616 @wireprotocommand('getbundle', '*')
617 def getbundle(repo, proto, others):
617 def getbundle(repo, proto, others):
618 opts = options('getbundle', gboptsmap.keys(), others)
618 opts = options('getbundle', gboptsmap.keys(), others)
619 for k, v in opts.iteritems():
619 for k, v in opts.iteritems():
620 keytype = gboptsmap[k]
620 keytype = gboptsmap[k]
621 if keytype == 'nodes':
621 if keytype == 'nodes':
622 opts[k] = decodelist(v)
622 opts[k] = decodelist(v)
623 elif keytype == 'csv':
623 elif keytype == 'csv':
624 opts[k] = list(v.split(','))
624 opts[k] = list(v.split(','))
625 elif keytype == 'scsv':
625 elif keytype == 'scsv':
626 opts[k] = set(v.split(','))
626 opts[k] = set(v.split(','))
627 elif keytype == 'boolean':
627 elif keytype == 'boolean':
628 opts[k] = bool(v)
628 opts[k] = bool(v)
629 elif keytype != 'plain':
629 elif keytype != 'plain':
630 raise KeyError('unknown getbundle option type %s'
630 raise KeyError('unknown getbundle option type %s'
631 % keytype)
631 % keytype)
632 cg = exchange.getbundle(repo, 'serve', **opts)
632 cg = exchange.getbundle(repo, 'serve', **opts)
633 return streamres(proto.groupchunks(cg))
633 return streamres(proto.groupchunks(cg))
634
634
635 @wireprotocommand('heads')
635 @wireprotocommand('heads')
636 def heads(repo, proto):
636 def heads(repo, proto):
637 h = repo.heads()
637 h = repo.heads()
638 return encodelist(h) + "\n"
638 return encodelist(h) + "\n"
639
639
640 @wireprotocommand('hello')
640 @wireprotocommand('hello')
641 def hello(repo, proto):
641 def hello(repo, proto):
642 '''the hello command returns a set of lines describing various
642 '''the hello command returns a set of lines describing various
643 interesting things about the server, in an RFC822-like format.
643 interesting things about the server, in an RFC822-like format.
644 Currently the only one defined is "capabilities", which
644 Currently the only one defined is "capabilities", which
645 consists of a line in the form:
645 consists of a line in the form:
646
646
647 capabilities: space separated list of tokens
647 capabilities: space separated list of tokens
648 '''
648 '''
649 return "capabilities: %s\n" % (capabilities(repo, proto))
649 return "capabilities: %s\n" % (capabilities(repo, proto))
650
650
651 @wireprotocommand('listkeys', 'namespace')
651 @wireprotocommand('listkeys', 'namespace')
652 def listkeys(repo, proto, namespace):
652 def listkeys(repo, proto, namespace):
653 d = repo.listkeys(encoding.tolocal(namespace)).items()
653 d = repo.listkeys(encoding.tolocal(namespace)).items()
654 return pushkeymod.encodekeys(d)
654 return pushkeymod.encodekeys(d)
655
655
656 @wireprotocommand('lookup', 'key')
656 @wireprotocommand('lookup', 'key')
657 def lookup(repo, proto, key):
657 def lookup(repo, proto, key):
658 try:
658 try:
659 k = encoding.tolocal(key)
659 k = encoding.tolocal(key)
660 c = repo[k]
660 c = repo[k]
661 r = c.hex()
661 r = c.hex()
662 success = 1
662 success = 1
663 except Exception as inst:
663 except Exception as inst:
664 r = str(inst)
664 r = str(inst)
665 success = 0
665 success = 0
666 return "%s %s\n" % (success, r)
666 return "%s %s\n" % (success, r)
667
667
668 @wireprotocommand('known', 'nodes *')
668 @wireprotocommand('known', 'nodes *')
669 def known(repo, proto, nodes, others):
669 def known(repo, proto, nodes, others):
670 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
670 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
671
671
672 @wireprotocommand('pushkey', 'namespace key old new')
672 @wireprotocommand('pushkey', 'namespace key old new')
673 def pushkey(repo, proto, namespace, key, old, new):
673 def pushkey(repo, proto, namespace, key, old, new):
674 # compatibility with pre-1.8 clients which were accidentally
674 # compatibility with pre-1.8 clients which were accidentally
675 # sending raw binary nodes rather than utf-8-encoded hex
675 # sending raw binary nodes rather than utf-8-encoded hex
676 if len(new) == 20 and new.encode('string-escape') != new:
676 if len(new) == 20 and new.encode('string-escape') != new:
677 # looks like it could be a binary node
677 # looks like it could be a binary node
678 try:
678 try:
679 new.decode('utf-8')
679 new.decode('utf-8')
680 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
680 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
681 except UnicodeDecodeError:
681 except UnicodeDecodeError:
682 pass # binary, leave unmodified
682 pass # binary, leave unmodified
683 else:
683 else:
684 new = encoding.tolocal(new) # normal path
684 new = encoding.tolocal(new) # normal path
685
685
686 if util.safehasattr(proto, 'restore'):
686 if util.safehasattr(proto, 'restore'):
687
687
688 proto.redirect()
688 proto.redirect()
689
689
690 try:
690 try:
691 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
691 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
692 encoding.tolocal(old), new) or False
692 encoding.tolocal(old), new) or False
693 except util.Abort:
693 except util.Abort:
694 r = False
694 r = False
695
695
696 output = proto.restore()
696 output = proto.restore()
697
697
698 return '%s\n%s' % (int(r), output)
698 return '%s\n%s' % (int(r), output)
699
699
700 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
700 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
701 encoding.tolocal(old), new)
701 encoding.tolocal(old), new)
702 return '%s\n' % int(r)
702 return '%s\n' % int(r)
703
703
704 def _allowstream(ui):
705 return ui.configbool('server', 'uncompressed', True, untrusted=True)
706
707 @wireprotocommand('stream_out')
704 @wireprotocommand('stream_out')
708 def stream(repo, proto):
705 def stream(repo, proto):
709 '''If the server supports streaming clone, it advertises the "stream"
706 '''If the server supports streaming clone, it advertises the "stream"
710 capability with a value representing the version and flags of the repo
707 capability with a value representing the version and flags of the repo
711 it is serving. Client checks to see if it understands the format.
708 it is serving. Client checks to see if it understands the format.
712 '''
709 '''
713 if not _allowstream(repo.ui):
710 if not streamclone.allowservergeneration(repo.ui):
714 return '1\n'
711 return '1\n'
715
712
716 def getstream(it):
713 def getstream(it):
717 yield '0\n'
714 yield '0\n'
718 for chunk in it:
715 for chunk in it:
719 yield chunk
716 yield chunk
720
717
721 try:
718 try:
722 # LockError may be raised before the first result is yielded. Don't
719 # LockError may be raised before the first result is yielded. Don't
723 # emit output until we're sure we got the lock successfully.
720 # emit output until we're sure we got the lock successfully.
724 it = streamclone.generatev1(repo)
721 it = streamclone.generatev1(repo)
725 return streamres(getstream(it))
722 return streamres(getstream(it))
726 except error.LockError:
723 except error.LockError:
727 return '2\n'
724 return '2\n'
728
725
729 @wireprotocommand('unbundle', 'heads')
726 @wireprotocommand('unbundle', 'heads')
730 def unbundle(repo, proto, heads):
727 def unbundle(repo, proto, heads):
731 their_heads = decodelist(heads)
728 their_heads = decodelist(heads)
732
729
733 try:
730 try:
734 proto.redirect()
731 proto.redirect()
735
732
736 exchange.check_heads(repo, their_heads, 'preparing changes')
733 exchange.check_heads(repo, their_heads, 'preparing changes')
737
734
738 # write bundle data to temporary file because it can be big
735 # write bundle data to temporary file because it can be big
739 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
736 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
740 fp = os.fdopen(fd, 'wb+')
737 fp = os.fdopen(fd, 'wb+')
741 r = 0
738 r = 0
742 try:
739 try:
743 proto.getfile(fp)
740 proto.getfile(fp)
744 fp.seek(0)
741 fp.seek(0)
745 gen = exchange.readbundle(repo.ui, fp, None)
742 gen = exchange.readbundle(repo.ui, fp, None)
746 r = exchange.unbundle(repo, gen, their_heads, 'serve',
743 r = exchange.unbundle(repo, gen, their_heads, 'serve',
747 proto._client())
744 proto._client())
748 if util.safehasattr(r, 'addpart'):
745 if util.safehasattr(r, 'addpart'):
749 # The return looks streamable, we are in the bundle2 case and
746 # The return looks streamable, we are in the bundle2 case and
750 # should return a stream.
747 # should return a stream.
751 return streamres(r.getchunks())
748 return streamres(r.getchunks())
752 return pushres(r)
749 return pushres(r)
753
750
754 finally:
751 finally:
755 fp.close()
752 fp.close()
756 os.unlink(tempname)
753 os.unlink(tempname)
757
754
758 except (error.BundleValueError, util.Abort, error.PushRaced) as exc:
755 except (error.BundleValueError, util.Abort, error.PushRaced) as exc:
759 # handle non-bundle2 case first
756 # handle non-bundle2 case first
760 if not getattr(exc, 'duringunbundle2', False):
757 if not getattr(exc, 'duringunbundle2', False):
761 try:
758 try:
762 raise
759 raise
763 except util.Abort:
760 except util.Abort:
764 # The old code we moved used sys.stderr directly.
761 # The old code we moved used sys.stderr directly.
765 # We did not change it to minimise code change.
762 # We did not change it to minimise code change.
766 # This need to be moved to something proper.
763 # This need to be moved to something proper.
767 # Feel free to do it.
764 # Feel free to do it.
768 sys.stderr.write("abort: %s\n" % exc)
765 sys.stderr.write("abort: %s\n" % exc)
769 return pushres(0)
766 return pushres(0)
770 except error.PushRaced:
767 except error.PushRaced:
771 return pusherr(str(exc))
768 return pusherr(str(exc))
772
769
773 bundler = bundle2.bundle20(repo.ui)
770 bundler = bundle2.bundle20(repo.ui)
774 for out in getattr(exc, '_bundle2salvagedoutput', ()):
771 for out in getattr(exc, '_bundle2salvagedoutput', ()):
775 bundler.addpart(out)
772 bundler.addpart(out)
776 try:
773 try:
777 try:
774 try:
778 raise
775 raise
779 except error.PushkeyFailed as exc:
776 except error.PushkeyFailed as exc:
780 # check client caps
777 # check client caps
781 remotecaps = getattr(exc, '_replycaps', None)
778 remotecaps = getattr(exc, '_replycaps', None)
782 if (remotecaps is not None
779 if (remotecaps is not None
783 and 'pushkey' not in remotecaps.get('error', ())):
780 and 'pushkey' not in remotecaps.get('error', ())):
784 # no support remote side, fallback to Abort handler.
781 # no support remote side, fallback to Abort handler.
785 raise
782 raise
786 part = bundler.newpart('error:pushkey')
783 part = bundler.newpart('error:pushkey')
787 part.addparam('in-reply-to', exc.partid)
784 part.addparam('in-reply-to', exc.partid)
788 if exc.namespace is not None:
785 if exc.namespace is not None:
789 part.addparam('namespace', exc.namespace, mandatory=False)
786 part.addparam('namespace', exc.namespace, mandatory=False)
790 if exc.key is not None:
787 if exc.key is not None:
791 part.addparam('key', exc.key, mandatory=False)
788 part.addparam('key', exc.key, mandatory=False)
792 if exc.new is not None:
789 if exc.new is not None:
793 part.addparam('new', exc.new, mandatory=False)
790 part.addparam('new', exc.new, mandatory=False)
794 if exc.old is not None:
791 if exc.old is not None:
795 part.addparam('old', exc.old, mandatory=False)
792 part.addparam('old', exc.old, mandatory=False)
796 if exc.ret is not None:
793 if exc.ret is not None:
797 part.addparam('ret', exc.ret, mandatory=False)
794 part.addparam('ret', exc.ret, mandatory=False)
798 except error.BundleValueError as exc:
795 except error.BundleValueError as exc:
799 errpart = bundler.newpart('error:unsupportedcontent')
796 errpart = bundler.newpart('error:unsupportedcontent')
800 if exc.parttype is not None:
797 if exc.parttype is not None:
801 errpart.addparam('parttype', exc.parttype)
798 errpart.addparam('parttype', exc.parttype)
802 if exc.params:
799 if exc.params:
803 errpart.addparam('params', '\0'.join(exc.params))
800 errpart.addparam('params', '\0'.join(exc.params))
804 except util.Abort as exc:
801 except util.Abort as exc:
805 manargs = [('message', str(exc))]
802 manargs = [('message', str(exc))]
806 advargs = []
803 advargs = []
807 if exc.hint is not None:
804 if exc.hint is not None:
808 advargs.append(('hint', exc.hint))
805 advargs.append(('hint', exc.hint))
809 bundler.addpart(bundle2.bundlepart('error:abort',
806 bundler.addpart(bundle2.bundlepart('error:abort',
810 manargs, advargs))
807 manargs, advargs))
811 except error.PushRaced as exc:
808 except error.PushRaced as exc:
812 bundler.newpart('error:pushraced', [('message', str(exc))])
809 bundler.newpart('error:pushraced', [('message', str(exc))])
813 return streamres(bundler.getchunks())
810 return streamres(bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now