##// END OF EJS Templates
wireproto: rephrase the error message for unknown getbundle parameters...
Pierre-Yves David -
r21728:0f73ed62 default
parent child Browse files
Show More
@@ -1,863 +1,863 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import urllib, tempfile, os, sys
8 import urllib, tempfile, os, sys
9 from i18n import _
9 from i18n import _
10 from node import bin, hex
10 from node import bin, hex
11 import changegroup as changegroupmod, bundle2, pushkey as pushkeymod
11 import changegroup as changegroupmod, bundle2, pushkey as pushkeymod
12 import peer, error, encoding, util, store, exchange
12 import peer, error, encoding, util, store, exchange
13
13
14
14
15 class abstractserverproto(object):
15 class abstractserverproto(object):
16 """abstract class that summarizes the protocol API
16 """abstract class that summarizes the protocol API
17
17
18 Used as reference and documentation.
18 Used as reference and documentation.
19 """
19 """
20
20
21 def getargs(self, args):
21 def getargs(self, args):
22 """return the value for arguments in <args>
22 """return the value for arguments in <args>
23
23
24 returns a list of values (same order as <args>)"""
24 returns a list of values (same order as <args>)"""
25 raise NotImplementedError()
25 raise NotImplementedError()
26
26
27 def getfile(self, fp):
27 def getfile(self, fp):
28 """write the whole content of a file into a file like object
28 """write the whole content of a file into a file like object
29
29
30 The file is in the form::
30 The file is in the form::
31
31
32 (<chunk-size>\n<chunk>)+0\n
32 (<chunk-size>\n<chunk>)+0\n
33
33
34 chunk size is the ascii version of the int.
34 chunk size is the ascii version of the int.
35 """
35 """
36 raise NotImplementedError()
36 raise NotImplementedError()
37
37
38 def redirect(self):
38 def redirect(self):
39 """may setup interception for stdout and stderr
39 """may setup interception for stdout and stderr
40
40
41 See also the `restore` method."""
41 See also the `restore` method."""
42 raise NotImplementedError()
42 raise NotImplementedError()
43
43
44 # If the `redirect` function does install interception, the `restore`
44 # If the `redirect` function does install interception, the `restore`
45 # function MUST be defined. If interception is not used, this function
45 # function MUST be defined. If interception is not used, this function
46 # MUST NOT be defined.
46 # MUST NOT be defined.
47 #
47 #
48 # left commented here on purpose
48 # left commented here on purpose
49 #
49 #
50 #def restore(self):
50 #def restore(self):
51 # """reinstall previous stdout and stderr and return intercepted stdout
51 # """reinstall previous stdout and stderr and return intercepted stdout
52 # """
52 # """
53 # raise NotImplementedError()
53 # raise NotImplementedError()
54
54
55 def groupchunks(self, cg):
55 def groupchunks(self, cg):
56 """return 4096 chunks from a changegroup object
56 """return 4096 chunks from a changegroup object
57
57
58 Some protocols may have compressed the contents."""
58 Some protocols may have compressed the contents."""
59 raise NotImplementedError()
59 raise NotImplementedError()
60
60
61 # abstract batching support
61 # abstract batching support
62
62
63 class future(object):
63 class future(object):
64 '''placeholder for a value to be set later'''
64 '''placeholder for a value to be set later'''
65 def set(self, value):
65 def set(self, value):
66 if util.safehasattr(self, 'value'):
66 if util.safehasattr(self, 'value'):
67 raise error.RepoError("future is already set")
67 raise error.RepoError("future is already set")
68 self.value = value
68 self.value = value
69
69
70 class batcher(object):
70 class batcher(object):
71 '''base class for batches of commands submittable in a single request
71 '''base class for batches of commands submittable in a single request
72
72
73 All methods invoked on instances of this class are simply queued and
73 All methods invoked on instances of this class are simply queued and
74 return a a future for the result. Once you call submit(), all the queued
74 return a a future for the result. Once you call submit(), all the queued
75 calls are performed and the results set in their respective futures.
75 calls are performed and the results set in their respective futures.
76 '''
76 '''
77 def __init__(self):
77 def __init__(self):
78 self.calls = []
78 self.calls = []
79 def __getattr__(self, name):
79 def __getattr__(self, name):
80 def call(*args, **opts):
80 def call(*args, **opts):
81 resref = future()
81 resref = future()
82 self.calls.append((name, args, opts, resref,))
82 self.calls.append((name, args, opts, resref,))
83 return resref
83 return resref
84 return call
84 return call
85 def submit(self):
85 def submit(self):
86 pass
86 pass
87
87
88 class localbatch(batcher):
88 class localbatch(batcher):
89 '''performs the queued calls directly'''
89 '''performs the queued calls directly'''
90 def __init__(self, local):
90 def __init__(self, local):
91 batcher.__init__(self)
91 batcher.__init__(self)
92 self.local = local
92 self.local = local
93 def submit(self):
93 def submit(self):
94 for name, args, opts, resref in self.calls:
94 for name, args, opts, resref in self.calls:
95 resref.set(getattr(self.local, name)(*args, **opts))
95 resref.set(getattr(self.local, name)(*args, **opts))
96
96
97 class remotebatch(batcher):
97 class remotebatch(batcher):
98 '''batches the queued calls; uses as few roundtrips as possible'''
98 '''batches the queued calls; uses as few roundtrips as possible'''
99 def __init__(self, remote):
99 def __init__(self, remote):
100 '''remote must support _submitbatch(encbatch) and
100 '''remote must support _submitbatch(encbatch) and
101 _submitone(op, encargs)'''
101 _submitone(op, encargs)'''
102 batcher.__init__(self)
102 batcher.__init__(self)
103 self.remote = remote
103 self.remote = remote
104 def submit(self):
104 def submit(self):
105 req, rsp = [], []
105 req, rsp = [], []
106 for name, args, opts, resref in self.calls:
106 for name, args, opts, resref in self.calls:
107 mtd = getattr(self.remote, name)
107 mtd = getattr(self.remote, name)
108 batchablefn = getattr(mtd, 'batchable', None)
108 batchablefn = getattr(mtd, 'batchable', None)
109 if batchablefn is not None:
109 if batchablefn is not None:
110 batchable = batchablefn(mtd.im_self, *args, **opts)
110 batchable = batchablefn(mtd.im_self, *args, **opts)
111 encargsorres, encresref = batchable.next()
111 encargsorres, encresref = batchable.next()
112 if encresref:
112 if encresref:
113 req.append((name, encargsorres,))
113 req.append((name, encargsorres,))
114 rsp.append((batchable, encresref, resref,))
114 rsp.append((batchable, encresref, resref,))
115 else:
115 else:
116 resref.set(encargsorres)
116 resref.set(encargsorres)
117 else:
117 else:
118 if req:
118 if req:
119 self._submitreq(req, rsp)
119 self._submitreq(req, rsp)
120 req, rsp = [], []
120 req, rsp = [], []
121 resref.set(mtd(*args, **opts))
121 resref.set(mtd(*args, **opts))
122 if req:
122 if req:
123 self._submitreq(req, rsp)
123 self._submitreq(req, rsp)
124 def _submitreq(self, req, rsp):
124 def _submitreq(self, req, rsp):
125 encresults = self.remote._submitbatch(req)
125 encresults = self.remote._submitbatch(req)
126 for encres, r in zip(encresults, rsp):
126 for encres, r in zip(encresults, rsp):
127 batchable, encresref, resref = r
127 batchable, encresref, resref = r
128 encresref.set(encres)
128 encresref.set(encres)
129 resref.set(batchable.next())
129 resref.set(batchable.next())
130
130
131 def batchable(f):
131 def batchable(f):
132 '''annotation for batchable methods
132 '''annotation for batchable methods
133
133
134 Such methods must implement a coroutine as follows:
134 Such methods must implement a coroutine as follows:
135
135
136 @batchable
136 @batchable
137 def sample(self, one, two=None):
137 def sample(self, one, two=None):
138 # Handle locally computable results first:
138 # Handle locally computable results first:
139 if not one:
139 if not one:
140 yield "a local result", None
140 yield "a local result", None
141 # Build list of encoded arguments suitable for your wire protocol:
141 # Build list of encoded arguments suitable for your wire protocol:
142 encargs = [('one', encode(one),), ('two', encode(two),)]
142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 # Create future for injection of encoded result:
143 # Create future for injection of encoded result:
144 encresref = future()
144 encresref = future()
145 # Return encoded arguments and future:
145 # Return encoded arguments and future:
146 yield encargs, encresref
146 yield encargs, encresref
147 # Assuming the future to be filled with the result from the batched
147 # Assuming the future to be filled with the result from the batched
148 # request now. Decode it:
148 # request now. Decode it:
149 yield decode(encresref.value)
149 yield decode(encresref.value)
150
150
151 The decorator returns a function which wraps this coroutine as a plain
151 The decorator returns a function which wraps this coroutine as a plain
152 method, but adds the original method as an attribute called "batchable",
152 method, but adds the original method as an attribute called "batchable",
153 which is used by remotebatch to split the call into separate encoding and
153 which is used by remotebatch to split the call into separate encoding and
154 decoding phases.
154 decoding phases.
155 '''
155 '''
156 def plain(*args, **opts):
156 def plain(*args, **opts):
157 batchable = f(*args, **opts)
157 batchable = f(*args, **opts)
158 encargsorres, encresref = batchable.next()
158 encargsorres, encresref = batchable.next()
159 if not encresref:
159 if not encresref:
160 return encargsorres # a local result in this case
160 return encargsorres # a local result in this case
161 self = args[0]
161 self = args[0]
162 encresref.set(self._submitone(f.func_name, encargsorres))
162 encresref.set(self._submitone(f.func_name, encargsorres))
163 return batchable.next()
163 return batchable.next()
164 setattr(plain, 'batchable', f)
164 setattr(plain, 'batchable', f)
165 return plain
165 return plain
166
166
167 # list of nodes encoding / decoding
167 # list of nodes encoding / decoding
168
168
169 def decodelist(l, sep=' '):
169 def decodelist(l, sep=' '):
170 if l:
170 if l:
171 return map(bin, l.split(sep))
171 return map(bin, l.split(sep))
172 return []
172 return []
173
173
174 def encodelist(l, sep=' '):
174 def encodelist(l, sep=' '):
175 return sep.join(map(hex, l))
175 return sep.join(map(hex, l))
176
176
177 # batched call argument encoding
177 # batched call argument encoding
178
178
179 def escapearg(plain):
179 def escapearg(plain):
180 return (plain
180 return (plain
181 .replace(':', '::')
181 .replace(':', '::')
182 .replace(',', ':,')
182 .replace(',', ':,')
183 .replace(';', ':;')
183 .replace(';', ':;')
184 .replace('=', ':='))
184 .replace('=', ':='))
185
185
186 def unescapearg(escaped):
186 def unescapearg(escaped):
187 return (escaped
187 return (escaped
188 .replace(':=', '=')
188 .replace(':=', '=')
189 .replace(':;', ';')
189 .replace(':;', ';')
190 .replace(':,', ',')
190 .replace(':,', ',')
191 .replace('::', ':'))
191 .replace('::', ':'))
192
192
193 # mapping of options accepted by getbundle and their types
193 # mapping of options accepted by getbundle and their types
194 #
194 #
195 # Meant to be extended by extensions. It is extensions responsibility to ensure
195 # Meant to be extended by extensions. It is extensions responsibility to ensure
196 # such options are properly processed in exchange.getbundle.
196 # such options are properly processed in exchange.getbundle.
197 #
197 #
198 # supported types are:
198 # supported types are:
199 #
199 #
200 # :nodes: list of binary nodes
200 # :nodes: list of binary nodes
201 # :csv: list of comma-separated values
201 # :csv: list of comma-separated values
202 # :plain: string with no transformation needed.
202 # :plain: string with no transformation needed.
203 gboptsmap = {'heads': 'nodes',
203 gboptsmap = {'heads': 'nodes',
204 'common': 'nodes',
204 'common': 'nodes',
205 'bundlecaps': 'csv',
205 'bundlecaps': 'csv',
206 'listkeys': 'csv'}
206 'listkeys': 'csv'}
207
207
208 # client side
208 # client side
209
209
210 class wirepeer(peer.peerrepository):
210 class wirepeer(peer.peerrepository):
211
211
212 def batch(self):
212 def batch(self):
213 return remotebatch(self)
213 return remotebatch(self)
214 def _submitbatch(self, req):
214 def _submitbatch(self, req):
215 cmds = []
215 cmds = []
216 for op, argsdict in req:
216 for op, argsdict in req:
217 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
217 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
218 cmds.append('%s %s' % (op, args))
218 cmds.append('%s %s' % (op, args))
219 rsp = self._call("batch", cmds=';'.join(cmds))
219 rsp = self._call("batch", cmds=';'.join(cmds))
220 return rsp.split(';')
220 return rsp.split(';')
221 def _submitone(self, op, args):
221 def _submitone(self, op, args):
222 return self._call(op, **args)
222 return self._call(op, **args)
223
223
224 @batchable
224 @batchable
225 def lookup(self, key):
225 def lookup(self, key):
226 self.requirecap('lookup', _('look up remote revision'))
226 self.requirecap('lookup', _('look up remote revision'))
227 f = future()
227 f = future()
228 yield {'key': encoding.fromlocal(key)}, f
228 yield {'key': encoding.fromlocal(key)}, f
229 d = f.value
229 d = f.value
230 success, data = d[:-1].split(" ", 1)
230 success, data = d[:-1].split(" ", 1)
231 if int(success):
231 if int(success):
232 yield bin(data)
232 yield bin(data)
233 self._abort(error.RepoError(data))
233 self._abort(error.RepoError(data))
234
234
235 @batchable
235 @batchable
236 def heads(self):
236 def heads(self):
237 f = future()
237 f = future()
238 yield {}, f
238 yield {}, f
239 d = f.value
239 d = f.value
240 try:
240 try:
241 yield decodelist(d[:-1])
241 yield decodelist(d[:-1])
242 except ValueError:
242 except ValueError:
243 self._abort(error.ResponseError(_("unexpected response:"), d))
243 self._abort(error.ResponseError(_("unexpected response:"), d))
244
244
245 @batchable
245 @batchable
246 def known(self, nodes):
246 def known(self, nodes):
247 f = future()
247 f = future()
248 yield {'nodes': encodelist(nodes)}, f
248 yield {'nodes': encodelist(nodes)}, f
249 d = f.value
249 d = f.value
250 try:
250 try:
251 yield [bool(int(f)) for f in d]
251 yield [bool(int(f)) for f in d]
252 except ValueError:
252 except ValueError:
253 self._abort(error.ResponseError(_("unexpected response:"), d))
253 self._abort(error.ResponseError(_("unexpected response:"), d))
254
254
255 @batchable
255 @batchable
256 def branchmap(self):
256 def branchmap(self):
257 f = future()
257 f = future()
258 yield {}, f
258 yield {}, f
259 d = f.value
259 d = f.value
260 try:
260 try:
261 branchmap = {}
261 branchmap = {}
262 for branchpart in d.splitlines():
262 for branchpart in d.splitlines():
263 branchname, branchheads = branchpart.split(' ', 1)
263 branchname, branchheads = branchpart.split(' ', 1)
264 branchname = encoding.tolocal(urllib.unquote(branchname))
264 branchname = encoding.tolocal(urllib.unquote(branchname))
265 branchheads = decodelist(branchheads)
265 branchheads = decodelist(branchheads)
266 branchmap[branchname] = branchheads
266 branchmap[branchname] = branchheads
267 yield branchmap
267 yield branchmap
268 except TypeError:
268 except TypeError:
269 self._abort(error.ResponseError(_("unexpected response:"), d))
269 self._abort(error.ResponseError(_("unexpected response:"), d))
270
270
271 def branches(self, nodes):
271 def branches(self, nodes):
272 n = encodelist(nodes)
272 n = encodelist(nodes)
273 d = self._call("branches", nodes=n)
273 d = self._call("branches", nodes=n)
274 try:
274 try:
275 br = [tuple(decodelist(b)) for b in d.splitlines()]
275 br = [tuple(decodelist(b)) for b in d.splitlines()]
276 return br
276 return br
277 except ValueError:
277 except ValueError:
278 self._abort(error.ResponseError(_("unexpected response:"), d))
278 self._abort(error.ResponseError(_("unexpected response:"), d))
279
279
280 def between(self, pairs):
280 def between(self, pairs):
281 batch = 8 # avoid giant requests
281 batch = 8 # avoid giant requests
282 r = []
282 r = []
283 for i in xrange(0, len(pairs), batch):
283 for i in xrange(0, len(pairs), batch):
284 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
284 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
285 d = self._call("between", pairs=n)
285 d = self._call("between", pairs=n)
286 try:
286 try:
287 r.extend(l and decodelist(l) or [] for l in d.splitlines())
287 r.extend(l and decodelist(l) or [] for l in d.splitlines())
288 except ValueError:
288 except ValueError:
289 self._abort(error.ResponseError(_("unexpected response:"), d))
289 self._abort(error.ResponseError(_("unexpected response:"), d))
290 return r
290 return r
291
291
292 @batchable
292 @batchable
293 def pushkey(self, namespace, key, old, new):
293 def pushkey(self, namespace, key, old, new):
294 if not self.capable('pushkey'):
294 if not self.capable('pushkey'):
295 yield False, None
295 yield False, None
296 f = future()
296 f = future()
297 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
297 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
298 yield {'namespace': encoding.fromlocal(namespace),
298 yield {'namespace': encoding.fromlocal(namespace),
299 'key': encoding.fromlocal(key),
299 'key': encoding.fromlocal(key),
300 'old': encoding.fromlocal(old),
300 'old': encoding.fromlocal(old),
301 'new': encoding.fromlocal(new)}, f
301 'new': encoding.fromlocal(new)}, f
302 d = f.value
302 d = f.value
303 d, output = d.split('\n', 1)
303 d, output = d.split('\n', 1)
304 try:
304 try:
305 d = bool(int(d))
305 d = bool(int(d))
306 except ValueError:
306 except ValueError:
307 raise error.ResponseError(
307 raise error.ResponseError(
308 _('push failed (unexpected response):'), d)
308 _('push failed (unexpected response):'), d)
309 for l in output.splitlines(True):
309 for l in output.splitlines(True):
310 self.ui.status(_('remote: '), l)
310 self.ui.status(_('remote: '), l)
311 yield d
311 yield d
312
312
313 @batchable
313 @batchable
314 def listkeys(self, namespace):
314 def listkeys(self, namespace):
315 if not self.capable('pushkey'):
315 if not self.capable('pushkey'):
316 yield {}, None
316 yield {}, None
317 f = future()
317 f = future()
318 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
318 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
319 yield {'namespace': encoding.fromlocal(namespace)}, f
319 yield {'namespace': encoding.fromlocal(namespace)}, f
320 d = f.value
320 d = f.value
321 yield pushkeymod.decodekeys(d)
321 yield pushkeymod.decodekeys(d)
322
322
323 def stream_out(self):
323 def stream_out(self):
324 return self._callstream('stream_out')
324 return self._callstream('stream_out')
325
325
326 def changegroup(self, nodes, kind):
326 def changegroup(self, nodes, kind):
327 n = encodelist(nodes)
327 n = encodelist(nodes)
328 f = self._callcompressable("changegroup", roots=n)
328 f = self._callcompressable("changegroup", roots=n)
329 return changegroupmod.unbundle10(f, 'UN')
329 return changegroupmod.unbundle10(f, 'UN')
330
330
331 def changegroupsubset(self, bases, heads, kind):
331 def changegroupsubset(self, bases, heads, kind):
332 self.requirecap('changegroupsubset', _('look up remote changes'))
332 self.requirecap('changegroupsubset', _('look up remote changes'))
333 bases = encodelist(bases)
333 bases = encodelist(bases)
334 heads = encodelist(heads)
334 heads = encodelist(heads)
335 f = self._callcompressable("changegroupsubset",
335 f = self._callcompressable("changegroupsubset",
336 bases=bases, heads=heads)
336 bases=bases, heads=heads)
337 return changegroupmod.unbundle10(f, 'UN')
337 return changegroupmod.unbundle10(f, 'UN')
338
338
339 def getbundle(self, source, **kwargs):
339 def getbundle(self, source, **kwargs):
340 self.requirecap('getbundle', _('look up remote changes'))
340 self.requirecap('getbundle', _('look up remote changes'))
341 opts = {}
341 opts = {}
342 for key, value in kwargs.iteritems():
342 for key, value in kwargs.iteritems():
343 if value is None:
343 if value is None:
344 continue
344 continue
345 keytype = gboptsmap.get(key)
345 keytype = gboptsmap.get(key)
346 if keytype is None:
346 if keytype is None:
347 assert False, 'unexpected'
347 assert False, 'unexpected'
348 elif keytype == 'nodes':
348 elif keytype == 'nodes':
349 value = encodelist(value)
349 value = encodelist(value)
350 elif keytype == 'csv':
350 elif keytype == 'csv':
351 value = ','.join(value)
351 value = ','.join(value)
352 elif keytype != 'plain':
352 elif keytype != 'plain':
353 raise KeyError('unknown getbundle option type %s'
353 raise KeyError('unknown getbundle option type %s'
354 % keytype)
354 % keytype)
355 opts[key] = value
355 opts[key] = value
356 f = self._callcompressable("getbundle", **opts)
356 f = self._callcompressable("getbundle", **opts)
357 bundlecaps = kwargs.get('bundlecaps')
357 bundlecaps = kwargs.get('bundlecaps')
358 if bundlecaps is not None and 'HG2X' in bundlecaps:
358 if bundlecaps is not None and 'HG2X' in bundlecaps:
359 return bundle2.unbundle20(self.ui, f)
359 return bundle2.unbundle20(self.ui, f)
360 else:
360 else:
361 return changegroupmod.unbundle10(f, 'UN')
361 return changegroupmod.unbundle10(f, 'UN')
362
362
363 def unbundle(self, cg, heads, source):
363 def unbundle(self, cg, heads, source):
364 '''Send cg (a readable file-like object representing the
364 '''Send cg (a readable file-like object representing the
365 changegroup to push, typically a chunkbuffer object) to the
365 changegroup to push, typically a chunkbuffer object) to the
366 remote server as a bundle.
366 remote server as a bundle.
367
367
368 When pushing a bundle10 stream, return an integer indicating the
368 When pushing a bundle10 stream, return an integer indicating the
369 result of the push (see localrepository.addchangegroup()).
369 result of the push (see localrepository.addchangegroup()).
370
370
371 When pushing a bundle20 stream, return a bundle20 stream.'''
371 When pushing a bundle20 stream, return a bundle20 stream.'''
372
372
373 if heads != ['force'] and self.capable('unbundlehash'):
373 if heads != ['force'] and self.capable('unbundlehash'):
374 heads = encodelist(['hashed',
374 heads = encodelist(['hashed',
375 util.sha1(''.join(sorted(heads))).digest()])
375 util.sha1(''.join(sorted(heads))).digest()])
376 else:
376 else:
377 heads = encodelist(heads)
377 heads = encodelist(heads)
378
378
379 if util.safehasattr(cg, 'deltaheader'):
379 if util.safehasattr(cg, 'deltaheader'):
380 # this a bundle10, do the old style call sequence
380 # this a bundle10, do the old style call sequence
381 ret, output = self._callpush("unbundle", cg, heads=heads)
381 ret, output = self._callpush("unbundle", cg, heads=heads)
382 if ret == "":
382 if ret == "":
383 raise error.ResponseError(
383 raise error.ResponseError(
384 _('push failed:'), output)
384 _('push failed:'), output)
385 try:
385 try:
386 ret = int(ret)
386 ret = int(ret)
387 except ValueError:
387 except ValueError:
388 raise error.ResponseError(
388 raise error.ResponseError(
389 _('push failed (unexpected response):'), ret)
389 _('push failed (unexpected response):'), ret)
390
390
391 for l in output.splitlines(True):
391 for l in output.splitlines(True):
392 self.ui.status(_('remote: '), l)
392 self.ui.status(_('remote: '), l)
393 else:
393 else:
394 # bundle2 push. Send a stream, fetch a stream.
394 # bundle2 push. Send a stream, fetch a stream.
395 stream = self._calltwowaystream('unbundle', cg, heads=heads)
395 stream = self._calltwowaystream('unbundle', cg, heads=heads)
396 ret = bundle2.unbundle20(self.ui, stream)
396 ret = bundle2.unbundle20(self.ui, stream)
397 return ret
397 return ret
398
398
399 def debugwireargs(self, one, two, three=None, four=None, five=None):
399 def debugwireargs(self, one, two, three=None, four=None, five=None):
400 # don't pass optional arguments left at their default value
400 # don't pass optional arguments left at their default value
401 opts = {}
401 opts = {}
402 if three is not None:
402 if three is not None:
403 opts['three'] = three
403 opts['three'] = three
404 if four is not None:
404 if four is not None:
405 opts['four'] = four
405 opts['four'] = four
406 return self._call('debugwireargs', one=one, two=two, **opts)
406 return self._call('debugwireargs', one=one, two=two, **opts)
407
407
408 def _call(self, cmd, **args):
408 def _call(self, cmd, **args):
409 """execute <cmd> on the server
409 """execute <cmd> on the server
410
410
411 The command is expected to return a simple string.
411 The command is expected to return a simple string.
412
412
413 returns the server reply as a string."""
413 returns the server reply as a string."""
414 raise NotImplementedError()
414 raise NotImplementedError()
415
415
416 def _callstream(self, cmd, **args):
416 def _callstream(self, cmd, **args):
417 """execute <cmd> on the server
417 """execute <cmd> on the server
418
418
419 The command is expected to return a stream.
419 The command is expected to return a stream.
420
420
421 returns the server reply as a file like object."""
421 returns the server reply as a file like object."""
422 raise NotImplementedError()
422 raise NotImplementedError()
423
423
424 def _callcompressable(self, cmd, **args):
424 def _callcompressable(self, cmd, **args):
425 """execute <cmd> on the server
425 """execute <cmd> on the server
426
426
427 The command is expected to return a stream.
427 The command is expected to return a stream.
428
428
429 The stream may have been compressed in some implementations. This
429 The stream may have been compressed in some implementations. This
430 function takes care of the decompression. This is the only difference
430 function takes care of the decompression. This is the only difference
431 with _callstream.
431 with _callstream.
432
432
433 returns the server reply as a file like object.
433 returns the server reply as a file like object.
434 """
434 """
435 raise NotImplementedError()
435 raise NotImplementedError()
436
436
437 def _callpush(self, cmd, fp, **args):
437 def _callpush(self, cmd, fp, **args):
438 """execute a <cmd> on server
438 """execute a <cmd> on server
439
439
440 The command is expected to be related to a push. Push has a special
440 The command is expected to be related to a push. Push has a special
441 return method.
441 return method.
442
442
443 returns the server reply as a (ret, output) tuple. ret is either
443 returns the server reply as a (ret, output) tuple. ret is either
444 empty (error) or a stringified int.
444 empty (error) or a stringified int.
445 """
445 """
446 raise NotImplementedError()
446 raise NotImplementedError()
447
447
448 def _calltwowaystream(self, cmd, fp, **args):
448 def _calltwowaystream(self, cmd, fp, **args):
449 """execute <cmd> on server
449 """execute <cmd> on server
450
450
451 The command will send a stream to the server and get a stream in reply.
451 The command will send a stream to the server and get a stream in reply.
452 """
452 """
453 raise NotImplementedError()
453 raise NotImplementedError()
454
454
455 def _abort(self, exception):
455 def _abort(self, exception):
456 """clearly abort the wire protocol connection and raise the exception
456 """clearly abort the wire protocol connection and raise the exception
457 """
457 """
458 raise NotImplementedError()
458 raise NotImplementedError()
459
459
460 # server side
460 # server side
461
461
462 # wire protocol command can either return a string or one of these classes.
462 # wire protocol command can either return a string or one of these classes.
463 class streamres(object):
463 class streamres(object):
464 """wireproto reply: binary stream
464 """wireproto reply: binary stream
465
465
466 The call was successful and the result is a stream.
466 The call was successful and the result is a stream.
467 Iterate on the `self.gen` attribute to retrieve chunks.
467 Iterate on the `self.gen` attribute to retrieve chunks.
468 """
468 """
469 def __init__(self, gen):
469 def __init__(self, gen):
470 self.gen = gen
470 self.gen = gen
471
471
472 class pushres(object):
472 class pushres(object):
473 """wireproto reply: success with simple integer return
473 """wireproto reply: success with simple integer return
474
474
475 The call was successful and returned an integer contained in `self.res`.
475 The call was successful and returned an integer contained in `self.res`.
476 """
476 """
477 def __init__(self, res):
477 def __init__(self, res):
478 self.res = res
478 self.res = res
479
479
480 class pusherr(object):
480 class pusherr(object):
481 """wireproto reply: failure
481 """wireproto reply: failure
482
482
483 The call failed. The `self.res` attribute contains the error message.
483 The call failed. The `self.res` attribute contains the error message.
484 """
484 """
485 def __init__(self, res):
485 def __init__(self, res):
486 self.res = res
486 self.res = res
487
487
488 class ooberror(object):
488 class ooberror(object):
489 """wireproto reply: failure of a batch of operation
489 """wireproto reply: failure of a batch of operation
490
490
491 Something failed during a batch call. The error message is stored in
491 Something failed during a batch call. The error message is stored in
492 `self.message`.
492 `self.message`.
493 """
493 """
494 def __init__(self, message):
494 def __init__(self, message):
495 self.message = message
495 self.message = message
496
496
497 def dispatch(repo, proto, command):
497 def dispatch(repo, proto, command):
498 repo = repo.filtered("served")
498 repo = repo.filtered("served")
499 func, spec = commands[command]
499 func, spec = commands[command]
500 args = proto.getargs(spec)
500 args = proto.getargs(spec)
501 return func(repo, proto, *args)
501 return func(repo, proto, *args)
502
502
503 def options(cmd, keys, others):
503 def options(cmd, keys, others):
504 opts = {}
504 opts = {}
505 for k in keys:
505 for k in keys:
506 if k in others:
506 if k in others:
507 opts[k] = others[k]
507 opts[k] = others[k]
508 del others[k]
508 del others[k]
509 if others:
509 if others:
510 sys.stderr.write("abort: %s got unexpected arguments %s\n"
510 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
511 % (cmd, ",".join(others)))
511 % (cmd, ",".join(others)))
512 return opts
512 return opts
513
513
514 # list of commands
514 # list of commands
515 commands = {}
515 commands = {}
516
516
517 def wireprotocommand(name, args=''):
517 def wireprotocommand(name, args=''):
518 """decorator for wire protocol command"""
518 """decorator for wire protocol command"""
519 def register(func):
519 def register(func):
520 commands[name] = (func, args)
520 commands[name] = (func, args)
521 return func
521 return func
522 return register
522 return register
523
523
524 @wireprotocommand('batch', 'cmds *')
524 @wireprotocommand('batch', 'cmds *')
525 def batch(repo, proto, cmds, others):
525 def batch(repo, proto, cmds, others):
526 repo = repo.filtered("served")
526 repo = repo.filtered("served")
527 res = []
527 res = []
528 for pair in cmds.split(';'):
528 for pair in cmds.split(';'):
529 op, args = pair.split(' ', 1)
529 op, args = pair.split(' ', 1)
530 vals = {}
530 vals = {}
531 for a in args.split(','):
531 for a in args.split(','):
532 if a:
532 if a:
533 n, v = a.split('=')
533 n, v = a.split('=')
534 vals[n] = unescapearg(v)
534 vals[n] = unescapearg(v)
535 func, spec = commands[op]
535 func, spec = commands[op]
536 if spec:
536 if spec:
537 keys = spec.split()
537 keys = spec.split()
538 data = {}
538 data = {}
539 for k in keys:
539 for k in keys:
540 if k == '*':
540 if k == '*':
541 star = {}
541 star = {}
542 for key in vals.keys():
542 for key in vals.keys():
543 if key not in keys:
543 if key not in keys:
544 star[key] = vals[key]
544 star[key] = vals[key]
545 data['*'] = star
545 data['*'] = star
546 else:
546 else:
547 data[k] = vals[k]
547 data[k] = vals[k]
548 result = func(repo, proto, *[data[k] for k in keys])
548 result = func(repo, proto, *[data[k] for k in keys])
549 else:
549 else:
550 result = func(repo, proto)
550 result = func(repo, proto)
551 if isinstance(result, ooberror):
551 if isinstance(result, ooberror):
552 return result
552 return result
553 res.append(escapearg(result))
553 res.append(escapearg(result))
554 return ';'.join(res)
554 return ';'.join(res)
555
555
556 @wireprotocommand('between', 'pairs')
556 @wireprotocommand('between', 'pairs')
557 def between(repo, proto, pairs):
557 def between(repo, proto, pairs):
558 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
558 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
559 r = []
559 r = []
560 for b in repo.between(pairs):
560 for b in repo.between(pairs):
561 r.append(encodelist(b) + "\n")
561 r.append(encodelist(b) + "\n")
562 return "".join(r)
562 return "".join(r)
563
563
564 @wireprotocommand('branchmap')
564 @wireprotocommand('branchmap')
565 def branchmap(repo, proto):
565 def branchmap(repo, proto):
566 branchmap = repo.branchmap()
566 branchmap = repo.branchmap()
567 heads = []
567 heads = []
568 for branch, nodes in branchmap.iteritems():
568 for branch, nodes in branchmap.iteritems():
569 branchname = urllib.quote(encoding.fromlocal(branch))
569 branchname = urllib.quote(encoding.fromlocal(branch))
570 branchnodes = encodelist(nodes)
570 branchnodes = encodelist(nodes)
571 heads.append('%s %s' % (branchname, branchnodes))
571 heads.append('%s %s' % (branchname, branchnodes))
572 return '\n'.join(heads)
572 return '\n'.join(heads)
573
573
574 @wireprotocommand('branches', 'nodes')
574 @wireprotocommand('branches', 'nodes')
575 def branches(repo, proto, nodes):
575 def branches(repo, proto, nodes):
576 nodes = decodelist(nodes)
576 nodes = decodelist(nodes)
577 r = []
577 r = []
578 for b in repo.branches(nodes):
578 for b in repo.branches(nodes):
579 r.append(encodelist(b) + "\n")
579 r.append(encodelist(b) + "\n")
580 return "".join(r)
580 return "".join(r)
581
581
582
582
583 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
583 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
584 'known', 'getbundle', 'unbundlehash', 'batch']
584 'known', 'getbundle', 'unbundlehash', 'batch']
585
585
586 def _capabilities(repo, proto):
586 def _capabilities(repo, proto):
587 """return a list of capabilities for a repo
587 """return a list of capabilities for a repo
588
588
589 This function exists to allow extensions to easily wrap capabilities
589 This function exists to allow extensions to easily wrap capabilities
590 computation
590 computation
591
591
592 - returns a lists: easy to alter
592 - returns a lists: easy to alter
593 - change done here will be propagated to both `capabilities` and `hello`
593 - change done here will be propagated to both `capabilities` and `hello`
594 command without any other action needed.
594 command without any other action needed.
595 """
595 """
596 # copy to prevent modification of the global list
596 # copy to prevent modification of the global list
597 caps = list(wireprotocaps)
597 caps = list(wireprotocaps)
598 if _allowstream(repo.ui):
598 if _allowstream(repo.ui):
599 if repo.ui.configbool('server', 'preferuncompressed', False):
599 if repo.ui.configbool('server', 'preferuncompressed', False):
600 caps.append('stream-preferred')
600 caps.append('stream-preferred')
601 requiredformats = repo.requirements & repo.supportedformats
601 requiredformats = repo.requirements & repo.supportedformats
602 # if our local revlogs are just revlogv1, add 'stream' cap
602 # if our local revlogs are just revlogv1, add 'stream' cap
603 if not requiredformats - set(('revlogv1',)):
603 if not requiredformats - set(('revlogv1',)):
604 caps.append('stream')
604 caps.append('stream')
605 # otherwise, add 'streamreqs' detailing our local revlog format
605 # otherwise, add 'streamreqs' detailing our local revlog format
606 else:
606 else:
607 caps.append('streamreqs=%s' % ','.join(requiredformats))
607 caps.append('streamreqs=%s' % ','.join(requiredformats))
608 if repo.ui.configbool('experimental', 'bundle2-exp', False):
608 if repo.ui.configbool('experimental', 'bundle2-exp', False):
609 capsblob = bundle2.encodecaps(repo.bundle2caps)
609 capsblob = bundle2.encodecaps(repo.bundle2caps)
610 caps.append('bundle2-exp=' + urllib.quote(capsblob))
610 caps.append('bundle2-exp=' + urllib.quote(capsblob))
611 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
611 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
612 caps.append('httpheader=1024')
612 caps.append('httpheader=1024')
613 return caps
613 return caps
614
614
615 # If you are writing an extension and consider wrapping this function. Wrap
615 # If you are writing an extension and consider wrapping this function. Wrap
616 # `_capabilities` instead.
616 # `_capabilities` instead.
617 @wireprotocommand('capabilities')
617 @wireprotocommand('capabilities')
618 def capabilities(repo, proto):
618 def capabilities(repo, proto):
619 return ' '.join(_capabilities(repo, proto))
619 return ' '.join(_capabilities(repo, proto))
620
620
621 @wireprotocommand('changegroup', 'roots')
621 @wireprotocommand('changegroup', 'roots')
622 def changegroup(repo, proto, roots):
622 def changegroup(repo, proto, roots):
623 nodes = decodelist(roots)
623 nodes = decodelist(roots)
624 cg = changegroupmod.changegroup(repo, nodes, 'serve')
624 cg = changegroupmod.changegroup(repo, nodes, 'serve')
625 return streamres(proto.groupchunks(cg))
625 return streamres(proto.groupchunks(cg))
626
626
627 @wireprotocommand('changegroupsubset', 'bases heads')
627 @wireprotocommand('changegroupsubset', 'bases heads')
628 def changegroupsubset(repo, proto, bases, heads):
628 def changegroupsubset(repo, proto, bases, heads):
629 bases = decodelist(bases)
629 bases = decodelist(bases)
630 heads = decodelist(heads)
630 heads = decodelist(heads)
631 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
631 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
632 return streamres(proto.groupchunks(cg))
632 return streamres(proto.groupchunks(cg))
633
633
634 @wireprotocommand('debugwireargs', 'one two *')
634 @wireprotocommand('debugwireargs', 'one two *')
635 def debugwireargs(repo, proto, one, two, others):
635 def debugwireargs(repo, proto, one, two, others):
636 # only accept optional args from the known set
636 # only accept optional args from the known set
637 opts = options('debugwireargs', ['three', 'four'], others)
637 opts = options('debugwireargs', ['three', 'four'], others)
638 return repo.debugwireargs(one, two, **opts)
638 return repo.debugwireargs(one, two, **opts)
639
639
640 # List of options accepted by getbundle.
640 # List of options accepted by getbundle.
641 #
641 #
642 # Meant to be extended by extensions. It is the extension's responsibility to
642 # Meant to be extended by extensions. It is the extension's responsibility to
643 # ensure such options are properly processed in exchange.getbundle.
643 # ensure such options are properly processed in exchange.getbundle.
644 gboptslist = ['heads', 'common', 'bundlecaps']
644 gboptslist = ['heads', 'common', 'bundlecaps']
645
645
646 @wireprotocommand('getbundle', '*')
646 @wireprotocommand('getbundle', '*')
647 def getbundle(repo, proto, others):
647 def getbundle(repo, proto, others):
648 opts = options('getbundle', gboptsmap.keys(), others)
648 opts = options('getbundle', gboptsmap.keys(), others)
649 for k, v in opts.iteritems():
649 for k, v in opts.iteritems():
650 keytype = gboptsmap[k]
650 keytype = gboptsmap[k]
651 if keytype == 'nodes':
651 if keytype == 'nodes':
652 opts[k] = decodelist(v)
652 opts[k] = decodelist(v)
653 elif keytype == 'csv':
653 elif keytype == 'csv':
654 opts[k] = set(v.split(','))
654 opts[k] = set(v.split(','))
655 elif keytype != 'plain':
655 elif keytype != 'plain':
656 raise KeyError('unknown getbundle option type %s'
656 raise KeyError('unknown getbundle option type %s'
657 % keytype)
657 % keytype)
658 cg = exchange.getbundle(repo, 'serve', **opts)
658 cg = exchange.getbundle(repo, 'serve', **opts)
659 return streamres(proto.groupchunks(cg))
659 return streamres(proto.groupchunks(cg))
660
660
661 @wireprotocommand('heads')
661 @wireprotocommand('heads')
662 def heads(repo, proto):
662 def heads(repo, proto):
663 h = repo.heads()
663 h = repo.heads()
664 return encodelist(h) + "\n"
664 return encodelist(h) + "\n"
665
665
666 @wireprotocommand('hello')
666 @wireprotocommand('hello')
667 def hello(repo, proto):
667 def hello(repo, proto):
668 '''the hello command returns a set of lines describing various
668 '''the hello command returns a set of lines describing various
669 interesting things about the server, in an RFC822-like format.
669 interesting things about the server, in an RFC822-like format.
670 Currently the only one defined is "capabilities", which
670 Currently the only one defined is "capabilities", which
671 consists of a line in the form:
671 consists of a line in the form:
672
672
673 capabilities: space separated list of tokens
673 capabilities: space separated list of tokens
674 '''
674 '''
675 return "capabilities: %s\n" % (capabilities(repo, proto))
675 return "capabilities: %s\n" % (capabilities(repo, proto))
676
676
677 @wireprotocommand('listkeys', 'namespace')
677 @wireprotocommand('listkeys', 'namespace')
678 def listkeys(repo, proto, namespace):
678 def listkeys(repo, proto, namespace):
679 d = repo.listkeys(encoding.tolocal(namespace)).items()
679 d = repo.listkeys(encoding.tolocal(namespace)).items()
680 return pushkeymod.encodekeys(d)
680 return pushkeymod.encodekeys(d)
681
681
682 @wireprotocommand('lookup', 'key')
682 @wireprotocommand('lookup', 'key')
683 def lookup(repo, proto, key):
683 def lookup(repo, proto, key):
684 try:
684 try:
685 k = encoding.tolocal(key)
685 k = encoding.tolocal(key)
686 c = repo[k]
686 c = repo[k]
687 r = c.hex()
687 r = c.hex()
688 success = 1
688 success = 1
689 except Exception, inst:
689 except Exception, inst:
690 r = str(inst)
690 r = str(inst)
691 success = 0
691 success = 0
692 return "%s %s\n" % (success, r)
692 return "%s %s\n" % (success, r)
693
693
694 @wireprotocommand('known', 'nodes *')
694 @wireprotocommand('known', 'nodes *')
695 def known(repo, proto, nodes, others):
695 def known(repo, proto, nodes, others):
696 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
696 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
697
697
698 @wireprotocommand('pushkey', 'namespace key old new')
698 @wireprotocommand('pushkey', 'namespace key old new')
699 def pushkey(repo, proto, namespace, key, old, new):
699 def pushkey(repo, proto, namespace, key, old, new):
700 # compatibility with pre-1.8 clients which were accidentally
700 # compatibility with pre-1.8 clients which were accidentally
701 # sending raw binary nodes rather than utf-8-encoded hex
701 # sending raw binary nodes rather than utf-8-encoded hex
702 if len(new) == 20 and new.encode('string-escape') != new:
702 if len(new) == 20 and new.encode('string-escape') != new:
703 # looks like it could be a binary node
703 # looks like it could be a binary node
704 try:
704 try:
705 new.decode('utf-8')
705 new.decode('utf-8')
706 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
706 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
707 except UnicodeDecodeError:
707 except UnicodeDecodeError:
708 pass # binary, leave unmodified
708 pass # binary, leave unmodified
709 else:
709 else:
710 new = encoding.tolocal(new) # normal path
710 new = encoding.tolocal(new) # normal path
711
711
712 if util.safehasattr(proto, 'restore'):
712 if util.safehasattr(proto, 'restore'):
713
713
714 proto.redirect()
714 proto.redirect()
715
715
716 try:
716 try:
717 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
717 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
718 encoding.tolocal(old), new) or False
718 encoding.tolocal(old), new) or False
719 except util.Abort:
719 except util.Abort:
720 r = False
720 r = False
721
721
722 output = proto.restore()
722 output = proto.restore()
723
723
724 return '%s\n%s' % (int(r), output)
724 return '%s\n%s' % (int(r), output)
725
725
726 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
726 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
727 encoding.tolocal(old), new)
727 encoding.tolocal(old), new)
728 return '%s\n' % int(r)
728 return '%s\n' % int(r)
729
729
730 def _allowstream(ui):
730 def _allowstream(ui):
731 return ui.configbool('server', 'uncompressed', True, untrusted=True)
731 return ui.configbool('server', 'uncompressed', True, untrusted=True)
732
732
733 def _walkstreamfiles(repo):
733 def _walkstreamfiles(repo):
734 # this is it's own function so extensions can override it
734 # this is it's own function so extensions can override it
735 return repo.store.walk()
735 return repo.store.walk()
736
736
737 @wireprotocommand('stream_out')
737 @wireprotocommand('stream_out')
738 def stream(repo, proto):
738 def stream(repo, proto):
739 '''If the server supports streaming clone, it advertises the "stream"
739 '''If the server supports streaming clone, it advertises the "stream"
740 capability with a value representing the version and flags of the repo
740 capability with a value representing the version and flags of the repo
741 it is serving. Client checks to see if it understands the format.
741 it is serving. Client checks to see if it understands the format.
742
742
743 The format is simple: the server writes out a line with the amount
743 The format is simple: the server writes out a line with the amount
744 of files, then the total amount of bytes to be transferred (separated
744 of files, then the total amount of bytes to be transferred (separated
745 by a space). Then, for each file, the server first writes the filename
745 by a space). Then, for each file, the server first writes the filename
746 and file size (separated by the null character), then the file contents.
746 and file size (separated by the null character), then the file contents.
747 '''
747 '''
748
748
749 if not _allowstream(repo.ui):
749 if not _allowstream(repo.ui):
750 return '1\n'
750 return '1\n'
751
751
752 entries = []
752 entries = []
753 total_bytes = 0
753 total_bytes = 0
754 try:
754 try:
755 # get consistent snapshot of repo, lock during scan
755 # get consistent snapshot of repo, lock during scan
756 lock = repo.lock()
756 lock = repo.lock()
757 try:
757 try:
758 repo.ui.debug('scanning\n')
758 repo.ui.debug('scanning\n')
759 for name, ename, size in _walkstreamfiles(repo):
759 for name, ename, size in _walkstreamfiles(repo):
760 if size:
760 if size:
761 entries.append((name, size))
761 entries.append((name, size))
762 total_bytes += size
762 total_bytes += size
763 finally:
763 finally:
764 lock.release()
764 lock.release()
765 except error.LockError:
765 except error.LockError:
766 return '2\n' # error: 2
766 return '2\n' # error: 2
767
767
768 def streamer(repo, entries, total):
768 def streamer(repo, entries, total):
769 '''stream out all metadata files in repository.'''
769 '''stream out all metadata files in repository.'''
770 yield '0\n' # success
770 yield '0\n' # success
771 repo.ui.debug('%d files, %d bytes to transfer\n' %
771 repo.ui.debug('%d files, %d bytes to transfer\n' %
772 (len(entries), total_bytes))
772 (len(entries), total_bytes))
773 yield '%d %d\n' % (len(entries), total_bytes)
773 yield '%d %d\n' % (len(entries), total_bytes)
774
774
775 sopener = repo.sopener
775 sopener = repo.sopener
776 oldaudit = sopener.mustaudit
776 oldaudit = sopener.mustaudit
777 debugflag = repo.ui.debugflag
777 debugflag = repo.ui.debugflag
778 sopener.mustaudit = False
778 sopener.mustaudit = False
779
779
780 try:
780 try:
781 for name, size in entries:
781 for name, size in entries:
782 if debugflag:
782 if debugflag:
783 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
783 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
784 # partially encode name over the wire for backwards compat
784 # partially encode name over the wire for backwards compat
785 yield '%s\0%d\n' % (store.encodedir(name), size)
785 yield '%s\0%d\n' % (store.encodedir(name), size)
786 if size <= 65536:
786 if size <= 65536:
787 fp = sopener(name)
787 fp = sopener(name)
788 try:
788 try:
789 data = fp.read(size)
789 data = fp.read(size)
790 finally:
790 finally:
791 fp.close()
791 fp.close()
792 yield data
792 yield data
793 else:
793 else:
794 for chunk in util.filechunkiter(sopener(name), limit=size):
794 for chunk in util.filechunkiter(sopener(name), limit=size):
795 yield chunk
795 yield chunk
796 # replace with "finally:" when support for python 2.4 has been dropped
796 # replace with "finally:" when support for python 2.4 has been dropped
797 except Exception:
797 except Exception:
798 sopener.mustaudit = oldaudit
798 sopener.mustaudit = oldaudit
799 raise
799 raise
800 sopener.mustaudit = oldaudit
800 sopener.mustaudit = oldaudit
801
801
802 return streamres(streamer(repo, entries, total_bytes))
802 return streamres(streamer(repo, entries, total_bytes))
803
803
804 @wireprotocommand('unbundle', 'heads')
804 @wireprotocommand('unbundle', 'heads')
805 def unbundle(repo, proto, heads):
805 def unbundle(repo, proto, heads):
806 their_heads = decodelist(heads)
806 their_heads = decodelist(heads)
807
807
808 try:
808 try:
809 proto.redirect()
809 proto.redirect()
810
810
811 exchange.check_heads(repo, their_heads, 'preparing changes')
811 exchange.check_heads(repo, their_heads, 'preparing changes')
812
812
813 # write bundle data to temporary file because it can be big
813 # write bundle data to temporary file because it can be big
814 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
814 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
815 fp = os.fdopen(fd, 'wb+')
815 fp = os.fdopen(fd, 'wb+')
816 r = 0
816 r = 0
817 try:
817 try:
818 proto.getfile(fp)
818 proto.getfile(fp)
819 fp.seek(0)
819 fp.seek(0)
820 gen = exchange.readbundle(repo.ui, fp, None)
820 gen = exchange.readbundle(repo.ui, fp, None)
821 r = exchange.unbundle(repo, gen, their_heads, 'serve',
821 r = exchange.unbundle(repo, gen, their_heads, 'serve',
822 proto._client())
822 proto._client())
823 if util.safehasattr(r, 'addpart'):
823 if util.safehasattr(r, 'addpart'):
824 # The return looks streameable, we are in the bundle2 case and
824 # The return looks streameable, we are in the bundle2 case and
825 # should return a stream.
825 # should return a stream.
826 return streamres(r.getchunks())
826 return streamres(r.getchunks())
827 return pushres(r)
827 return pushres(r)
828
828
829 finally:
829 finally:
830 fp.close()
830 fp.close()
831 os.unlink(tempname)
831 os.unlink(tempname)
832 except error.BundleValueError, exc:
832 except error.BundleValueError, exc:
833 bundler = bundle2.bundle20(repo.ui)
833 bundler = bundle2.bundle20(repo.ui)
834 errpart = bundler.newpart('B2X:ERROR:UNSUPPORTEDCONTENT')
834 errpart = bundler.newpart('B2X:ERROR:UNSUPPORTEDCONTENT')
835 if exc.parttype is not None:
835 if exc.parttype is not None:
836 errpart.addparam('parttype', exc.parttype)
836 errpart.addparam('parttype', exc.parttype)
837 if exc.params:
837 if exc.params:
838 errpart.addparam('params', '\0'.join(exc.params))
838 errpart.addparam('params', '\0'.join(exc.params))
839 return streamres(bundler.getchunks())
839 return streamres(bundler.getchunks())
840 except util.Abort, inst:
840 except util.Abort, inst:
841 # The old code we moved used sys.stderr directly.
841 # The old code we moved used sys.stderr directly.
842 # We did not change it to minimise code change.
842 # We did not change it to minimise code change.
843 # This need to be moved to something proper.
843 # This need to be moved to something proper.
844 # Feel free to do it.
844 # Feel free to do it.
845 if getattr(inst, 'duringunbundle2', False):
845 if getattr(inst, 'duringunbundle2', False):
846 bundler = bundle2.bundle20(repo.ui)
846 bundler = bundle2.bundle20(repo.ui)
847 manargs = [('message', str(inst))]
847 manargs = [('message', str(inst))]
848 advargs = []
848 advargs = []
849 if inst.hint is not None:
849 if inst.hint is not None:
850 advargs.append(('hint', inst.hint))
850 advargs.append(('hint', inst.hint))
851 bundler.addpart(bundle2.bundlepart('B2X:ERROR:ABORT',
851 bundler.addpart(bundle2.bundlepart('B2X:ERROR:ABORT',
852 manargs, advargs))
852 manargs, advargs))
853 return streamres(bundler.getchunks())
853 return streamres(bundler.getchunks())
854 else:
854 else:
855 sys.stderr.write("abort: %s\n" % inst)
855 sys.stderr.write("abort: %s\n" % inst)
856 return pushres(0)
856 return pushres(0)
857 except error.PushRaced, exc:
857 except error.PushRaced, exc:
858 if getattr(exc, 'duringunbundle2', False):
858 if getattr(exc, 'duringunbundle2', False):
859 bundler = bundle2.bundle20(repo.ui)
859 bundler = bundle2.bundle20(repo.ui)
860 bundler.newpart('B2X:ERROR:PUSHRACED', [('message', str(exc))])
860 bundler.newpart('B2X:ERROR:PUSHRACED', [('message', str(exc))])
861 return streamres(bundler.getchunks())
861 return streamres(bundler.getchunks())
862 else:
862 else:
863 return pusherr(str(exc))
863 return pusherr(str(exc))
General Comments 0
You need to be logged in to leave comments. Login now