##// END OF EJS Templates
streamclone: move payload header generation into own function...
Gregory Szorc -
r26469:fb743268 default
parent child Browse files
Show More
@@ -1,279 +1,291 b''
1 1 # streamclone.py - producing and consuming streaming repository data
2 2 #
3 3 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import time
11 11
12 12 from .i18n import _
13 13 from . import (
14 14 branchmap,
15 15 error,
16 16 store,
17 17 util,
18 18 )
19 19
20 20 def canperformstreamclone(pullop, bailifbundle2supported=False):
21 21 """Whether it is possible to perform a streaming clone as part of pull.
22 22
23 23 ``bailifbundle2supported`` will cause the function to return False if
24 24 bundle2 stream clones are supported. It should only be called by the
25 25 legacy stream clone code path.
26 26
27 27 Returns a tuple of (supported, requirements). ``supported`` is True if
28 28 streaming clone is supported and False otherwise. ``requirements`` is
29 29 a set of repo requirements from the remote, or ``None`` if stream clone
30 30 isn't supported.
31 31 """
32 32 repo = pullop.repo
33 33 remote = pullop.remote
34 34
35 35 bundle2supported = False
36 36 if pullop.canusebundle2:
37 37 if 'v1' in pullop.remotebundle2caps.get('stream', []):
38 38 bundle2supported = True
39 39 # else
40 40 # Server doesn't support bundle2 stream clone or doesn't support
41 41 # the versions we support. Fall back and possibly allow legacy.
42 42
43 43 # Ensures legacy code path uses available bundle2.
44 44 if bailifbundle2supported and bundle2supported:
45 45 return False, None
46 46 # Ensures bundle2 doesn't try to do a stream clone if it isn't supported.
47 47 #elif not bailifbundle2supported and not bundle2supported:
48 48 # return False, None
49 49
50 50 # Streaming clone only works on empty repositories.
51 51 if len(repo):
52 52 return False, None
53 53
54 54 # Streaming clone only works if all data is being requested.
55 55 if pullop.heads:
56 56 return False, None
57 57
58 58 streamrequested = pullop.streamclonerequested
59 59
60 60 # If we don't have a preference, let the server decide for us. This
61 61 # likely only comes into play in LANs.
62 62 if streamrequested is None:
63 63 # The server can advertise whether to prefer streaming clone.
64 64 streamrequested = remote.capable('stream-preferred')
65 65
66 66 if not streamrequested:
67 67 return False, None
68 68
69 69 # In order for stream clone to work, the client has to support all the
70 70 # requirements advertised by the server.
71 71 #
72 72 # The server advertises its requirements via the "stream" and "streamreqs"
73 73 # capability. "stream" (a value-less capability) is advertised if and only
74 74 # if the only requirement is "revlogv1." Else, the "streamreqs" capability
75 75 # is advertised and contains a comma-delimited list of requirements.
76 76 requirements = set()
77 77 if remote.capable('stream'):
78 78 requirements.add('revlogv1')
79 79 else:
80 80 streamreqs = remote.capable('streamreqs')
81 81 # This is weird and shouldn't happen with modern servers.
82 82 if not streamreqs:
83 83 return False, None
84 84
85 85 streamreqs = set(streamreqs.split(','))
86 86 # Server requires something we don't support. Bail.
87 87 if streamreqs - repo.supportedformats:
88 88 return False, None
89 89 requirements = streamreqs
90 90
91 91 return True, requirements
92 92
93 93 def maybeperformlegacystreamclone(pullop):
94 94 """Possibly perform a legacy stream clone operation.
95 95
96 96 Legacy stream clones are performed as part of pull but before all other
97 97 operations.
98 98
99 99 A legacy stream clone will not be performed if a bundle2 stream clone is
100 100 supported.
101 101 """
102 102 supported, requirements = canperformstreamclone(pullop)
103 103
104 104 if not supported:
105 105 return
106 106
107 107 repo = pullop.repo
108 108 remote = pullop.remote
109 109
110 110 # Save remote branchmap. We will use it later to speed up branchcache
111 111 # creation.
112 112 rbranchmap = None
113 113 if remote.capable('branchmap'):
114 114 rbranchmap = remote.branchmap()
115 115
116 116 fp = remote.stream_out()
117 117 l = fp.readline()
118 118 try:
119 119 resp = int(l)
120 120 except ValueError:
121 121 raise error.ResponseError(
122 122 _('unexpected response from remote server:'), l)
123 123 if resp == 1:
124 124 raise util.Abort(_('operation forbidden by server'))
125 125 elif resp == 2:
126 126 raise util.Abort(_('locking the remote repository failed'))
127 127 elif resp != 0:
128 128 raise util.Abort(_('the server sent an unknown error code'))
129 129
130 130 l = fp.readline()
131 131 try:
132 132 filecount, bytecount = map(int, l.split(' ', 1))
133 133 except (ValueError, TypeError):
134 134 raise error.ResponseError(
135 135 _('unexpected response from remote server:'), l)
136 136
137 137 lock = repo.lock()
138 138 try:
139 139 consumev1(repo, fp, filecount, bytecount)
140 140
141 141 # new requirements = old non-format requirements +
142 142 # new format-related remote requirements
143 143 # requirements from the streamed-in repository
144 144 repo.requirements = requirements | (
145 145 repo.requirements - repo.supportedformats)
146 146 repo._applyopenerreqs()
147 147 repo._writerequirements()
148 148
149 149 if rbranchmap:
150 150 branchmap.replacecache(repo, rbranchmap)
151 151
152 152 repo.invalidate()
153 153 finally:
154 154 lock.release()
155 155
156 156 def allowservergeneration(ui):
157 157 """Whether streaming clones are allowed from the server."""
158 158 return ui.configbool('server', 'uncompressed', True, untrusted=True)
159 159
160 160 # This is it's own function so extensions can override it.
161 161 def _walkstreamfiles(repo):
162 162 return repo.store.walk()
163 163
164 164 def generatev1(repo):
165 165 """Emit content for version 1 of a streaming clone.
166 166
167 This is a generator of raw chunks that constitute a streaming clone.
167 This returns a 3-tuple of (file count, byte size, data iterator).
168 168
169 The stream begins with a line of 2 space-delimited integers containing the
170 number of entries and total bytes size.
169 The data iterator consists of N entries for each file being transferred.
170 Each file entry starts as a line with the file name and integer size
171 delimited by a null byte.
171 172
172 Next, are N entries for each file being transferred. Each file entry starts
173 as a line with the file name and integer size delimited by a null byte.
174 173 The raw file data follows. Following the raw file data is the next file
175 174 entry, or EOF.
176 175
177 176 When used on the wire protocol, an additional line indicating protocol
178 177 success will be prepended to the stream. This function is not responsible
179 178 for adding it.
180 179
181 180 This function will obtain a repository lock to ensure a consistent view of
182 181 the store is captured. It therefore may raise LockError.
183 182 """
184 183 entries = []
185 184 total_bytes = 0
186 185 # Get consistent snapshot of repo, lock during scan.
187 186 lock = repo.lock()
188 187 try:
189 188 repo.ui.debug('scanning\n')
190 189 for name, ename, size in _walkstreamfiles(repo):
191 190 if size:
192 191 entries.append((name, size))
193 192 total_bytes += size
194 193 finally:
195 194 lock.release()
196 195
197 196 repo.ui.debug('%d files, %d bytes to transfer\n' %
198 197 (len(entries), total_bytes))
199 yield '%d %d\n' % (len(entries), total_bytes)
200 198
201 199 svfs = repo.svfs
202 200 oldaudit = svfs.mustaudit
203 201 debugflag = repo.ui.debugflag
204 202 svfs.mustaudit = False
205 203
206 try:
207 for name, size in entries:
208 if debugflag:
209 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
210 # partially encode name over the wire for backwards compat
211 yield '%s\0%d\n' % (store.encodedir(name), size)
212 if size <= 65536:
213 fp = svfs(name)
214 try:
215 data = fp.read(size)
216 finally:
217 fp.close()
218 yield data
219 else:
220 for chunk in util.filechunkiter(svfs(name), limit=size):
221 yield chunk
222 finally:
223 svfs.mustaudit = oldaudit
204 def emitrevlogdata():
205 try:
206 for name, size in entries:
207 if debugflag:
208 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
209 # partially encode name over the wire for backwards compat
210 yield '%s\0%d\n' % (store.encodedir(name), size)
211 if size <= 65536:
212 fp = svfs(name)
213 try:
214 data = fp.read(size)
215 finally:
216 fp.close()
217 yield data
218 else:
219 for chunk in util.filechunkiter(svfs(name), limit=size):
220 yield chunk
221 finally:
222 svfs.mustaudit = oldaudit
223
224 return len(entries), total_bytes, emitrevlogdata()
225
226 def generatev1wireproto(repo):
227 """Emit content for version 1 of streaming clone suitable for the wire.
228
229 This is the data output from ``generatev1()`` with a header line
230 indicating file count and byte size.
231 """
232 filecount, bytecount, it = generatev1(repo)
233 yield '%d %d\n' % (filecount, bytecount)
234 for chunk in it:
235 yield chunk
224 236
225 237 def consumev1(repo, fp, filecount, bytecount):
226 238 """Apply the contents from version 1 of a streaming clone file handle.
227 239
228 240 This takes the output from "streamout" and applies it to the specified
229 241 repository.
230 242
231 243 Like "streamout," the status line added by the wire protocol is not handled
232 244 by this function.
233 245 """
234 246 lock = repo.lock()
235 247 try:
236 248 repo.ui.status(_('streaming all changes\n'))
237 249 repo.ui.status(_('%d files to transfer, %s of data\n') %
238 250 (filecount, util.bytecount(bytecount)))
239 251 handled_bytes = 0
240 252 repo.ui.progress(_('clone'), 0, total=bytecount)
241 253 start = time.time()
242 254
243 255 tr = repo.transaction(_('clone'))
244 256 try:
245 257 for i in xrange(filecount):
246 258 # XXX doesn't support '\n' or '\r' in filenames
247 259 l = fp.readline()
248 260 try:
249 261 name, size = l.split('\0', 1)
250 262 size = int(size)
251 263 except (ValueError, TypeError):
252 264 raise error.ResponseError(
253 265 _('unexpected response from remote server:'), l)
254 266 if repo.ui.debugflag:
255 267 repo.ui.debug('adding %s (%s)\n' %
256 268 (name, util.bytecount(size)))
257 269 # for backwards compat, name was partially encoded
258 270 ofp = repo.svfs(store.decodedir(name), 'w')
259 271 for chunk in util.filechunkiter(fp, limit=size):
260 272 handled_bytes += len(chunk)
261 273 repo.ui.progress(_('clone'), handled_bytes, total=bytecount)
262 274 ofp.write(chunk)
263 275 ofp.close()
264 276 tr.close()
265 277 finally:
266 278 tr.release()
267 279
268 280 # Writing straight to files circumvented the inmemory caches
269 281 repo.invalidate()
270 282
271 283 elapsed = time.time() - start
272 284 if elapsed <= 0:
273 285 elapsed = 0.001
274 286 repo.ui.progress(_('clone'), None)
275 287 repo.ui.status(_('transferred %s in %.1f seconds (%s/sec)\n') %
276 288 (util.bytecount(bytecount), elapsed,
277 289 util.bytecount(bytecount / elapsed)))
278 290 finally:
279 291 lock.release()
@@ -1,810 +1,810 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import os
11 11 import sys
12 12 import tempfile
13 13 import urllib
14 14
15 15 from .i18n import _
16 16 from .node import (
17 17 bin,
18 18 hex,
19 19 )
20 20
21 21 from . import (
22 22 bundle2,
23 23 changegroup as changegroupmod,
24 24 encoding,
25 25 error,
26 26 exchange,
27 27 peer,
28 28 pushkey as pushkeymod,
29 29 streamclone,
30 30 util,
31 31 )
32 32
33 33 class abstractserverproto(object):
34 34 """abstract class that summarizes the protocol API
35 35
36 36 Used as reference and documentation.
37 37 """
38 38
39 39 def getargs(self, args):
40 40 """return the value for arguments in <args>
41 41
42 42 returns a list of values (same order as <args>)"""
43 43 raise NotImplementedError()
44 44
45 45 def getfile(self, fp):
46 46 """write the whole content of a file into a file like object
47 47
48 48 The file is in the form::
49 49
50 50 (<chunk-size>\n<chunk>)+0\n
51 51
52 52 chunk size is the ascii version of the int.
53 53 """
54 54 raise NotImplementedError()
55 55
56 56 def redirect(self):
57 57 """may setup interception for stdout and stderr
58 58
59 59 See also the `restore` method."""
60 60 raise NotImplementedError()
61 61
62 62 # If the `redirect` function does install interception, the `restore`
63 63 # function MUST be defined. If interception is not used, this function
64 64 # MUST NOT be defined.
65 65 #
66 66 # left commented here on purpose
67 67 #
68 68 #def restore(self):
69 69 # """reinstall previous stdout and stderr and return intercepted stdout
70 70 # """
71 71 # raise NotImplementedError()
72 72
73 73 def groupchunks(self, cg):
74 74 """return 4096 chunks from a changegroup object
75 75
76 76 Some protocols may have compressed the contents."""
77 77 raise NotImplementedError()
78 78
79 79 class remotebatch(peer.batcher):
80 80 '''batches the queued calls; uses as few roundtrips as possible'''
81 81 def __init__(self, remote):
82 82 '''remote must support _submitbatch(encbatch) and
83 83 _submitone(op, encargs)'''
84 84 peer.batcher.__init__(self)
85 85 self.remote = remote
86 86 def submit(self):
87 87 req, rsp = [], []
88 88 for name, args, opts, resref in self.calls:
89 89 mtd = getattr(self.remote, name)
90 90 batchablefn = getattr(mtd, 'batchable', None)
91 91 if batchablefn is not None:
92 92 batchable = batchablefn(mtd.im_self, *args, **opts)
93 93 encargsorres, encresref = batchable.next()
94 94 if encresref:
95 95 req.append((name, encargsorres,))
96 96 rsp.append((batchable, encresref, resref,))
97 97 else:
98 98 resref.set(encargsorres)
99 99 else:
100 100 if req:
101 101 self._submitreq(req, rsp)
102 102 req, rsp = [], []
103 103 resref.set(mtd(*args, **opts))
104 104 if req:
105 105 self._submitreq(req, rsp)
106 106 def _submitreq(self, req, rsp):
107 107 encresults = self.remote._submitbatch(req)
108 108 for encres, r in zip(encresults, rsp):
109 109 batchable, encresref, resref = r
110 110 encresref.set(encres)
111 111 resref.set(batchable.next())
112 112
113 113 # Forward a couple of names from peer to make wireproto interactions
114 114 # slightly more sensible.
115 115 batchable = peer.batchable
116 116 future = peer.future
117 117
118 118 # list of nodes encoding / decoding
119 119
120 120 def decodelist(l, sep=' '):
121 121 if l:
122 122 return map(bin, l.split(sep))
123 123 return []
124 124
125 125 def encodelist(l, sep=' '):
126 126 try:
127 127 return sep.join(map(hex, l))
128 128 except TypeError:
129 129 raise
130 130
131 131 # batched call argument encoding
132 132
133 133 def escapearg(plain):
134 134 return (plain
135 135 .replace(':', ':c')
136 136 .replace(',', ':o')
137 137 .replace(';', ':s')
138 138 .replace('=', ':e'))
139 139
140 140 def unescapearg(escaped):
141 141 return (escaped
142 142 .replace(':e', '=')
143 143 .replace(':s', ';')
144 144 .replace(':o', ',')
145 145 .replace(':c', ':'))
146 146
147 147 # mapping of options accepted by getbundle and their types
148 148 #
149 149 # Meant to be extended by extensions. It is extensions responsibility to ensure
150 150 # such options are properly processed in exchange.getbundle.
151 151 #
152 152 # supported types are:
153 153 #
154 154 # :nodes: list of binary nodes
155 155 # :csv: list of comma-separated values
156 156 # :scsv: list of comma-separated values return as set
157 157 # :plain: string with no transformation needed.
158 158 gboptsmap = {'heads': 'nodes',
159 159 'common': 'nodes',
160 160 'obsmarkers': 'boolean',
161 161 'bundlecaps': 'scsv',
162 162 'listkeys': 'csv',
163 163 'cg': 'boolean'}
164 164
165 165 # client side
166 166
167 167 class wirepeer(peer.peerrepository):
168 168
169 169 def batch(self):
170 170 if self.capable('batch'):
171 171 return remotebatch(self)
172 172 else:
173 173 return peer.localbatch(self)
174 174 def _submitbatch(self, req):
175 175 cmds = []
176 176 for op, argsdict in req:
177 177 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
178 178 for k, v in argsdict.iteritems())
179 179 cmds.append('%s %s' % (op, args))
180 180 rsp = self._call("batch", cmds=';'.join(cmds))
181 181 return [unescapearg(r) for r in rsp.split(';')]
182 182 def _submitone(self, op, args):
183 183 return self._call(op, **args)
184 184
185 185 @batchable
186 186 def lookup(self, key):
187 187 self.requirecap('lookup', _('look up remote revision'))
188 188 f = future()
189 189 yield {'key': encoding.fromlocal(key)}, f
190 190 d = f.value
191 191 success, data = d[:-1].split(" ", 1)
192 192 if int(success):
193 193 yield bin(data)
194 194 self._abort(error.RepoError(data))
195 195
196 196 @batchable
197 197 def heads(self):
198 198 f = future()
199 199 yield {}, f
200 200 d = f.value
201 201 try:
202 202 yield decodelist(d[:-1])
203 203 except ValueError:
204 204 self._abort(error.ResponseError(_("unexpected response:"), d))
205 205
206 206 @batchable
207 207 def known(self, nodes):
208 208 f = future()
209 209 yield {'nodes': encodelist(nodes)}, f
210 210 d = f.value
211 211 try:
212 212 yield [bool(int(b)) for b in d]
213 213 except ValueError:
214 214 self._abort(error.ResponseError(_("unexpected response:"), d))
215 215
216 216 @batchable
217 217 def branchmap(self):
218 218 f = future()
219 219 yield {}, f
220 220 d = f.value
221 221 try:
222 222 branchmap = {}
223 223 for branchpart in d.splitlines():
224 224 branchname, branchheads = branchpart.split(' ', 1)
225 225 branchname = encoding.tolocal(urllib.unquote(branchname))
226 226 branchheads = decodelist(branchheads)
227 227 branchmap[branchname] = branchheads
228 228 yield branchmap
229 229 except TypeError:
230 230 self._abort(error.ResponseError(_("unexpected response:"), d))
231 231
232 232 def branches(self, nodes):
233 233 n = encodelist(nodes)
234 234 d = self._call("branches", nodes=n)
235 235 try:
236 236 br = [tuple(decodelist(b)) for b in d.splitlines()]
237 237 return br
238 238 except ValueError:
239 239 self._abort(error.ResponseError(_("unexpected response:"), d))
240 240
241 241 def between(self, pairs):
242 242 batch = 8 # avoid giant requests
243 243 r = []
244 244 for i in xrange(0, len(pairs), batch):
245 245 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
246 246 d = self._call("between", pairs=n)
247 247 try:
248 248 r.extend(l and decodelist(l) or [] for l in d.splitlines())
249 249 except ValueError:
250 250 self._abort(error.ResponseError(_("unexpected response:"), d))
251 251 return r
252 252
253 253 @batchable
254 254 def pushkey(self, namespace, key, old, new):
255 255 if not self.capable('pushkey'):
256 256 yield False, None
257 257 f = future()
258 258 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
259 259 yield {'namespace': encoding.fromlocal(namespace),
260 260 'key': encoding.fromlocal(key),
261 261 'old': encoding.fromlocal(old),
262 262 'new': encoding.fromlocal(new)}, f
263 263 d = f.value
264 264 d, output = d.split('\n', 1)
265 265 try:
266 266 d = bool(int(d))
267 267 except ValueError:
268 268 raise error.ResponseError(
269 269 _('push failed (unexpected response):'), d)
270 270 for l in output.splitlines(True):
271 271 self.ui.status(_('remote: '), l)
272 272 yield d
273 273
274 274 @batchable
275 275 def listkeys(self, namespace):
276 276 if not self.capable('pushkey'):
277 277 yield {}, None
278 278 f = future()
279 279 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
280 280 yield {'namespace': encoding.fromlocal(namespace)}, f
281 281 d = f.value
282 282 self.ui.debug('received listkey for "%s": %i bytes\n'
283 283 % (namespace, len(d)))
284 284 yield pushkeymod.decodekeys(d)
285 285
286 286 def stream_out(self):
287 287 return self._callstream('stream_out')
288 288
289 289 def changegroup(self, nodes, kind):
290 290 n = encodelist(nodes)
291 291 f = self._callcompressable("changegroup", roots=n)
292 292 return changegroupmod.cg1unpacker(f, 'UN')
293 293
294 294 def changegroupsubset(self, bases, heads, kind):
295 295 self.requirecap('changegroupsubset', _('look up remote changes'))
296 296 bases = encodelist(bases)
297 297 heads = encodelist(heads)
298 298 f = self._callcompressable("changegroupsubset",
299 299 bases=bases, heads=heads)
300 300 return changegroupmod.cg1unpacker(f, 'UN')
301 301
302 302 def getbundle(self, source, **kwargs):
303 303 self.requirecap('getbundle', _('look up remote changes'))
304 304 opts = {}
305 305 bundlecaps = kwargs.get('bundlecaps')
306 306 if bundlecaps is not None:
307 307 kwargs['bundlecaps'] = sorted(bundlecaps)
308 308 else:
309 309 bundlecaps = () # kwargs could have it to None
310 310 for key, value in kwargs.iteritems():
311 311 if value is None:
312 312 continue
313 313 keytype = gboptsmap.get(key)
314 314 if keytype is None:
315 315 assert False, 'unexpected'
316 316 elif keytype == 'nodes':
317 317 value = encodelist(value)
318 318 elif keytype in ('csv', 'scsv'):
319 319 value = ','.join(value)
320 320 elif keytype == 'boolean':
321 321 value = '%i' % bool(value)
322 322 elif keytype != 'plain':
323 323 raise KeyError('unknown getbundle option type %s'
324 324 % keytype)
325 325 opts[key] = value
326 326 f = self._callcompressable("getbundle", **opts)
327 327 if any((cap.startswith('HG2') for cap in bundlecaps)):
328 328 return bundle2.getunbundler(self.ui, f)
329 329 else:
330 330 return changegroupmod.cg1unpacker(f, 'UN')
331 331
332 332 def unbundle(self, cg, heads, source):
333 333 '''Send cg (a readable file-like object representing the
334 334 changegroup to push, typically a chunkbuffer object) to the
335 335 remote server as a bundle.
336 336
337 337 When pushing a bundle10 stream, return an integer indicating the
338 338 result of the push (see localrepository.addchangegroup()).
339 339
340 340 When pushing a bundle20 stream, return a bundle20 stream.'''
341 341
342 342 if heads != ['force'] and self.capable('unbundlehash'):
343 343 heads = encodelist(['hashed',
344 344 util.sha1(''.join(sorted(heads))).digest()])
345 345 else:
346 346 heads = encodelist(heads)
347 347
348 348 if util.safehasattr(cg, 'deltaheader'):
349 349 # this a bundle10, do the old style call sequence
350 350 ret, output = self._callpush("unbundle", cg, heads=heads)
351 351 if ret == "":
352 352 raise error.ResponseError(
353 353 _('push failed:'), output)
354 354 try:
355 355 ret = int(ret)
356 356 except ValueError:
357 357 raise error.ResponseError(
358 358 _('push failed (unexpected response):'), ret)
359 359
360 360 for l in output.splitlines(True):
361 361 self.ui.status(_('remote: '), l)
362 362 else:
363 363 # bundle2 push. Send a stream, fetch a stream.
364 364 stream = self._calltwowaystream('unbundle', cg, heads=heads)
365 365 ret = bundle2.getunbundler(self.ui, stream)
366 366 return ret
367 367
368 368 def debugwireargs(self, one, two, three=None, four=None, five=None):
369 369 # don't pass optional arguments left at their default value
370 370 opts = {}
371 371 if three is not None:
372 372 opts['three'] = three
373 373 if four is not None:
374 374 opts['four'] = four
375 375 return self._call('debugwireargs', one=one, two=two, **opts)
376 376
377 377 def _call(self, cmd, **args):
378 378 """execute <cmd> on the server
379 379
380 380 The command is expected to return a simple string.
381 381
382 382 returns the server reply as a string."""
383 383 raise NotImplementedError()
384 384
385 385 def _callstream(self, cmd, **args):
386 386 """execute <cmd> on the server
387 387
388 388 The command is expected to return a stream.
389 389
390 390 returns the server reply as a file like object."""
391 391 raise NotImplementedError()
392 392
393 393 def _callcompressable(self, cmd, **args):
394 394 """execute <cmd> on the server
395 395
396 396 The command is expected to return a stream.
397 397
398 398 The stream may have been compressed in some implementations. This
399 399 function takes care of the decompression. This is the only difference
400 400 with _callstream.
401 401
402 402 returns the server reply as a file like object.
403 403 """
404 404 raise NotImplementedError()
405 405
406 406 def _callpush(self, cmd, fp, **args):
407 407 """execute a <cmd> on server
408 408
409 409 The command is expected to be related to a push. Push has a special
410 410 return method.
411 411
412 412 returns the server reply as a (ret, output) tuple. ret is either
413 413 empty (error) or a stringified int.
414 414 """
415 415 raise NotImplementedError()
416 416
417 417 def _calltwowaystream(self, cmd, fp, **args):
418 418 """execute <cmd> on server
419 419
420 420 The command will send a stream to the server and get a stream in reply.
421 421 """
422 422 raise NotImplementedError()
423 423
424 424 def _abort(self, exception):
425 425 """clearly abort the wire protocol connection and raise the exception
426 426 """
427 427 raise NotImplementedError()
428 428
429 429 # server side
430 430
431 431 # wire protocol command can either return a string or one of these classes.
432 432 class streamres(object):
433 433 """wireproto reply: binary stream
434 434
435 435 The call was successful and the result is a stream.
436 436 Iterate on the `self.gen` attribute to retrieve chunks.
437 437 """
438 438 def __init__(self, gen):
439 439 self.gen = gen
440 440
441 441 class pushres(object):
442 442 """wireproto reply: success with simple integer return
443 443
444 444 The call was successful and returned an integer contained in `self.res`.
445 445 """
446 446 def __init__(self, res):
447 447 self.res = res
448 448
449 449 class pusherr(object):
450 450 """wireproto reply: failure
451 451
452 452 The call failed. The `self.res` attribute contains the error message.
453 453 """
454 454 def __init__(self, res):
455 455 self.res = res
456 456
457 457 class ooberror(object):
458 458 """wireproto reply: failure of a batch of operation
459 459
460 460 Something failed during a batch call. The error message is stored in
461 461 `self.message`.
462 462 """
463 463 def __init__(self, message):
464 464 self.message = message
465 465
466 466 def dispatch(repo, proto, command):
467 467 repo = repo.filtered("served")
468 468 func, spec = commands[command]
469 469 args = proto.getargs(spec)
470 470 return func(repo, proto, *args)
471 471
472 472 def options(cmd, keys, others):
473 473 opts = {}
474 474 for k in keys:
475 475 if k in others:
476 476 opts[k] = others[k]
477 477 del others[k]
478 478 if others:
479 479 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
480 480 % (cmd, ",".join(others)))
481 481 return opts
482 482
483 483 # list of commands
484 484 commands = {}
485 485
486 486 def wireprotocommand(name, args=''):
487 487 """decorator for wire protocol command"""
488 488 def register(func):
489 489 commands[name] = (func, args)
490 490 return func
491 491 return register
492 492
493 493 @wireprotocommand('batch', 'cmds *')
494 494 def batch(repo, proto, cmds, others):
495 495 repo = repo.filtered("served")
496 496 res = []
497 497 for pair in cmds.split(';'):
498 498 op, args = pair.split(' ', 1)
499 499 vals = {}
500 500 for a in args.split(','):
501 501 if a:
502 502 n, v = a.split('=')
503 503 vals[n] = unescapearg(v)
504 504 func, spec = commands[op]
505 505 if spec:
506 506 keys = spec.split()
507 507 data = {}
508 508 for k in keys:
509 509 if k == '*':
510 510 star = {}
511 511 for key in vals.keys():
512 512 if key not in keys:
513 513 star[key] = vals[key]
514 514 data['*'] = star
515 515 else:
516 516 data[k] = vals[k]
517 517 result = func(repo, proto, *[data[k] for k in keys])
518 518 else:
519 519 result = func(repo, proto)
520 520 if isinstance(result, ooberror):
521 521 return result
522 522 res.append(escapearg(result))
523 523 return ';'.join(res)
524 524
525 525 @wireprotocommand('between', 'pairs')
526 526 def between(repo, proto, pairs):
527 527 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
528 528 r = []
529 529 for b in repo.between(pairs):
530 530 r.append(encodelist(b) + "\n")
531 531 return "".join(r)
532 532
533 533 @wireprotocommand('branchmap')
534 534 def branchmap(repo, proto):
535 535 branchmap = repo.branchmap()
536 536 heads = []
537 537 for branch, nodes in branchmap.iteritems():
538 538 branchname = urllib.quote(encoding.fromlocal(branch))
539 539 branchnodes = encodelist(nodes)
540 540 heads.append('%s %s' % (branchname, branchnodes))
541 541 return '\n'.join(heads)
542 542
543 543 @wireprotocommand('branches', 'nodes')
544 544 def branches(repo, proto, nodes):
545 545 nodes = decodelist(nodes)
546 546 r = []
547 547 for b in repo.branches(nodes):
548 548 r.append(encodelist(b) + "\n")
549 549 return "".join(r)
550 550
551 551
552 552 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
553 553 'known', 'getbundle', 'unbundlehash', 'batch']
554 554
555 555 def _capabilities(repo, proto):
556 556 """return a list of capabilities for a repo
557 557
558 558 This function exists to allow extensions to easily wrap capabilities
559 559 computation
560 560
561 561 - returns a lists: easy to alter
562 562 - change done here will be propagated to both `capabilities` and `hello`
563 563 command without any other action needed.
564 564 """
565 565 # copy to prevent modification of the global list
566 566 caps = list(wireprotocaps)
567 567 if streamclone.allowservergeneration(repo.ui):
568 568 if repo.ui.configbool('server', 'preferuncompressed', False):
569 569 caps.append('stream-preferred')
570 570 requiredformats = repo.requirements & repo.supportedformats
571 571 # if our local revlogs are just revlogv1, add 'stream' cap
572 572 if not requiredformats - set(('revlogv1',)):
573 573 caps.append('stream')
574 574 # otherwise, add 'streamreqs' detailing our local revlog format
575 575 else:
576 576 caps.append('streamreqs=%s' % ','.join(requiredformats))
577 577 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
578 578 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
579 579 caps.append('bundle2=' + urllib.quote(capsblob))
580 580 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
581 581 caps.append(
582 582 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
583 583 return caps
584 584
585 585 # If you are writing an extension and consider wrapping this function. Wrap
586 586 # `_capabilities` instead.
587 587 @wireprotocommand('capabilities')
588 588 def capabilities(repo, proto):
589 589 return ' '.join(_capabilities(repo, proto))
590 590
591 591 @wireprotocommand('changegroup', 'roots')
592 592 def changegroup(repo, proto, roots):
593 593 nodes = decodelist(roots)
594 594 cg = changegroupmod.changegroup(repo, nodes, 'serve')
595 595 return streamres(proto.groupchunks(cg))
596 596
597 597 @wireprotocommand('changegroupsubset', 'bases heads')
598 598 def changegroupsubset(repo, proto, bases, heads):
599 599 bases = decodelist(bases)
600 600 heads = decodelist(heads)
601 601 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
602 602 return streamres(proto.groupchunks(cg))
603 603
604 604 @wireprotocommand('debugwireargs', 'one two *')
605 605 def debugwireargs(repo, proto, one, two, others):
606 606 # only accept optional args from the known set
607 607 opts = options('debugwireargs', ['three', 'four'], others)
608 608 return repo.debugwireargs(one, two, **opts)
609 609
610 610 # List of options accepted by getbundle.
611 611 #
612 612 # Meant to be extended by extensions. It is the extension's responsibility to
613 613 # ensure such options are properly processed in exchange.getbundle.
614 614 gboptslist = ['heads', 'common', 'bundlecaps']
615 615
616 616 @wireprotocommand('getbundle', '*')
617 617 def getbundle(repo, proto, others):
618 618 opts = options('getbundle', gboptsmap.keys(), others)
619 619 for k, v in opts.iteritems():
620 620 keytype = gboptsmap[k]
621 621 if keytype == 'nodes':
622 622 opts[k] = decodelist(v)
623 623 elif keytype == 'csv':
624 624 opts[k] = list(v.split(','))
625 625 elif keytype == 'scsv':
626 626 opts[k] = set(v.split(','))
627 627 elif keytype == 'boolean':
628 628 opts[k] = bool(v)
629 629 elif keytype != 'plain':
630 630 raise KeyError('unknown getbundle option type %s'
631 631 % keytype)
632 632 cg = exchange.getbundle(repo, 'serve', **opts)
633 633 return streamres(proto.groupchunks(cg))
634 634
635 635 @wireprotocommand('heads')
636 636 def heads(repo, proto):
637 637 h = repo.heads()
638 638 return encodelist(h) + "\n"
639 639
640 640 @wireprotocommand('hello')
641 641 def hello(repo, proto):
642 642 '''the hello command returns a set of lines describing various
643 643 interesting things about the server, in an RFC822-like format.
644 644 Currently the only one defined is "capabilities", which
645 645 consists of a line in the form:
646 646
647 647 capabilities: space separated list of tokens
648 648 '''
649 649 return "capabilities: %s\n" % (capabilities(repo, proto))
650 650
651 651 @wireprotocommand('listkeys', 'namespace')
652 652 def listkeys(repo, proto, namespace):
653 653 d = repo.listkeys(encoding.tolocal(namespace)).items()
654 654 return pushkeymod.encodekeys(d)
655 655
656 656 @wireprotocommand('lookup', 'key')
657 657 def lookup(repo, proto, key):
658 658 try:
659 659 k = encoding.tolocal(key)
660 660 c = repo[k]
661 661 r = c.hex()
662 662 success = 1
663 663 except Exception as inst:
664 664 r = str(inst)
665 665 success = 0
666 666 return "%s %s\n" % (success, r)
667 667
668 668 @wireprotocommand('known', 'nodes *')
669 669 def known(repo, proto, nodes, others):
670 670 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
671 671
672 672 @wireprotocommand('pushkey', 'namespace key old new')
673 673 def pushkey(repo, proto, namespace, key, old, new):
674 674 # compatibility with pre-1.8 clients which were accidentally
675 675 # sending raw binary nodes rather than utf-8-encoded hex
676 676 if len(new) == 20 and new.encode('string-escape') != new:
677 677 # looks like it could be a binary node
678 678 try:
679 679 new.decode('utf-8')
680 680 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
681 681 except UnicodeDecodeError:
682 682 pass # binary, leave unmodified
683 683 else:
684 684 new = encoding.tolocal(new) # normal path
685 685
686 686 if util.safehasattr(proto, 'restore'):
687 687
688 688 proto.redirect()
689 689
690 690 try:
691 691 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
692 692 encoding.tolocal(old), new) or False
693 693 except util.Abort:
694 694 r = False
695 695
696 696 output = proto.restore()
697 697
698 698 return '%s\n%s' % (int(r), output)
699 699
700 700 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
701 701 encoding.tolocal(old), new)
702 702 return '%s\n' % int(r)
703 703
704 704 @wireprotocommand('stream_out')
705 705 def stream(repo, proto):
706 706 '''If the server supports streaming clone, it advertises the "stream"
707 707 capability with a value representing the version and flags of the repo
708 708 it is serving. Client checks to see if it understands the format.
709 709 '''
710 710 if not streamclone.allowservergeneration(repo.ui):
711 711 return '1\n'
712 712
713 713 def getstream(it):
714 714 yield '0\n'
715 715 for chunk in it:
716 716 yield chunk
717 717
718 718 try:
719 719 # LockError may be raised before the first result is yielded. Don't
720 720 # emit output until we're sure we got the lock successfully.
721 it = streamclone.generatev1(repo)
721 it = streamclone.generatev1wireproto(repo)
722 722 return streamres(getstream(it))
723 723 except error.LockError:
724 724 return '2\n'
725 725
726 726 @wireprotocommand('unbundle', 'heads')
727 727 def unbundle(repo, proto, heads):
728 728 their_heads = decodelist(heads)
729 729
730 730 try:
731 731 proto.redirect()
732 732
733 733 exchange.check_heads(repo, their_heads, 'preparing changes')
734 734
735 735 # write bundle data to temporary file because it can be big
736 736 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
737 737 fp = os.fdopen(fd, 'wb+')
738 738 r = 0
739 739 try:
740 740 proto.getfile(fp)
741 741 fp.seek(0)
742 742 gen = exchange.readbundle(repo.ui, fp, None)
743 743 r = exchange.unbundle(repo, gen, their_heads, 'serve',
744 744 proto._client())
745 745 if util.safehasattr(r, 'addpart'):
746 746 # The return looks streamable, we are in the bundle2 case and
747 747 # should return a stream.
748 748 return streamres(r.getchunks())
749 749 return pushres(r)
750 750
751 751 finally:
752 752 fp.close()
753 753 os.unlink(tempname)
754 754
755 755 except (error.BundleValueError, util.Abort, error.PushRaced) as exc:
756 756 # handle non-bundle2 case first
757 757 if not getattr(exc, 'duringunbundle2', False):
758 758 try:
759 759 raise
760 760 except util.Abort:
761 761 # The old code we moved used sys.stderr directly.
762 762 # We did not change it to minimise code change.
763 763 # This need to be moved to something proper.
764 764 # Feel free to do it.
765 765 sys.stderr.write("abort: %s\n" % exc)
766 766 return pushres(0)
767 767 except error.PushRaced:
768 768 return pusherr(str(exc))
769 769
770 770 bundler = bundle2.bundle20(repo.ui)
771 771 for out in getattr(exc, '_bundle2salvagedoutput', ()):
772 772 bundler.addpart(out)
773 773 try:
774 774 try:
775 775 raise
776 776 except error.PushkeyFailed as exc:
777 777 # check client caps
778 778 remotecaps = getattr(exc, '_replycaps', None)
779 779 if (remotecaps is not None
780 780 and 'pushkey' not in remotecaps.get('error', ())):
781 781 # no support remote side, fallback to Abort handler.
782 782 raise
783 783 part = bundler.newpart('error:pushkey')
784 784 part.addparam('in-reply-to', exc.partid)
785 785 if exc.namespace is not None:
786 786 part.addparam('namespace', exc.namespace, mandatory=False)
787 787 if exc.key is not None:
788 788 part.addparam('key', exc.key, mandatory=False)
789 789 if exc.new is not None:
790 790 part.addparam('new', exc.new, mandatory=False)
791 791 if exc.old is not None:
792 792 part.addparam('old', exc.old, mandatory=False)
793 793 if exc.ret is not None:
794 794 part.addparam('ret', exc.ret, mandatory=False)
795 795 except error.BundleValueError as exc:
796 796 errpart = bundler.newpart('error:unsupportedcontent')
797 797 if exc.parttype is not None:
798 798 errpart.addparam('parttype', exc.parttype)
799 799 if exc.params:
800 800 errpart.addparam('params', '\0'.join(exc.params))
801 801 except util.Abort as exc:
802 802 manargs = [('message', str(exc))]
803 803 advargs = []
804 804 if exc.hint is not None:
805 805 advargs.append(('hint', exc.hint))
806 806 bundler.addpart(bundle2.bundlepart('error:abort',
807 807 manargs, advargs))
808 808 except error.PushRaced as exc:
809 809 bundler.newpart('error:pushraced', [('message', str(exc))])
810 810 return streamres(bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now