##// END OF EJS Templates
wireproto: add a ``boolean`` type for getbundle parameters...
Pierre-Yves David -
r21988:12cd3827 default
parent child Browse files
Show More
@@ -1,863 +1,867 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import urllib, tempfile, os, sys
8 import urllib, tempfile, os, sys
9 from i18n import _
9 from i18n import _
10 from node import bin, hex
10 from node import bin, hex
11 import changegroup as changegroupmod, bundle2, pushkey as pushkeymod
11 import changegroup as changegroupmod, bundle2, pushkey as pushkeymod
12 import peer, error, encoding, util, store, exchange
12 import peer, error, encoding, util, store, exchange
13
13
14
14
15 class abstractserverproto(object):
15 class abstractserverproto(object):
16 """abstract class that summarizes the protocol API
16 """abstract class that summarizes the protocol API
17
17
18 Used as reference and documentation.
18 Used as reference and documentation.
19 """
19 """
20
20
21 def getargs(self, args):
21 def getargs(self, args):
22 """return the value for arguments in <args>
22 """return the value for arguments in <args>
23
23
24 returns a list of values (same order as <args>)"""
24 returns a list of values (same order as <args>)"""
25 raise NotImplementedError()
25 raise NotImplementedError()
26
26
27 def getfile(self, fp):
27 def getfile(self, fp):
28 """write the whole content of a file into a file like object
28 """write the whole content of a file into a file like object
29
29
30 The file is in the form::
30 The file is in the form::
31
31
32 (<chunk-size>\n<chunk>)+0\n
32 (<chunk-size>\n<chunk>)+0\n
33
33
34 chunk size is the ascii version of the int.
34 chunk size is the ascii version of the int.
35 """
35 """
36 raise NotImplementedError()
36 raise NotImplementedError()
37
37
38 def redirect(self):
38 def redirect(self):
39 """may setup interception for stdout and stderr
39 """may setup interception for stdout and stderr
40
40
41 See also the `restore` method."""
41 See also the `restore` method."""
42 raise NotImplementedError()
42 raise NotImplementedError()
43
43
44 # If the `redirect` function does install interception, the `restore`
44 # If the `redirect` function does install interception, the `restore`
45 # function MUST be defined. If interception is not used, this function
45 # function MUST be defined. If interception is not used, this function
46 # MUST NOT be defined.
46 # MUST NOT be defined.
47 #
47 #
48 # left commented here on purpose
48 # left commented here on purpose
49 #
49 #
50 #def restore(self):
50 #def restore(self):
51 # """reinstall previous stdout and stderr and return intercepted stdout
51 # """reinstall previous stdout and stderr and return intercepted stdout
52 # """
52 # """
53 # raise NotImplementedError()
53 # raise NotImplementedError()
54
54
55 def groupchunks(self, cg):
55 def groupchunks(self, cg):
56 """return 4096 chunks from a changegroup object
56 """return 4096 chunks from a changegroup object
57
57
58 Some protocols may have compressed the contents."""
58 Some protocols may have compressed the contents."""
59 raise NotImplementedError()
59 raise NotImplementedError()
60
60
61 # abstract batching support
61 # abstract batching support
62
62
63 class future(object):
63 class future(object):
64 '''placeholder for a value to be set later'''
64 '''placeholder for a value to be set later'''
65 def set(self, value):
65 def set(self, value):
66 if util.safehasattr(self, 'value'):
66 if util.safehasattr(self, 'value'):
67 raise error.RepoError("future is already set")
67 raise error.RepoError("future is already set")
68 self.value = value
68 self.value = value
69
69
70 class batcher(object):
70 class batcher(object):
71 '''base class for batches of commands submittable in a single request
71 '''base class for batches of commands submittable in a single request
72
72
73 All methods invoked on instances of this class are simply queued and
73 All methods invoked on instances of this class are simply queued and
74 return a a future for the result. Once you call submit(), all the queued
74 return a a future for the result. Once you call submit(), all the queued
75 calls are performed and the results set in their respective futures.
75 calls are performed and the results set in their respective futures.
76 '''
76 '''
77 def __init__(self):
77 def __init__(self):
78 self.calls = []
78 self.calls = []
79 def __getattr__(self, name):
79 def __getattr__(self, name):
80 def call(*args, **opts):
80 def call(*args, **opts):
81 resref = future()
81 resref = future()
82 self.calls.append((name, args, opts, resref,))
82 self.calls.append((name, args, opts, resref,))
83 return resref
83 return resref
84 return call
84 return call
85 def submit(self):
85 def submit(self):
86 pass
86 pass
87
87
88 class localbatch(batcher):
88 class localbatch(batcher):
89 '''performs the queued calls directly'''
89 '''performs the queued calls directly'''
90 def __init__(self, local):
90 def __init__(self, local):
91 batcher.__init__(self)
91 batcher.__init__(self)
92 self.local = local
92 self.local = local
93 def submit(self):
93 def submit(self):
94 for name, args, opts, resref in self.calls:
94 for name, args, opts, resref in self.calls:
95 resref.set(getattr(self.local, name)(*args, **opts))
95 resref.set(getattr(self.local, name)(*args, **opts))
96
96
97 class remotebatch(batcher):
97 class remotebatch(batcher):
98 '''batches the queued calls; uses as few roundtrips as possible'''
98 '''batches the queued calls; uses as few roundtrips as possible'''
99 def __init__(self, remote):
99 def __init__(self, remote):
100 '''remote must support _submitbatch(encbatch) and
100 '''remote must support _submitbatch(encbatch) and
101 _submitone(op, encargs)'''
101 _submitone(op, encargs)'''
102 batcher.__init__(self)
102 batcher.__init__(self)
103 self.remote = remote
103 self.remote = remote
104 def submit(self):
104 def submit(self):
105 req, rsp = [], []
105 req, rsp = [], []
106 for name, args, opts, resref in self.calls:
106 for name, args, opts, resref in self.calls:
107 mtd = getattr(self.remote, name)
107 mtd = getattr(self.remote, name)
108 batchablefn = getattr(mtd, 'batchable', None)
108 batchablefn = getattr(mtd, 'batchable', None)
109 if batchablefn is not None:
109 if batchablefn is not None:
110 batchable = batchablefn(mtd.im_self, *args, **opts)
110 batchable = batchablefn(mtd.im_self, *args, **opts)
111 encargsorres, encresref = batchable.next()
111 encargsorres, encresref = batchable.next()
112 if encresref:
112 if encresref:
113 req.append((name, encargsorres,))
113 req.append((name, encargsorres,))
114 rsp.append((batchable, encresref, resref,))
114 rsp.append((batchable, encresref, resref,))
115 else:
115 else:
116 resref.set(encargsorres)
116 resref.set(encargsorres)
117 else:
117 else:
118 if req:
118 if req:
119 self._submitreq(req, rsp)
119 self._submitreq(req, rsp)
120 req, rsp = [], []
120 req, rsp = [], []
121 resref.set(mtd(*args, **opts))
121 resref.set(mtd(*args, **opts))
122 if req:
122 if req:
123 self._submitreq(req, rsp)
123 self._submitreq(req, rsp)
124 def _submitreq(self, req, rsp):
124 def _submitreq(self, req, rsp):
125 encresults = self.remote._submitbatch(req)
125 encresults = self.remote._submitbatch(req)
126 for encres, r in zip(encresults, rsp):
126 for encres, r in zip(encresults, rsp):
127 batchable, encresref, resref = r
127 batchable, encresref, resref = r
128 encresref.set(encres)
128 encresref.set(encres)
129 resref.set(batchable.next())
129 resref.set(batchable.next())
130
130
131 def batchable(f):
131 def batchable(f):
132 '''annotation for batchable methods
132 '''annotation for batchable methods
133
133
134 Such methods must implement a coroutine as follows:
134 Such methods must implement a coroutine as follows:
135
135
136 @batchable
136 @batchable
137 def sample(self, one, two=None):
137 def sample(self, one, two=None):
138 # Handle locally computable results first:
138 # Handle locally computable results first:
139 if not one:
139 if not one:
140 yield "a local result", None
140 yield "a local result", None
141 # Build list of encoded arguments suitable for your wire protocol:
141 # Build list of encoded arguments suitable for your wire protocol:
142 encargs = [('one', encode(one),), ('two', encode(two),)]
142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 # Create future for injection of encoded result:
143 # Create future for injection of encoded result:
144 encresref = future()
144 encresref = future()
145 # Return encoded arguments and future:
145 # Return encoded arguments and future:
146 yield encargs, encresref
146 yield encargs, encresref
147 # Assuming the future to be filled with the result from the batched
147 # Assuming the future to be filled with the result from the batched
148 # request now. Decode it:
148 # request now. Decode it:
149 yield decode(encresref.value)
149 yield decode(encresref.value)
150
150
151 The decorator returns a function which wraps this coroutine as a plain
151 The decorator returns a function which wraps this coroutine as a plain
152 method, but adds the original method as an attribute called "batchable",
152 method, but adds the original method as an attribute called "batchable",
153 which is used by remotebatch to split the call into separate encoding and
153 which is used by remotebatch to split the call into separate encoding and
154 decoding phases.
154 decoding phases.
155 '''
155 '''
156 def plain(*args, **opts):
156 def plain(*args, **opts):
157 batchable = f(*args, **opts)
157 batchable = f(*args, **opts)
158 encargsorres, encresref = batchable.next()
158 encargsorres, encresref = batchable.next()
159 if not encresref:
159 if not encresref:
160 return encargsorres # a local result in this case
160 return encargsorres # a local result in this case
161 self = args[0]
161 self = args[0]
162 encresref.set(self._submitone(f.func_name, encargsorres))
162 encresref.set(self._submitone(f.func_name, encargsorres))
163 return batchable.next()
163 return batchable.next()
164 setattr(plain, 'batchable', f)
164 setattr(plain, 'batchable', f)
165 return plain
165 return plain
166
166
167 # list of nodes encoding / decoding
167 # list of nodes encoding / decoding
168
168
169 def decodelist(l, sep=' '):
169 def decodelist(l, sep=' '):
170 if l:
170 if l:
171 return map(bin, l.split(sep))
171 return map(bin, l.split(sep))
172 return []
172 return []
173
173
174 def encodelist(l, sep=' '):
174 def encodelist(l, sep=' '):
175 return sep.join(map(hex, l))
175 return sep.join(map(hex, l))
176
176
177 # batched call argument encoding
177 # batched call argument encoding
178
178
179 def escapearg(plain):
179 def escapearg(plain):
180 return (plain
180 return (plain
181 .replace(':', '::')
181 .replace(':', '::')
182 .replace(',', ':,')
182 .replace(',', ':,')
183 .replace(';', ':;')
183 .replace(';', ':;')
184 .replace('=', ':='))
184 .replace('=', ':='))
185
185
186 def unescapearg(escaped):
186 def unescapearg(escaped):
187 return (escaped
187 return (escaped
188 .replace(':=', '=')
188 .replace(':=', '=')
189 .replace(':;', ';')
189 .replace(':;', ';')
190 .replace(':,', ',')
190 .replace(':,', ',')
191 .replace('::', ':'))
191 .replace('::', ':'))
192
192
193 # mapping of options accepted by getbundle and their types
193 # mapping of options accepted by getbundle and their types
194 #
194 #
195 # Meant to be extended by extensions. It is extensions responsibility to ensure
195 # Meant to be extended by extensions. It is extensions responsibility to ensure
196 # such options are properly processed in exchange.getbundle.
196 # such options are properly processed in exchange.getbundle.
197 #
197 #
198 # supported types are:
198 # supported types are:
199 #
199 #
200 # :nodes: list of binary nodes
200 # :nodes: list of binary nodes
201 # :csv: list of comma-separated values
201 # :csv: list of comma-separated values
202 # :plain: string with no transformation needed.
202 # :plain: string with no transformation needed.
203 gboptsmap = {'heads': 'nodes',
203 gboptsmap = {'heads': 'nodes',
204 'common': 'nodes',
204 'common': 'nodes',
205 'bundlecaps': 'csv',
205 'bundlecaps': 'csv',
206 'listkeys': 'csv'}
206 'listkeys': 'csv'}
207
207
208 # client side
208 # client side
209
209
210 class wirepeer(peer.peerrepository):
210 class wirepeer(peer.peerrepository):
211
211
212 def batch(self):
212 def batch(self):
213 return remotebatch(self)
213 return remotebatch(self)
214 def _submitbatch(self, req):
214 def _submitbatch(self, req):
215 cmds = []
215 cmds = []
216 for op, argsdict in req:
216 for op, argsdict in req:
217 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
217 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
218 cmds.append('%s %s' % (op, args))
218 cmds.append('%s %s' % (op, args))
219 rsp = self._call("batch", cmds=';'.join(cmds))
219 rsp = self._call("batch", cmds=';'.join(cmds))
220 return rsp.split(';')
220 return rsp.split(';')
221 def _submitone(self, op, args):
221 def _submitone(self, op, args):
222 return self._call(op, **args)
222 return self._call(op, **args)
223
223
224 @batchable
224 @batchable
225 def lookup(self, key):
225 def lookup(self, key):
226 self.requirecap('lookup', _('look up remote revision'))
226 self.requirecap('lookup', _('look up remote revision'))
227 f = future()
227 f = future()
228 yield {'key': encoding.fromlocal(key)}, f
228 yield {'key': encoding.fromlocal(key)}, f
229 d = f.value
229 d = f.value
230 success, data = d[:-1].split(" ", 1)
230 success, data = d[:-1].split(" ", 1)
231 if int(success):
231 if int(success):
232 yield bin(data)
232 yield bin(data)
233 self._abort(error.RepoError(data))
233 self._abort(error.RepoError(data))
234
234
235 @batchable
235 @batchable
236 def heads(self):
236 def heads(self):
237 f = future()
237 f = future()
238 yield {}, f
238 yield {}, f
239 d = f.value
239 d = f.value
240 try:
240 try:
241 yield decodelist(d[:-1])
241 yield decodelist(d[:-1])
242 except ValueError:
242 except ValueError:
243 self._abort(error.ResponseError(_("unexpected response:"), d))
243 self._abort(error.ResponseError(_("unexpected response:"), d))
244
244
245 @batchable
245 @batchable
246 def known(self, nodes):
246 def known(self, nodes):
247 f = future()
247 f = future()
248 yield {'nodes': encodelist(nodes)}, f
248 yield {'nodes': encodelist(nodes)}, f
249 d = f.value
249 d = f.value
250 try:
250 try:
251 yield [bool(int(f)) for f in d]
251 yield [bool(int(f)) for f in d]
252 except ValueError:
252 except ValueError:
253 self._abort(error.ResponseError(_("unexpected response:"), d))
253 self._abort(error.ResponseError(_("unexpected response:"), d))
254
254
255 @batchable
255 @batchable
256 def branchmap(self):
256 def branchmap(self):
257 f = future()
257 f = future()
258 yield {}, f
258 yield {}, f
259 d = f.value
259 d = f.value
260 try:
260 try:
261 branchmap = {}
261 branchmap = {}
262 for branchpart in d.splitlines():
262 for branchpart in d.splitlines():
263 branchname, branchheads = branchpart.split(' ', 1)
263 branchname, branchheads = branchpart.split(' ', 1)
264 branchname = encoding.tolocal(urllib.unquote(branchname))
264 branchname = encoding.tolocal(urllib.unquote(branchname))
265 branchheads = decodelist(branchheads)
265 branchheads = decodelist(branchheads)
266 branchmap[branchname] = branchheads
266 branchmap[branchname] = branchheads
267 yield branchmap
267 yield branchmap
268 except TypeError:
268 except TypeError:
269 self._abort(error.ResponseError(_("unexpected response:"), d))
269 self._abort(error.ResponseError(_("unexpected response:"), d))
270
270
271 def branches(self, nodes):
271 def branches(self, nodes):
272 n = encodelist(nodes)
272 n = encodelist(nodes)
273 d = self._call("branches", nodes=n)
273 d = self._call("branches", nodes=n)
274 try:
274 try:
275 br = [tuple(decodelist(b)) for b in d.splitlines()]
275 br = [tuple(decodelist(b)) for b in d.splitlines()]
276 return br
276 return br
277 except ValueError:
277 except ValueError:
278 self._abort(error.ResponseError(_("unexpected response:"), d))
278 self._abort(error.ResponseError(_("unexpected response:"), d))
279
279
280 def between(self, pairs):
280 def between(self, pairs):
281 batch = 8 # avoid giant requests
281 batch = 8 # avoid giant requests
282 r = []
282 r = []
283 for i in xrange(0, len(pairs), batch):
283 for i in xrange(0, len(pairs), batch):
284 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
284 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
285 d = self._call("between", pairs=n)
285 d = self._call("between", pairs=n)
286 try:
286 try:
287 r.extend(l and decodelist(l) or [] for l in d.splitlines())
287 r.extend(l and decodelist(l) or [] for l in d.splitlines())
288 except ValueError:
288 except ValueError:
289 self._abort(error.ResponseError(_("unexpected response:"), d))
289 self._abort(error.ResponseError(_("unexpected response:"), d))
290 return r
290 return r
291
291
292 @batchable
292 @batchable
293 def pushkey(self, namespace, key, old, new):
293 def pushkey(self, namespace, key, old, new):
294 if not self.capable('pushkey'):
294 if not self.capable('pushkey'):
295 yield False, None
295 yield False, None
296 f = future()
296 f = future()
297 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
297 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
298 yield {'namespace': encoding.fromlocal(namespace),
298 yield {'namespace': encoding.fromlocal(namespace),
299 'key': encoding.fromlocal(key),
299 'key': encoding.fromlocal(key),
300 'old': encoding.fromlocal(old),
300 'old': encoding.fromlocal(old),
301 'new': encoding.fromlocal(new)}, f
301 'new': encoding.fromlocal(new)}, f
302 d = f.value
302 d = f.value
303 d, output = d.split('\n', 1)
303 d, output = d.split('\n', 1)
304 try:
304 try:
305 d = bool(int(d))
305 d = bool(int(d))
306 except ValueError:
306 except ValueError:
307 raise error.ResponseError(
307 raise error.ResponseError(
308 _('push failed (unexpected response):'), d)
308 _('push failed (unexpected response):'), d)
309 for l in output.splitlines(True):
309 for l in output.splitlines(True):
310 self.ui.status(_('remote: '), l)
310 self.ui.status(_('remote: '), l)
311 yield d
311 yield d
312
312
313 @batchable
313 @batchable
314 def listkeys(self, namespace):
314 def listkeys(self, namespace):
315 if not self.capable('pushkey'):
315 if not self.capable('pushkey'):
316 yield {}, None
316 yield {}, None
317 f = future()
317 f = future()
318 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
318 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
319 yield {'namespace': encoding.fromlocal(namespace)}, f
319 yield {'namespace': encoding.fromlocal(namespace)}, f
320 d = f.value
320 d = f.value
321 yield pushkeymod.decodekeys(d)
321 yield pushkeymod.decodekeys(d)
322
322
323 def stream_out(self):
323 def stream_out(self):
324 return self._callstream('stream_out')
324 return self._callstream('stream_out')
325
325
326 def changegroup(self, nodes, kind):
326 def changegroup(self, nodes, kind):
327 n = encodelist(nodes)
327 n = encodelist(nodes)
328 f = self._callcompressable("changegroup", roots=n)
328 f = self._callcompressable("changegroup", roots=n)
329 return changegroupmod.unbundle10(f, 'UN')
329 return changegroupmod.unbundle10(f, 'UN')
330
330
331 def changegroupsubset(self, bases, heads, kind):
331 def changegroupsubset(self, bases, heads, kind):
332 self.requirecap('changegroupsubset', _('look up remote changes'))
332 self.requirecap('changegroupsubset', _('look up remote changes'))
333 bases = encodelist(bases)
333 bases = encodelist(bases)
334 heads = encodelist(heads)
334 heads = encodelist(heads)
335 f = self._callcompressable("changegroupsubset",
335 f = self._callcompressable("changegroupsubset",
336 bases=bases, heads=heads)
336 bases=bases, heads=heads)
337 return changegroupmod.unbundle10(f, 'UN')
337 return changegroupmod.unbundle10(f, 'UN')
338
338
339 def getbundle(self, source, **kwargs):
339 def getbundle(self, source, **kwargs):
340 self.requirecap('getbundle', _('look up remote changes'))
340 self.requirecap('getbundle', _('look up remote changes'))
341 opts = {}
341 opts = {}
342 for key, value in kwargs.iteritems():
342 for key, value in kwargs.iteritems():
343 if value is None:
343 if value is None:
344 continue
344 continue
345 keytype = gboptsmap.get(key)
345 keytype = gboptsmap.get(key)
346 if keytype is None:
346 if keytype is None:
347 assert False, 'unexpected'
347 assert False, 'unexpected'
348 elif keytype == 'nodes':
348 elif keytype == 'nodes':
349 value = encodelist(value)
349 value = encodelist(value)
350 elif keytype == 'csv':
350 elif keytype == 'csv':
351 value = ','.join(value)
351 value = ','.join(value)
352 elif keytype == 'boolean':
353 value = bool(value)
352 elif keytype != 'plain':
354 elif keytype != 'plain':
353 raise KeyError('unknown getbundle option type %s'
355 raise KeyError('unknown getbundle option type %s'
354 % keytype)
356 % keytype)
355 opts[key] = value
357 opts[key] = value
356 f = self._callcompressable("getbundle", **opts)
358 f = self._callcompressable("getbundle", **opts)
357 bundlecaps = kwargs.get('bundlecaps')
359 bundlecaps = kwargs.get('bundlecaps')
358 if bundlecaps is not None and 'HG2X' in bundlecaps:
360 if bundlecaps is not None and 'HG2X' in bundlecaps:
359 return bundle2.unbundle20(self.ui, f)
361 return bundle2.unbundle20(self.ui, f)
360 else:
362 else:
361 return changegroupmod.unbundle10(f, 'UN')
363 return changegroupmod.unbundle10(f, 'UN')
362
364
363 def unbundle(self, cg, heads, source):
365 def unbundle(self, cg, heads, source):
364 '''Send cg (a readable file-like object representing the
366 '''Send cg (a readable file-like object representing the
365 changegroup to push, typically a chunkbuffer object) to the
367 changegroup to push, typically a chunkbuffer object) to the
366 remote server as a bundle.
368 remote server as a bundle.
367
369
368 When pushing a bundle10 stream, return an integer indicating the
370 When pushing a bundle10 stream, return an integer indicating the
369 result of the push (see localrepository.addchangegroup()).
371 result of the push (see localrepository.addchangegroup()).
370
372
371 When pushing a bundle20 stream, return a bundle20 stream.'''
373 When pushing a bundle20 stream, return a bundle20 stream.'''
372
374
373 if heads != ['force'] and self.capable('unbundlehash'):
375 if heads != ['force'] and self.capable('unbundlehash'):
374 heads = encodelist(['hashed',
376 heads = encodelist(['hashed',
375 util.sha1(''.join(sorted(heads))).digest()])
377 util.sha1(''.join(sorted(heads))).digest()])
376 else:
378 else:
377 heads = encodelist(heads)
379 heads = encodelist(heads)
378
380
379 if util.safehasattr(cg, 'deltaheader'):
381 if util.safehasattr(cg, 'deltaheader'):
380 # this a bundle10, do the old style call sequence
382 # this a bundle10, do the old style call sequence
381 ret, output = self._callpush("unbundle", cg, heads=heads)
383 ret, output = self._callpush("unbundle", cg, heads=heads)
382 if ret == "":
384 if ret == "":
383 raise error.ResponseError(
385 raise error.ResponseError(
384 _('push failed:'), output)
386 _('push failed:'), output)
385 try:
387 try:
386 ret = int(ret)
388 ret = int(ret)
387 except ValueError:
389 except ValueError:
388 raise error.ResponseError(
390 raise error.ResponseError(
389 _('push failed (unexpected response):'), ret)
391 _('push failed (unexpected response):'), ret)
390
392
391 for l in output.splitlines(True):
393 for l in output.splitlines(True):
392 self.ui.status(_('remote: '), l)
394 self.ui.status(_('remote: '), l)
393 else:
395 else:
394 # bundle2 push. Send a stream, fetch a stream.
396 # bundle2 push. Send a stream, fetch a stream.
395 stream = self._calltwowaystream('unbundle', cg, heads=heads)
397 stream = self._calltwowaystream('unbundle', cg, heads=heads)
396 ret = bundle2.unbundle20(self.ui, stream)
398 ret = bundle2.unbundle20(self.ui, stream)
397 return ret
399 return ret
398
400
399 def debugwireargs(self, one, two, three=None, four=None, five=None):
401 def debugwireargs(self, one, two, three=None, four=None, five=None):
400 # don't pass optional arguments left at their default value
402 # don't pass optional arguments left at their default value
401 opts = {}
403 opts = {}
402 if three is not None:
404 if three is not None:
403 opts['three'] = three
405 opts['three'] = three
404 if four is not None:
406 if four is not None:
405 opts['four'] = four
407 opts['four'] = four
406 return self._call('debugwireargs', one=one, two=two, **opts)
408 return self._call('debugwireargs', one=one, two=two, **opts)
407
409
408 def _call(self, cmd, **args):
410 def _call(self, cmd, **args):
409 """execute <cmd> on the server
411 """execute <cmd> on the server
410
412
411 The command is expected to return a simple string.
413 The command is expected to return a simple string.
412
414
413 returns the server reply as a string."""
415 returns the server reply as a string."""
414 raise NotImplementedError()
416 raise NotImplementedError()
415
417
416 def _callstream(self, cmd, **args):
418 def _callstream(self, cmd, **args):
417 """execute <cmd> on the server
419 """execute <cmd> on the server
418
420
419 The command is expected to return a stream.
421 The command is expected to return a stream.
420
422
421 returns the server reply as a file like object."""
423 returns the server reply as a file like object."""
422 raise NotImplementedError()
424 raise NotImplementedError()
423
425
424 def _callcompressable(self, cmd, **args):
426 def _callcompressable(self, cmd, **args):
425 """execute <cmd> on the server
427 """execute <cmd> on the server
426
428
427 The command is expected to return a stream.
429 The command is expected to return a stream.
428
430
429 The stream may have been compressed in some implementations. This
431 The stream may have been compressed in some implementations. This
430 function takes care of the decompression. This is the only difference
432 function takes care of the decompression. This is the only difference
431 with _callstream.
433 with _callstream.
432
434
433 returns the server reply as a file like object.
435 returns the server reply as a file like object.
434 """
436 """
435 raise NotImplementedError()
437 raise NotImplementedError()
436
438
437 def _callpush(self, cmd, fp, **args):
439 def _callpush(self, cmd, fp, **args):
438 """execute a <cmd> on server
440 """execute a <cmd> on server
439
441
440 The command is expected to be related to a push. Push has a special
442 The command is expected to be related to a push. Push has a special
441 return method.
443 return method.
442
444
443 returns the server reply as a (ret, output) tuple. ret is either
445 returns the server reply as a (ret, output) tuple. ret is either
444 empty (error) or a stringified int.
446 empty (error) or a stringified int.
445 """
447 """
446 raise NotImplementedError()
448 raise NotImplementedError()
447
449
448 def _calltwowaystream(self, cmd, fp, **args):
450 def _calltwowaystream(self, cmd, fp, **args):
449 """execute <cmd> on server
451 """execute <cmd> on server
450
452
451 The command will send a stream to the server and get a stream in reply.
453 The command will send a stream to the server and get a stream in reply.
452 """
454 """
453 raise NotImplementedError()
455 raise NotImplementedError()
454
456
455 def _abort(self, exception):
457 def _abort(self, exception):
456 """clearly abort the wire protocol connection and raise the exception
458 """clearly abort the wire protocol connection and raise the exception
457 """
459 """
458 raise NotImplementedError()
460 raise NotImplementedError()
459
461
460 # server side
462 # server side
461
463
462 # wire protocol command can either return a string or one of these classes.
464 # wire protocol command can either return a string or one of these classes.
463 class streamres(object):
465 class streamres(object):
464 """wireproto reply: binary stream
466 """wireproto reply: binary stream
465
467
466 The call was successful and the result is a stream.
468 The call was successful and the result is a stream.
467 Iterate on the `self.gen` attribute to retrieve chunks.
469 Iterate on the `self.gen` attribute to retrieve chunks.
468 """
470 """
469 def __init__(self, gen):
471 def __init__(self, gen):
470 self.gen = gen
472 self.gen = gen
471
473
472 class pushres(object):
474 class pushres(object):
473 """wireproto reply: success with simple integer return
475 """wireproto reply: success with simple integer return
474
476
475 The call was successful and returned an integer contained in `self.res`.
477 The call was successful and returned an integer contained in `self.res`.
476 """
478 """
477 def __init__(self, res):
479 def __init__(self, res):
478 self.res = res
480 self.res = res
479
481
480 class pusherr(object):
482 class pusherr(object):
481 """wireproto reply: failure
483 """wireproto reply: failure
482
484
483 The call failed. The `self.res` attribute contains the error message.
485 The call failed. The `self.res` attribute contains the error message.
484 """
486 """
485 def __init__(self, res):
487 def __init__(self, res):
486 self.res = res
488 self.res = res
487
489
488 class ooberror(object):
490 class ooberror(object):
489 """wireproto reply: failure of a batch of operation
491 """wireproto reply: failure of a batch of operation
490
492
491 Something failed during a batch call. The error message is stored in
493 Something failed during a batch call. The error message is stored in
492 `self.message`.
494 `self.message`.
493 """
495 """
494 def __init__(self, message):
496 def __init__(self, message):
495 self.message = message
497 self.message = message
496
498
497 def dispatch(repo, proto, command):
499 def dispatch(repo, proto, command):
498 repo = repo.filtered("served")
500 repo = repo.filtered("served")
499 func, spec = commands[command]
501 func, spec = commands[command]
500 args = proto.getargs(spec)
502 args = proto.getargs(spec)
501 return func(repo, proto, *args)
503 return func(repo, proto, *args)
502
504
503 def options(cmd, keys, others):
505 def options(cmd, keys, others):
504 opts = {}
506 opts = {}
505 for k in keys:
507 for k in keys:
506 if k in others:
508 if k in others:
507 opts[k] = others[k]
509 opts[k] = others[k]
508 del others[k]
510 del others[k]
509 if others:
511 if others:
510 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
512 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
511 % (cmd, ",".join(others)))
513 % (cmd, ",".join(others)))
512 return opts
514 return opts
513
515
514 # list of commands
516 # list of commands
515 commands = {}
517 commands = {}
516
518
517 def wireprotocommand(name, args=''):
519 def wireprotocommand(name, args=''):
518 """decorator for wire protocol command"""
520 """decorator for wire protocol command"""
519 def register(func):
521 def register(func):
520 commands[name] = (func, args)
522 commands[name] = (func, args)
521 return func
523 return func
522 return register
524 return register
523
525
524 @wireprotocommand('batch', 'cmds *')
526 @wireprotocommand('batch', 'cmds *')
525 def batch(repo, proto, cmds, others):
527 def batch(repo, proto, cmds, others):
526 repo = repo.filtered("served")
528 repo = repo.filtered("served")
527 res = []
529 res = []
528 for pair in cmds.split(';'):
530 for pair in cmds.split(';'):
529 op, args = pair.split(' ', 1)
531 op, args = pair.split(' ', 1)
530 vals = {}
532 vals = {}
531 for a in args.split(','):
533 for a in args.split(','):
532 if a:
534 if a:
533 n, v = a.split('=')
535 n, v = a.split('=')
534 vals[n] = unescapearg(v)
536 vals[n] = unescapearg(v)
535 func, spec = commands[op]
537 func, spec = commands[op]
536 if spec:
538 if spec:
537 keys = spec.split()
539 keys = spec.split()
538 data = {}
540 data = {}
539 for k in keys:
541 for k in keys:
540 if k == '*':
542 if k == '*':
541 star = {}
543 star = {}
542 for key in vals.keys():
544 for key in vals.keys():
543 if key not in keys:
545 if key not in keys:
544 star[key] = vals[key]
546 star[key] = vals[key]
545 data['*'] = star
547 data['*'] = star
546 else:
548 else:
547 data[k] = vals[k]
549 data[k] = vals[k]
548 result = func(repo, proto, *[data[k] for k in keys])
550 result = func(repo, proto, *[data[k] for k in keys])
549 else:
551 else:
550 result = func(repo, proto)
552 result = func(repo, proto)
551 if isinstance(result, ooberror):
553 if isinstance(result, ooberror):
552 return result
554 return result
553 res.append(escapearg(result))
555 res.append(escapearg(result))
554 return ';'.join(res)
556 return ';'.join(res)
555
557
556 @wireprotocommand('between', 'pairs')
558 @wireprotocommand('between', 'pairs')
557 def between(repo, proto, pairs):
559 def between(repo, proto, pairs):
558 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
560 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
559 r = []
561 r = []
560 for b in repo.between(pairs):
562 for b in repo.between(pairs):
561 r.append(encodelist(b) + "\n")
563 r.append(encodelist(b) + "\n")
562 return "".join(r)
564 return "".join(r)
563
565
564 @wireprotocommand('branchmap')
566 @wireprotocommand('branchmap')
565 def branchmap(repo, proto):
567 def branchmap(repo, proto):
566 branchmap = repo.branchmap()
568 branchmap = repo.branchmap()
567 heads = []
569 heads = []
568 for branch, nodes in branchmap.iteritems():
570 for branch, nodes in branchmap.iteritems():
569 branchname = urllib.quote(encoding.fromlocal(branch))
571 branchname = urllib.quote(encoding.fromlocal(branch))
570 branchnodes = encodelist(nodes)
572 branchnodes = encodelist(nodes)
571 heads.append('%s %s' % (branchname, branchnodes))
573 heads.append('%s %s' % (branchname, branchnodes))
572 return '\n'.join(heads)
574 return '\n'.join(heads)
573
575
574 @wireprotocommand('branches', 'nodes')
576 @wireprotocommand('branches', 'nodes')
575 def branches(repo, proto, nodes):
577 def branches(repo, proto, nodes):
576 nodes = decodelist(nodes)
578 nodes = decodelist(nodes)
577 r = []
579 r = []
578 for b in repo.branches(nodes):
580 for b in repo.branches(nodes):
579 r.append(encodelist(b) + "\n")
581 r.append(encodelist(b) + "\n")
580 return "".join(r)
582 return "".join(r)
581
583
582
584
583 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
585 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
584 'known', 'getbundle', 'unbundlehash', 'batch']
586 'known', 'getbundle', 'unbundlehash', 'batch']
585
587
586 def _capabilities(repo, proto):
588 def _capabilities(repo, proto):
587 """return a list of capabilities for a repo
589 """return a list of capabilities for a repo
588
590
589 This function exists to allow extensions to easily wrap capabilities
591 This function exists to allow extensions to easily wrap capabilities
590 computation
592 computation
591
593
592 - returns a lists: easy to alter
594 - returns a lists: easy to alter
593 - change done here will be propagated to both `capabilities` and `hello`
595 - change done here will be propagated to both `capabilities` and `hello`
594 command without any other action needed.
596 command without any other action needed.
595 """
597 """
596 # copy to prevent modification of the global list
598 # copy to prevent modification of the global list
597 caps = list(wireprotocaps)
599 caps = list(wireprotocaps)
598 if _allowstream(repo.ui):
600 if _allowstream(repo.ui):
599 if repo.ui.configbool('server', 'preferuncompressed', False):
601 if repo.ui.configbool('server', 'preferuncompressed', False):
600 caps.append('stream-preferred')
602 caps.append('stream-preferred')
601 requiredformats = repo.requirements & repo.supportedformats
603 requiredformats = repo.requirements & repo.supportedformats
602 # if our local revlogs are just revlogv1, add 'stream' cap
604 # if our local revlogs are just revlogv1, add 'stream' cap
603 if not requiredformats - set(('revlogv1',)):
605 if not requiredformats - set(('revlogv1',)):
604 caps.append('stream')
606 caps.append('stream')
605 # otherwise, add 'streamreqs' detailing our local revlog format
607 # otherwise, add 'streamreqs' detailing our local revlog format
606 else:
608 else:
607 caps.append('streamreqs=%s' % ','.join(requiredformats))
609 caps.append('streamreqs=%s' % ','.join(requiredformats))
608 if repo.ui.configbool('experimental', 'bundle2-exp', False):
610 if repo.ui.configbool('experimental', 'bundle2-exp', False):
609 capsblob = bundle2.encodecaps(repo.bundle2caps)
611 capsblob = bundle2.encodecaps(repo.bundle2caps)
610 caps.append('bundle2-exp=' + urllib.quote(capsblob))
612 caps.append('bundle2-exp=' + urllib.quote(capsblob))
611 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
613 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
612 caps.append('httpheader=1024')
614 caps.append('httpheader=1024')
613 return caps
615 return caps
614
616
615 # If you are writing an extension and consider wrapping this function. Wrap
617 # If you are writing an extension and consider wrapping this function. Wrap
616 # `_capabilities` instead.
618 # `_capabilities` instead.
617 @wireprotocommand('capabilities')
619 @wireprotocommand('capabilities')
618 def capabilities(repo, proto):
620 def capabilities(repo, proto):
619 return ' '.join(_capabilities(repo, proto))
621 return ' '.join(_capabilities(repo, proto))
620
622
621 @wireprotocommand('changegroup', 'roots')
623 @wireprotocommand('changegroup', 'roots')
622 def changegroup(repo, proto, roots):
624 def changegroup(repo, proto, roots):
623 nodes = decodelist(roots)
625 nodes = decodelist(roots)
624 cg = changegroupmod.changegroup(repo, nodes, 'serve')
626 cg = changegroupmod.changegroup(repo, nodes, 'serve')
625 return streamres(proto.groupchunks(cg))
627 return streamres(proto.groupchunks(cg))
626
628
627 @wireprotocommand('changegroupsubset', 'bases heads')
629 @wireprotocommand('changegroupsubset', 'bases heads')
628 def changegroupsubset(repo, proto, bases, heads):
630 def changegroupsubset(repo, proto, bases, heads):
629 bases = decodelist(bases)
631 bases = decodelist(bases)
630 heads = decodelist(heads)
632 heads = decodelist(heads)
631 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
633 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
632 return streamres(proto.groupchunks(cg))
634 return streamres(proto.groupchunks(cg))
633
635
634 @wireprotocommand('debugwireargs', 'one two *')
636 @wireprotocommand('debugwireargs', 'one two *')
635 def debugwireargs(repo, proto, one, two, others):
637 def debugwireargs(repo, proto, one, two, others):
636 # only accept optional args from the known set
638 # only accept optional args from the known set
637 opts = options('debugwireargs', ['three', 'four'], others)
639 opts = options('debugwireargs', ['three', 'four'], others)
638 return repo.debugwireargs(one, two, **opts)
640 return repo.debugwireargs(one, two, **opts)
639
641
640 # List of options accepted by getbundle.
642 # List of options accepted by getbundle.
641 #
643 #
642 # Meant to be extended by extensions. It is the extension's responsibility to
644 # Meant to be extended by extensions. It is the extension's responsibility to
643 # ensure such options are properly processed in exchange.getbundle.
645 # ensure such options are properly processed in exchange.getbundle.
644 gboptslist = ['heads', 'common', 'bundlecaps']
646 gboptslist = ['heads', 'common', 'bundlecaps']
645
647
646 @wireprotocommand('getbundle', '*')
648 @wireprotocommand('getbundle', '*')
647 def getbundle(repo, proto, others):
649 def getbundle(repo, proto, others):
648 opts = options('getbundle', gboptsmap.keys(), others)
650 opts = options('getbundle', gboptsmap.keys(), others)
649 for k, v in opts.iteritems():
651 for k, v in opts.iteritems():
650 keytype = gboptsmap[k]
652 keytype = gboptsmap[k]
651 if keytype == 'nodes':
653 if keytype == 'nodes':
652 opts[k] = decodelist(v)
654 opts[k] = decodelist(v)
653 elif keytype == 'csv':
655 elif keytype == 'csv':
654 opts[k] = set(v.split(','))
656 opts[k] = set(v.split(','))
657 elif keytype == 'boolean':
658 opts[k] = '%i' % bool(v)
655 elif keytype != 'plain':
659 elif keytype != 'plain':
656 raise KeyError('unknown getbundle option type %s'
660 raise KeyError('unknown getbundle option type %s'
657 % keytype)
661 % keytype)
658 cg = exchange.getbundle(repo, 'serve', **opts)
662 cg = exchange.getbundle(repo, 'serve', **opts)
659 return streamres(proto.groupchunks(cg))
663 return streamres(proto.groupchunks(cg))
660
664
661 @wireprotocommand('heads')
665 @wireprotocommand('heads')
662 def heads(repo, proto):
666 def heads(repo, proto):
663 h = repo.heads()
667 h = repo.heads()
664 return encodelist(h) + "\n"
668 return encodelist(h) + "\n"
665
669
666 @wireprotocommand('hello')
670 @wireprotocommand('hello')
667 def hello(repo, proto):
671 def hello(repo, proto):
668 '''the hello command returns a set of lines describing various
672 '''the hello command returns a set of lines describing various
669 interesting things about the server, in an RFC822-like format.
673 interesting things about the server, in an RFC822-like format.
670 Currently the only one defined is "capabilities", which
674 Currently the only one defined is "capabilities", which
671 consists of a line in the form:
675 consists of a line in the form:
672
676
673 capabilities: space separated list of tokens
677 capabilities: space separated list of tokens
674 '''
678 '''
675 return "capabilities: %s\n" % (capabilities(repo, proto))
679 return "capabilities: %s\n" % (capabilities(repo, proto))
676
680
677 @wireprotocommand('listkeys', 'namespace')
681 @wireprotocommand('listkeys', 'namespace')
678 def listkeys(repo, proto, namespace):
682 def listkeys(repo, proto, namespace):
679 d = repo.listkeys(encoding.tolocal(namespace)).items()
683 d = repo.listkeys(encoding.tolocal(namespace)).items()
680 return pushkeymod.encodekeys(d)
684 return pushkeymod.encodekeys(d)
681
685
682 @wireprotocommand('lookup', 'key')
686 @wireprotocommand('lookup', 'key')
683 def lookup(repo, proto, key):
687 def lookup(repo, proto, key):
684 try:
688 try:
685 k = encoding.tolocal(key)
689 k = encoding.tolocal(key)
686 c = repo[k]
690 c = repo[k]
687 r = c.hex()
691 r = c.hex()
688 success = 1
692 success = 1
689 except Exception, inst:
693 except Exception, inst:
690 r = str(inst)
694 r = str(inst)
691 success = 0
695 success = 0
692 return "%s %s\n" % (success, r)
696 return "%s %s\n" % (success, r)
693
697
694 @wireprotocommand('known', 'nodes *')
698 @wireprotocommand('known', 'nodes *')
695 def known(repo, proto, nodes, others):
699 def known(repo, proto, nodes, others):
696 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
700 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
697
701
698 @wireprotocommand('pushkey', 'namespace key old new')
702 @wireprotocommand('pushkey', 'namespace key old new')
699 def pushkey(repo, proto, namespace, key, old, new):
703 def pushkey(repo, proto, namespace, key, old, new):
700 # compatibility with pre-1.8 clients which were accidentally
704 # compatibility with pre-1.8 clients which were accidentally
701 # sending raw binary nodes rather than utf-8-encoded hex
705 # sending raw binary nodes rather than utf-8-encoded hex
702 if len(new) == 20 and new.encode('string-escape') != new:
706 if len(new) == 20 and new.encode('string-escape') != new:
703 # looks like it could be a binary node
707 # looks like it could be a binary node
704 try:
708 try:
705 new.decode('utf-8')
709 new.decode('utf-8')
706 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
710 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
707 except UnicodeDecodeError:
711 except UnicodeDecodeError:
708 pass # binary, leave unmodified
712 pass # binary, leave unmodified
709 else:
713 else:
710 new = encoding.tolocal(new) # normal path
714 new = encoding.tolocal(new) # normal path
711
715
712 if util.safehasattr(proto, 'restore'):
716 if util.safehasattr(proto, 'restore'):
713
717
714 proto.redirect()
718 proto.redirect()
715
719
716 try:
720 try:
717 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
721 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
718 encoding.tolocal(old), new) or False
722 encoding.tolocal(old), new) or False
719 except util.Abort:
723 except util.Abort:
720 r = False
724 r = False
721
725
722 output = proto.restore()
726 output = proto.restore()
723
727
724 return '%s\n%s' % (int(r), output)
728 return '%s\n%s' % (int(r), output)
725
729
726 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
730 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
727 encoding.tolocal(old), new)
731 encoding.tolocal(old), new)
728 return '%s\n' % int(r)
732 return '%s\n' % int(r)
729
733
730 def _allowstream(ui):
734 def _allowstream(ui):
731 return ui.configbool('server', 'uncompressed', True, untrusted=True)
735 return ui.configbool('server', 'uncompressed', True, untrusted=True)
732
736
733 def _walkstreamfiles(repo):
737 def _walkstreamfiles(repo):
734 # this is it's own function so extensions can override it
738 # this is it's own function so extensions can override it
735 return repo.store.walk()
739 return repo.store.walk()
736
740
737 @wireprotocommand('stream_out')
741 @wireprotocommand('stream_out')
738 def stream(repo, proto):
742 def stream(repo, proto):
739 '''If the server supports streaming clone, it advertises the "stream"
743 '''If the server supports streaming clone, it advertises the "stream"
740 capability with a value representing the version and flags of the repo
744 capability with a value representing the version and flags of the repo
741 it is serving. Client checks to see if it understands the format.
745 it is serving. Client checks to see if it understands the format.
742
746
743 The format is simple: the server writes out a line with the amount
747 The format is simple: the server writes out a line with the amount
744 of files, then the total amount of bytes to be transferred (separated
748 of files, then the total amount of bytes to be transferred (separated
745 by a space). Then, for each file, the server first writes the filename
749 by a space). Then, for each file, the server first writes the filename
746 and file size (separated by the null character), then the file contents.
750 and file size (separated by the null character), then the file contents.
747 '''
751 '''
748
752
749 if not _allowstream(repo.ui):
753 if not _allowstream(repo.ui):
750 return '1\n'
754 return '1\n'
751
755
752 entries = []
756 entries = []
753 total_bytes = 0
757 total_bytes = 0
754 try:
758 try:
755 # get consistent snapshot of repo, lock during scan
759 # get consistent snapshot of repo, lock during scan
756 lock = repo.lock()
760 lock = repo.lock()
757 try:
761 try:
758 repo.ui.debug('scanning\n')
762 repo.ui.debug('scanning\n')
759 for name, ename, size in _walkstreamfiles(repo):
763 for name, ename, size in _walkstreamfiles(repo):
760 if size:
764 if size:
761 entries.append((name, size))
765 entries.append((name, size))
762 total_bytes += size
766 total_bytes += size
763 finally:
767 finally:
764 lock.release()
768 lock.release()
765 except error.LockError:
769 except error.LockError:
766 return '2\n' # error: 2
770 return '2\n' # error: 2
767
771
768 def streamer(repo, entries, total):
772 def streamer(repo, entries, total):
769 '''stream out all metadata files in repository.'''
773 '''stream out all metadata files in repository.'''
770 yield '0\n' # success
774 yield '0\n' # success
771 repo.ui.debug('%d files, %d bytes to transfer\n' %
775 repo.ui.debug('%d files, %d bytes to transfer\n' %
772 (len(entries), total_bytes))
776 (len(entries), total_bytes))
773 yield '%d %d\n' % (len(entries), total_bytes)
777 yield '%d %d\n' % (len(entries), total_bytes)
774
778
775 sopener = repo.sopener
779 sopener = repo.sopener
776 oldaudit = sopener.mustaudit
780 oldaudit = sopener.mustaudit
777 debugflag = repo.ui.debugflag
781 debugflag = repo.ui.debugflag
778 sopener.mustaudit = False
782 sopener.mustaudit = False
779
783
780 try:
784 try:
781 for name, size in entries:
785 for name, size in entries:
782 if debugflag:
786 if debugflag:
783 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
787 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
784 # partially encode name over the wire for backwards compat
788 # partially encode name over the wire for backwards compat
785 yield '%s\0%d\n' % (store.encodedir(name), size)
789 yield '%s\0%d\n' % (store.encodedir(name), size)
786 if size <= 65536:
790 if size <= 65536:
787 fp = sopener(name)
791 fp = sopener(name)
788 try:
792 try:
789 data = fp.read(size)
793 data = fp.read(size)
790 finally:
794 finally:
791 fp.close()
795 fp.close()
792 yield data
796 yield data
793 else:
797 else:
794 for chunk in util.filechunkiter(sopener(name), limit=size):
798 for chunk in util.filechunkiter(sopener(name), limit=size):
795 yield chunk
799 yield chunk
796 # replace with "finally:" when support for python 2.4 has been dropped
800 # replace with "finally:" when support for python 2.4 has been dropped
797 except Exception:
801 except Exception:
798 sopener.mustaudit = oldaudit
802 sopener.mustaudit = oldaudit
799 raise
803 raise
800 sopener.mustaudit = oldaudit
804 sopener.mustaudit = oldaudit
801
805
802 return streamres(streamer(repo, entries, total_bytes))
806 return streamres(streamer(repo, entries, total_bytes))
803
807
804 @wireprotocommand('unbundle', 'heads')
808 @wireprotocommand('unbundle', 'heads')
805 def unbundle(repo, proto, heads):
809 def unbundle(repo, proto, heads):
806 their_heads = decodelist(heads)
810 their_heads = decodelist(heads)
807
811
808 try:
812 try:
809 proto.redirect()
813 proto.redirect()
810
814
811 exchange.check_heads(repo, their_heads, 'preparing changes')
815 exchange.check_heads(repo, their_heads, 'preparing changes')
812
816
813 # write bundle data to temporary file because it can be big
817 # write bundle data to temporary file because it can be big
814 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
818 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
815 fp = os.fdopen(fd, 'wb+')
819 fp = os.fdopen(fd, 'wb+')
816 r = 0
820 r = 0
817 try:
821 try:
818 proto.getfile(fp)
822 proto.getfile(fp)
819 fp.seek(0)
823 fp.seek(0)
820 gen = exchange.readbundle(repo.ui, fp, None)
824 gen = exchange.readbundle(repo.ui, fp, None)
821 r = exchange.unbundle(repo, gen, their_heads, 'serve',
825 r = exchange.unbundle(repo, gen, their_heads, 'serve',
822 proto._client())
826 proto._client())
823 if util.safehasattr(r, 'addpart'):
827 if util.safehasattr(r, 'addpart'):
824 # The return looks streameable, we are in the bundle2 case and
828 # The return looks streameable, we are in the bundle2 case and
825 # should return a stream.
829 # should return a stream.
826 return streamres(r.getchunks())
830 return streamres(r.getchunks())
827 return pushres(r)
831 return pushres(r)
828
832
829 finally:
833 finally:
830 fp.close()
834 fp.close()
831 os.unlink(tempname)
835 os.unlink(tempname)
832 except error.BundleValueError, exc:
836 except error.BundleValueError, exc:
833 bundler = bundle2.bundle20(repo.ui)
837 bundler = bundle2.bundle20(repo.ui)
834 errpart = bundler.newpart('B2X:ERROR:UNSUPPORTEDCONTENT')
838 errpart = bundler.newpart('B2X:ERROR:UNSUPPORTEDCONTENT')
835 if exc.parttype is not None:
839 if exc.parttype is not None:
836 errpart.addparam('parttype', exc.parttype)
840 errpart.addparam('parttype', exc.parttype)
837 if exc.params:
841 if exc.params:
838 errpart.addparam('params', '\0'.join(exc.params))
842 errpart.addparam('params', '\0'.join(exc.params))
839 return streamres(bundler.getchunks())
843 return streamres(bundler.getchunks())
840 except util.Abort, inst:
844 except util.Abort, inst:
841 # The old code we moved used sys.stderr directly.
845 # The old code we moved used sys.stderr directly.
842 # We did not change it to minimise code change.
846 # We did not change it to minimise code change.
843 # This need to be moved to something proper.
847 # This need to be moved to something proper.
844 # Feel free to do it.
848 # Feel free to do it.
845 if getattr(inst, 'duringunbundle2', False):
849 if getattr(inst, 'duringunbundle2', False):
846 bundler = bundle2.bundle20(repo.ui)
850 bundler = bundle2.bundle20(repo.ui)
847 manargs = [('message', str(inst))]
851 manargs = [('message', str(inst))]
848 advargs = []
852 advargs = []
849 if inst.hint is not None:
853 if inst.hint is not None:
850 advargs.append(('hint', inst.hint))
854 advargs.append(('hint', inst.hint))
851 bundler.addpart(bundle2.bundlepart('B2X:ERROR:ABORT',
855 bundler.addpart(bundle2.bundlepart('B2X:ERROR:ABORT',
852 manargs, advargs))
856 manargs, advargs))
853 return streamres(bundler.getchunks())
857 return streamres(bundler.getchunks())
854 else:
858 else:
855 sys.stderr.write("abort: %s\n" % inst)
859 sys.stderr.write("abort: %s\n" % inst)
856 return pushres(0)
860 return pushres(0)
857 except error.PushRaced, exc:
861 except error.PushRaced, exc:
858 if getattr(exc, 'duringunbundle2', False):
862 if getattr(exc, 'duringunbundle2', False):
859 bundler = bundle2.bundle20(repo.ui)
863 bundler = bundle2.bundle20(repo.ui)
860 bundler.newpart('B2X:ERROR:PUSHRACED', [('message', str(exc))])
864 bundler.newpart('B2X:ERROR:PUSHRACED', [('message', str(exc))])
861 return streamres(bundler.getchunks())
865 return streamres(bundler.getchunks())
862 else:
866 else:
863 return pusherr(str(exc))
867 return pusherr(str(exc))
General Comments 0
You need to be logged in to leave comments. Login now