##// END OF EJS Templates
getbundle: sort bundlecaps before exchanging then over the wire...
Pierre-Yves David -
r25128:631766d1 default
parent child Browse files
Show More
@@ -1,880 +1,882 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import urllib, tempfile, os, sys
8 import urllib, tempfile, os, sys
9 from i18n import _
9 from i18n import _
10 from node import bin, hex
10 from node import bin, hex
11 import changegroup as changegroupmod, bundle2, pushkey as pushkeymod
11 import changegroup as changegroupmod, bundle2, pushkey as pushkeymod
12 import peer, error, encoding, util, store, exchange
12 import peer, error, encoding, util, store, exchange
13
13
14
14
15 class abstractserverproto(object):
15 class abstractserverproto(object):
16 """abstract class that summarizes the protocol API
16 """abstract class that summarizes the protocol API
17
17
18 Used as reference and documentation.
18 Used as reference and documentation.
19 """
19 """
20
20
21 def getargs(self, args):
21 def getargs(self, args):
22 """return the value for arguments in <args>
22 """return the value for arguments in <args>
23
23
24 returns a list of values (same order as <args>)"""
24 returns a list of values (same order as <args>)"""
25 raise NotImplementedError()
25 raise NotImplementedError()
26
26
27 def getfile(self, fp):
27 def getfile(self, fp):
28 """write the whole content of a file into a file like object
28 """write the whole content of a file into a file like object
29
29
30 The file is in the form::
30 The file is in the form::
31
31
32 (<chunk-size>\n<chunk>)+0\n
32 (<chunk-size>\n<chunk>)+0\n
33
33
34 chunk size is the ascii version of the int.
34 chunk size is the ascii version of the int.
35 """
35 """
36 raise NotImplementedError()
36 raise NotImplementedError()
37
37
38 def redirect(self):
38 def redirect(self):
39 """may setup interception for stdout and stderr
39 """may setup interception for stdout and stderr
40
40
41 See also the `restore` method."""
41 See also the `restore` method."""
42 raise NotImplementedError()
42 raise NotImplementedError()
43
43
44 # If the `redirect` function does install interception, the `restore`
44 # If the `redirect` function does install interception, the `restore`
45 # function MUST be defined. If interception is not used, this function
45 # function MUST be defined. If interception is not used, this function
46 # MUST NOT be defined.
46 # MUST NOT be defined.
47 #
47 #
48 # left commented here on purpose
48 # left commented here on purpose
49 #
49 #
50 #def restore(self):
50 #def restore(self):
51 # """reinstall previous stdout and stderr and return intercepted stdout
51 # """reinstall previous stdout and stderr and return intercepted stdout
52 # """
52 # """
53 # raise NotImplementedError()
53 # raise NotImplementedError()
54
54
55 def groupchunks(self, cg):
55 def groupchunks(self, cg):
56 """return 4096 chunks from a changegroup object
56 """return 4096 chunks from a changegroup object
57
57
58 Some protocols may have compressed the contents."""
58 Some protocols may have compressed the contents."""
59 raise NotImplementedError()
59 raise NotImplementedError()
60
60
61 # abstract batching support
61 # abstract batching support
62
62
63 class future(object):
63 class future(object):
64 '''placeholder for a value to be set later'''
64 '''placeholder for a value to be set later'''
65 def set(self, value):
65 def set(self, value):
66 if util.safehasattr(self, 'value'):
66 if util.safehasattr(self, 'value'):
67 raise error.RepoError("future is already set")
67 raise error.RepoError("future is already set")
68 self.value = value
68 self.value = value
69
69
70 class batcher(object):
70 class batcher(object):
71 '''base class for batches of commands submittable in a single request
71 '''base class for batches of commands submittable in a single request
72
72
73 All methods invoked on instances of this class are simply queued and
73 All methods invoked on instances of this class are simply queued and
74 return a a future for the result. Once you call submit(), all the queued
74 return a a future for the result. Once you call submit(), all the queued
75 calls are performed and the results set in their respective futures.
75 calls are performed and the results set in their respective futures.
76 '''
76 '''
77 def __init__(self):
77 def __init__(self):
78 self.calls = []
78 self.calls = []
79 def __getattr__(self, name):
79 def __getattr__(self, name):
80 def call(*args, **opts):
80 def call(*args, **opts):
81 resref = future()
81 resref = future()
82 self.calls.append((name, args, opts, resref,))
82 self.calls.append((name, args, opts, resref,))
83 return resref
83 return resref
84 return call
84 return call
85 def submit(self):
85 def submit(self):
86 pass
86 pass
87
87
88 class localbatch(batcher):
88 class localbatch(batcher):
89 '''performs the queued calls directly'''
89 '''performs the queued calls directly'''
90 def __init__(self, local):
90 def __init__(self, local):
91 batcher.__init__(self)
91 batcher.__init__(self)
92 self.local = local
92 self.local = local
93 def submit(self):
93 def submit(self):
94 for name, args, opts, resref in self.calls:
94 for name, args, opts, resref in self.calls:
95 resref.set(getattr(self.local, name)(*args, **opts))
95 resref.set(getattr(self.local, name)(*args, **opts))
96
96
97 class remotebatch(batcher):
97 class remotebatch(batcher):
98 '''batches the queued calls; uses as few roundtrips as possible'''
98 '''batches the queued calls; uses as few roundtrips as possible'''
99 def __init__(self, remote):
99 def __init__(self, remote):
100 '''remote must support _submitbatch(encbatch) and
100 '''remote must support _submitbatch(encbatch) and
101 _submitone(op, encargs)'''
101 _submitone(op, encargs)'''
102 batcher.__init__(self)
102 batcher.__init__(self)
103 self.remote = remote
103 self.remote = remote
104 def submit(self):
104 def submit(self):
105 req, rsp = [], []
105 req, rsp = [], []
106 for name, args, opts, resref in self.calls:
106 for name, args, opts, resref in self.calls:
107 mtd = getattr(self.remote, name)
107 mtd = getattr(self.remote, name)
108 batchablefn = getattr(mtd, 'batchable', None)
108 batchablefn = getattr(mtd, 'batchable', None)
109 if batchablefn is not None:
109 if batchablefn is not None:
110 batchable = batchablefn(mtd.im_self, *args, **opts)
110 batchable = batchablefn(mtd.im_self, *args, **opts)
111 encargsorres, encresref = batchable.next()
111 encargsorres, encresref = batchable.next()
112 if encresref:
112 if encresref:
113 req.append((name, encargsorres,))
113 req.append((name, encargsorres,))
114 rsp.append((batchable, encresref, resref,))
114 rsp.append((batchable, encresref, resref,))
115 else:
115 else:
116 resref.set(encargsorres)
116 resref.set(encargsorres)
117 else:
117 else:
118 if req:
118 if req:
119 self._submitreq(req, rsp)
119 self._submitreq(req, rsp)
120 req, rsp = [], []
120 req, rsp = [], []
121 resref.set(mtd(*args, **opts))
121 resref.set(mtd(*args, **opts))
122 if req:
122 if req:
123 self._submitreq(req, rsp)
123 self._submitreq(req, rsp)
124 def _submitreq(self, req, rsp):
124 def _submitreq(self, req, rsp):
125 encresults = self.remote._submitbatch(req)
125 encresults = self.remote._submitbatch(req)
126 for encres, r in zip(encresults, rsp):
126 for encres, r in zip(encresults, rsp):
127 batchable, encresref, resref = r
127 batchable, encresref, resref = r
128 encresref.set(encres)
128 encresref.set(encres)
129 resref.set(batchable.next())
129 resref.set(batchable.next())
130
130
131 def batchable(f):
131 def batchable(f):
132 '''annotation for batchable methods
132 '''annotation for batchable methods
133
133
134 Such methods must implement a coroutine as follows:
134 Such methods must implement a coroutine as follows:
135
135
136 @batchable
136 @batchable
137 def sample(self, one, two=None):
137 def sample(self, one, two=None):
138 # Handle locally computable results first:
138 # Handle locally computable results first:
139 if not one:
139 if not one:
140 yield "a local result", None
140 yield "a local result", None
141 # Build list of encoded arguments suitable for your wire protocol:
141 # Build list of encoded arguments suitable for your wire protocol:
142 encargs = [('one', encode(one),), ('two', encode(two),)]
142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 # Create future for injection of encoded result:
143 # Create future for injection of encoded result:
144 encresref = future()
144 encresref = future()
145 # Return encoded arguments and future:
145 # Return encoded arguments and future:
146 yield encargs, encresref
146 yield encargs, encresref
147 # Assuming the future to be filled with the result from the batched
147 # Assuming the future to be filled with the result from the batched
148 # request now. Decode it:
148 # request now. Decode it:
149 yield decode(encresref.value)
149 yield decode(encresref.value)
150
150
151 The decorator returns a function which wraps this coroutine as a plain
151 The decorator returns a function which wraps this coroutine as a plain
152 method, but adds the original method as an attribute called "batchable",
152 method, but adds the original method as an attribute called "batchable",
153 which is used by remotebatch to split the call into separate encoding and
153 which is used by remotebatch to split the call into separate encoding and
154 decoding phases.
154 decoding phases.
155 '''
155 '''
156 def plain(*args, **opts):
156 def plain(*args, **opts):
157 batchable = f(*args, **opts)
157 batchable = f(*args, **opts)
158 encargsorres, encresref = batchable.next()
158 encargsorres, encresref = batchable.next()
159 if not encresref:
159 if not encresref:
160 return encargsorres # a local result in this case
160 return encargsorres # a local result in this case
161 self = args[0]
161 self = args[0]
162 encresref.set(self._submitone(f.func_name, encargsorres))
162 encresref.set(self._submitone(f.func_name, encargsorres))
163 return batchable.next()
163 return batchable.next()
164 setattr(plain, 'batchable', f)
164 setattr(plain, 'batchable', f)
165 return plain
165 return plain
166
166
167 # list of nodes encoding / decoding
167 # list of nodes encoding / decoding
168
168
169 def decodelist(l, sep=' '):
169 def decodelist(l, sep=' '):
170 if l:
170 if l:
171 return map(bin, l.split(sep))
171 return map(bin, l.split(sep))
172 return []
172 return []
173
173
174 def encodelist(l, sep=' '):
174 def encodelist(l, sep=' '):
175 try:
175 try:
176 return sep.join(map(hex, l))
176 return sep.join(map(hex, l))
177 except TypeError:
177 except TypeError:
178 print l
178 print l
179 raise
179 raise
180
180
181 # batched call argument encoding
181 # batched call argument encoding
182
182
183 def escapearg(plain):
183 def escapearg(plain):
184 return (plain
184 return (plain
185 .replace(':', '::')
185 .replace(':', '::')
186 .replace(',', ':,')
186 .replace(',', ':,')
187 .replace(';', ':;')
187 .replace(';', ':;')
188 .replace('=', ':='))
188 .replace('=', ':='))
189
189
190 def unescapearg(escaped):
190 def unescapearg(escaped):
191 return (escaped
191 return (escaped
192 .replace(':=', '=')
192 .replace(':=', '=')
193 .replace(':;', ';')
193 .replace(':;', ';')
194 .replace(':,', ',')
194 .replace(':,', ',')
195 .replace('::', ':'))
195 .replace('::', ':'))
196
196
197 # mapping of options accepted by getbundle and their types
197 # mapping of options accepted by getbundle and their types
198 #
198 #
199 # Meant to be extended by extensions. It is extensions responsibility to ensure
199 # Meant to be extended by extensions. It is extensions responsibility to ensure
200 # such options are properly processed in exchange.getbundle.
200 # such options are properly processed in exchange.getbundle.
201 #
201 #
202 # supported types are:
202 # supported types are:
203 #
203 #
204 # :nodes: list of binary nodes
204 # :nodes: list of binary nodes
205 # :csv: list of comma-separated values
205 # :csv: list of comma-separated values
206 # :plain: string with no transformation needed.
206 # :plain: string with no transformation needed.
207 gboptsmap = {'heads': 'nodes',
207 gboptsmap = {'heads': 'nodes',
208 'common': 'nodes',
208 'common': 'nodes',
209 'obsmarkers': 'boolean',
209 'obsmarkers': 'boolean',
210 'bundlecaps': 'csv',
210 'bundlecaps': 'csv',
211 'listkeys': 'csv',
211 'listkeys': 'csv',
212 'cg': 'boolean'}
212 'cg': 'boolean'}
213
213
214 # client side
214 # client side
215
215
216 class wirepeer(peer.peerrepository):
216 class wirepeer(peer.peerrepository):
217
217
218 def batch(self):
218 def batch(self):
219 return remotebatch(self)
219 return remotebatch(self)
220 def _submitbatch(self, req):
220 def _submitbatch(self, req):
221 cmds = []
221 cmds = []
222 for op, argsdict in req:
222 for op, argsdict in req:
223 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
223 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
224 cmds.append('%s %s' % (op, args))
224 cmds.append('%s %s' % (op, args))
225 rsp = self._call("batch", cmds=';'.join(cmds))
225 rsp = self._call("batch", cmds=';'.join(cmds))
226 return rsp.split(';')
226 return rsp.split(';')
227 def _submitone(self, op, args):
227 def _submitone(self, op, args):
228 return self._call(op, **args)
228 return self._call(op, **args)
229
229
230 @batchable
230 @batchable
231 def lookup(self, key):
231 def lookup(self, key):
232 self.requirecap('lookup', _('look up remote revision'))
232 self.requirecap('lookup', _('look up remote revision'))
233 f = future()
233 f = future()
234 yield {'key': encoding.fromlocal(key)}, f
234 yield {'key': encoding.fromlocal(key)}, f
235 d = f.value
235 d = f.value
236 success, data = d[:-1].split(" ", 1)
236 success, data = d[:-1].split(" ", 1)
237 if int(success):
237 if int(success):
238 yield bin(data)
238 yield bin(data)
239 self._abort(error.RepoError(data))
239 self._abort(error.RepoError(data))
240
240
241 @batchable
241 @batchable
242 def heads(self):
242 def heads(self):
243 f = future()
243 f = future()
244 yield {}, f
244 yield {}, f
245 d = f.value
245 d = f.value
246 try:
246 try:
247 yield decodelist(d[:-1])
247 yield decodelist(d[:-1])
248 except ValueError:
248 except ValueError:
249 self._abort(error.ResponseError(_("unexpected response:"), d))
249 self._abort(error.ResponseError(_("unexpected response:"), d))
250
250
251 @batchable
251 @batchable
252 def known(self, nodes):
252 def known(self, nodes):
253 f = future()
253 f = future()
254 yield {'nodes': encodelist(nodes)}, f
254 yield {'nodes': encodelist(nodes)}, f
255 d = f.value
255 d = f.value
256 try:
256 try:
257 yield [bool(int(b)) for b in d]
257 yield [bool(int(b)) for b in d]
258 except ValueError:
258 except ValueError:
259 self._abort(error.ResponseError(_("unexpected response:"), d))
259 self._abort(error.ResponseError(_("unexpected response:"), d))
260
260
261 @batchable
261 @batchable
262 def branchmap(self):
262 def branchmap(self):
263 f = future()
263 f = future()
264 yield {}, f
264 yield {}, f
265 d = f.value
265 d = f.value
266 try:
266 try:
267 branchmap = {}
267 branchmap = {}
268 for branchpart in d.splitlines():
268 for branchpart in d.splitlines():
269 branchname, branchheads = branchpart.split(' ', 1)
269 branchname, branchheads = branchpart.split(' ', 1)
270 branchname = encoding.tolocal(urllib.unquote(branchname))
270 branchname = encoding.tolocal(urllib.unquote(branchname))
271 branchheads = decodelist(branchheads)
271 branchheads = decodelist(branchheads)
272 branchmap[branchname] = branchheads
272 branchmap[branchname] = branchheads
273 yield branchmap
273 yield branchmap
274 except TypeError:
274 except TypeError:
275 self._abort(error.ResponseError(_("unexpected response:"), d))
275 self._abort(error.ResponseError(_("unexpected response:"), d))
276
276
277 def branches(self, nodes):
277 def branches(self, nodes):
278 n = encodelist(nodes)
278 n = encodelist(nodes)
279 d = self._call("branches", nodes=n)
279 d = self._call("branches", nodes=n)
280 try:
280 try:
281 br = [tuple(decodelist(b)) for b in d.splitlines()]
281 br = [tuple(decodelist(b)) for b in d.splitlines()]
282 return br
282 return br
283 except ValueError:
283 except ValueError:
284 self._abort(error.ResponseError(_("unexpected response:"), d))
284 self._abort(error.ResponseError(_("unexpected response:"), d))
285
285
286 def between(self, pairs):
286 def between(self, pairs):
287 batch = 8 # avoid giant requests
287 batch = 8 # avoid giant requests
288 r = []
288 r = []
289 for i in xrange(0, len(pairs), batch):
289 for i in xrange(0, len(pairs), batch):
290 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
290 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
291 d = self._call("between", pairs=n)
291 d = self._call("between", pairs=n)
292 try:
292 try:
293 r.extend(l and decodelist(l) or [] for l in d.splitlines())
293 r.extend(l and decodelist(l) or [] for l in d.splitlines())
294 except ValueError:
294 except ValueError:
295 self._abort(error.ResponseError(_("unexpected response:"), d))
295 self._abort(error.ResponseError(_("unexpected response:"), d))
296 return r
296 return r
297
297
298 @batchable
298 @batchable
299 def pushkey(self, namespace, key, old, new):
299 def pushkey(self, namespace, key, old, new):
300 if not self.capable('pushkey'):
300 if not self.capable('pushkey'):
301 yield False, None
301 yield False, None
302 f = future()
302 f = future()
303 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
303 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
304 yield {'namespace': encoding.fromlocal(namespace),
304 yield {'namespace': encoding.fromlocal(namespace),
305 'key': encoding.fromlocal(key),
305 'key': encoding.fromlocal(key),
306 'old': encoding.fromlocal(old),
306 'old': encoding.fromlocal(old),
307 'new': encoding.fromlocal(new)}, f
307 'new': encoding.fromlocal(new)}, f
308 d = f.value
308 d = f.value
309 d, output = d.split('\n', 1)
309 d, output = d.split('\n', 1)
310 try:
310 try:
311 d = bool(int(d))
311 d = bool(int(d))
312 except ValueError:
312 except ValueError:
313 raise error.ResponseError(
313 raise error.ResponseError(
314 _('push failed (unexpected response):'), d)
314 _('push failed (unexpected response):'), d)
315 for l in output.splitlines(True):
315 for l in output.splitlines(True):
316 self.ui.status(_('remote: '), l)
316 self.ui.status(_('remote: '), l)
317 yield d
317 yield d
318
318
319 @batchable
319 @batchable
320 def listkeys(self, namespace):
320 def listkeys(self, namespace):
321 if not self.capable('pushkey'):
321 if not self.capable('pushkey'):
322 yield {}, None
322 yield {}, None
323 f = future()
323 f = future()
324 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
324 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
325 yield {'namespace': encoding.fromlocal(namespace)}, f
325 yield {'namespace': encoding.fromlocal(namespace)}, f
326 d = f.value
326 d = f.value
327 yield pushkeymod.decodekeys(d)
327 yield pushkeymod.decodekeys(d)
328
328
329 def stream_out(self):
329 def stream_out(self):
330 return self._callstream('stream_out')
330 return self._callstream('stream_out')
331
331
332 def changegroup(self, nodes, kind):
332 def changegroup(self, nodes, kind):
333 n = encodelist(nodes)
333 n = encodelist(nodes)
334 f = self._callcompressable("changegroup", roots=n)
334 f = self._callcompressable("changegroup", roots=n)
335 return changegroupmod.cg1unpacker(f, 'UN')
335 return changegroupmod.cg1unpacker(f, 'UN')
336
336
337 def changegroupsubset(self, bases, heads, kind):
337 def changegroupsubset(self, bases, heads, kind):
338 self.requirecap('changegroupsubset', _('look up remote changes'))
338 self.requirecap('changegroupsubset', _('look up remote changes'))
339 bases = encodelist(bases)
339 bases = encodelist(bases)
340 heads = encodelist(heads)
340 heads = encodelist(heads)
341 f = self._callcompressable("changegroupsubset",
341 f = self._callcompressable("changegroupsubset",
342 bases=bases, heads=heads)
342 bases=bases, heads=heads)
343 return changegroupmod.cg1unpacker(f, 'UN')
343 return changegroupmod.cg1unpacker(f, 'UN')
344
344
345 def getbundle(self, source, **kwargs):
345 def getbundle(self, source, **kwargs):
346 self.requirecap('getbundle', _('look up remote changes'))
346 self.requirecap('getbundle', _('look up remote changes'))
347 opts = {}
347 opts = {}
348 bundlecaps = kwargs.get('bundlecaps')
349 if bundlecaps is not None:
350 kwargs['bundlecaps'] = sorted(bundlecaps)
351 else:
352 bundlecaps = () # kwargs could have it to None
348 for key, value in kwargs.iteritems():
353 for key, value in kwargs.iteritems():
349 if value is None:
354 if value is None:
350 continue
355 continue
351 keytype = gboptsmap.get(key)
356 keytype = gboptsmap.get(key)
352 if keytype is None:
357 if keytype is None:
353 assert False, 'unexpected'
358 assert False, 'unexpected'
354 elif keytype == 'nodes':
359 elif keytype == 'nodes':
355 value = encodelist(value)
360 value = encodelist(value)
356 elif keytype == 'csv':
361 elif keytype == 'csv':
357 value = ','.join(value)
362 value = ','.join(value)
358 elif keytype == 'boolean':
363 elif keytype == 'boolean':
359 value = '%i' % bool(value)
364 value = '%i' % bool(value)
360 elif keytype != 'plain':
365 elif keytype != 'plain':
361 raise KeyError('unknown getbundle option type %s'
366 raise KeyError('unknown getbundle option type %s'
362 % keytype)
367 % keytype)
363 opts[key] = value
368 opts[key] = value
364 f = self._callcompressable("getbundle", **opts)
369 f = self._callcompressable("getbundle", **opts)
365 bundlecaps = kwargs.get('bundlecaps')
366 if bundlecaps is None:
367 bundlecaps = () # kwargs could have it to None
368 if util.any((cap.startswith('HG2') for cap in bundlecaps)):
370 if util.any((cap.startswith('HG2') for cap in bundlecaps)):
369 return bundle2.getunbundler(self.ui, f)
371 return bundle2.getunbundler(self.ui, f)
370 else:
372 else:
371 return changegroupmod.cg1unpacker(f, 'UN')
373 return changegroupmod.cg1unpacker(f, 'UN')
372
374
373 def unbundle(self, cg, heads, source):
375 def unbundle(self, cg, heads, source):
374 '''Send cg (a readable file-like object representing the
376 '''Send cg (a readable file-like object representing the
375 changegroup to push, typically a chunkbuffer object) to the
377 changegroup to push, typically a chunkbuffer object) to the
376 remote server as a bundle.
378 remote server as a bundle.
377
379
378 When pushing a bundle10 stream, return an integer indicating the
380 When pushing a bundle10 stream, return an integer indicating the
379 result of the push (see localrepository.addchangegroup()).
381 result of the push (see localrepository.addchangegroup()).
380
382
381 When pushing a bundle20 stream, return a bundle20 stream.'''
383 When pushing a bundle20 stream, return a bundle20 stream.'''
382
384
383 if heads != ['force'] and self.capable('unbundlehash'):
385 if heads != ['force'] and self.capable('unbundlehash'):
384 heads = encodelist(['hashed',
386 heads = encodelist(['hashed',
385 util.sha1(''.join(sorted(heads))).digest()])
387 util.sha1(''.join(sorted(heads))).digest()])
386 else:
388 else:
387 heads = encodelist(heads)
389 heads = encodelist(heads)
388
390
389 if util.safehasattr(cg, 'deltaheader'):
391 if util.safehasattr(cg, 'deltaheader'):
390 # this a bundle10, do the old style call sequence
392 # this a bundle10, do the old style call sequence
391 ret, output = self._callpush("unbundle", cg, heads=heads)
393 ret, output = self._callpush("unbundle", cg, heads=heads)
392 if ret == "":
394 if ret == "":
393 raise error.ResponseError(
395 raise error.ResponseError(
394 _('push failed:'), output)
396 _('push failed:'), output)
395 try:
397 try:
396 ret = int(ret)
398 ret = int(ret)
397 except ValueError:
399 except ValueError:
398 raise error.ResponseError(
400 raise error.ResponseError(
399 _('push failed (unexpected response):'), ret)
401 _('push failed (unexpected response):'), ret)
400
402
401 for l in output.splitlines(True):
403 for l in output.splitlines(True):
402 self.ui.status(_('remote: '), l)
404 self.ui.status(_('remote: '), l)
403 else:
405 else:
404 # bundle2 push. Send a stream, fetch a stream.
406 # bundle2 push. Send a stream, fetch a stream.
405 stream = self._calltwowaystream('unbundle', cg, heads=heads)
407 stream = self._calltwowaystream('unbundle', cg, heads=heads)
406 ret = bundle2.getunbundler(self.ui, stream)
408 ret = bundle2.getunbundler(self.ui, stream)
407 return ret
409 return ret
408
410
409 def debugwireargs(self, one, two, three=None, four=None, five=None):
411 def debugwireargs(self, one, two, three=None, four=None, five=None):
410 # don't pass optional arguments left at their default value
412 # don't pass optional arguments left at their default value
411 opts = {}
413 opts = {}
412 if three is not None:
414 if three is not None:
413 opts['three'] = three
415 opts['three'] = three
414 if four is not None:
416 if four is not None:
415 opts['four'] = four
417 opts['four'] = four
416 return self._call('debugwireargs', one=one, two=two, **opts)
418 return self._call('debugwireargs', one=one, two=two, **opts)
417
419
418 def _call(self, cmd, **args):
420 def _call(self, cmd, **args):
419 """execute <cmd> on the server
421 """execute <cmd> on the server
420
422
421 The command is expected to return a simple string.
423 The command is expected to return a simple string.
422
424
423 returns the server reply as a string."""
425 returns the server reply as a string."""
424 raise NotImplementedError()
426 raise NotImplementedError()
425
427
426 def _callstream(self, cmd, **args):
428 def _callstream(self, cmd, **args):
427 """execute <cmd> on the server
429 """execute <cmd> on the server
428
430
429 The command is expected to return a stream.
431 The command is expected to return a stream.
430
432
431 returns the server reply as a file like object."""
433 returns the server reply as a file like object."""
432 raise NotImplementedError()
434 raise NotImplementedError()
433
435
434 def _callcompressable(self, cmd, **args):
436 def _callcompressable(self, cmd, **args):
435 """execute <cmd> on the server
437 """execute <cmd> on the server
436
438
437 The command is expected to return a stream.
439 The command is expected to return a stream.
438
440
439 The stream may have been compressed in some implementations. This
441 The stream may have been compressed in some implementations. This
440 function takes care of the decompression. This is the only difference
442 function takes care of the decompression. This is the only difference
441 with _callstream.
443 with _callstream.
442
444
443 returns the server reply as a file like object.
445 returns the server reply as a file like object.
444 """
446 """
445 raise NotImplementedError()
447 raise NotImplementedError()
446
448
447 def _callpush(self, cmd, fp, **args):
449 def _callpush(self, cmd, fp, **args):
448 """execute a <cmd> on server
450 """execute a <cmd> on server
449
451
450 The command is expected to be related to a push. Push has a special
452 The command is expected to be related to a push. Push has a special
451 return method.
453 return method.
452
454
453 returns the server reply as a (ret, output) tuple. ret is either
455 returns the server reply as a (ret, output) tuple. ret is either
454 empty (error) or a stringified int.
456 empty (error) or a stringified int.
455 """
457 """
456 raise NotImplementedError()
458 raise NotImplementedError()
457
459
458 def _calltwowaystream(self, cmd, fp, **args):
460 def _calltwowaystream(self, cmd, fp, **args):
459 """execute <cmd> on server
461 """execute <cmd> on server
460
462
461 The command will send a stream to the server and get a stream in reply.
463 The command will send a stream to the server and get a stream in reply.
462 """
464 """
463 raise NotImplementedError()
465 raise NotImplementedError()
464
466
465 def _abort(self, exception):
467 def _abort(self, exception):
466 """clearly abort the wire protocol connection and raise the exception
468 """clearly abort the wire protocol connection and raise the exception
467 """
469 """
468 raise NotImplementedError()
470 raise NotImplementedError()
469
471
470 # server side
472 # server side
471
473
472 # wire protocol command can either return a string or one of these classes.
474 # wire protocol command can either return a string or one of these classes.
473 class streamres(object):
475 class streamres(object):
474 """wireproto reply: binary stream
476 """wireproto reply: binary stream
475
477
476 The call was successful and the result is a stream.
478 The call was successful and the result is a stream.
477 Iterate on the `self.gen` attribute to retrieve chunks.
479 Iterate on the `self.gen` attribute to retrieve chunks.
478 """
480 """
479 def __init__(self, gen):
481 def __init__(self, gen):
480 self.gen = gen
482 self.gen = gen
481
483
482 class pushres(object):
484 class pushres(object):
483 """wireproto reply: success with simple integer return
485 """wireproto reply: success with simple integer return
484
486
485 The call was successful and returned an integer contained in `self.res`.
487 The call was successful and returned an integer contained in `self.res`.
486 """
488 """
487 def __init__(self, res):
489 def __init__(self, res):
488 self.res = res
490 self.res = res
489
491
490 class pusherr(object):
492 class pusherr(object):
491 """wireproto reply: failure
493 """wireproto reply: failure
492
494
493 The call failed. The `self.res` attribute contains the error message.
495 The call failed. The `self.res` attribute contains the error message.
494 """
496 """
495 def __init__(self, res):
497 def __init__(self, res):
496 self.res = res
498 self.res = res
497
499
498 class ooberror(object):
500 class ooberror(object):
499 """wireproto reply: failure of a batch of operation
501 """wireproto reply: failure of a batch of operation
500
502
501 Something failed during a batch call. The error message is stored in
503 Something failed during a batch call. The error message is stored in
502 `self.message`.
504 `self.message`.
503 """
505 """
504 def __init__(self, message):
506 def __init__(self, message):
505 self.message = message
507 self.message = message
506
508
507 def dispatch(repo, proto, command):
509 def dispatch(repo, proto, command):
508 repo = repo.filtered("served")
510 repo = repo.filtered("served")
509 func, spec = commands[command]
511 func, spec = commands[command]
510 args = proto.getargs(spec)
512 args = proto.getargs(spec)
511 return func(repo, proto, *args)
513 return func(repo, proto, *args)
512
514
513 def options(cmd, keys, others):
515 def options(cmd, keys, others):
514 opts = {}
516 opts = {}
515 for k in keys:
517 for k in keys:
516 if k in others:
518 if k in others:
517 opts[k] = others[k]
519 opts[k] = others[k]
518 del others[k]
520 del others[k]
519 if others:
521 if others:
520 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
522 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
521 % (cmd, ",".join(others)))
523 % (cmd, ",".join(others)))
522 return opts
524 return opts
523
525
524 # list of commands
526 # list of commands
525 commands = {}
527 commands = {}
526
528
527 def wireprotocommand(name, args=''):
529 def wireprotocommand(name, args=''):
528 """decorator for wire protocol command"""
530 """decorator for wire protocol command"""
529 def register(func):
531 def register(func):
530 commands[name] = (func, args)
532 commands[name] = (func, args)
531 return func
533 return func
532 return register
534 return register
533
535
534 @wireprotocommand('batch', 'cmds *')
536 @wireprotocommand('batch', 'cmds *')
535 def batch(repo, proto, cmds, others):
537 def batch(repo, proto, cmds, others):
536 repo = repo.filtered("served")
538 repo = repo.filtered("served")
537 res = []
539 res = []
538 for pair in cmds.split(';'):
540 for pair in cmds.split(';'):
539 op, args = pair.split(' ', 1)
541 op, args = pair.split(' ', 1)
540 vals = {}
542 vals = {}
541 for a in args.split(','):
543 for a in args.split(','):
542 if a:
544 if a:
543 n, v = a.split('=')
545 n, v = a.split('=')
544 vals[n] = unescapearg(v)
546 vals[n] = unescapearg(v)
545 func, spec = commands[op]
547 func, spec = commands[op]
546 if spec:
548 if spec:
547 keys = spec.split()
549 keys = spec.split()
548 data = {}
550 data = {}
549 for k in keys:
551 for k in keys:
550 if k == '*':
552 if k == '*':
551 star = {}
553 star = {}
552 for key in vals.keys():
554 for key in vals.keys():
553 if key not in keys:
555 if key not in keys:
554 star[key] = vals[key]
556 star[key] = vals[key]
555 data['*'] = star
557 data['*'] = star
556 else:
558 else:
557 data[k] = vals[k]
559 data[k] = vals[k]
558 result = func(repo, proto, *[data[k] for k in keys])
560 result = func(repo, proto, *[data[k] for k in keys])
559 else:
561 else:
560 result = func(repo, proto)
562 result = func(repo, proto)
561 if isinstance(result, ooberror):
563 if isinstance(result, ooberror):
562 return result
564 return result
563 res.append(escapearg(result))
565 res.append(escapearg(result))
564 return ';'.join(res)
566 return ';'.join(res)
565
567
566 @wireprotocommand('between', 'pairs')
568 @wireprotocommand('between', 'pairs')
567 def between(repo, proto, pairs):
569 def between(repo, proto, pairs):
568 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
570 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
569 r = []
571 r = []
570 for b in repo.between(pairs):
572 for b in repo.between(pairs):
571 r.append(encodelist(b) + "\n")
573 r.append(encodelist(b) + "\n")
572 return "".join(r)
574 return "".join(r)
573
575
574 @wireprotocommand('branchmap')
576 @wireprotocommand('branchmap')
575 def branchmap(repo, proto):
577 def branchmap(repo, proto):
576 branchmap = repo.branchmap()
578 branchmap = repo.branchmap()
577 heads = []
579 heads = []
578 for branch, nodes in branchmap.iteritems():
580 for branch, nodes in branchmap.iteritems():
579 branchname = urllib.quote(encoding.fromlocal(branch))
581 branchname = urllib.quote(encoding.fromlocal(branch))
580 branchnodes = encodelist(nodes)
582 branchnodes = encodelist(nodes)
581 heads.append('%s %s' % (branchname, branchnodes))
583 heads.append('%s %s' % (branchname, branchnodes))
582 return '\n'.join(heads)
584 return '\n'.join(heads)
583
585
584 @wireprotocommand('branches', 'nodes')
586 @wireprotocommand('branches', 'nodes')
585 def branches(repo, proto, nodes):
587 def branches(repo, proto, nodes):
586 nodes = decodelist(nodes)
588 nodes = decodelist(nodes)
587 r = []
589 r = []
588 for b in repo.branches(nodes):
590 for b in repo.branches(nodes):
589 r.append(encodelist(b) + "\n")
591 r.append(encodelist(b) + "\n")
590 return "".join(r)
592 return "".join(r)
591
593
592
594
593 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
595 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
594 'known', 'getbundle', 'unbundlehash', 'batch']
596 'known', 'getbundle', 'unbundlehash', 'batch']
595
597
596 def _capabilities(repo, proto):
598 def _capabilities(repo, proto):
597 """return a list of capabilities for a repo
599 """return a list of capabilities for a repo
598
600
599 This function exists to allow extensions to easily wrap capabilities
601 This function exists to allow extensions to easily wrap capabilities
600 computation
602 computation
601
603
602 - returns a lists: easy to alter
604 - returns a lists: easy to alter
603 - change done here will be propagated to both `capabilities` and `hello`
605 - change done here will be propagated to both `capabilities` and `hello`
604 command without any other action needed.
606 command without any other action needed.
605 """
607 """
606 # copy to prevent modification of the global list
608 # copy to prevent modification of the global list
607 caps = list(wireprotocaps)
609 caps = list(wireprotocaps)
608 if _allowstream(repo.ui):
610 if _allowstream(repo.ui):
609 if repo.ui.configbool('server', 'preferuncompressed', False):
611 if repo.ui.configbool('server', 'preferuncompressed', False):
610 caps.append('stream-preferred')
612 caps.append('stream-preferred')
611 requiredformats = repo.requirements & repo.supportedformats
613 requiredformats = repo.requirements & repo.supportedformats
612 # if our local revlogs are just revlogv1, add 'stream' cap
614 # if our local revlogs are just revlogv1, add 'stream' cap
613 if not requiredformats - set(('revlogv1',)):
615 if not requiredformats - set(('revlogv1',)):
614 caps.append('stream')
616 caps.append('stream')
615 # otherwise, add 'streamreqs' detailing our local revlog format
617 # otherwise, add 'streamreqs' detailing our local revlog format
616 else:
618 else:
617 caps.append('streamreqs=%s' % ','.join(requiredformats))
619 caps.append('streamreqs=%s' % ','.join(requiredformats))
618 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
620 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
619 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
621 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
620 caps.append('bundle2=' + urllib.quote(capsblob))
622 caps.append('bundle2=' + urllib.quote(capsblob))
621 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
623 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
622 caps.append('httpheader=1024')
624 caps.append('httpheader=1024')
623 return caps
625 return caps
624
626
625 # If you are writing an extension and consider wrapping this function. Wrap
627 # If you are writing an extension and consider wrapping this function. Wrap
626 # `_capabilities` instead.
628 # `_capabilities` instead.
627 @wireprotocommand('capabilities')
629 @wireprotocommand('capabilities')
628 def capabilities(repo, proto):
630 def capabilities(repo, proto):
629 return ' '.join(_capabilities(repo, proto))
631 return ' '.join(_capabilities(repo, proto))
630
632
631 @wireprotocommand('changegroup', 'roots')
633 @wireprotocommand('changegroup', 'roots')
632 def changegroup(repo, proto, roots):
634 def changegroup(repo, proto, roots):
633 nodes = decodelist(roots)
635 nodes = decodelist(roots)
634 cg = changegroupmod.changegroup(repo, nodes, 'serve')
636 cg = changegroupmod.changegroup(repo, nodes, 'serve')
635 return streamres(proto.groupchunks(cg))
637 return streamres(proto.groupchunks(cg))
636
638
637 @wireprotocommand('changegroupsubset', 'bases heads')
639 @wireprotocommand('changegroupsubset', 'bases heads')
638 def changegroupsubset(repo, proto, bases, heads):
640 def changegroupsubset(repo, proto, bases, heads):
639 bases = decodelist(bases)
641 bases = decodelist(bases)
640 heads = decodelist(heads)
642 heads = decodelist(heads)
641 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
643 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
642 return streamres(proto.groupchunks(cg))
644 return streamres(proto.groupchunks(cg))
643
645
644 @wireprotocommand('debugwireargs', 'one two *')
646 @wireprotocommand('debugwireargs', 'one two *')
645 def debugwireargs(repo, proto, one, two, others):
647 def debugwireargs(repo, proto, one, two, others):
646 # only accept optional args from the known set
648 # only accept optional args from the known set
647 opts = options('debugwireargs', ['three', 'four'], others)
649 opts = options('debugwireargs', ['three', 'four'], others)
648 return repo.debugwireargs(one, two, **opts)
650 return repo.debugwireargs(one, two, **opts)
649
651
650 # List of options accepted by getbundle.
652 # List of options accepted by getbundle.
651 #
653 #
652 # Meant to be extended by extensions. It is the extension's responsibility to
654 # Meant to be extended by extensions. It is the extension's responsibility to
653 # ensure such options are properly processed in exchange.getbundle.
655 # ensure such options are properly processed in exchange.getbundle.
654 gboptslist = ['heads', 'common', 'bundlecaps']
656 gboptslist = ['heads', 'common', 'bundlecaps']
655
657
656 @wireprotocommand('getbundle', '*')
658 @wireprotocommand('getbundle', '*')
657 def getbundle(repo, proto, others):
659 def getbundle(repo, proto, others):
658 opts = options('getbundle', gboptsmap.keys(), others)
660 opts = options('getbundle', gboptsmap.keys(), others)
659 for k, v in opts.iteritems():
661 for k, v in opts.iteritems():
660 keytype = gboptsmap[k]
662 keytype = gboptsmap[k]
661 if keytype == 'nodes':
663 if keytype == 'nodes':
662 opts[k] = decodelist(v)
664 opts[k] = decodelist(v)
663 elif keytype == 'csv':
665 elif keytype == 'csv':
664 opts[k] = set(v.split(','))
666 opts[k] = set(v.split(','))
665 elif keytype == 'boolean':
667 elif keytype == 'boolean':
666 opts[k] = bool(v)
668 opts[k] = bool(v)
667 elif keytype != 'plain':
669 elif keytype != 'plain':
668 raise KeyError('unknown getbundle option type %s'
670 raise KeyError('unknown getbundle option type %s'
669 % keytype)
671 % keytype)
670 cg = exchange.getbundle(repo, 'serve', **opts)
672 cg = exchange.getbundle(repo, 'serve', **opts)
671 return streamres(proto.groupchunks(cg))
673 return streamres(proto.groupchunks(cg))
672
674
673 @wireprotocommand('heads')
675 @wireprotocommand('heads')
674 def heads(repo, proto):
676 def heads(repo, proto):
675 h = repo.heads()
677 h = repo.heads()
676 return encodelist(h) + "\n"
678 return encodelist(h) + "\n"
677
679
678 @wireprotocommand('hello')
680 @wireprotocommand('hello')
679 def hello(repo, proto):
681 def hello(repo, proto):
680 '''the hello command returns a set of lines describing various
682 '''the hello command returns a set of lines describing various
681 interesting things about the server, in an RFC822-like format.
683 interesting things about the server, in an RFC822-like format.
682 Currently the only one defined is "capabilities", which
684 Currently the only one defined is "capabilities", which
683 consists of a line in the form:
685 consists of a line in the form:
684
686
685 capabilities: space separated list of tokens
687 capabilities: space separated list of tokens
686 '''
688 '''
687 return "capabilities: %s\n" % (capabilities(repo, proto))
689 return "capabilities: %s\n" % (capabilities(repo, proto))
688
690
689 @wireprotocommand('listkeys', 'namespace')
691 @wireprotocommand('listkeys', 'namespace')
690 def listkeys(repo, proto, namespace):
692 def listkeys(repo, proto, namespace):
691 d = repo.listkeys(encoding.tolocal(namespace)).items()
693 d = repo.listkeys(encoding.tolocal(namespace)).items()
692 return pushkeymod.encodekeys(d)
694 return pushkeymod.encodekeys(d)
693
695
694 @wireprotocommand('lookup', 'key')
696 @wireprotocommand('lookup', 'key')
695 def lookup(repo, proto, key):
697 def lookup(repo, proto, key):
696 try:
698 try:
697 k = encoding.tolocal(key)
699 k = encoding.tolocal(key)
698 c = repo[k]
700 c = repo[k]
699 r = c.hex()
701 r = c.hex()
700 success = 1
702 success = 1
701 except Exception, inst:
703 except Exception, inst:
702 r = str(inst)
704 r = str(inst)
703 success = 0
705 success = 0
704 return "%s %s\n" % (success, r)
706 return "%s %s\n" % (success, r)
705
707
706 @wireprotocommand('known', 'nodes *')
708 @wireprotocommand('known', 'nodes *')
707 def known(repo, proto, nodes, others):
709 def known(repo, proto, nodes, others):
708 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
710 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
709
711
710 @wireprotocommand('pushkey', 'namespace key old new')
712 @wireprotocommand('pushkey', 'namespace key old new')
711 def pushkey(repo, proto, namespace, key, old, new):
713 def pushkey(repo, proto, namespace, key, old, new):
712 # compatibility with pre-1.8 clients which were accidentally
714 # compatibility with pre-1.8 clients which were accidentally
713 # sending raw binary nodes rather than utf-8-encoded hex
715 # sending raw binary nodes rather than utf-8-encoded hex
714 if len(new) == 20 and new.encode('string-escape') != new:
716 if len(new) == 20 and new.encode('string-escape') != new:
715 # looks like it could be a binary node
717 # looks like it could be a binary node
716 try:
718 try:
717 new.decode('utf-8')
719 new.decode('utf-8')
718 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
720 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
719 except UnicodeDecodeError:
721 except UnicodeDecodeError:
720 pass # binary, leave unmodified
722 pass # binary, leave unmodified
721 else:
723 else:
722 new = encoding.tolocal(new) # normal path
724 new = encoding.tolocal(new) # normal path
723
725
724 if util.safehasattr(proto, 'restore'):
726 if util.safehasattr(proto, 'restore'):
725
727
726 proto.redirect()
728 proto.redirect()
727
729
728 try:
730 try:
729 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
731 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
730 encoding.tolocal(old), new) or False
732 encoding.tolocal(old), new) or False
731 except util.Abort:
733 except util.Abort:
732 r = False
734 r = False
733
735
734 output = proto.restore()
736 output = proto.restore()
735
737
736 return '%s\n%s' % (int(r), output)
738 return '%s\n%s' % (int(r), output)
737
739
738 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
740 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
739 encoding.tolocal(old), new)
741 encoding.tolocal(old), new)
740 return '%s\n' % int(r)
742 return '%s\n' % int(r)
741
743
742 def _allowstream(ui):
744 def _allowstream(ui):
743 return ui.configbool('server', 'uncompressed', True, untrusted=True)
745 return ui.configbool('server', 'uncompressed', True, untrusted=True)
744
746
745 def _walkstreamfiles(repo):
747 def _walkstreamfiles(repo):
746 # this is it's own function so extensions can override it
748 # this is it's own function so extensions can override it
747 return repo.store.walk()
749 return repo.store.walk()
748
750
749 @wireprotocommand('stream_out')
751 @wireprotocommand('stream_out')
750 def stream(repo, proto):
752 def stream(repo, proto):
751 '''If the server supports streaming clone, it advertises the "stream"
753 '''If the server supports streaming clone, it advertises the "stream"
752 capability with a value representing the version and flags of the repo
754 capability with a value representing the version and flags of the repo
753 it is serving. Client checks to see if it understands the format.
755 it is serving. Client checks to see if it understands the format.
754
756
755 The format is simple: the server writes out a line with the amount
757 The format is simple: the server writes out a line with the amount
756 of files, then the total amount of bytes to be transferred (separated
758 of files, then the total amount of bytes to be transferred (separated
757 by a space). Then, for each file, the server first writes the filename
759 by a space). Then, for each file, the server first writes the filename
758 and file size (separated by the null character), then the file contents.
760 and file size (separated by the null character), then the file contents.
759 '''
761 '''
760
762
761 if not _allowstream(repo.ui):
763 if not _allowstream(repo.ui):
762 return '1\n'
764 return '1\n'
763
765
764 entries = []
766 entries = []
765 total_bytes = 0
767 total_bytes = 0
766 try:
768 try:
767 # get consistent snapshot of repo, lock during scan
769 # get consistent snapshot of repo, lock during scan
768 lock = repo.lock()
770 lock = repo.lock()
769 try:
771 try:
770 repo.ui.debug('scanning\n')
772 repo.ui.debug('scanning\n')
771 for name, ename, size in _walkstreamfiles(repo):
773 for name, ename, size in _walkstreamfiles(repo):
772 if size:
774 if size:
773 entries.append((name, size))
775 entries.append((name, size))
774 total_bytes += size
776 total_bytes += size
775 finally:
777 finally:
776 lock.release()
778 lock.release()
777 except error.LockError:
779 except error.LockError:
778 return '2\n' # error: 2
780 return '2\n' # error: 2
779
781
780 def streamer(repo, entries, total):
782 def streamer(repo, entries, total):
781 '''stream out all metadata files in repository.'''
783 '''stream out all metadata files in repository.'''
782 yield '0\n' # success
784 yield '0\n' # success
783 repo.ui.debug('%d files, %d bytes to transfer\n' %
785 repo.ui.debug('%d files, %d bytes to transfer\n' %
784 (len(entries), total_bytes))
786 (len(entries), total_bytes))
785 yield '%d %d\n' % (len(entries), total_bytes)
787 yield '%d %d\n' % (len(entries), total_bytes)
786
788
787 sopener = repo.svfs
789 sopener = repo.svfs
788 oldaudit = sopener.mustaudit
790 oldaudit = sopener.mustaudit
789 debugflag = repo.ui.debugflag
791 debugflag = repo.ui.debugflag
790 sopener.mustaudit = False
792 sopener.mustaudit = False
791
793
792 try:
794 try:
793 for name, size in entries:
795 for name, size in entries:
794 if debugflag:
796 if debugflag:
795 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
797 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
796 # partially encode name over the wire for backwards compat
798 # partially encode name over the wire for backwards compat
797 yield '%s\0%d\n' % (store.encodedir(name), size)
799 yield '%s\0%d\n' % (store.encodedir(name), size)
798 if size <= 65536:
800 if size <= 65536:
799 fp = sopener(name)
801 fp = sopener(name)
800 try:
802 try:
801 data = fp.read(size)
803 data = fp.read(size)
802 finally:
804 finally:
803 fp.close()
805 fp.close()
804 yield data
806 yield data
805 else:
807 else:
806 for chunk in util.filechunkiter(sopener(name), limit=size):
808 for chunk in util.filechunkiter(sopener(name), limit=size):
807 yield chunk
809 yield chunk
808 # replace with "finally:" when support for python 2.4 has been dropped
810 # replace with "finally:" when support for python 2.4 has been dropped
809 except Exception:
811 except Exception:
810 sopener.mustaudit = oldaudit
812 sopener.mustaudit = oldaudit
811 raise
813 raise
812 sopener.mustaudit = oldaudit
814 sopener.mustaudit = oldaudit
813
815
814 return streamres(streamer(repo, entries, total_bytes))
816 return streamres(streamer(repo, entries, total_bytes))
815
817
816 @wireprotocommand('unbundle', 'heads')
818 @wireprotocommand('unbundle', 'heads')
817 def unbundle(repo, proto, heads):
819 def unbundle(repo, proto, heads):
818 their_heads = decodelist(heads)
820 their_heads = decodelist(heads)
819
821
820 try:
822 try:
821 proto.redirect()
823 proto.redirect()
822
824
823 exchange.check_heads(repo, their_heads, 'preparing changes')
825 exchange.check_heads(repo, their_heads, 'preparing changes')
824
826
825 # write bundle data to temporary file because it can be big
827 # write bundle data to temporary file because it can be big
826 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
828 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
827 fp = os.fdopen(fd, 'wb+')
829 fp = os.fdopen(fd, 'wb+')
828 r = 0
830 r = 0
829 try:
831 try:
830 proto.getfile(fp)
832 proto.getfile(fp)
831 fp.seek(0)
833 fp.seek(0)
832 gen = exchange.readbundle(repo.ui, fp, None)
834 gen = exchange.readbundle(repo.ui, fp, None)
833 r = exchange.unbundle(repo, gen, their_heads, 'serve',
835 r = exchange.unbundle(repo, gen, their_heads, 'serve',
834 proto._client())
836 proto._client())
835 if util.safehasattr(r, 'addpart'):
837 if util.safehasattr(r, 'addpart'):
836 # The return looks streamable, we are in the bundle2 case and
838 # The return looks streamable, we are in the bundle2 case and
837 # should return a stream.
839 # should return a stream.
838 return streamres(r.getchunks())
840 return streamres(r.getchunks())
839 return pushres(r)
841 return pushres(r)
840
842
841 finally:
843 finally:
842 fp.close()
844 fp.close()
843 os.unlink(tempname)
845 os.unlink(tempname)
844
846
845 except (error.BundleValueError, util.Abort, error.PushRaced), exc:
847 except (error.BundleValueError, util.Abort, error.PushRaced), exc:
846 # handle non-bundle2 case first
848 # handle non-bundle2 case first
847 if not getattr(exc, 'duringunbundle2', False):
849 if not getattr(exc, 'duringunbundle2', False):
848 try:
850 try:
849 raise
851 raise
850 except util.Abort:
852 except util.Abort:
851 # The old code we moved used sys.stderr directly.
853 # The old code we moved used sys.stderr directly.
852 # We did not change it to minimise code change.
854 # We did not change it to minimise code change.
853 # This need to be moved to something proper.
855 # This need to be moved to something proper.
854 # Feel free to do it.
856 # Feel free to do it.
855 sys.stderr.write("abort: %s\n" % exc)
857 sys.stderr.write("abort: %s\n" % exc)
856 return pushres(0)
858 return pushres(0)
857 except error.PushRaced:
859 except error.PushRaced:
858 return pusherr(str(exc))
860 return pusherr(str(exc))
859
861
860 bundler = bundle2.bundle20(repo.ui)
862 bundler = bundle2.bundle20(repo.ui)
861 for out in getattr(exc, '_bundle2salvagedoutput', ()):
863 for out in getattr(exc, '_bundle2salvagedoutput', ()):
862 bundler.addpart(out)
864 bundler.addpart(out)
863 try:
865 try:
864 raise
866 raise
865 except error.BundleValueError, exc:
867 except error.BundleValueError, exc:
866 errpart = bundler.newpart('error:unsupportedcontent')
868 errpart = bundler.newpart('error:unsupportedcontent')
867 if exc.parttype is not None:
869 if exc.parttype is not None:
868 errpart.addparam('parttype', exc.parttype)
870 errpart.addparam('parttype', exc.parttype)
869 if exc.params:
871 if exc.params:
870 errpart.addparam('params', '\0'.join(exc.params))
872 errpart.addparam('params', '\0'.join(exc.params))
871 except util.Abort, exc:
873 except util.Abort, exc:
872 manargs = [('message', str(exc))]
874 manargs = [('message', str(exc))]
873 advargs = []
875 advargs = []
874 if exc.hint is not None:
876 if exc.hint is not None:
875 advargs.append(('hint', exc.hint))
877 advargs.append(('hint', exc.hint))
876 bundler.addpart(bundle2.bundlepart('error:abort',
878 bundler.addpart(bundle2.bundlepart('error:abort',
877 manargs, advargs))
879 manargs, advargs))
878 except error.PushRaced, exc:
880 except error.PushRaced, exc:
879 bundler.newpart('error:pushraced', [('message', str(exc))])
881 bundler.newpart('error:pushraced', [('message', str(exc))])
880 return streamres(bundler.getchunks())
882 return streamres(bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now