##// END OF EJS Templates
wireprotocol: fix 'boolean' handling...
Pierre-Yves David -
r22351:7e6dd496 default
parent child Browse files
Show More
@@ -1,868 +1,868
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import urllib, tempfile, os, sys
8 import urllib, tempfile, os, sys
9 from i18n import _
9 from i18n import _
10 from node import bin, hex
10 from node import bin, hex
11 import changegroup as changegroupmod, bundle2, pushkey as pushkeymod
11 import changegroup as changegroupmod, bundle2, pushkey as pushkeymod
12 import peer, error, encoding, util, store, exchange
12 import peer, error, encoding, util, store, exchange
13
13
14
14
15 class abstractserverproto(object):
15 class abstractserverproto(object):
16 """abstract class that summarizes the protocol API
16 """abstract class that summarizes the protocol API
17
17
18 Used as reference and documentation.
18 Used as reference and documentation.
19 """
19 """
20
20
21 def getargs(self, args):
21 def getargs(self, args):
22 """return the value for arguments in <args>
22 """return the value for arguments in <args>
23
23
24 returns a list of values (same order as <args>)"""
24 returns a list of values (same order as <args>)"""
25 raise NotImplementedError()
25 raise NotImplementedError()
26
26
27 def getfile(self, fp):
27 def getfile(self, fp):
28 """write the whole content of a file into a file like object
28 """write the whole content of a file into a file like object
29
29
30 The file is in the form::
30 The file is in the form::
31
31
32 (<chunk-size>\n<chunk>)+0\n
32 (<chunk-size>\n<chunk>)+0\n
33
33
34 chunk size is the ascii version of the int.
34 chunk size is the ascii version of the int.
35 """
35 """
36 raise NotImplementedError()
36 raise NotImplementedError()
37
37
38 def redirect(self):
38 def redirect(self):
39 """may setup interception for stdout and stderr
39 """may setup interception for stdout and stderr
40
40
41 See also the `restore` method."""
41 See also the `restore` method."""
42 raise NotImplementedError()
42 raise NotImplementedError()
43
43
44 # If the `redirect` function does install interception, the `restore`
44 # If the `redirect` function does install interception, the `restore`
45 # function MUST be defined. If interception is not used, this function
45 # function MUST be defined. If interception is not used, this function
46 # MUST NOT be defined.
46 # MUST NOT be defined.
47 #
47 #
48 # left commented here on purpose
48 # left commented here on purpose
49 #
49 #
50 #def restore(self):
50 #def restore(self):
51 # """reinstall previous stdout and stderr and return intercepted stdout
51 # """reinstall previous stdout and stderr and return intercepted stdout
52 # """
52 # """
53 # raise NotImplementedError()
53 # raise NotImplementedError()
54
54
55 def groupchunks(self, cg):
55 def groupchunks(self, cg):
56 """return 4096 chunks from a changegroup object
56 """return 4096 chunks from a changegroup object
57
57
58 Some protocols may have compressed the contents."""
58 Some protocols may have compressed the contents."""
59 raise NotImplementedError()
59 raise NotImplementedError()
60
60
61 # abstract batching support
61 # abstract batching support
62
62
63 class future(object):
63 class future(object):
64 '''placeholder for a value to be set later'''
64 '''placeholder for a value to be set later'''
65 def set(self, value):
65 def set(self, value):
66 if util.safehasattr(self, 'value'):
66 if util.safehasattr(self, 'value'):
67 raise error.RepoError("future is already set")
67 raise error.RepoError("future is already set")
68 self.value = value
68 self.value = value
69
69
70 class batcher(object):
70 class batcher(object):
71 '''base class for batches of commands submittable in a single request
71 '''base class for batches of commands submittable in a single request
72
72
73 All methods invoked on instances of this class are simply queued and
73 All methods invoked on instances of this class are simply queued and
74 return a a future for the result. Once you call submit(), all the queued
74 return a a future for the result. Once you call submit(), all the queued
75 calls are performed and the results set in their respective futures.
75 calls are performed and the results set in their respective futures.
76 '''
76 '''
77 def __init__(self):
77 def __init__(self):
78 self.calls = []
78 self.calls = []
79 def __getattr__(self, name):
79 def __getattr__(self, name):
80 def call(*args, **opts):
80 def call(*args, **opts):
81 resref = future()
81 resref = future()
82 self.calls.append((name, args, opts, resref,))
82 self.calls.append((name, args, opts, resref,))
83 return resref
83 return resref
84 return call
84 return call
85 def submit(self):
85 def submit(self):
86 pass
86 pass
87
87
88 class localbatch(batcher):
88 class localbatch(batcher):
89 '''performs the queued calls directly'''
89 '''performs the queued calls directly'''
90 def __init__(self, local):
90 def __init__(self, local):
91 batcher.__init__(self)
91 batcher.__init__(self)
92 self.local = local
92 self.local = local
93 def submit(self):
93 def submit(self):
94 for name, args, opts, resref in self.calls:
94 for name, args, opts, resref in self.calls:
95 resref.set(getattr(self.local, name)(*args, **opts))
95 resref.set(getattr(self.local, name)(*args, **opts))
96
96
97 class remotebatch(batcher):
97 class remotebatch(batcher):
98 '''batches the queued calls; uses as few roundtrips as possible'''
98 '''batches the queued calls; uses as few roundtrips as possible'''
99 def __init__(self, remote):
99 def __init__(self, remote):
100 '''remote must support _submitbatch(encbatch) and
100 '''remote must support _submitbatch(encbatch) and
101 _submitone(op, encargs)'''
101 _submitone(op, encargs)'''
102 batcher.__init__(self)
102 batcher.__init__(self)
103 self.remote = remote
103 self.remote = remote
104 def submit(self):
104 def submit(self):
105 req, rsp = [], []
105 req, rsp = [], []
106 for name, args, opts, resref in self.calls:
106 for name, args, opts, resref in self.calls:
107 mtd = getattr(self.remote, name)
107 mtd = getattr(self.remote, name)
108 batchablefn = getattr(mtd, 'batchable', None)
108 batchablefn = getattr(mtd, 'batchable', None)
109 if batchablefn is not None:
109 if batchablefn is not None:
110 batchable = batchablefn(mtd.im_self, *args, **opts)
110 batchable = batchablefn(mtd.im_self, *args, **opts)
111 encargsorres, encresref = batchable.next()
111 encargsorres, encresref = batchable.next()
112 if encresref:
112 if encresref:
113 req.append((name, encargsorres,))
113 req.append((name, encargsorres,))
114 rsp.append((batchable, encresref, resref,))
114 rsp.append((batchable, encresref, resref,))
115 else:
115 else:
116 resref.set(encargsorres)
116 resref.set(encargsorres)
117 else:
117 else:
118 if req:
118 if req:
119 self._submitreq(req, rsp)
119 self._submitreq(req, rsp)
120 req, rsp = [], []
120 req, rsp = [], []
121 resref.set(mtd(*args, **opts))
121 resref.set(mtd(*args, **opts))
122 if req:
122 if req:
123 self._submitreq(req, rsp)
123 self._submitreq(req, rsp)
124 def _submitreq(self, req, rsp):
124 def _submitreq(self, req, rsp):
125 encresults = self.remote._submitbatch(req)
125 encresults = self.remote._submitbatch(req)
126 for encres, r in zip(encresults, rsp):
126 for encres, r in zip(encresults, rsp):
127 batchable, encresref, resref = r
127 batchable, encresref, resref = r
128 encresref.set(encres)
128 encresref.set(encres)
129 resref.set(batchable.next())
129 resref.set(batchable.next())
130
130
131 def batchable(f):
131 def batchable(f):
132 '''annotation for batchable methods
132 '''annotation for batchable methods
133
133
134 Such methods must implement a coroutine as follows:
134 Such methods must implement a coroutine as follows:
135
135
136 @batchable
136 @batchable
137 def sample(self, one, two=None):
137 def sample(self, one, two=None):
138 # Handle locally computable results first:
138 # Handle locally computable results first:
139 if not one:
139 if not one:
140 yield "a local result", None
140 yield "a local result", None
141 # Build list of encoded arguments suitable for your wire protocol:
141 # Build list of encoded arguments suitable for your wire protocol:
142 encargs = [('one', encode(one),), ('two', encode(two),)]
142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 # Create future for injection of encoded result:
143 # Create future for injection of encoded result:
144 encresref = future()
144 encresref = future()
145 # Return encoded arguments and future:
145 # Return encoded arguments and future:
146 yield encargs, encresref
146 yield encargs, encresref
147 # Assuming the future to be filled with the result from the batched
147 # Assuming the future to be filled with the result from the batched
148 # request now. Decode it:
148 # request now. Decode it:
149 yield decode(encresref.value)
149 yield decode(encresref.value)
150
150
151 The decorator returns a function which wraps this coroutine as a plain
151 The decorator returns a function which wraps this coroutine as a plain
152 method, but adds the original method as an attribute called "batchable",
152 method, but adds the original method as an attribute called "batchable",
153 which is used by remotebatch to split the call into separate encoding and
153 which is used by remotebatch to split the call into separate encoding and
154 decoding phases.
154 decoding phases.
155 '''
155 '''
156 def plain(*args, **opts):
156 def plain(*args, **opts):
157 batchable = f(*args, **opts)
157 batchable = f(*args, **opts)
158 encargsorres, encresref = batchable.next()
158 encargsorres, encresref = batchable.next()
159 if not encresref:
159 if not encresref:
160 return encargsorres # a local result in this case
160 return encargsorres # a local result in this case
161 self = args[0]
161 self = args[0]
162 encresref.set(self._submitone(f.func_name, encargsorres))
162 encresref.set(self._submitone(f.func_name, encargsorres))
163 return batchable.next()
163 return batchable.next()
164 setattr(plain, 'batchable', f)
164 setattr(plain, 'batchable', f)
165 return plain
165 return plain
166
166
167 # list of nodes encoding / decoding
167 # list of nodes encoding / decoding
168
168
169 def decodelist(l, sep=' '):
169 def decodelist(l, sep=' '):
170 if l:
170 if l:
171 return map(bin, l.split(sep))
171 return map(bin, l.split(sep))
172 return []
172 return []
173
173
174 def encodelist(l, sep=' '):
174 def encodelist(l, sep=' '):
175 return sep.join(map(hex, l))
175 return sep.join(map(hex, l))
176
176
177 # batched call argument encoding
177 # batched call argument encoding
178
178
179 def escapearg(plain):
179 def escapearg(plain):
180 return (plain
180 return (plain
181 .replace(':', '::')
181 .replace(':', '::')
182 .replace(',', ':,')
182 .replace(',', ':,')
183 .replace(';', ':;')
183 .replace(';', ':;')
184 .replace('=', ':='))
184 .replace('=', ':='))
185
185
186 def unescapearg(escaped):
186 def unescapearg(escaped):
187 return (escaped
187 return (escaped
188 .replace(':=', '=')
188 .replace(':=', '=')
189 .replace(':;', ';')
189 .replace(':;', ';')
190 .replace(':,', ',')
190 .replace(':,', ',')
191 .replace('::', ':'))
191 .replace('::', ':'))
192
192
193 # mapping of options accepted by getbundle and their types
193 # mapping of options accepted by getbundle and their types
194 #
194 #
195 # Meant to be extended by extensions. It is extensions responsibility to ensure
195 # Meant to be extended by extensions. It is extensions responsibility to ensure
196 # such options are properly processed in exchange.getbundle.
196 # such options are properly processed in exchange.getbundle.
197 #
197 #
198 # supported types are:
198 # supported types are:
199 #
199 #
200 # :nodes: list of binary nodes
200 # :nodes: list of binary nodes
201 # :csv: list of comma-separated values
201 # :csv: list of comma-separated values
202 # :plain: string with no transformation needed.
202 # :plain: string with no transformation needed.
203 gboptsmap = {'heads': 'nodes',
203 gboptsmap = {'heads': 'nodes',
204 'common': 'nodes',
204 'common': 'nodes',
205 'bundlecaps': 'csv',
205 'bundlecaps': 'csv',
206 'listkeys': 'csv',
206 'listkeys': 'csv',
207 'cg': 'boolean'}
207 'cg': 'boolean'}
208
208
209 # client side
209 # client side
210
210
211 class wirepeer(peer.peerrepository):
211 class wirepeer(peer.peerrepository):
212
212
213 def batch(self):
213 def batch(self):
214 return remotebatch(self)
214 return remotebatch(self)
215 def _submitbatch(self, req):
215 def _submitbatch(self, req):
216 cmds = []
216 cmds = []
217 for op, argsdict in req:
217 for op, argsdict in req:
218 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
218 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
219 cmds.append('%s %s' % (op, args))
219 cmds.append('%s %s' % (op, args))
220 rsp = self._call("batch", cmds=';'.join(cmds))
220 rsp = self._call("batch", cmds=';'.join(cmds))
221 return rsp.split(';')
221 return rsp.split(';')
222 def _submitone(self, op, args):
222 def _submitone(self, op, args):
223 return self._call(op, **args)
223 return self._call(op, **args)
224
224
225 @batchable
225 @batchable
226 def lookup(self, key):
226 def lookup(self, key):
227 self.requirecap('lookup', _('look up remote revision'))
227 self.requirecap('lookup', _('look up remote revision'))
228 f = future()
228 f = future()
229 yield {'key': encoding.fromlocal(key)}, f
229 yield {'key': encoding.fromlocal(key)}, f
230 d = f.value
230 d = f.value
231 success, data = d[:-1].split(" ", 1)
231 success, data = d[:-1].split(" ", 1)
232 if int(success):
232 if int(success):
233 yield bin(data)
233 yield bin(data)
234 self._abort(error.RepoError(data))
234 self._abort(error.RepoError(data))
235
235
236 @batchable
236 @batchable
237 def heads(self):
237 def heads(self):
238 f = future()
238 f = future()
239 yield {}, f
239 yield {}, f
240 d = f.value
240 d = f.value
241 try:
241 try:
242 yield decodelist(d[:-1])
242 yield decodelist(d[:-1])
243 except ValueError:
243 except ValueError:
244 self._abort(error.ResponseError(_("unexpected response:"), d))
244 self._abort(error.ResponseError(_("unexpected response:"), d))
245
245
246 @batchable
246 @batchable
247 def known(self, nodes):
247 def known(self, nodes):
248 f = future()
248 f = future()
249 yield {'nodes': encodelist(nodes)}, f
249 yield {'nodes': encodelist(nodes)}, f
250 d = f.value
250 d = f.value
251 try:
251 try:
252 yield [bool(int(b)) for b in d]
252 yield [bool(int(b)) for b in d]
253 except ValueError:
253 except ValueError:
254 self._abort(error.ResponseError(_("unexpected response:"), d))
254 self._abort(error.ResponseError(_("unexpected response:"), d))
255
255
256 @batchable
256 @batchable
257 def branchmap(self):
257 def branchmap(self):
258 f = future()
258 f = future()
259 yield {}, f
259 yield {}, f
260 d = f.value
260 d = f.value
261 try:
261 try:
262 branchmap = {}
262 branchmap = {}
263 for branchpart in d.splitlines():
263 for branchpart in d.splitlines():
264 branchname, branchheads = branchpart.split(' ', 1)
264 branchname, branchheads = branchpart.split(' ', 1)
265 branchname = encoding.tolocal(urllib.unquote(branchname))
265 branchname = encoding.tolocal(urllib.unquote(branchname))
266 branchheads = decodelist(branchheads)
266 branchheads = decodelist(branchheads)
267 branchmap[branchname] = branchheads
267 branchmap[branchname] = branchheads
268 yield branchmap
268 yield branchmap
269 except TypeError:
269 except TypeError:
270 self._abort(error.ResponseError(_("unexpected response:"), d))
270 self._abort(error.ResponseError(_("unexpected response:"), d))
271
271
272 def branches(self, nodes):
272 def branches(self, nodes):
273 n = encodelist(nodes)
273 n = encodelist(nodes)
274 d = self._call("branches", nodes=n)
274 d = self._call("branches", nodes=n)
275 try:
275 try:
276 br = [tuple(decodelist(b)) for b in d.splitlines()]
276 br = [tuple(decodelist(b)) for b in d.splitlines()]
277 return br
277 return br
278 except ValueError:
278 except ValueError:
279 self._abort(error.ResponseError(_("unexpected response:"), d))
279 self._abort(error.ResponseError(_("unexpected response:"), d))
280
280
281 def between(self, pairs):
281 def between(self, pairs):
282 batch = 8 # avoid giant requests
282 batch = 8 # avoid giant requests
283 r = []
283 r = []
284 for i in xrange(0, len(pairs), batch):
284 for i in xrange(0, len(pairs), batch):
285 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
285 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
286 d = self._call("between", pairs=n)
286 d = self._call("between", pairs=n)
287 try:
287 try:
288 r.extend(l and decodelist(l) or [] for l in d.splitlines())
288 r.extend(l and decodelist(l) or [] for l in d.splitlines())
289 except ValueError:
289 except ValueError:
290 self._abort(error.ResponseError(_("unexpected response:"), d))
290 self._abort(error.ResponseError(_("unexpected response:"), d))
291 return r
291 return r
292
292
293 @batchable
293 @batchable
294 def pushkey(self, namespace, key, old, new):
294 def pushkey(self, namespace, key, old, new):
295 if not self.capable('pushkey'):
295 if not self.capable('pushkey'):
296 yield False, None
296 yield False, None
297 f = future()
297 f = future()
298 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
298 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
299 yield {'namespace': encoding.fromlocal(namespace),
299 yield {'namespace': encoding.fromlocal(namespace),
300 'key': encoding.fromlocal(key),
300 'key': encoding.fromlocal(key),
301 'old': encoding.fromlocal(old),
301 'old': encoding.fromlocal(old),
302 'new': encoding.fromlocal(new)}, f
302 'new': encoding.fromlocal(new)}, f
303 d = f.value
303 d = f.value
304 d, output = d.split('\n', 1)
304 d, output = d.split('\n', 1)
305 try:
305 try:
306 d = bool(int(d))
306 d = bool(int(d))
307 except ValueError:
307 except ValueError:
308 raise error.ResponseError(
308 raise error.ResponseError(
309 _('push failed (unexpected response):'), d)
309 _('push failed (unexpected response):'), d)
310 for l in output.splitlines(True):
310 for l in output.splitlines(True):
311 self.ui.status(_('remote: '), l)
311 self.ui.status(_('remote: '), l)
312 yield d
312 yield d
313
313
314 @batchable
314 @batchable
315 def listkeys(self, namespace):
315 def listkeys(self, namespace):
316 if not self.capable('pushkey'):
316 if not self.capable('pushkey'):
317 yield {}, None
317 yield {}, None
318 f = future()
318 f = future()
319 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
319 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
320 yield {'namespace': encoding.fromlocal(namespace)}, f
320 yield {'namespace': encoding.fromlocal(namespace)}, f
321 d = f.value
321 d = f.value
322 yield pushkeymod.decodekeys(d)
322 yield pushkeymod.decodekeys(d)
323
323
324 def stream_out(self):
324 def stream_out(self):
325 return self._callstream('stream_out')
325 return self._callstream('stream_out')
326
326
327 def changegroup(self, nodes, kind):
327 def changegroup(self, nodes, kind):
328 n = encodelist(nodes)
328 n = encodelist(nodes)
329 f = self._callcompressable("changegroup", roots=n)
329 f = self._callcompressable("changegroup", roots=n)
330 return changegroupmod.unbundle10(f, 'UN')
330 return changegroupmod.unbundle10(f, 'UN')
331
331
332 def changegroupsubset(self, bases, heads, kind):
332 def changegroupsubset(self, bases, heads, kind):
333 self.requirecap('changegroupsubset', _('look up remote changes'))
333 self.requirecap('changegroupsubset', _('look up remote changes'))
334 bases = encodelist(bases)
334 bases = encodelist(bases)
335 heads = encodelist(heads)
335 heads = encodelist(heads)
336 f = self._callcompressable("changegroupsubset",
336 f = self._callcompressable("changegroupsubset",
337 bases=bases, heads=heads)
337 bases=bases, heads=heads)
338 return changegroupmod.unbundle10(f, 'UN')
338 return changegroupmod.unbundle10(f, 'UN')
339
339
340 def getbundle(self, source, **kwargs):
340 def getbundle(self, source, **kwargs):
341 self.requirecap('getbundle', _('look up remote changes'))
341 self.requirecap('getbundle', _('look up remote changes'))
342 opts = {}
342 opts = {}
343 for key, value in kwargs.iteritems():
343 for key, value in kwargs.iteritems():
344 if value is None:
344 if value is None:
345 continue
345 continue
346 keytype = gboptsmap.get(key)
346 keytype = gboptsmap.get(key)
347 if keytype is None:
347 if keytype is None:
348 assert False, 'unexpected'
348 assert False, 'unexpected'
349 elif keytype == 'nodes':
349 elif keytype == 'nodes':
350 value = encodelist(value)
350 value = encodelist(value)
351 elif keytype == 'csv':
351 elif keytype == 'csv':
352 value = ','.join(value)
352 value = ','.join(value)
353 elif keytype == 'boolean':
353 elif keytype == 'boolean':
354 value = bool(value)
354 value = '%i' % bool(value)
355 elif keytype != 'plain':
355 elif keytype != 'plain':
356 raise KeyError('unknown getbundle option type %s'
356 raise KeyError('unknown getbundle option type %s'
357 % keytype)
357 % keytype)
358 opts[key] = value
358 opts[key] = value
359 f = self._callcompressable("getbundle", **opts)
359 f = self._callcompressable("getbundle", **opts)
360 bundlecaps = kwargs.get('bundlecaps')
360 bundlecaps = kwargs.get('bundlecaps')
361 if bundlecaps is not None and 'HG2X' in bundlecaps:
361 if bundlecaps is not None and 'HG2X' in bundlecaps:
362 return bundle2.unbundle20(self.ui, f)
362 return bundle2.unbundle20(self.ui, f)
363 else:
363 else:
364 return changegroupmod.unbundle10(f, 'UN')
364 return changegroupmod.unbundle10(f, 'UN')
365
365
366 def unbundle(self, cg, heads, source):
366 def unbundle(self, cg, heads, source):
367 '''Send cg (a readable file-like object representing the
367 '''Send cg (a readable file-like object representing the
368 changegroup to push, typically a chunkbuffer object) to the
368 changegroup to push, typically a chunkbuffer object) to the
369 remote server as a bundle.
369 remote server as a bundle.
370
370
371 When pushing a bundle10 stream, return an integer indicating the
371 When pushing a bundle10 stream, return an integer indicating the
372 result of the push (see localrepository.addchangegroup()).
372 result of the push (see localrepository.addchangegroup()).
373
373
374 When pushing a bundle20 stream, return a bundle20 stream.'''
374 When pushing a bundle20 stream, return a bundle20 stream.'''
375
375
376 if heads != ['force'] and self.capable('unbundlehash'):
376 if heads != ['force'] and self.capable('unbundlehash'):
377 heads = encodelist(['hashed',
377 heads = encodelist(['hashed',
378 util.sha1(''.join(sorted(heads))).digest()])
378 util.sha1(''.join(sorted(heads))).digest()])
379 else:
379 else:
380 heads = encodelist(heads)
380 heads = encodelist(heads)
381
381
382 if util.safehasattr(cg, 'deltaheader'):
382 if util.safehasattr(cg, 'deltaheader'):
383 # this a bundle10, do the old style call sequence
383 # this a bundle10, do the old style call sequence
384 ret, output = self._callpush("unbundle", cg, heads=heads)
384 ret, output = self._callpush("unbundle", cg, heads=heads)
385 if ret == "":
385 if ret == "":
386 raise error.ResponseError(
386 raise error.ResponseError(
387 _('push failed:'), output)
387 _('push failed:'), output)
388 try:
388 try:
389 ret = int(ret)
389 ret = int(ret)
390 except ValueError:
390 except ValueError:
391 raise error.ResponseError(
391 raise error.ResponseError(
392 _('push failed (unexpected response):'), ret)
392 _('push failed (unexpected response):'), ret)
393
393
394 for l in output.splitlines(True):
394 for l in output.splitlines(True):
395 self.ui.status(_('remote: '), l)
395 self.ui.status(_('remote: '), l)
396 else:
396 else:
397 # bundle2 push. Send a stream, fetch a stream.
397 # bundle2 push. Send a stream, fetch a stream.
398 stream = self._calltwowaystream('unbundle', cg, heads=heads)
398 stream = self._calltwowaystream('unbundle', cg, heads=heads)
399 ret = bundle2.unbundle20(self.ui, stream)
399 ret = bundle2.unbundle20(self.ui, stream)
400 return ret
400 return ret
401
401
402 def debugwireargs(self, one, two, three=None, four=None, five=None):
402 def debugwireargs(self, one, two, three=None, four=None, five=None):
403 # don't pass optional arguments left at their default value
403 # don't pass optional arguments left at their default value
404 opts = {}
404 opts = {}
405 if three is not None:
405 if three is not None:
406 opts['three'] = three
406 opts['three'] = three
407 if four is not None:
407 if four is not None:
408 opts['four'] = four
408 opts['four'] = four
409 return self._call('debugwireargs', one=one, two=two, **opts)
409 return self._call('debugwireargs', one=one, two=two, **opts)
410
410
411 def _call(self, cmd, **args):
411 def _call(self, cmd, **args):
412 """execute <cmd> on the server
412 """execute <cmd> on the server
413
413
414 The command is expected to return a simple string.
414 The command is expected to return a simple string.
415
415
416 returns the server reply as a string."""
416 returns the server reply as a string."""
417 raise NotImplementedError()
417 raise NotImplementedError()
418
418
419 def _callstream(self, cmd, **args):
419 def _callstream(self, cmd, **args):
420 """execute <cmd> on the server
420 """execute <cmd> on the server
421
421
422 The command is expected to return a stream.
422 The command is expected to return a stream.
423
423
424 returns the server reply as a file like object."""
424 returns the server reply as a file like object."""
425 raise NotImplementedError()
425 raise NotImplementedError()
426
426
427 def _callcompressable(self, cmd, **args):
427 def _callcompressable(self, cmd, **args):
428 """execute <cmd> on the server
428 """execute <cmd> on the server
429
429
430 The command is expected to return a stream.
430 The command is expected to return a stream.
431
431
432 The stream may have been compressed in some implementations. This
432 The stream may have been compressed in some implementations. This
433 function takes care of the decompression. This is the only difference
433 function takes care of the decompression. This is the only difference
434 with _callstream.
434 with _callstream.
435
435
436 returns the server reply as a file like object.
436 returns the server reply as a file like object.
437 """
437 """
438 raise NotImplementedError()
438 raise NotImplementedError()
439
439
440 def _callpush(self, cmd, fp, **args):
440 def _callpush(self, cmd, fp, **args):
441 """execute a <cmd> on server
441 """execute a <cmd> on server
442
442
443 The command is expected to be related to a push. Push has a special
443 The command is expected to be related to a push. Push has a special
444 return method.
444 return method.
445
445
446 returns the server reply as a (ret, output) tuple. ret is either
446 returns the server reply as a (ret, output) tuple. ret is either
447 empty (error) or a stringified int.
447 empty (error) or a stringified int.
448 """
448 """
449 raise NotImplementedError()
449 raise NotImplementedError()
450
450
451 def _calltwowaystream(self, cmd, fp, **args):
451 def _calltwowaystream(self, cmd, fp, **args):
452 """execute <cmd> on server
452 """execute <cmd> on server
453
453
454 The command will send a stream to the server and get a stream in reply.
454 The command will send a stream to the server and get a stream in reply.
455 """
455 """
456 raise NotImplementedError()
456 raise NotImplementedError()
457
457
458 def _abort(self, exception):
458 def _abort(self, exception):
459 """clearly abort the wire protocol connection and raise the exception
459 """clearly abort the wire protocol connection and raise the exception
460 """
460 """
461 raise NotImplementedError()
461 raise NotImplementedError()
462
462
463 # server side
463 # server side
464
464
465 # wire protocol command can either return a string or one of these classes.
465 # wire protocol command can either return a string or one of these classes.
466 class streamres(object):
466 class streamres(object):
467 """wireproto reply: binary stream
467 """wireproto reply: binary stream
468
468
469 The call was successful and the result is a stream.
469 The call was successful and the result is a stream.
470 Iterate on the `self.gen` attribute to retrieve chunks.
470 Iterate on the `self.gen` attribute to retrieve chunks.
471 """
471 """
472 def __init__(self, gen):
472 def __init__(self, gen):
473 self.gen = gen
473 self.gen = gen
474
474
475 class pushres(object):
475 class pushres(object):
476 """wireproto reply: success with simple integer return
476 """wireproto reply: success with simple integer return
477
477
478 The call was successful and returned an integer contained in `self.res`.
478 The call was successful and returned an integer contained in `self.res`.
479 """
479 """
480 def __init__(self, res):
480 def __init__(self, res):
481 self.res = res
481 self.res = res
482
482
483 class pusherr(object):
483 class pusherr(object):
484 """wireproto reply: failure
484 """wireproto reply: failure
485
485
486 The call failed. The `self.res` attribute contains the error message.
486 The call failed. The `self.res` attribute contains the error message.
487 """
487 """
488 def __init__(self, res):
488 def __init__(self, res):
489 self.res = res
489 self.res = res
490
490
491 class ooberror(object):
491 class ooberror(object):
492 """wireproto reply: failure of a batch of operation
492 """wireproto reply: failure of a batch of operation
493
493
494 Something failed during a batch call. The error message is stored in
494 Something failed during a batch call. The error message is stored in
495 `self.message`.
495 `self.message`.
496 """
496 """
497 def __init__(self, message):
497 def __init__(self, message):
498 self.message = message
498 self.message = message
499
499
500 def dispatch(repo, proto, command):
500 def dispatch(repo, proto, command):
501 repo = repo.filtered("served")
501 repo = repo.filtered("served")
502 func, spec = commands[command]
502 func, spec = commands[command]
503 args = proto.getargs(spec)
503 args = proto.getargs(spec)
504 return func(repo, proto, *args)
504 return func(repo, proto, *args)
505
505
506 def options(cmd, keys, others):
506 def options(cmd, keys, others):
507 opts = {}
507 opts = {}
508 for k in keys:
508 for k in keys:
509 if k in others:
509 if k in others:
510 opts[k] = others[k]
510 opts[k] = others[k]
511 del others[k]
511 del others[k]
512 if others:
512 if others:
513 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
513 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
514 % (cmd, ",".join(others)))
514 % (cmd, ",".join(others)))
515 return opts
515 return opts
516
516
517 # list of commands
517 # list of commands
518 commands = {}
518 commands = {}
519
519
520 def wireprotocommand(name, args=''):
520 def wireprotocommand(name, args=''):
521 """decorator for wire protocol command"""
521 """decorator for wire protocol command"""
522 def register(func):
522 def register(func):
523 commands[name] = (func, args)
523 commands[name] = (func, args)
524 return func
524 return func
525 return register
525 return register
526
526
527 @wireprotocommand('batch', 'cmds *')
527 @wireprotocommand('batch', 'cmds *')
528 def batch(repo, proto, cmds, others):
528 def batch(repo, proto, cmds, others):
529 repo = repo.filtered("served")
529 repo = repo.filtered("served")
530 res = []
530 res = []
531 for pair in cmds.split(';'):
531 for pair in cmds.split(';'):
532 op, args = pair.split(' ', 1)
532 op, args = pair.split(' ', 1)
533 vals = {}
533 vals = {}
534 for a in args.split(','):
534 for a in args.split(','):
535 if a:
535 if a:
536 n, v = a.split('=')
536 n, v = a.split('=')
537 vals[n] = unescapearg(v)
537 vals[n] = unescapearg(v)
538 func, spec = commands[op]
538 func, spec = commands[op]
539 if spec:
539 if spec:
540 keys = spec.split()
540 keys = spec.split()
541 data = {}
541 data = {}
542 for k in keys:
542 for k in keys:
543 if k == '*':
543 if k == '*':
544 star = {}
544 star = {}
545 for key in vals.keys():
545 for key in vals.keys():
546 if key not in keys:
546 if key not in keys:
547 star[key] = vals[key]
547 star[key] = vals[key]
548 data['*'] = star
548 data['*'] = star
549 else:
549 else:
550 data[k] = vals[k]
550 data[k] = vals[k]
551 result = func(repo, proto, *[data[k] for k in keys])
551 result = func(repo, proto, *[data[k] for k in keys])
552 else:
552 else:
553 result = func(repo, proto)
553 result = func(repo, proto)
554 if isinstance(result, ooberror):
554 if isinstance(result, ooberror):
555 return result
555 return result
556 res.append(escapearg(result))
556 res.append(escapearg(result))
557 return ';'.join(res)
557 return ';'.join(res)
558
558
559 @wireprotocommand('between', 'pairs')
559 @wireprotocommand('between', 'pairs')
560 def between(repo, proto, pairs):
560 def between(repo, proto, pairs):
561 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
561 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
562 r = []
562 r = []
563 for b in repo.between(pairs):
563 for b in repo.between(pairs):
564 r.append(encodelist(b) + "\n")
564 r.append(encodelist(b) + "\n")
565 return "".join(r)
565 return "".join(r)
566
566
567 @wireprotocommand('branchmap')
567 @wireprotocommand('branchmap')
568 def branchmap(repo, proto):
568 def branchmap(repo, proto):
569 branchmap = repo.branchmap()
569 branchmap = repo.branchmap()
570 heads = []
570 heads = []
571 for branch, nodes in branchmap.iteritems():
571 for branch, nodes in branchmap.iteritems():
572 branchname = urllib.quote(encoding.fromlocal(branch))
572 branchname = urllib.quote(encoding.fromlocal(branch))
573 branchnodes = encodelist(nodes)
573 branchnodes = encodelist(nodes)
574 heads.append('%s %s' % (branchname, branchnodes))
574 heads.append('%s %s' % (branchname, branchnodes))
575 return '\n'.join(heads)
575 return '\n'.join(heads)
576
576
577 @wireprotocommand('branches', 'nodes')
577 @wireprotocommand('branches', 'nodes')
578 def branches(repo, proto, nodes):
578 def branches(repo, proto, nodes):
579 nodes = decodelist(nodes)
579 nodes = decodelist(nodes)
580 r = []
580 r = []
581 for b in repo.branches(nodes):
581 for b in repo.branches(nodes):
582 r.append(encodelist(b) + "\n")
582 r.append(encodelist(b) + "\n")
583 return "".join(r)
583 return "".join(r)
584
584
585
585
586 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
586 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
587 'known', 'getbundle', 'unbundlehash', 'batch']
587 'known', 'getbundle', 'unbundlehash', 'batch']
588
588
589 def _capabilities(repo, proto):
589 def _capabilities(repo, proto):
590 """return a list of capabilities for a repo
590 """return a list of capabilities for a repo
591
591
592 This function exists to allow extensions to easily wrap capabilities
592 This function exists to allow extensions to easily wrap capabilities
593 computation
593 computation
594
594
595 - returns a lists: easy to alter
595 - returns a lists: easy to alter
596 - change done here will be propagated to both `capabilities` and `hello`
596 - change done here will be propagated to both `capabilities` and `hello`
597 command without any other action needed.
597 command without any other action needed.
598 """
598 """
599 # copy to prevent modification of the global list
599 # copy to prevent modification of the global list
600 caps = list(wireprotocaps)
600 caps = list(wireprotocaps)
601 if _allowstream(repo.ui):
601 if _allowstream(repo.ui):
602 if repo.ui.configbool('server', 'preferuncompressed', False):
602 if repo.ui.configbool('server', 'preferuncompressed', False):
603 caps.append('stream-preferred')
603 caps.append('stream-preferred')
604 requiredformats = repo.requirements & repo.supportedformats
604 requiredformats = repo.requirements & repo.supportedformats
605 # if our local revlogs are just revlogv1, add 'stream' cap
605 # if our local revlogs are just revlogv1, add 'stream' cap
606 if not requiredformats - set(('revlogv1',)):
606 if not requiredformats - set(('revlogv1',)):
607 caps.append('stream')
607 caps.append('stream')
608 # otherwise, add 'streamreqs' detailing our local revlog format
608 # otherwise, add 'streamreqs' detailing our local revlog format
609 else:
609 else:
610 caps.append('streamreqs=%s' % ','.join(requiredformats))
610 caps.append('streamreqs=%s' % ','.join(requiredformats))
611 if repo.ui.configbool('experimental', 'bundle2-exp', False):
611 if repo.ui.configbool('experimental', 'bundle2-exp', False):
612 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
612 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
613 caps.append('bundle2-exp=' + urllib.quote(capsblob))
613 caps.append('bundle2-exp=' + urllib.quote(capsblob))
614 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
614 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
615 caps.append('httpheader=1024')
615 caps.append('httpheader=1024')
616 return caps
616 return caps
617
617
618 # If you are writing an extension and consider wrapping this function. Wrap
618 # If you are writing an extension and consider wrapping this function. Wrap
619 # `_capabilities` instead.
619 # `_capabilities` instead.
620 @wireprotocommand('capabilities')
620 @wireprotocommand('capabilities')
621 def capabilities(repo, proto):
621 def capabilities(repo, proto):
622 return ' '.join(_capabilities(repo, proto))
622 return ' '.join(_capabilities(repo, proto))
623
623
624 @wireprotocommand('changegroup', 'roots')
624 @wireprotocommand('changegroup', 'roots')
625 def changegroup(repo, proto, roots):
625 def changegroup(repo, proto, roots):
626 nodes = decodelist(roots)
626 nodes = decodelist(roots)
627 cg = changegroupmod.changegroup(repo, nodes, 'serve')
627 cg = changegroupmod.changegroup(repo, nodes, 'serve')
628 return streamres(proto.groupchunks(cg))
628 return streamres(proto.groupchunks(cg))
629
629
630 @wireprotocommand('changegroupsubset', 'bases heads')
630 @wireprotocommand('changegroupsubset', 'bases heads')
631 def changegroupsubset(repo, proto, bases, heads):
631 def changegroupsubset(repo, proto, bases, heads):
632 bases = decodelist(bases)
632 bases = decodelist(bases)
633 heads = decodelist(heads)
633 heads = decodelist(heads)
634 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
634 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
635 return streamres(proto.groupchunks(cg))
635 return streamres(proto.groupchunks(cg))
636
636
637 @wireprotocommand('debugwireargs', 'one two *')
637 @wireprotocommand('debugwireargs', 'one two *')
638 def debugwireargs(repo, proto, one, two, others):
638 def debugwireargs(repo, proto, one, two, others):
639 # only accept optional args from the known set
639 # only accept optional args from the known set
640 opts = options('debugwireargs', ['three', 'four'], others)
640 opts = options('debugwireargs', ['three', 'four'], others)
641 return repo.debugwireargs(one, two, **opts)
641 return repo.debugwireargs(one, two, **opts)
642
642
643 # List of options accepted by getbundle.
643 # List of options accepted by getbundle.
644 #
644 #
645 # Meant to be extended by extensions. It is the extension's responsibility to
645 # Meant to be extended by extensions. It is the extension's responsibility to
646 # ensure such options are properly processed in exchange.getbundle.
646 # ensure such options are properly processed in exchange.getbundle.
647 gboptslist = ['heads', 'common', 'bundlecaps']
647 gboptslist = ['heads', 'common', 'bundlecaps']
648
648
649 @wireprotocommand('getbundle', '*')
649 @wireprotocommand('getbundle', '*')
650 def getbundle(repo, proto, others):
650 def getbundle(repo, proto, others):
651 opts = options('getbundle', gboptsmap.keys(), others)
651 opts = options('getbundle', gboptsmap.keys(), others)
652 for k, v in opts.iteritems():
652 for k, v in opts.iteritems():
653 keytype = gboptsmap[k]
653 keytype = gboptsmap[k]
654 if keytype == 'nodes':
654 if keytype == 'nodes':
655 opts[k] = decodelist(v)
655 opts[k] = decodelist(v)
656 elif keytype == 'csv':
656 elif keytype == 'csv':
657 opts[k] = set(v.split(','))
657 opts[k] = set(v.split(','))
658 elif keytype == 'boolean':
658 elif keytype == 'boolean':
659 opts[k] = '%i' % bool(v)
659 opts[k] = bool(v)
660 elif keytype != 'plain':
660 elif keytype != 'plain':
661 raise KeyError('unknown getbundle option type %s'
661 raise KeyError('unknown getbundle option type %s'
662 % keytype)
662 % keytype)
663 cg = exchange.getbundle(repo, 'serve', **opts)
663 cg = exchange.getbundle(repo, 'serve', **opts)
664 return streamres(proto.groupchunks(cg))
664 return streamres(proto.groupchunks(cg))
665
665
666 @wireprotocommand('heads')
666 @wireprotocommand('heads')
667 def heads(repo, proto):
667 def heads(repo, proto):
668 h = repo.heads()
668 h = repo.heads()
669 return encodelist(h) + "\n"
669 return encodelist(h) + "\n"
670
670
671 @wireprotocommand('hello')
671 @wireprotocommand('hello')
672 def hello(repo, proto):
672 def hello(repo, proto):
673 '''the hello command returns a set of lines describing various
673 '''the hello command returns a set of lines describing various
674 interesting things about the server, in an RFC822-like format.
674 interesting things about the server, in an RFC822-like format.
675 Currently the only one defined is "capabilities", which
675 Currently the only one defined is "capabilities", which
676 consists of a line in the form:
676 consists of a line in the form:
677
677
678 capabilities: space separated list of tokens
678 capabilities: space separated list of tokens
679 '''
679 '''
680 return "capabilities: %s\n" % (capabilities(repo, proto))
680 return "capabilities: %s\n" % (capabilities(repo, proto))
681
681
682 @wireprotocommand('listkeys', 'namespace')
682 @wireprotocommand('listkeys', 'namespace')
683 def listkeys(repo, proto, namespace):
683 def listkeys(repo, proto, namespace):
684 d = repo.listkeys(encoding.tolocal(namespace)).items()
684 d = repo.listkeys(encoding.tolocal(namespace)).items()
685 return pushkeymod.encodekeys(d)
685 return pushkeymod.encodekeys(d)
686
686
687 @wireprotocommand('lookup', 'key')
687 @wireprotocommand('lookup', 'key')
688 def lookup(repo, proto, key):
688 def lookup(repo, proto, key):
689 try:
689 try:
690 k = encoding.tolocal(key)
690 k = encoding.tolocal(key)
691 c = repo[k]
691 c = repo[k]
692 r = c.hex()
692 r = c.hex()
693 success = 1
693 success = 1
694 except Exception, inst:
694 except Exception, inst:
695 r = str(inst)
695 r = str(inst)
696 success = 0
696 success = 0
697 return "%s %s\n" % (success, r)
697 return "%s %s\n" % (success, r)
698
698
699 @wireprotocommand('known', 'nodes *')
699 @wireprotocommand('known', 'nodes *')
700 def known(repo, proto, nodes, others):
700 def known(repo, proto, nodes, others):
701 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
701 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
702
702
703 @wireprotocommand('pushkey', 'namespace key old new')
703 @wireprotocommand('pushkey', 'namespace key old new')
704 def pushkey(repo, proto, namespace, key, old, new):
704 def pushkey(repo, proto, namespace, key, old, new):
705 # compatibility with pre-1.8 clients which were accidentally
705 # compatibility with pre-1.8 clients which were accidentally
706 # sending raw binary nodes rather than utf-8-encoded hex
706 # sending raw binary nodes rather than utf-8-encoded hex
707 if len(new) == 20 and new.encode('string-escape') != new:
707 if len(new) == 20 and new.encode('string-escape') != new:
708 # looks like it could be a binary node
708 # looks like it could be a binary node
709 try:
709 try:
710 new.decode('utf-8')
710 new.decode('utf-8')
711 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
711 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
712 except UnicodeDecodeError:
712 except UnicodeDecodeError:
713 pass # binary, leave unmodified
713 pass # binary, leave unmodified
714 else:
714 else:
715 new = encoding.tolocal(new) # normal path
715 new = encoding.tolocal(new) # normal path
716
716
717 if util.safehasattr(proto, 'restore'):
717 if util.safehasattr(proto, 'restore'):
718
718
719 proto.redirect()
719 proto.redirect()
720
720
721 try:
721 try:
722 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
722 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
723 encoding.tolocal(old), new) or False
723 encoding.tolocal(old), new) or False
724 except util.Abort:
724 except util.Abort:
725 r = False
725 r = False
726
726
727 output = proto.restore()
727 output = proto.restore()
728
728
729 return '%s\n%s' % (int(r), output)
729 return '%s\n%s' % (int(r), output)
730
730
731 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
731 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
732 encoding.tolocal(old), new)
732 encoding.tolocal(old), new)
733 return '%s\n' % int(r)
733 return '%s\n' % int(r)
734
734
735 def _allowstream(ui):
735 def _allowstream(ui):
736 return ui.configbool('server', 'uncompressed', True, untrusted=True)
736 return ui.configbool('server', 'uncompressed', True, untrusted=True)
737
737
738 def _walkstreamfiles(repo):
738 def _walkstreamfiles(repo):
739 # this is it's own function so extensions can override it
739 # this is it's own function so extensions can override it
740 return repo.store.walk()
740 return repo.store.walk()
741
741
742 @wireprotocommand('stream_out')
742 @wireprotocommand('stream_out')
743 def stream(repo, proto):
743 def stream(repo, proto):
744 '''If the server supports streaming clone, it advertises the "stream"
744 '''If the server supports streaming clone, it advertises the "stream"
745 capability with a value representing the version and flags of the repo
745 capability with a value representing the version and flags of the repo
746 it is serving. Client checks to see if it understands the format.
746 it is serving. Client checks to see if it understands the format.
747
747
748 The format is simple: the server writes out a line with the amount
748 The format is simple: the server writes out a line with the amount
749 of files, then the total amount of bytes to be transferred (separated
749 of files, then the total amount of bytes to be transferred (separated
750 by a space). Then, for each file, the server first writes the filename
750 by a space). Then, for each file, the server first writes the filename
751 and file size (separated by the null character), then the file contents.
751 and file size (separated by the null character), then the file contents.
752 '''
752 '''
753
753
754 if not _allowstream(repo.ui):
754 if not _allowstream(repo.ui):
755 return '1\n'
755 return '1\n'
756
756
757 entries = []
757 entries = []
758 total_bytes = 0
758 total_bytes = 0
759 try:
759 try:
760 # get consistent snapshot of repo, lock during scan
760 # get consistent snapshot of repo, lock during scan
761 lock = repo.lock()
761 lock = repo.lock()
762 try:
762 try:
763 repo.ui.debug('scanning\n')
763 repo.ui.debug('scanning\n')
764 for name, ename, size in _walkstreamfiles(repo):
764 for name, ename, size in _walkstreamfiles(repo):
765 if size:
765 if size:
766 entries.append((name, size))
766 entries.append((name, size))
767 total_bytes += size
767 total_bytes += size
768 finally:
768 finally:
769 lock.release()
769 lock.release()
770 except error.LockError:
770 except error.LockError:
771 return '2\n' # error: 2
771 return '2\n' # error: 2
772
772
773 def streamer(repo, entries, total):
773 def streamer(repo, entries, total):
774 '''stream out all metadata files in repository.'''
774 '''stream out all metadata files in repository.'''
775 yield '0\n' # success
775 yield '0\n' # success
776 repo.ui.debug('%d files, %d bytes to transfer\n' %
776 repo.ui.debug('%d files, %d bytes to transfer\n' %
777 (len(entries), total_bytes))
777 (len(entries), total_bytes))
778 yield '%d %d\n' % (len(entries), total_bytes)
778 yield '%d %d\n' % (len(entries), total_bytes)
779
779
780 sopener = repo.sopener
780 sopener = repo.sopener
781 oldaudit = sopener.mustaudit
781 oldaudit = sopener.mustaudit
782 debugflag = repo.ui.debugflag
782 debugflag = repo.ui.debugflag
783 sopener.mustaudit = False
783 sopener.mustaudit = False
784
784
785 try:
785 try:
786 for name, size in entries:
786 for name, size in entries:
787 if debugflag:
787 if debugflag:
788 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
788 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
789 # partially encode name over the wire for backwards compat
789 # partially encode name over the wire for backwards compat
790 yield '%s\0%d\n' % (store.encodedir(name), size)
790 yield '%s\0%d\n' % (store.encodedir(name), size)
791 if size <= 65536:
791 if size <= 65536:
792 fp = sopener(name)
792 fp = sopener(name)
793 try:
793 try:
794 data = fp.read(size)
794 data = fp.read(size)
795 finally:
795 finally:
796 fp.close()
796 fp.close()
797 yield data
797 yield data
798 else:
798 else:
799 for chunk in util.filechunkiter(sopener(name), limit=size):
799 for chunk in util.filechunkiter(sopener(name), limit=size):
800 yield chunk
800 yield chunk
801 # replace with "finally:" when support for python 2.4 has been dropped
801 # replace with "finally:" when support for python 2.4 has been dropped
802 except Exception:
802 except Exception:
803 sopener.mustaudit = oldaudit
803 sopener.mustaudit = oldaudit
804 raise
804 raise
805 sopener.mustaudit = oldaudit
805 sopener.mustaudit = oldaudit
806
806
807 return streamres(streamer(repo, entries, total_bytes))
807 return streamres(streamer(repo, entries, total_bytes))
808
808
809 @wireprotocommand('unbundle', 'heads')
809 @wireprotocommand('unbundle', 'heads')
810 def unbundle(repo, proto, heads):
810 def unbundle(repo, proto, heads):
811 their_heads = decodelist(heads)
811 their_heads = decodelist(heads)
812
812
813 try:
813 try:
814 proto.redirect()
814 proto.redirect()
815
815
816 exchange.check_heads(repo, their_heads, 'preparing changes')
816 exchange.check_heads(repo, their_heads, 'preparing changes')
817
817
818 # write bundle data to temporary file because it can be big
818 # write bundle data to temporary file because it can be big
819 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
819 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
820 fp = os.fdopen(fd, 'wb+')
820 fp = os.fdopen(fd, 'wb+')
821 r = 0
821 r = 0
822 try:
822 try:
823 proto.getfile(fp)
823 proto.getfile(fp)
824 fp.seek(0)
824 fp.seek(0)
825 gen = exchange.readbundle(repo.ui, fp, None)
825 gen = exchange.readbundle(repo.ui, fp, None)
826 r = exchange.unbundle(repo, gen, their_heads, 'serve',
826 r = exchange.unbundle(repo, gen, their_heads, 'serve',
827 proto._client())
827 proto._client())
828 if util.safehasattr(r, 'addpart'):
828 if util.safehasattr(r, 'addpart'):
829 # The return looks streameable, we are in the bundle2 case and
829 # The return looks streameable, we are in the bundle2 case and
830 # should return a stream.
830 # should return a stream.
831 return streamres(r.getchunks())
831 return streamres(r.getchunks())
832 return pushres(r)
832 return pushres(r)
833
833
834 finally:
834 finally:
835 fp.close()
835 fp.close()
836 os.unlink(tempname)
836 os.unlink(tempname)
837 except error.BundleValueError, exc:
837 except error.BundleValueError, exc:
838 bundler = bundle2.bundle20(repo.ui)
838 bundler = bundle2.bundle20(repo.ui)
839 errpart = bundler.newpart('B2X:ERROR:UNSUPPORTEDCONTENT')
839 errpart = bundler.newpart('B2X:ERROR:UNSUPPORTEDCONTENT')
840 if exc.parttype is not None:
840 if exc.parttype is not None:
841 errpart.addparam('parttype', exc.parttype)
841 errpart.addparam('parttype', exc.parttype)
842 if exc.params:
842 if exc.params:
843 errpart.addparam('params', '\0'.join(exc.params))
843 errpart.addparam('params', '\0'.join(exc.params))
844 return streamres(bundler.getchunks())
844 return streamres(bundler.getchunks())
845 except util.Abort, inst:
845 except util.Abort, inst:
846 # The old code we moved used sys.stderr directly.
846 # The old code we moved used sys.stderr directly.
847 # We did not change it to minimise code change.
847 # We did not change it to minimise code change.
848 # This need to be moved to something proper.
848 # This need to be moved to something proper.
849 # Feel free to do it.
849 # Feel free to do it.
850 if getattr(inst, 'duringunbundle2', False):
850 if getattr(inst, 'duringunbundle2', False):
851 bundler = bundle2.bundle20(repo.ui)
851 bundler = bundle2.bundle20(repo.ui)
852 manargs = [('message', str(inst))]
852 manargs = [('message', str(inst))]
853 advargs = []
853 advargs = []
854 if inst.hint is not None:
854 if inst.hint is not None:
855 advargs.append(('hint', inst.hint))
855 advargs.append(('hint', inst.hint))
856 bundler.addpart(bundle2.bundlepart('B2X:ERROR:ABORT',
856 bundler.addpart(bundle2.bundlepart('B2X:ERROR:ABORT',
857 manargs, advargs))
857 manargs, advargs))
858 return streamres(bundler.getchunks())
858 return streamres(bundler.getchunks())
859 else:
859 else:
860 sys.stderr.write("abort: %s\n" % inst)
860 sys.stderr.write("abort: %s\n" % inst)
861 return pushres(0)
861 return pushres(0)
862 except error.PushRaced, exc:
862 except error.PushRaced, exc:
863 if getattr(exc, 'duringunbundle2', False):
863 if getattr(exc, 'duringunbundle2', False):
864 bundler = bundle2.bundle20(repo.ui)
864 bundler = bundle2.bundle20(repo.ui)
865 bundler.newpart('B2X:ERROR:PUSHRACED', [('message', str(exc))])
865 bundler.newpart('B2X:ERROR:PUSHRACED', [('message', str(exc))])
866 return streamres(bundler.getchunks())
866 return streamres(bundler.getchunks())
867 else:
867 else:
868 return pusherr(str(exc))
868 return pusherr(str(exc))
General Comments 0
You need to be logged in to leave comments. Login now