##// END OF EJS Templates
bundle2: refactor error bundle creation for the wireprotocol...
Pierre-Yves David -
r24796:61ff209f default
parent child Browse files
Show More
@@ -1,875 +1,878 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import urllib, tempfile, os, sys
8 import urllib, tempfile, os, sys
9 from i18n import _
9 from i18n import _
10 from node import bin, hex
10 from node import bin, hex
11 import changegroup as changegroupmod, bundle2, pushkey as pushkeymod
11 import changegroup as changegroupmod, bundle2, pushkey as pushkeymod
12 import peer, error, encoding, util, store, exchange
12 import peer, error, encoding, util, store, exchange
13
13
14
14
15 class abstractserverproto(object):
15 class abstractserverproto(object):
16 """abstract class that summarizes the protocol API
16 """abstract class that summarizes the protocol API
17
17
18 Used as reference and documentation.
18 Used as reference and documentation.
19 """
19 """
20
20
21 def getargs(self, args):
21 def getargs(self, args):
22 """return the value for arguments in <args>
22 """return the value for arguments in <args>
23
23
24 returns a list of values (same order as <args>)"""
24 returns a list of values (same order as <args>)"""
25 raise NotImplementedError()
25 raise NotImplementedError()
26
26
27 def getfile(self, fp):
27 def getfile(self, fp):
28 """write the whole content of a file into a file like object
28 """write the whole content of a file into a file like object
29
29
30 The file is in the form::
30 The file is in the form::
31
31
32 (<chunk-size>\n<chunk>)+0\n
32 (<chunk-size>\n<chunk>)+0\n
33
33
34 chunk size is the ascii version of the int.
34 chunk size is the ascii version of the int.
35 """
35 """
36 raise NotImplementedError()
36 raise NotImplementedError()
37
37
38 def redirect(self):
38 def redirect(self):
39 """may setup interception for stdout and stderr
39 """may setup interception for stdout and stderr
40
40
41 See also the `restore` method."""
41 See also the `restore` method."""
42 raise NotImplementedError()
42 raise NotImplementedError()
43
43
44 # If the `redirect` function does install interception, the `restore`
44 # If the `redirect` function does install interception, the `restore`
45 # function MUST be defined. If interception is not used, this function
45 # function MUST be defined. If interception is not used, this function
46 # MUST NOT be defined.
46 # MUST NOT be defined.
47 #
47 #
48 # left commented here on purpose
48 # left commented here on purpose
49 #
49 #
50 #def restore(self):
50 #def restore(self):
51 # """reinstall previous stdout and stderr and return intercepted stdout
51 # """reinstall previous stdout and stderr and return intercepted stdout
52 # """
52 # """
53 # raise NotImplementedError()
53 # raise NotImplementedError()
54
54
55 def groupchunks(self, cg):
55 def groupchunks(self, cg):
56 """return 4096 chunks from a changegroup object
56 """return 4096 chunks from a changegroup object
57
57
58 Some protocols may have compressed the contents."""
58 Some protocols may have compressed the contents."""
59 raise NotImplementedError()
59 raise NotImplementedError()
60
60
61 # abstract batching support
61 # abstract batching support
62
62
63 class future(object):
63 class future(object):
64 '''placeholder for a value to be set later'''
64 '''placeholder for a value to be set later'''
65 def set(self, value):
65 def set(self, value):
66 if util.safehasattr(self, 'value'):
66 if util.safehasattr(self, 'value'):
67 raise error.RepoError("future is already set")
67 raise error.RepoError("future is already set")
68 self.value = value
68 self.value = value
69
69
70 class batcher(object):
70 class batcher(object):
71 '''base class for batches of commands submittable in a single request
71 '''base class for batches of commands submittable in a single request
72
72
73 All methods invoked on instances of this class are simply queued and
73 All methods invoked on instances of this class are simply queued and
74 return a a future for the result. Once you call submit(), all the queued
74 return a a future for the result. Once you call submit(), all the queued
75 calls are performed and the results set in their respective futures.
75 calls are performed and the results set in their respective futures.
76 '''
76 '''
77 def __init__(self):
77 def __init__(self):
78 self.calls = []
78 self.calls = []
79 def __getattr__(self, name):
79 def __getattr__(self, name):
80 def call(*args, **opts):
80 def call(*args, **opts):
81 resref = future()
81 resref = future()
82 self.calls.append((name, args, opts, resref,))
82 self.calls.append((name, args, opts, resref,))
83 return resref
83 return resref
84 return call
84 return call
85 def submit(self):
85 def submit(self):
86 pass
86 pass
87
87
88 class localbatch(batcher):
88 class localbatch(batcher):
89 '''performs the queued calls directly'''
89 '''performs the queued calls directly'''
90 def __init__(self, local):
90 def __init__(self, local):
91 batcher.__init__(self)
91 batcher.__init__(self)
92 self.local = local
92 self.local = local
93 def submit(self):
93 def submit(self):
94 for name, args, opts, resref in self.calls:
94 for name, args, opts, resref in self.calls:
95 resref.set(getattr(self.local, name)(*args, **opts))
95 resref.set(getattr(self.local, name)(*args, **opts))
96
96
97 class remotebatch(batcher):
97 class remotebatch(batcher):
98 '''batches the queued calls; uses as few roundtrips as possible'''
98 '''batches the queued calls; uses as few roundtrips as possible'''
99 def __init__(self, remote):
99 def __init__(self, remote):
100 '''remote must support _submitbatch(encbatch) and
100 '''remote must support _submitbatch(encbatch) and
101 _submitone(op, encargs)'''
101 _submitone(op, encargs)'''
102 batcher.__init__(self)
102 batcher.__init__(self)
103 self.remote = remote
103 self.remote = remote
104 def submit(self):
104 def submit(self):
105 req, rsp = [], []
105 req, rsp = [], []
106 for name, args, opts, resref in self.calls:
106 for name, args, opts, resref in self.calls:
107 mtd = getattr(self.remote, name)
107 mtd = getattr(self.remote, name)
108 batchablefn = getattr(mtd, 'batchable', None)
108 batchablefn = getattr(mtd, 'batchable', None)
109 if batchablefn is not None:
109 if batchablefn is not None:
110 batchable = batchablefn(mtd.im_self, *args, **opts)
110 batchable = batchablefn(mtd.im_self, *args, **opts)
111 encargsorres, encresref = batchable.next()
111 encargsorres, encresref = batchable.next()
112 if encresref:
112 if encresref:
113 req.append((name, encargsorres,))
113 req.append((name, encargsorres,))
114 rsp.append((batchable, encresref, resref,))
114 rsp.append((batchable, encresref, resref,))
115 else:
115 else:
116 resref.set(encargsorres)
116 resref.set(encargsorres)
117 else:
117 else:
118 if req:
118 if req:
119 self._submitreq(req, rsp)
119 self._submitreq(req, rsp)
120 req, rsp = [], []
120 req, rsp = [], []
121 resref.set(mtd(*args, **opts))
121 resref.set(mtd(*args, **opts))
122 if req:
122 if req:
123 self._submitreq(req, rsp)
123 self._submitreq(req, rsp)
124 def _submitreq(self, req, rsp):
124 def _submitreq(self, req, rsp):
125 encresults = self.remote._submitbatch(req)
125 encresults = self.remote._submitbatch(req)
126 for encres, r in zip(encresults, rsp):
126 for encres, r in zip(encresults, rsp):
127 batchable, encresref, resref = r
127 batchable, encresref, resref = r
128 encresref.set(encres)
128 encresref.set(encres)
129 resref.set(batchable.next())
129 resref.set(batchable.next())
130
130
131 def batchable(f):
131 def batchable(f):
132 '''annotation for batchable methods
132 '''annotation for batchable methods
133
133
134 Such methods must implement a coroutine as follows:
134 Such methods must implement a coroutine as follows:
135
135
136 @batchable
136 @batchable
137 def sample(self, one, two=None):
137 def sample(self, one, two=None):
138 # Handle locally computable results first:
138 # Handle locally computable results first:
139 if not one:
139 if not one:
140 yield "a local result", None
140 yield "a local result", None
141 # Build list of encoded arguments suitable for your wire protocol:
141 # Build list of encoded arguments suitable for your wire protocol:
142 encargs = [('one', encode(one),), ('two', encode(two),)]
142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 # Create future for injection of encoded result:
143 # Create future for injection of encoded result:
144 encresref = future()
144 encresref = future()
145 # Return encoded arguments and future:
145 # Return encoded arguments and future:
146 yield encargs, encresref
146 yield encargs, encresref
147 # Assuming the future to be filled with the result from the batched
147 # Assuming the future to be filled with the result from the batched
148 # request now. Decode it:
148 # request now. Decode it:
149 yield decode(encresref.value)
149 yield decode(encresref.value)
150
150
151 The decorator returns a function which wraps this coroutine as a plain
151 The decorator returns a function which wraps this coroutine as a plain
152 method, but adds the original method as an attribute called "batchable",
152 method, but adds the original method as an attribute called "batchable",
153 which is used by remotebatch to split the call into separate encoding and
153 which is used by remotebatch to split the call into separate encoding and
154 decoding phases.
154 decoding phases.
155 '''
155 '''
156 def plain(*args, **opts):
156 def plain(*args, **opts):
157 batchable = f(*args, **opts)
157 batchable = f(*args, **opts)
158 encargsorres, encresref = batchable.next()
158 encargsorres, encresref = batchable.next()
159 if not encresref:
159 if not encresref:
160 return encargsorres # a local result in this case
160 return encargsorres # a local result in this case
161 self = args[0]
161 self = args[0]
162 encresref.set(self._submitone(f.func_name, encargsorres))
162 encresref.set(self._submitone(f.func_name, encargsorres))
163 return batchable.next()
163 return batchable.next()
164 setattr(plain, 'batchable', f)
164 setattr(plain, 'batchable', f)
165 return plain
165 return plain
166
166
167 # list of nodes encoding / decoding
167 # list of nodes encoding / decoding
168
168
169 def decodelist(l, sep=' '):
169 def decodelist(l, sep=' '):
170 if l:
170 if l:
171 return map(bin, l.split(sep))
171 return map(bin, l.split(sep))
172 return []
172 return []
173
173
174 def encodelist(l, sep=' '):
174 def encodelist(l, sep=' '):
175 try:
175 try:
176 return sep.join(map(hex, l))
176 return sep.join(map(hex, l))
177 except TypeError:
177 except TypeError:
178 print l
178 print l
179 raise
179 raise
180
180
181 # batched call argument encoding
181 # batched call argument encoding
182
182
183 def escapearg(plain):
183 def escapearg(plain):
184 return (plain
184 return (plain
185 .replace(':', '::')
185 .replace(':', '::')
186 .replace(',', ':,')
186 .replace(',', ':,')
187 .replace(';', ':;')
187 .replace(';', ':;')
188 .replace('=', ':='))
188 .replace('=', ':='))
189
189
190 def unescapearg(escaped):
190 def unescapearg(escaped):
191 return (escaped
191 return (escaped
192 .replace(':=', '=')
192 .replace(':=', '=')
193 .replace(':;', ';')
193 .replace(':;', ';')
194 .replace(':,', ',')
194 .replace(':,', ',')
195 .replace('::', ':'))
195 .replace('::', ':'))
196
196
197 # mapping of options accepted by getbundle and their types
197 # mapping of options accepted by getbundle and their types
198 #
198 #
199 # Meant to be extended by extensions. It is extensions responsibility to ensure
199 # Meant to be extended by extensions. It is extensions responsibility to ensure
200 # such options are properly processed in exchange.getbundle.
200 # such options are properly processed in exchange.getbundle.
201 #
201 #
202 # supported types are:
202 # supported types are:
203 #
203 #
204 # :nodes: list of binary nodes
204 # :nodes: list of binary nodes
205 # :csv: list of comma-separated values
205 # :csv: list of comma-separated values
206 # :plain: string with no transformation needed.
206 # :plain: string with no transformation needed.
207 gboptsmap = {'heads': 'nodes',
207 gboptsmap = {'heads': 'nodes',
208 'common': 'nodes',
208 'common': 'nodes',
209 'obsmarkers': 'boolean',
209 'obsmarkers': 'boolean',
210 'bundlecaps': 'csv',
210 'bundlecaps': 'csv',
211 'listkeys': 'csv',
211 'listkeys': 'csv',
212 'cg': 'boolean'}
212 'cg': 'boolean'}
213
213
214 # client side
214 # client side
215
215
216 class wirepeer(peer.peerrepository):
216 class wirepeer(peer.peerrepository):
217
217
218 def batch(self):
218 def batch(self):
219 return remotebatch(self)
219 return remotebatch(self)
220 def _submitbatch(self, req):
220 def _submitbatch(self, req):
221 cmds = []
221 cmds = []
222 for op, argsdict in req:
222 for op, argsdict in req:
223 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
223 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
224 cmds.append('%s %s' % (op, args))
224 cmds.append('%s %s' % (op, args))
225 rsp = self._call("batch", cmds=';'.join(cmds))
225 rsp = self._call("batch", cmds=';'.join(cmds))
226 return rsp.split(';')
226 return rsp.split(';')
227 def _submitone(self, op, args):
227 def _submitone(self, op, args):
228 return self._call(op, **args)
228 return self._call(op, **args)
229
229
230 @batchable
230 @batchable
231 def lookup(self, key):
231 def lookup(self, key):
232 self.requirecap('lookup', _('look up remote revision'))
232 self.requirecap('lookup', _('look up remote revision'))
233 f = future()
233 f = future()
234 yield {'key': encoding.fromlocal(key)}, f
234 yield {'key': encoding.fromlocal(key)}, f
235 d = f.value
235 d = f.value
236 success, data = d[:-1].split(" ", 1)
236 success, data = d[:-1].split(" ", 1)
237 if int(success):
237 if int(success):
238 yield bin(data)
238 yield bin(data)
239 self._abort(error.RepoError(data))
239 self._abort(error.RepoError(data))
240
240
241 @batchable
241 @batchable
242 def heads(self):
242 def heads(self):
243 f = future()
243 f = future()
244 yield {}, f
244 yield {}, f
245 d = f.value
245 d = f.value
246 try:
246 try:
247 yield decodelist(d[:-1])
247 yield decodelist(d[:-1])
248 except ValueError:
248 except ValueError:
249 self._abort(error.ResponseError(_("unexpected response:"), d))
249 self._abort(error.ResponseError(_("unexpected response:"), d))
250
250
251 @batchable
251 @batchable
252 def known(self, nodes):
252 def known(self, nodes):
253 f = future()
253 f = future()
254 yield {'nodes': encodelist(nodes)}, f
254 yield {'nodes': encodelist(nodes)}, f
255 d = f.value
255 d = f.value
256 try:
256 try:
257 yield [bool(int(b)) for b in d]
257 yield [bool(int(b)) for b in d]
258 except ValueError:
258 except ValueError:
259 self._abort(error.ResponseError(_("unexpected response:"), d))
259 self._abort(error.ResponseError(_("unexpected response:"), d))
260
260
261 @batchable
261 @batchable
262 def branchmap(self):
262 def branchmap(self):
263 f = future()
263 f = future()
264 yield {}, f
264 yield {}, f
265 d = f.value
265 d = f.value
266 try:
266 try:
267 branchmap = {}
267 branchmap = {}
268 for branchpart in d.splitlines():
268 for branchpart in d.splitlines():
269 branchname, branchheads = branchpart.split(' ', 1)
269 branchname, branchheads = branchpart.split(' ', 1)
270 branchname = encoding.tolocal(urllib.unquote(branchname))
270 branchname = encoding.tolocal(urllib.unquote(branchname))
271 branchheads = decodelist(branchheads)
271 branchheads = decodelist(branchheads)
272 branchmap[branchname] = branchheads
272 branchmap[branchname] = branchheads
273 yield branchmap
273 yield branchmap
274 except TypeError:
274 except TypeError:
275 self._abort(error.ResponseError(_("unexpected response:"), d))
275 self._abort(error.ResponseError(_("unexpected response:"), d))
276
276
277 def branches(self, nodes):
277 def branches(self, nodes):
278 n = encodelist(nodes)
278 n = encodelist(nodes)
279 d = self._call("branches", nodes=n)
279 d = self._call("branches", nodes=n)
280 try:
280 try:
281 br = [tuple(decodelist(b)) for b in d.splitlines()]
281 br = [tuple(decodelist(b)) for b in d.splitlines()]
282 return br
282 return br
283 except ValueError:
283 except ValueError:
284 self._abort(error.ResponseError(_("unexpected response:"), d))
284 self._abort(error.ResponseError(_("unexpected response:"), d))
285
285
286 def between(self, pairs):
286 def between(self, pairs):
287 batch = 8 # avoid giant requests
287 batch = 8 # avoid giant requests
288 r = []
288 r = []
289 for i in xrange(0, len(pairs), batch):
289 for i in xrange(0, len(pairs), batch):
290 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
290 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
291 d = self._call("between", pairs=n)
291 d = self._call("between", pairs=n)
292 try:
292 try:
293 r.extend(l and decodelist(l) or [] for l in d.splitlines())
293 r.extend(l and decodelist(l) or [] for l in d.splitlines())
294 except ValueError:
294 except ValueError:
295 self._abort(error.ResponseError(_("unexpected response:"), d))
295 self._abort(error.ResponseError(_("unexpected response:"), d))
296 return r
296 return r
297
297
298 @batchable
298 @batchable
299 def pushkey(self, namespace, key, old, new):
299 def pushkey(self, namespace, key, old, new):
300 if not self.capable('pushkey'):
300 if not self.capable('pushkey'):
301 yield False, None
301 yield False, None
302 f = future()
302 f = future()
303 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
303 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
304 yield {'namespace': encoding.fromlocal(namespace),
304 yield {'namespace': encoding.fromlocal(namespace),
305 'key': encoding.fromlocal(key),
305 'key': encoding.fromlocal(key),
306 'old': encoding.fromlocal(old),
306 'old': encoding.fromlocal(old),
307 'new': encoding.fromlocal(new)}, f
307 'new': encoding.fromlocal(new)}, f
308 d = f.value
308 d = f.value
309 d, output = d.split('\n', 1)
309 d, output = d.split('\n', 1)
310 try:
310 try:
311 d = bool(int(d))
311 d = bool(int(d))
312 except ValueError:
312 except ValueError:
313 raise error.ResponseError(
313 raise error.ResponseError(
314 _('push failed (unexpected response):'), d)
314 _('push failed (unexpected response):'), d)
315 for l in output.splitlines(True):
315 for l in output.splitlines(True):
316 self.ui.status(_('remote: '), l)
316 self.ui.status(_('remote: '), l)
317 yield d
317 yield d
318
318
319 @batchable
319 @batchable
320 def listkeys(self, namespace):
320 def listkeys(self, namespace):
321 if not self.capable('pushkey'):
321 if not self.capable('pushkey'):
322 yield {}, None
322 yield {}, None
323 f = future()
323 f = future()
324 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
324 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
325 yield {'namespace': encoding.fromlocal(namespace)}, f
325 yield {'namespace': encoding.fromlocal(namespace)}, f
326 d = f.value
326 d = f.value
327 yield pushkeymod.decodekeys(d)
327 yield pushkeymod.decodekeys(d)
328
328
329 def stream_out(self):
329 def stream_out(self):
330 return self._callstream('stream_out')
330 return self._callstream('stream_out')
331
331
332 def changegroup(self, nodes, kind):
332 def changegroup(self, nodes, kind):
333 n = encodelist(nodes)
333 n = encodelist(nodes)
334 f = self._callcompressable("changegroup", roots=n)
334 f = self._callcompressable("changegroup", roots=n)
335 return changegroupmod.cg1unpacker(f, 'UN')
335 return changegroupmod.cg1unpacker(f, 'UN')
336
336
337 def changegroupsubset(self, bases, heads, kind):
337 def changegroupsubset(self, bases, heads, kind):
338 self.requirecap('changegroupsubset', _('look up remote changes'))
338 self.requirecap('changegroupsubset', _('look up remote changes'))
339 bases = encodelist(bases)
339 bases = encodelist(bases)
340 heads = encodelist(heads)
340 heads = encodelist(heads)
341 f = self._callcompressable("changegroupsubset",
341 f = self._callcompressable("changegroupsubset",
342 bases=bases, heads=heads)
342 bases=bases, heads=heads)
343 return changegroupmod.cg1unpacker(f, 'UN')
343 return changegroupmod.cg1unpacker(f, 'UN')
344
344
345 def getbundle(self, source, **kwargs):
345 def getbundle(self, source, **kwargs):
346 self.requirecap('getbundle', _('look up remote changes'))
346 self.requirecap('getbundle', _('look up remote changes'))
347 opts = {}
347 opts = {}
348 for key, value in kwargs.iteritems():
348 for key, value in kwargs.iteritems():
349 if value is None:
349 if value is None:
350 continue
350 continue
351 keytype = gboptsmap.get(key)
351 keytype = gboptsmap.get(key)
352 if keytype is None:
352 if keytype is None:
353 assert False, 'unexpected'
353 assert False, 'unexpected'
354 elif keytype == 'nodes':
354 elif keytype == 'nodes':
355 value = encodelist(value)
355 value = encodelist(value)
356 elif keytype == 'csv':
356 elif keytype == 'csv':
357 value = ','.join(value)
357 value = ','.join(value)
358 elif keytype == 'boolean':
358 elif keytype == 'boolean':
359 value = '%i' % bool(value)
359 value = '%i' % bool(value)
360 elif keytype != 'plain':
360 elif keytype != 'plain':
361 raise KeyError('unknown getbundle option type %s'
361 raise KeyError('unknown getbundle option type %s'
362 % keytype)
362 % keytype)
363 opts[key] = value
363 opts[key] = value
364 f = self._callcompressable("getbundle", **opts)
364 f = self._callcompressable("getbundle", **opts)
365 bundlecaps = kwargs.get('bundlecaps')
365 bundlecaps = kwargs.get('bundlecaps')
366 if bundlecaps is None:
366 if bundlecaps is None:
367 bundlecaps = () # kwargs could have it to None
367 bundlecaps = () # kwargs could have it to None
368 if util.any((cap.startswith('HG2') for cap in bundlecaps)):
368 if util.any((cap.startswith('HG2') for cap in bundlecaps)):
369 return bundle2.getunbundler(self.ui, f)
369 return bundle2.getunbundler(self.ui, f)
370 else:
370 else:
371 return changegroupmod.cg1unpacker(f, 'UN')
371 return changegroupmod.cg1unpacker(f, 'UN')
372
372
373 def unbundle(self, cg, heads, source):
373 def unbundle(self, cg, heads, source):
374 '''Send cg (a readable file-like object representing the
374 '''Send cg (a readable file-like object representing the
375 changegroup to push, typically a chunkbuffer object) to the
375 changegroup to push, typically a chunkbuffer object) to the
376 remote server as a bundle.
376 remote server as a bundle.
377
377
378 When pushing a bundle10 stream, return an integer indicating the
378 When pushing a bundle10 stream, return an integer indicating the
379 result of the push (see localrepository.addchangegroup()).
379 result of the push (see localrepository.addchangegroup()).
380
380
381 When pushing a bundle20 stream, return a bundle20 stream.'''
381 When pushing a bundle20 stream, return a bundle20 stream.'''
382
382
383 if heads != ['force'] and self.capable('unbundlehash'):
383 if heads != ['force'] and self.capable('unbundlehash'):
384 heads = encodelist(['hashed',
384 heads = encodelist(['hashed',
385 util.sha1(''.join(sorted(heads))).digest()])
385 util.sha1(''.join(sorted(heads))).digest()])
386 else:
386 else:
387 heads = encodelist(heads)
387 heads = encodelist(heads)
388
388
389 if util.safehasattr(cg, 'deltaheader'):
389 if util.safehasattr(cg, 'deltaheader'):
390 # this a bundle10, do the old style call sequence
390 # this a bundle10, do the old style call sequence
391 ret, output = self._callpush("unbundle", cg, heads=heads)
391 ret, output = self._callpush("unbundle", cg, heads=heads)
392 if ret == "":
392 if ret == "":
393 raise error.ResponseError(
393 raise error.ResponseError(
394 _('push failed:'), output)
394 _('push failed:'), output)
395 try:
395 try:
396 ret = int(ret)
396 ret = int(ret)
397 except ValueError:
397 except ValueError:
398 raise error.ResponseError(
398 raise error.ResponseError(
399 _('push failed (unexpected response):'), ret)
399 _('push failed (unexpected response):'), ret)
400
400
401 for l in output.splitlines(True):
401 for l in output.splitlines(True):
402 self.ui.status(_('remote: '), l)
402 self.ui.status(_('remote: '), l)
403 else:
403 else:
404 # bundle2 push. Send a stream, fetch a stream.
404 # bundle2 push. Send a stream, fetch a stream.
405 stream = self._calltwowaystream('unbundle', cg, heads=heads)
405 stream = self._calltwowaystream('unbundle', cg, heads=heads)
406 ret = bundle2.getunbundler(self.ui, stream)
406 ret = bundle2.getunbundler(self.ui, stream)
407 return ret
407 return ret
408
408
409 def debugwireargs(self, one, two, three=None, four=None, five=None):
409 def debugwireargs(self, one, two, three=None, four=None, five=None):
410 # don't pass optional arguments left at their default value
410 # don't pass optional arguments left at their default value
411 opts = {}
411 opts = {}
412 if three is not None:
412 if three is not None:
413 opts['three'] = three
413 opts['three'] = three
414 if four is not None:
414 if four is not None:
415 opts['four'] = four
415 opts['four'] = four
416 return self._call('debugwireargs', one=one, two=two, **opts)
416 return self._call('debugwireargs', one=one, two=two, **opts)
417
417
418 def _call(self, cmd, **args):
418 def _call(self, cmd, **args):
419 """execute <cmd> on the server
419 """execute <cmd> on the server
420
420
421 The command is expected to return a simple string.
421 The command is expected to return a simple string.
422
422
423 returns the server reply as a string."""
423 returns the server reply as a string."""
424 raise NotImplementedError()
424 raise NotImplementedError()
425
425
426 def _callstream(self, cmd, **args):
426 def _callstream(self, cmd, **args):
427 """execute <cmd> on the server
427 """execute <cmd> on the server
428
428
429 The command is expected to return a stream.
429 The command is expected to return a stream.
430
430
431 returns the server reply as a file like object."""
431 returns the server reply as a file like object."""
432 raise NotImplementedError()
432 raise NotImplementedError()
433
433
434 def _callcompressable(self, cmd, **args):
434 def _callcompressable(self, cmd, **args):
435 """execute <cmd> on the server
435 """execute <cmd> on the server
436
436
437 The command is expected to return a stream.
437 The command is expected to return a stream.
438
438
439 The stream may have been compressed in some implementations. This
439 The stream may have been compressed in some implementations. This
440 function takes care of the decompression. This is the only difference
440 function takes care of the decompression. This is the only difference
441 with _callstream.
441 with _callstream.
442
442
443 returns the server reply as a file like object.
443 returns the server reply as a file like object.
444 """
444 """
445 raise NotImplementedError()
445 raise NotImplementedError()
446
446
447 def _callpush(self, cmd, fp, **args):
447 def _callpush(self, cmd, fp, **args):
448 """execute a <cmd> on server
448 """execute a <cmd> on server
449
449
450 The command is expected to be related to a push. Push has a special
450 The command is expected to be related to a push. Push has a special
451 return method.
451 return method.
452
452
453 returns the server reply as a (ret, output) tuple. ret is either
453 returns the server reply as a (ret, output) tuple. ret is either
454 empty (error) or a stringified int.
454 empty (error) or a stringified int.
455 """
455 """
456 raise NotImplementedError()
456 raise NotImplementedError()
457
457
458 def _calltwowaystream(self, cmd, fp, **args):
458 def _calltwowaystream(self, cmd, fp, **args):
459 """execute <cmd> on server
459 """execute <cmd> on server
460
460
461 The command will send a stream to the server and get a stream in reply.
461 The command will send a stream to the server and get a stream in reply.
462 """
462 """
463 raise NotImplementedError()
463 raise NotImplementedError()
464
464
465 def _abort(self, exception):
465 def _abort(self, exception):
466 """clearly abort the wire protocol connection and raise the exception
466 """clearly abort the wire protocol connection and raise the exception
467 """
467 """
468 raise NotImplementedError()
468 raise NotImplementedError()
469
469
470 # server side
470 # server side
471
471
472 # wire protocol command can either return a string or one of these classes.
472 # wire protocol command can either return a string or one of these classes.
473 class streamres(object):
473 class streamres(object):
474 """wireproto reply: binary stream
474 """wireproto reply: binary stream
475
475
476 The call was successful and the result is a stream.
476 The call was successful and the result is a stream.
477 Iterate on the `self.gen` attribute to retrieve chunks.
477 Iterate on the `self.gen` attribute to retrieve chunks.
478 """
478 """
479 def __init__(self, gen):
479 def __init__(self, gen):
480 self.gen = gen
480 self.gen = gen
481
481
482 class pushres(object):
482 class pushres(object):
483 """wireproto reply: success with simple integer return
483 """wireproto reply: success with simple integer return
484
484
485 The call was successful and returned an integer contained in `self.res`.
485 The call was successful and returned an integer contained in `self.res`.
486 """
486 """
487 def __init__(self, res):
487 def __init__(self, res):
488 self.res = res
488 self.res = res
489
489
490 class pusherr(object):
490 class pusherr(object):
491 """wireproto reply: failure
491 """wireproto reply: failure
492
492
493 The call failed. The `self.res` attribute contains the error message.
493 The call failed. The `self.res` attribute contains the error message.
494 """
494 """
495 def __init__(self, res):
495 def __init__(self, res):
496 self.res = res
496 self.res = res
497
497
498 class ooberror(object):
498 class ooberror(object):
499 """wireproto reply: failure of a batch of operation
499 """wireproto reply: failure of a batch of operation
500
500
501 Something failed during a batch call. The error message is stored in
501 Something failed during a batch call. The error message is stored in
502 `self.message`.
502 `self.message`.
503 """
503 """
504 def __init__(self, message):
504 def __init__(self, message):
505 self.message = message
505 self.message = message
506
506
507 def dispatch(repo, proto, command):
507 def dispatch(repo, proto, command):
508 repo = repo.filtered("served")
508 repo = repo.filtered("served")
509 func, spec = commands[command]
509 func, spec = commands[command]
510 args = proto.getargs(spec)
510 args = proto.getargs(spec)
511 return func(repo, proto, *args)
511 return func(repo, proto, *args)
512
512
513 def options(cmd, keys, others):
513 def options(cmd, keys, others):
514 opts = {}
514 opts = {}
515 for k in keys:
515 for k in keys:
516 if k in others:
516 if k in others:
517 opts[k] = others[k]
517 opts[k] = others[k]
518 del others[k]
518 del others[k]
519 if others:
519 if others:
520 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
520 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
521 % (cmd, ",".join(others)))
521 % (cmd, ",".join(others)))
522 return opts
522 return opts
523
523
524 # list of commands
524 # list of commands
525 commands = {}
525 commands = {}
526
526
527 def wireprotocommand(name, args=''):
527 def wireprotocommand(name, args=''):
528 """decorator for wire protocol command"""
528 """decorator for wire protocol command"""
529 def register(func):
529 def register(func):
530 commands[name] = (func, args)
530 commands[name] = (func, args)
531 return func
531 return func
532 return register
532 return register
533
533
534 @wireprotocommand('batch', 'cmds *')
534 @wireprotocommand('batch', 'cmds *')
535 def batch(repo, proto, cmds, others):
535 def batch(repo, proto, cmds, others):
536 repo = repo.filtered("served")
536 repo = repo.filtered("served")
537 res = []
537 res = []
538 for pair in cmds.split(';'):
538 for pair in cmds.split(';'):
539 op, args = pair.split(' ', 1)
539 op, args = pair.split(' ', 1)
540 vals = {}
540 vals = {}
541 for a in args.split(','):
541 for a in args.split(','):
542 if a:
542 if a:
543 n, v = a.split('=')
543 n, v = a.split('=')
544 vals[n] = unescapearg(v)
544 vals[n] = unescapearg(v)
545 func, spec = commands[op]
545 func, spec = commands[op]
546 if spec:
546 if spec:
547 keys = spec.split()
547 keys = spec.split()
548 data = {}
548 data = {}
549 for k in keys:
549 for k in keys:
550 if k == '*':
550 if k == '*':
551 star = {}
551 star = {}
552 for key in vals.keys():
552 for key in vals.keys():
553 if key not in keys:
553 if key not in keys:
554 star[key] = vals[key]
554 star[key] = vals[key]
555 data['*'] = star
555 data['*'] = star
556 else:
556 else:
557 data[k] = vals[k]
557 data[k] = vals[k]
558 result = func(repo, proto, *[data[k] for k in keys])
558 result = func(repo, proto, *[data[k] for k in keys])
559 else:
559 else:
560 result = func(repo, proto)
560 result = func(repo, proto)
561 if isinstance(result, ooberror):
561 if isinstance(result, ooberror):
562 return result
562 return result
563 res.append(escapearg(result))
563 res.append(escapearg(result))
564 return ';'.join(res)
564 return ';'.join(res)
565
565
566 @wireprotocommand('between', 'pairs')
566 @wireprotocommand('between', 'pairs')
567 def between(repo, proto, pairs):
567 def between(repo, proto, pairs):
568 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
568 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
569 r = []
569 r = []
570 for b in repo.between(pairs):
570 for b in repo.between(pairs):
571 r.append(encodelist(b) + "\n")
571 r.append(encodelist(b) + "\n")
572 return "".join(r)
572 return "".join(r)
573
573
574 @wireprotocommand('branchmap')
574 @wireprotocommand('branchmap')
575 def branchmap(repo, proto):
575 def branchmap(repo, proto):
576 branchmap = repo.branchmap()
576 branchmap = repo.branchmap()
577 heads = []
577 heads = []
578 for branch, nodes in branchmap.iteritems():
578 for branch, nodes in branchmap.iteritems():
579 branchname = urllib.quote(encoding.fromlocal(branch))
579 branchname = urllib.quote(encoding.fromlocal(branch))
580 branchnodes = encodelist(nodes)
580 branchnodes = encodelist(nodes)
581 heads.append('%s %s' % (branchname, branchnodes))
581 heads.append('%s %s' % (branchname, branchnodes))
582 return '\n'.join(heads)
582 return '\n'.join(heads)
583
583
584 @wireprotocommand('branches', 'nodes')
584 @wireprotocommand('branches', 'nodes')
585 def branches(repo, proto, nodes):
585 def branches(repo, proto, nodes):
586 nodes = decodelist(nodes)
586 nodes = decodelist(nodes)
587 r = []
587 r = []
588 for b in repo.branches(nodes):
588 for b in repo.branches(nodes):
589 r.append(encodelist(b) + "\n")
589 r.append(encodelist(b) + "\n")
590 return "".join(r)
590 return "".join(r)
591
591
592
592
593 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
593 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
594 'known', 'getbundle', 'unbundlehash', 'batch']
594 'known', 'getbundle', 'unbundlehash', 'batch']
595
595
596 def _capabilities(repo, proto):
596 def _capabilities(repo, proto):
597 """return a list of capabilities for a repo
597 """return a list of capabilities for a repo
598
598
599 This function exists to allow extensions to easily wrap capabilities
599 This function exists to allow extensions to easily wrap capabilities
600 computation
600 computation
601
601
602 - returns a lists: easy to alter
602 - returns a lists: easy to alter
603 - change done here will be propagated to both `capabilities` and `hello`
603 - change done here will be propagated to both `capabilities` and `hello`
604 command without any other action needed.
604 command without any other action needed.
605 """
605 """
606 # copy to prevent modification of the global list
606 # copy to prevent modification of the global list
607 caps = list(wireprotocaps)
607 caps = list(wireprotocaps)
608 if _allowstream(repo.ui):
608 if _allowstream(repo.ui):
609 if repo.ui.configbool('server', 'preferuncompressed', False):
609 if repo.ui.configbool('server', 'preferuncompressed', False):
610 caps.append('stream-preferred')
610 caps.append('stream-preferred')
611 requiredformats = repo.requirements & repo.supportedformats
611 requiredformats = repo.requirements & repo.supportedformats
612 # if our local revlogs are just revlogv1, add 'stream' cap
612 # if our local revlogs are just revlogv1, add 'stream' cap
613 if not requiredformats - set(('revlogv1',)):
613 if not requiredformats - set(('revlogv1',)):
614 caps.append('stream')
614 caps.append('stream')
615 # otherwise, add 'streamreqs' detailing our local revlog format
615 # otherwise, add 'streamreqs' detailing our local revlog format
616 else:
616 else:
617 caps.append('streamreqs=%s' % ','.join(requiredformats))
617 caps.append('streamreqs=%s' % ','.join(requiredformats))
618 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
618 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
619 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
619 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
620 caps.append('bundle2=' + urllib.quote(capsblob))
620 caps.append('bundle2=' + urllib.quote(capsblob))
621 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
621 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
622 caps.append('httpheader=1024')
622 caps.append('httpheader=1024')
623 return caps
623 return caps
624
624
625 # If you are writing an extension and consider wrapping this function. Wrap
625 # If you are writing an extension and consider wrapping this function. Wrap
626 # `_capabilities` instead.
626 # `_capabilities` instead.
627 @wireprotocommand('capabilities')
627 @wireprotocommand('capabilities')
628 def capabilities(repo, proto):
628 def capabilities(repo, proto):
629 return ' '.join(_capabilities(repo, proto))
629 return ' '.join(_capabilities(repo, proto))
630
630
631 @wireprotocommand('changegroup', 'roots')
631 @wireprotocommand('changegroup', 'roots')
632 def changegroup(repo, proto, roots):
632 def changegroup(repo, proto, roots):
633 nodes = decodelist(roots)
633 nodes = decodelist(roots)
634 cg = changegroupmod.changegroup(repo, nodes, 'serve')
634 cg = changegroupmod.changegroup(repo, nodes, 'serve')
635 return streamres(proto.groupchunks(cg))
635 return streamres(proto.groupchunks(cg))
636
636
637 @wireprotocommand('changegroupsubset', 'bases heads')
637 @wireprotocommand('changegroupsubset', 'bases heads')
638 def changegroupsubset(repo, proto, bases, heads):
638 def changegroupsubset(repo, proto, bases, heads):
639 bases = decodelist(bases)
639 bases = decodelist(bases)
640 heads = decodelist(heads)
640 heads = decodelist(heads)
641 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
641 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
642 return streamres(proto.groupchunks(cg))
642 return streamres(proto.groupchunks(cg))
643
643
644 @wireprotocommand('debugwireargs', 'one two *')
644 @wireprotocommand('debugwireargs', 'one two *')
645 def debugwireargs(repo, proto, one, two, others):
645 def debugwireargs(repo, proto, one, two, others):
646 # only accept optional args from the known set
646 # only accept optional args from the known set
647 opts = options('debugwireargs', ['three', 'four'], others)
647 opts = options('debugwireargs', ['three', 'four'], others)
648 return repo.debugwireargs(one, two, **opts)
648 return repo.debugwireargs(one, two, **opts)
649
649
650 # List of options accepted by getbundle.
650 # List of options accepted by getbundle.
651 #
651 #
652 # Meant to be extended by extensions. It is the extension's responsibility to
652 # Meant to be extended by extensions. It is the extension's responsibility to
653 # ensure such options are properly processed in exchange.getbundle.
653 # ensure such options are properly processed in exchange.getbundle.
654 gboptslist = ['heads', 'common', 'bundlecaps']
654 gboptslist = ['heads', 'common', 'bundlecaps']
655
655
656 @wireprotocommand('getbundle', '*')
656 @wireprotocommand('getbundle', '*')
657 def getbundle(repo, proto, others):
657 def getbundle(repo, proto, others):
658 opts = options('getbundle', gboptsmap.keys(), others)
658 opts = options('getbundle', gboptsmap.keys(), others)
659 for k, v in opts.iteritems():
659 for k, v in opts.iteritems():
660 keytype = gboptsmap[k]
660 keytype = gboptsmap[k]
661 if keytype == 'nodes':
661 if keytype == 'nodes':
662 opts[k] = decodelist(v)
662 opts[k] = decodelist(v)
663 elif keytype == 'csv':
663 elif keytype == 'csv':
664 opts[k] = set(v.split(','))
664 opts[k] = set(v.split(','))
665 elif keytype == 'boolean':
665 elif keytype == 'boolean':
666 opts[k] = bool(v)
666 opts[k] = bool(v)
667 elif keytype != 'plain':
667 elif keytype != 'plain':
668 raise KeyError('unknown getbundle option type %s'
668 raise KeyError('unknown getbundle option type %s'
669 % keytype)
669 % keytype)
670 cg = exchange.getbundle(repo, 'serve', **opts)
670 cg = exchange.getbundle(repo, 'serve', **opts)
671 return streamres(proto.groupchunks(cg))
671 return streamres(proto.groupchunks(cg))
672
672
673 @wireprotocommand('heads')
673 @wireprotocommand('heads')
674 def heads(repo, proto):
674 def heads(repo, proto):
675 h = repo.heads()
675 h = repo.heads()
676 return encodelist(h) + "\n"
676 return encodelist(h) + "\n"
677
677
678 @wireprotocommand('hello')
678 @wireprotocommand('hello')
679 def hello(repo, proto):
679 def hello(repo, proto):
680 '''the hello command returns a set of lines describing various
680 '''the hello command returns a set of lines describing various
681 interesting things about the server, in an RFC822-like format.
681 interesting things about the server, in an RFC822-like format.
682 Currently the only one defined is "capabilities", which
682 Currently the only one defined is "capabilities", which
683 consists of a line in the form:
683 consists of a line in the form:
684
684
685 capabilities: space separated list of tokens
685 capabilities: space separated list of tokens
686 '''
686 '''
687 return "capabilities: %s\n" % (capabilities(repo, proto))
687 return "capabilities: %s\n" % (capabilities(repo, proto))
688
688
689 @wireprotocommand('listkeys', 'namespace')
689 @wireprotocommand('listkeys', 'namespace')
690 def listkeys(repo, proto, namespace):
690 def listkeys(repo, proto, namespace):
691 d = repo.listkeys(encoding.tolocal(namespace)).items()
691 d = repo.listkeys(encoding.tolocal(namespace)).items()
692 return pushkeymod.encodekeys(d)
692 return pushkeymod.encodekeys(d)
693
693
694 @wireprotocommand('lookup', 'key')
694 @wireprotocommand('lookup', 'key')
695 def lookup(repo, proto, key):
695 def lookup(repo, proto, key):
696 try:
696 try:
697 k = encoding.tolocal(key)
697 k = encoding.tolocal(key)
698 c = repo[k]
698 c = repo[k]
699 r = c.hex()
699 r = c.hex()
700 success = 1
700 success = 1
701 except Exception, inst:
701 except Exception, inst:
702 r = str(inst)
702 r = str(inst)
703 success = 0
703 success = 0
704 return "%s %s\n" % (success, r)
704 return "%s %s\n" % (success, r)
705
705
706 @wireprotocommand('known', 'nodes *')
706 @wireprotocommand('known', 'nodes *')
707 def known(repo, proto, nodes, others):
707 def known(repo, proto, nodes, others):
708 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
708 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
709
709
710 @wireprotocommand('pushkey', 'namespace key old new')
710 @wireprotocommand('pushkey', 'namespace key old new')
711 def pushkey(repo, proto, namespace, key, old, new):
711 def pushkey(repo, proto, namespace, key, old, new):
712 # compatibility with pre-1.8 clients which were accidentally
712 # compatibility with pre-1.8 clients which were accidentally
713 # sending raw binary nodes rather than utf-8-encoded hex
713 # sending raw binary nodes rather than utf-8-encoded hex
714 if len(new) == 20 and new.encode('string-escape') != new:
714 if len(new) == 20 and new.encode('string-escape') != new:
715 # looks like it could be a binary node
715 # looks like it could be a binary node
716 try:
716 try:
717 new.decode('utf-8')
717 new.decode('utf-8')
718 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
718 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
719 except UnicodeDecodeError:
719 except UnicodeDecodeError:
720 pass # binary, leave unmodified
720 pass # binary, leave unmodified
721 else:
721 else:
722 new = encoding.tolocal(new) # normal path
722 new = encoding.tolocal(new) # normal path
723
723
724 if util.safehasattr(proto, 'restore'):
724 if util.safehasattr(proto, 'restore'):
725
725
726 proto.redirect()
726 proto.redirect()
727
727
728 try:
728 try:
729 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
729 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
730 encoding.tolocal(old), new) or False
730 encoding.tolocal(old), new) or False
731 except util.Abort:
731 except util.Abort:
732 r = False
732 r = False
733
733
734 output = proto.restore()
734 output = proto.restore()
735
735
736 return '%s\n%s' % (int(r), output)
736 return '%s\n%s' % (int(r), output)
737
737
738 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
738 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
739 encoding.tolocal(old), new)
739 encoding.tolocal(old), new)
740 return '%s\n' % int(r)
740 return '%s\n' % int(r)
741
741
742 def _allowstream(ui):
742 def _allowstream(ui):
743 return ui.configbool('server', 'uncompressed', True, untrusted=True)
743 return ui.configbool('server', 'uncompressed', True, untrusted=True)
744
744
745 def _walkstreamfiles(repo):
745 def _walkstreamfiles(repo):
746 # this is it's own function so extensions can override it
746 # this is it's own function so extensions can override it
747 return repo.store.walk()
747 return repo.store.walk()
748
748
749 @wireprotocommand('stream_out')
749 @wireprotocommand('stream_out')
750 def stream(repo, proto):
750 def stream(repo, proto):
751 '''If the server supports streaming clone, it advertises the "stream"
751 '''If the server supports streaming clone, it advertises the "stream"
752 capability with a value representing the version and flags of the repo
752 capability with a value representing the version and flags of the repo
753 it is serving. Client checks to see if it understands the format.
753 it is serving. Client checks to see if it understands the format.
754
754
755 The format is simple: the server writes out a line with the amount
755 The format is simple: the server writes out a line with the amount
756 of files, then the total amount of bytes to be transferred (separated
756 of files, then the total amount of bytes to be transferred (separated
757 by a space). Then, for each file, the server first writes the filename
757 by a space). Then, for each file, the server first writes the filename
758 and file size (separated by the null character), then the file contents.
758 and file size (separated by the null character), then the file contents.
759 '''
759 '''
760
760
761 if not _allowstream(repo.ui):
761 if not _allowstream(repo.ui):
762 return '1\n'
762 return '1\n'
763
763
764 entries = []
764 entries = []
765 total_bytes = 0
765 total_bytes = 0
766 try:
766 try:
767 # get consistent snapshot of repo, lock during scan
767 # get consistent snapshot of repo, lock during scan
768 lock = repo.lock()
768 lock = repo.lock()
769 try:
769 try:
770 repo.ui.debug('scanning\n')
770 repo.ui.debug('scanning\n')
771 for name, ename, size in _walkstreamfiles(repo):
771 for name, ename, size in _walkstreamfiles(repo):
772 if size:
772 if size:
773 entries.append((name, size))
773 entries.append((name, size))
774 total_bytes += size
774 total_bytes += size
775 finally:
775 finally:
776 lock.release()
776 lock.release()
777 except error.LockError:
777 except error.LockError:
778 return '2\n' # error: 2
778 return '2\n' # error: 2
779
779
780 def streamer(repo, entries, total):
780 def streamer(repo, entries, total):
781 '''stream out all metadata files in repository.'''
781 '''stream out all metadata files in repository.'''
782 yield '0\n' # success
782 yield '0\n' # success
783 repo.ui.debug('%d files, %d bytes to transfer\n' %
783 repo.ui.debug('%d files, %d bytes to transfer\n' %
784 (len(entries), total_bytes))
784 (len(entries), total_bytes))
785 yield '%d %d\n' % (len(entries), total_bytes)
785 yield '%d %d\n' % (len(entries), total_bytes)
786
786
787 sopener = repo.svfs
787 sopener = repo.svfs
788 oldaudit = sopener.mustaudit
788 oldaudit = sopener.mustaudit
789 debugflag = repo.ui.debugflag
789 debugflag = repo.ui.debugflag
790 sopener.mustaudit = False
790 sopener.mustaudit = False
791
791
792 try:
792 try:
793 for name, size in entries:
793 for name, size in entries:
794 if debugflag:
794 if debugflag:
795 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
795 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
796 # partially encode name over the wire for backwards compat
796 # partially encode name over the wire for backwards compat
797 yield '%s\0%d\n' % (store.encodedir(name), size)
797 yield '%s\0%d\n' % (store.encodedir(name), size)
798 if size <= 65536:
798 if size <= 65536:
799 fp = sopener(name)
799 fp = sopener(name)
800 try:
800 try:
801 data = fp.read(size)
801 data = fp.read(size)
802 finally:
802 finally:
803 fp.close()
803 fp.close()
804 yield data
804 yield data
805 else:
805 else:
806 for chunk in util.filechunkiter(sopener(name), limit=size):
806 for chunk in util.filechunkiter(sopener(name), limit=size):
807 yield chunk
807 yield chunk
808 # replace with "finally:" when support for python 2.4 has been dropped
808 # replace with "finally:" when support for python 2.4 has been dropped
809 except Exception:
809 except Exception:
810 sopener.mustaudit = oldaudit
810 sopener.mustaudit = oldaudit
811 raise
811 raise
812 sopener.mustaudit = oldaudit
812 sopener.mustaudit = oldaudit
813
813
814 return streamres(streamer(repo, entries, total_bytes))
814 return streamres(streamer(repo, entries, total_bytes))
815
815
816 @wireprotocommand('unbundle', 'heads')
816 @wireprotocommand('unbundle', 'heads')
817 def unbundle(repo, proto, heads):
817 def unbundle(repo, proto, heads):
818 their_heads = decodelist(heads)
818 their_heads = decodelist(heads)
819
819
820 try:
820 try:
821 proto.redirect()
821 proto.redirect()
822
822
823 exchange.check_heads(repo, their_heads, 'preparing changes')
823 exchange.check_heads(repo, their_heads, 'preparing changes')
824
824
825 # write bundle data to temporary file because it can be big
825 # write bundle data to temporary file because it can be big
826 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
826 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
827 fp = os.fdopen(fd, 'wb+')
827 fp = os.fdopen(fd, 'wb+')
828 r = 0
828 r = 0
829 try:
829 try:
830 proto.getfile(fp)
830 proto.getfile(fp)
831 fp.seek(0)
831 fp.seek(0)
832 gen = exchange.readbundle(repo.ui, fp, None)
832 gen = exchange.readbundle(repo.ui, fp, None)
833 r = exchange.unbundle(repo, gen, their_heads, 'serve',
833 r = exchange.unbundle(repo, gen, their_heads, 'serve',
834 proto._client())
834 proto._client())
835 if util.safehasattr(r, 'addpart'):
835 if util.safehasattr(r, 'addpart'):
836 # The return looks streamable, we are in the bundle2 case and
836 # The return looks streamable, we are in the bundle2 case and
837 # should return a stream.
837 # should return a stream.
838 return streamres(r.getchunks())
838 return streamres(r.getchunks())
839 return pushres(r)
839 return pushres(r)
840
840
841 finally:
841 finally:
842 fp.close()
842 fp.close()
843 os.unlink(tempname)
843 os.unlink(tempname)
844
845 except (error.BundleValueError, util.Abort, error.PushRaced), exc:
846 # handle non-bundle2 case first
847 if not getattr(exc, 'duringunbundle2', False):
848 try:
849 raise
850 except util.Abort:
851 # The old code we moved used sys.stderr directly.
852 # We did not change it to minimise code change.
853 # This need to be moved to something proper.
854 # Feel free to do it.
855 sys.stderr.write("abort: %s\n" % exc)
856 return pushres(0)
857 except error.PushRaced:
858 return pusherr(str(exc))
859
860 bundler = bundle2.bundle20(repo.ui)
861 try:
862 raise
844 except error.BundleValueError, exc:
863 except error.BundleValueError, exc:
845 bundler = bundle2.bundle20(repo.ui)
846 errpart = bundler.newpart('error:unsupportedcontent')
864 errpart = bundler.newpart('error:unsupportedcontent')
847 if exc.parttype is not None:
865 if exc.parttype is not None:
848 errpart.addparam('parttype', exc.parttype)
866 errpart.addparam('parttype', exc.parttype)
849 if exc.params:
867 if exc.params:
850 errpart.addparam('params', '\0'.join(exc.params))
868 errpart.addparam('params', '\0'.join(exc.params))
851 return streamres(bundler.getchunks())
869 except util.Abort, exc:
852 except util.Abort, inst:
870 manargs = [('message', str(exc))]
853 # The old code we moved used sys.stderr directly.
854 # We did not change it to minimise code change.
855 # This need to be moved to something proper.
856 # Feel free to do it.
857 if getattr(inst, 'duringunbundle2', False):
858 bundler = bundle2.bundle20(repo.ui)
859 manargs = [('message', str(inst))]
860 advargs = []
871 advargs = []
861 if inst.hint is not None:
872 if exc.hint is not None:
862 advargs.append(('hint', inst.hint))
873 advargs.append(('hint', exc.hint))
863 bundler.addpart(bundle2.bundlepart('error:abort',
874 bundler.addpart(bundle2.bundlepart('error:abort',
864 manargs, advargs))
875 manargs, advargs))
865 return streamres(bundler.getchunks())
866 else:
867 sys.stderr.write("abort: %s\n" % inst)
868 return pushres(0)
869 except error.PushRaced, exc:
876 except error.PushRaced, exc:
870 if getattr(exc, 'duringunbundle2', False):
871 bundler = bundle2.bundle20(repo.ui)
872 bundler.newpart('error:pushraced', [('message', str(exc))])
877 bundler.newpart('error:pushraced', [('message', str(exc))])
873 return streamres(bundler.getchunks())
878 return streamres(bundler.getchunks())
874 else:
875 return pusherr(str(exc))
General Comments 0
You need to be logged in to leave comments. Login now