##// END OF EJS Templates
wireproto: use pushkey.decodekey
Pierre-Yves David -
r21653:4188cae7 default
parent child Browse files
Show More
@@ -1,866 +1,862
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import urllib, tempfile, os, sys
8 import urllib, tempfile, os, sys
9 from i18n import _
9 from i18n import _
10 from node import bin, hex
10 from node import bin, hex
11 import changegroup as changegroupmod, bundle2, pushkey as pushkeymod
11 import changegroup as changegroupmod, bundle2, pushkey as pushkeymod
12 import peer, error, encoding, util, store, exchange
12 import peer, error, encoding, util, store, exchange
13
13
14
14
15 class abstractserverproto(object):
15 class abstractserverproto(object):
16 """abstract class that summarizes the protocol API
16 """abstract class that summarizes the protocol API
17
17
18 Used as reference and documentation.
18 Used as reference and documentation.
19 """
19 """
20
20
21 def getargs(self, args):
21 def getargs(self, args):
22 """return the value for arguments in <args>
22 """return the value for arguments in <args>
23
23
24 returns a list of values (same order as <args>)"""
24 returns a list of values (same order as <args>)"""
25 raise NotImplementedError()
25 raise NotImplementedError()
26
26
27 def getfile(self, fp):
27 def getfile(self, fp):
28 """write the whole content of a file into a file like object
28 """write the whole content of a file into a file like object
29
29
30 The file is in the form::
30 The file is in the form::
31
31
32 (<chunk-size>\n<chunk>)+0\n
32 (<chunk-size>\n<chunk>)+0\n
33
33
34 chunk size is the ascii version of the int.
34 chunk size is the ascii version of the int.
35 """
35 """
36 raise NotImplementedError()
36 raise NotImplementedError()
37
37
38 def redirect(self):
38 def redirect(self):
39 """may setup interception for stdout and stderr
39 """may setup interception for stdout and stderr
40
40
41 See also the `restore` method."""
41 See also the `restore` method."""
42 raise NotImplementedError()
42 raise NotImplementedError()
43
43
44 # If the `redirect` function does install interception, the `restore`
44 # If the `redirect` function does install interception, the `restore`
45 # function MUST be defined. If interception is not used, this function
45 # function MUST be defined. If interception is not used, this function
46 # MUST NOT be defined.
46 # MUST NOT be defined.
47 #
47 #
48 # left commented here on purpose
48 # left commented here on purpose
49 #
49 #
50 #def restore(self):
50 #def restore(self):
51 # """reinstall previous stdout and stderr and return intercepted stdout
51 # """reinstall previous stdout and stderr and return intercepted stdout
52 # """
52 # """
53 # raise NotImplementedError()
53 # raise NotImplementedError()
54
54
55 def groupchunks(self, cg):
55 def groupchunks(self, cg):
56 """return 4096 chunks from a changegroup object
56 """return 4096 chunks from a changegroup object
57
57
58 Some protocols may have compressed the contents."""
58 Some protocols may have compressed the contents."""
59 raise NotImplementedError()
59 raise NotImplementedError()
60
60
61 # abstract batching support
61 # abstract batching support
62
62
63 class future(object):
63 class future(object):
64 '''placeholder for a value to be set later'''
64 '''placeholder for a value to be set later'''
65 def set(self, value):
65 def set(self, value):
66 if util.safehasattr(self, 'value'):
66 if util.safehasattr(self, 'value'):
67 raise error.RepoError("future is already set")
67 raise error.RepoError("future is already set")
68 self.value = value
68 self.value = value
69
69
70 class batcher(object):
70 class batcher(object):
71 '''base class for batches of commands submittable in a single request
71 '''base class for batches of commands submittable in a single request
72
72
73 All methods invoked on instances of this class are simply queued and
73 All methods invoked on instances of this class are simply queued and
74 return a a future for the result. Once you call submit(), all the queued
74 return a a future for the result. Once you call submit(), all the queued
75 calls are performed and the results set in their respective futures.
75 calls are performed and the results set in their respective futures.
76 '''
76 '''
77 def __init__(self):
77 def __init__(self):
78 self.calls = []
78 self.calls = []
79 def __getattr__(self, name):
79 def __getattr__(self, name):
80 def call(*args, **opts):
80 def call(*args, **opts):
81 resref = future()
81 resref = future()
82 self.calls.append((name, args, opts, resref,))
82 self.calls.append((name, args, opts, resref,))
83 return resref
83 return resref
84 return call
84 return call
85 def submit(self):
85 def submit(self):
86 pass
86 pass
87
87
88 class localbatch(batcher):
88 class localbatch(batcher):
89 '''performs the queued calls directly'''
89 '''performs the queued calls directly'''
90 def __init__(self, local):
90 def __init__(self, local):
91 batcher.__init__(self)
91 batcher.__init__(self)
92 self.local = local
92 self.local = local
93 def submit(self):
93 def submit(self):
94 for name, args, opts, resref in self.calls:
94 for name, args, opts, resref in self.calls:
95 resref.set(getattr(self.local, name)(*args, **opts))
95 resref.set(getattr(self.local, name)(*args, **opts))
96
96
97 class remotebatch(batcher):
97 class remotebatch(batcher):
98 '''batches the queued calls; uses as few roundtrips as possible'''
98 '''batches the queued calls; uses as few roundtrips as possible'''
99 def __init__(self, remote):
99 def __init__(self, remote):
100 '''remote must support _submitbatch(encbatch) and
100 '''remote must support _submitbatch(encbatch) and
101 _submitone(op, encargs)'''
101 _submitone(op, encargs)'''
102 batcher.__init__(self)
102 batcher.__init__(self)
103 self.remote = remote
103 self.remote = remote
104 def submit(self):
104 def submit(self):
105 req, rsp = [], []
105 req, rsp = [], []
106 for name, args, opts, resref in self.calls:
106 for name, args, opts, resref in self.calls:
107 mtd = getattr(self.remote, name)
107 mtd = getattr(self.remote, name)
108 batchablefn = getattr(mtd, 'batchable', None)
108 batchablefn = getattr(mtd, 'batchable', None)
109 if batchablefn is not None:
109 if batchablefn is not None:
110 batchable = batchablefn(mtd.im_self, *args, **opts)
110 batchable = batchablefn(mtd.im_self, *args, **opts)
111 encargsorres, encresref = batchable.next()
111 encargsorres, encresref = batchable.next()
112 if encresref:
112 if encresref:
113 req.append((name, encargsorres,))
113 req.append((name, encargsorres,))
114 rsp.append((batchable, encresref, resref,))
114 rsp.append((batchable, encresref, resref,))
115 else:
115 else:
116 resref.set(encargsorres)
116 resref.set(encargsorres)
117 else:
117 else:
118 if req:
118 if req:
119 self._submitreq(req, rsp)
119 self._submitreq(req, rsp)
120 req, rsp = [], []
120 req, rsp = [], []
121 resref.set(mtd(*args, **opts))
121 resref.set(mtd(*args, **opts))
122 if req:
122 if req:
123 self._submitreq(req, rsp)
123 self._submitreq(req, rsp)
124 def _submitreq(self, req, rsp):
124 def _submitreq(self, req, rsp):
125 encresults = self.remote._submitbatch(req)
125 encresults = self.remote._submitbatch(req)
126 for encres, r in zip(encresults, rsp):
126 for encres, r in zip(encresults, rsp):
127 batchable, encresref, resref = r
127 batchable, encresref, resref = r
128 encresref.set(encres)
128 encresref.set(encres)
129 resref.set(batchable.next())
129 resref.set(batchable.next())
130
130
131 def batchable(f):
131 def batchable(f):
132 '''annotation for batchable methods
132 '''annotation for batchable methods
133
133
134 Such methods must implement a coroutine as follows:
134 Such methods must implement a coroutine as follows:
135
135
136 @batchable
136 @batchable
137 def sample(self, one, two=None):
137 def sample(self, one, two=None):
138 # Handle locally computable results first:
138 # Handle locally computable results first:
139 if not one:
139 if not one:
140 yield "a local result", None
140 yield "a local result", None
141 # Build list of encoded arguments suitable for your wire protocol:
141 # Build list of encoded arguments suitable for your wire protocol:
142 encargs = [('one', encode(one),), ('two', encode(two),)]
142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 # Create future for injection of encoded result:
143 # Create future for injection of encoded result:
144 encresref = future()
144 encresref = future()
145 # Return encoded arguments and future:
145 # Return encoded arguments and future:
146 yield encargs, encresref
146 yield encargs, encresref
147 # Assuming the future to be filled with the result from the batched
147 # Assuming the future to be filled with the result from the batched
148 # request now. Decode it:
148 # request now. Decode it:
149 yield decode(encresref.value)
149 yield decode(encresref.value)
150
150
151 The decorator returns a function which wraps this coroutine as a plain
151 The decorator returns a function which wraps this coroutine as a plain
152 method, but adds the original method as an attribute called "batchable",
152 method, but adds the original method as an attribute called "batchable",
153 which is used by remotebatch to split the call into separate encoding and
153 which is used by remotebatch to split the call into separate encoding and
154 decoding phases.
154 decoding phases.
155 '''
155 '''
156 def plain(*args, **opts):
156 def plain(*args, **opts):
157 batchable = f(*args, **opts)
157 batchable = f(*args, **opts)
158 encargsorres, encresref = batchable.next()
158 encargsorres, encresref = batchable.next()
159 if not encresref:
159 if not encresref:
160 return encargsorres # a local result in this case
160 return encargsorres # a local result in this case
161 self = args[0]
161 self = args[0]
162 encresref.set(self._submitone(f.func_name, encargsorres))
162 encresref.set(self._submitone(f.func_name, encargsorres))
163 return batchable.next()
163 return batchable.next()
164 setattr(plain, 'batchable', f)
164 setattr(plain, 'batchable', f)
165 return plain
165 return plain
166
166
167 # list of nodes encoding / decoding
167 # list of nodes encoding / decoding
168
168
169 def decodelist(l, sep=' '):
169 def decodelist(l, sep=' '):
170 if l:
170 if l:
171 return map(bin, l.split(sep))
171 return map(bin, l.split(sep))
172 return []
172 return []
173
173
174 def encodelist(l, sep=' '):
174 def encodelist(l, sep=' '):
175 return sep.join(map(hex, l))
175 return sep.join(map(hex, l))
176
176
177 # batched call argument encoding
177 # batched call argument encoding
178
178
179 def escapearg(plain):
179 def escapearg(plain):
180 return (plain
180 return (plain
181 .replace(':', '::')
181 .replace(':', '::')
182 .replace(',', ':,')
182 .replace(',', ':,')
183 .replace(';', ':;')
183 .replace(';', ':;')
184 .replace('=', ':='))
184 .replace('=', ':='))
185
185
186 def unescapearg(escaped):
186 def unescapearg(escaped):
187 return (escaped
187 return (escaped
188 .replace(':=', '=')
188 .replace(':=', '=')
189 .replace(':;', ';')
189 .replace(':;', ';')
190 .replace(':,', ',')
190 .replace(':,', ',')
191 .replace('::', ':'))
191 .replace('::', ':'))
192
192
193 # mapping of options accepted by getbundle and their types
193 # mapping of options accepted by getbundle and their types
194 #
194 #
195 # Meant to be extended by extensions. It is extensions responsibility to ensure
195 # Meant to be extended by extensions. It is extensions responsibility to ensure
196 # such options are properly processed in exchange.getbundle.
196 # such options are properly processed in exchange.getbundle.
197 #
197 #
198 # supported types are:
198 # supported types are:
199 #
199 #
200 # :nodes: list of binary nodes
200 # :nodes: list of binary nodes
201 # :csv: list of comma-separated values
201 # :csv: list of comma-separated values
202 # :plain: string with no transformation needed.
202 # :plain: string with no transformation needed.
203 gboptsmap = {'heads': 'nodes',
203 gboptsmap = {'heads': 'nodes',
204 'common': 'nodes',
204 'common': 'nodes',
205 'bundlecaps': 'csv'}
205 'bundlecaps': 'csv'}
206
206
207 # client side
207 # client side
208
208
209 class wirepeer(peer.peerrepository):
209 class wirepeer(peer.peerrepository):
210
210
211 def batch(self):
211 def batch(self):
212 return remotebatch(self)
212 return remotebatch(self)
213 def _submitbatch(self, req):
213 def _submitbatch(self, req):
214 cmds = []
214 cmds = []
215 for op, argsdict in req:
215 for op, argsdict in req:
216 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
216 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
217 cmds.append('%s %s' % (op, args))
217 cmds.append('%s %s' % (op, args))
218 rsp = self._call("batch", cmds=';'.join(cmds))
218 rsp = self._call("batch", cmds=';'.join(cmds))
219 return rsp.split(';')
219 return rsp.split(';')
220 def _submitone(self, op, args):
220 def _submitone(self, op, args):
221 return self._call(op, **args)
221 return self._call(op, **args)
222
222
223 @batchable
223 @batchable
224 def lookup(self, key):
224 def lookup(self, key):
225 self.requirecap('lookup', _('look up remote revision'))
225 self.requirecap('lookup', _('look up remote revision'))
226 f = future()
226 f = future()
227 yield {'key': encoding.fromlocal(key)}, f
227 yield {'key': encoding.fromlocal(key)}, f
228 d = f.value
228 d = f.value
229 success, data = d[:-1].split(" ", 1)
229 success, data = d[:-1].split(" ", 1)
230 if int(success):
230 if int(success):
231 yield bin(data)
231 yield bin(data)
232 self._abort(error.RepoError(data))
232 self._abort(error.RepoError(data))
233
233
234 @batchable
234 @batchable
235 def heads(self):
235 def heads(self):
236 f = future()
236 f = future()
237 yield {}, f
237 yield {}, f
238 d = f.value
238 d = f.value
239 try:
239 try:
240 yield decodelist(d[:-1])
240 yield decodelist(d[:-1])
241 except ValueError:
241 except ValueError:
242 self._abort(error.ResponseError(_("unexpected response:"), d))
242 self._abort(error.ResponseError(_("unexpected response:"), d))
243
243
244 @batchable
244 @batchable
245 def known(self, nodes):
245 def known(self, nodes):
246 f = future()
246 f = future()
247 yield {'nodes': encodelist(nodes)}, f
247 yield {'nodes': encodelist(nodes)}, f
248 d = f.value
248 d = f.value
249 try:
249 try:
250 yield [bool(int(f)) for f in d]
250 yield [bool(int(f)) for f in d]
251 except ValueError:
251 except ValueError:
252 self._abort(error.ResponseError(_("unexpected response:"), d))
252 self._abort(error.ResponseError(_("unexpected response:"), d))
253
253
254 @batchable
254 @batchable
255 def branchmap(self):
255 def branchmap(self):
256 f = future()
256 f = future()
257 yield {}, f
257 yield {}, f
258 d = f.value
258 d = f.value
259 try:
259 try:
260 branchmap = {}
260 branchmap = {}
261 for branchpart in d.splitlines():
261 for branchpart in d.splitlines():
262 branchname, branchheads = branchpart.split(' ', 1)
262 branchname, branchheads = branchpart.split(' ', 1)
263 branchname = encoding.tolocal(urllib.unquote(branchname))
263 branchname = encoding.tolocal(urllib.unquote(branchname))
264 branchheads = decodelist(branchheads)
264 branchheads = decodelist(branchheads)
265 branchmap[branchname] = branchheads
265 branchmap[branchname] = branchheads
266 yield branchmap
266 yield branchmap
267 except TypeError:
267 except TypeError:
268 self._abort(error.ResponseError(_("unexpected response:"), d))
268 self._abort(error.ResponseError(_("unexpected response:"), d))
269
269
270 def branches(self, nodes):
270 def branches(self, nodes):
271 n = encodelist(nodes)
271 n = encodelist(nodes)
272 d = self._call("branches", nodes=n)
272 d = self._call("branches", nodes=n)
273 try:
273 try:
274 br = [tuple(decodelist(b)) for b in d.splitlines()]
274 br = [tuple(decodelist(b)) for b in d.splitlines()]
275 return br
275 return br
276 except ValueError:
276 except ValueError:
277 self._abort(error.ResponseError(_("unexpected response:"), d))
277 self._abort(error.ResponseError(_("unexpected response:"), d))
278
278
279 def between(self, pairs):
279 def between(self, pairs):
280 batch = 8 # avoid giant requests
280 batch = 8 # avoid giant requests
281 r = []
281 r = []
282 for i in xrange(0, len(pairs), batch):
282 for i in xrange(0, len(pairs), batch):
283 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
283 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
284 d = self._call("between", pairs=n)
284 d = self._call("between", pairs=n)
285 try:
285 try:
286 r.extend(l and decodelist(l) or [] for l in d.splitlines())
286 r.extend(l and decodelist(l) or [] for l in d.splitlines())
287 except ValueError:
287 except ValueError:
288 self._abort(error.ResponseError(_("unexpected response:"), d))
288 self._abort(error.ResponseError(_("unexpected response:"), d))
289 return r
289 return r
290
290
291 @batchable
291 @batchable
292 def pushkey(self, namespace, key, old, new):
292 def pushkey(self, namespace, key, old, new):
293 if not self.capable('pushkey'):
293 if not self.capable('pushkey'):
294 yield False, None
294 yield False, None
295 f = future()
295 f = future()
296 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
296 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
297 yield {'namespace': encoding.fromlocal(namespace),
297 yield {'namespace': encoding.fromlocal(namespace),
298 'key': encoding.fromlocal(key),
298 'key': encoding.fromlocal(key),
299 'old': encoding.fromlocal(old),
299 'old': encoding.fromlocal(old),
300 'new': encoding.fromlocal(new)}, f
300 'new': encoding.fromlocal(new)}, f
301 d = f.value
301 d = f.value
302 d, output = d.split('\n', 1)
302 d, output = d.split('\n', 1)
303 try:
303 try:
304 d = bool(int(d))
304 d = bool(int(d))
305 except ValueError:
305 except ValueError:
306 raise error.ResponseError(
306 raise error.ResponseError(
307 _('push failed (unexpected response):'), d)
307 _('push failed (unexpected response):'), d)
308 for l in output.splitlines(True):
308 for l in output.splitlines(True):
309 self.ui.status(_('remote: '), l)
309 self.ui.status(_('remote: '), l)
310 yield d
310 yield d
311
311
312 @batchable
312 @batchable
313 def listkeys(self, namespace):
313 def listkeys(self, namespace):
314 if not self.capable('pushkey'):
314 if not self.capable('pushkey'):
315 yield {}, None
315 yield {}, None
316 f = future()
316 f = future()
317 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
317 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
318 yield {'namespace': encoding.fromlocal(namespace)}, f
318 yield {'namespace': encoding.fromlocal(namespace)}, f
319 d = f.value
319 d = f.value
320 r = {}
320 yield pushkeymod.decodekeys(d)
321 for l in d.splitlines():
322 k, v = l.split('\t')
323 r[encoding.tolocal(k)] = encoding.tolocal(v)
324 yield r
325
321
326 def stream_out(self):
322 def stream_out(self):
327 return self._callstream('stream_out')
323 return self._callstream('stream_out')
328
324
329 def changegroup(self, nodes, kind):
325 def changegroup(self, nodes, kind):
330 n = encodelist(nodes)
326 n = encodelist(nodes)
331 f = self._callcompressable("changegroup", roots=n)
327 f = self._callcompressable("changegroup", roots=n)
332 return changegroupmod.unbundle10(f, 'UN')
328 return changegroupmod.unbundle10(f, 'UN')
333
329
334 def changegroupsubset(self, bases, heads, kind):
330 def changegroupsubset(self, bases, heads, kind):
335 self.requirecap('changegroupsubset', _('look up remote changes'))
331 self.requirecap('changegroupsubset', _('look up remote changes'))
336 bases = encodelist(bases)
332 bases = encodelist(bases)
337 heads = encodelist(heads)
333 heads = encodelist(heads)
338 f = self._callcompressable("changegroupsubset",
334 f = self._callcompressable("changegroupsubset",
339 bases=bases, heads=heads)
335 bases=bases, heads=heads)
340 return changegroupmod.unbundle10(f, 'UN')
336 return changegroupmod.unbundle10(f, 'UN')
341
337
342 def getbundle(self, source, **kwargs):
338 def getbundle(self, source, **kwargs):
343 self.requirecap('getbundle', _('look up remote changes'))
339 self.requirecap('getbundle', _('look up remote changes'))
344 opts = {}
340 opts = {}
345 for key, value in kwargs.iteritems():
341 for key, value in kwargs.iteritems():
346 if value is None:
342 if value is None:
347 continue
343 continue
348 keytype = gboptsmap.get(key)
344 keytype = gboptsmap.get(key)
349 if keytype is None:
345 if keytype is None:
350 assert False, 'unexpected'
346 assert False, 'unexpected'
351 elif keytype == 'nodes':
347 elif keytype == 'nodes':
352 value = encodelist(value)
348 value = encodelist(value)
353 elif keytype == 'csv':
349 elif keytype == 'csv':
354 value = ','.join(value)
350 value = ','.join(value)
355 elif keytype != 'plain':
351 elif keytype != 'plain':
356 raise KeyError('unknown getbundle option type %s'
352 raise KeyError('unknown getbundle option type %s'
357 % keytype)
353 % keytype)
358 opts[key] = value
354 opts[key] = value
359 f = self._callcompressable("getbundle", **opts)
355 f = self._callcompressable("getbundle", **opts)
360 bundlecaps = kwargs.get('bundlecaps')
356 bundlecaps = kwargs.get('bundlecaps')
361 if bundlecaps is not None and 'HG2X' in bundlecaps:
357 if bundlecaps is not None and 'HG2X' in bundlecaps:
362 return bundle2.unbundle20(self.ui, f)
358 return bundle2.unbundle20(self.ui, f)
363 else:
359 else:
364 return changegroupmod.unbundle10(f, 'UN')
360 return changegroupmod.unbundle10(f, 'UN')
365
361
366 def unbundle(self, cg, heads, source):
362 def unbundle(self, cg, heads, source):
367 '''Send cg (a readable file-like object representing the
363 '''Send cg (a readable file-like object representing the
368 changegroup to push, typically a chunkbuffer object) to the
364 changegroup to push, typically a chunkbuffer object) to the
369 remote server as a bundle.
365 remote server as a bundle.
370
366
371 When pushing a bundle10 stream, return an integer indicating the
367 When pushing a bundle10 stream, return an integer indicating the
372 result of the push (see localrepository.addchangegroup()).
368 result of the push (see localrepository.addchangegroup()).
373
369
374 When pushing a bundle20 stream, return a bundle20 stream.'''
370 When pushing a bundle20 stream, return a bundle20 stream.'''
375
371
376 if heads != ['force'] and self.capable('unbundlehash'):
372 if heads != ['force'] and self.capable('unbundlehash'):
377 heads = encodelist(['hashed',
373 heads = encodelist(['hashed',
378 util.sha1(''.join(sorted(heads))).digest()])
374 util.sha1(''.join(sorted(heads))).digest()])
379 else:
375 else:
380 heads = encodelist(heads)
376 heads = encodelist(heads)
381
377
382 if util.safehasattr(cg, 'deltaheader'):
378 if util.safehasattr(cg, 'deltaheader'):
383 # this a bundle10, do the old style call sequence
379 # this a bundle10, do the old style call sequence
384 ret, output = self._callpush("unbundle", cg, heads=heads)
380 ret, output = self._callpush("unbundle", cg, heads=heads)
385 if ret == "":
381 if ret == "":
386 raise error.ResponseError(
382 raise error.ResponseError(
387 _('push failed:'), output)
383 _('push failed:'), output)
388 try:
384 try:
389 ret = int(ret)
385 ret = int(ret)
390 except ValueError:
386 except ValueError:
391 raise error.ResponseError(
387 raise error.ResponseError(
392 _('push failed (unexpected response):'), ret)
388 _('push failed (unexpected response):'), ret)
393
389
394 for l in output.splitlines(True):
390 for l in output.splitlines(True):
395 self.ui.status(_('remote: '), l)
391 self.ui.status(_('remote: '), l)
396 else:
392 else:
397 # bundle2 push. Send a stream, fetch a stream.
393 # bundle2 push. Send a stream, fetch a stream.
398 stream = self._calltwowaystream('unbundle', cg, heads=heads)
394 stream = self._calltwowaystream('unbundle', cg, heads=heads)
399 ret = bundle2.unbundle20(self.ui, stream)
395 ret = bundle2.unbundle20(self.ui, stream)
400 return ret
396 return ret
401
397
402 def debugwireargs(self, one, two, three=None, four=None, five=None):
398 def debugwireargs(self, one, two, three=None, four=None, five=None):
403 # don't pass optional arguments left at their default value
399 # don't pass optional arguments left at their default value
404 opts = {}
400 opts = {}
405 if three is not None:
401 if three is not None:
406 opts['three'] = three
402 opts['three'] = three
407 if four is not None:
403 if four is not None:
408 opts['four'] = four
404 opts['four'] = four
409 return self._call('debugwireargs', one=one, two=two, **opts)
405 return self._call('debugwireargs', one=one, two=two, **opts)
410
406
411 def _call(self, cmd, **args):
407 def _call(self, cmd, **args):
412 """execute <cmd> on the server
408 """execute <cmd> on the server
413
409
414 The command is expected to return a simple string.
410 The command is expected to return a simple string.
415
411
416 returns the server reply as a string."""
412 returns the server reply as a string."""
417 raise NotImplementedError()
413 raise NotImplementedError()
418
414
419 def _callstream(self, cmd, **args):
415 def _callstream(self, cmd, **args):
420 """execute <cmd> on the server
416 """execute <cmd> on the server
421
417
422 The command is expected to return a stream.
418 The command is expected to return a stream.
423
419
424 returns the server reply as a file like object."""
420 returns the server reply as a file like object."""
425 raise NotImplementedError()
421 raise NotImplementedError()
426
422
427 def _callcompressable(self, cmd, **args):
423 def _callcompressable(self, cmd, **args):
428 """execute <cmd> on the server
424 """execute <cmd> on the server
429
425
430 The command is expected to return a stream.
426 The command is expected to return a stream.
431
427
432 The stream may have been compressed in some implementations. This
428 The stream may have been compressed in some implementations. This
433 function takes care of the decompression. This is the only difference
429 function takes care of the decompression. This is the only difference
434 with _callstream.
430 with _callstream.
435
431
436 returns the server reply as a file like object.
432 returns the server reply as a file like object.
437 """
433 """
438 raise NotImplementedError()
434 raise NotImplementedError()
439
435
440 def _callpush(self, cmd, fp, **args):
436 def _callpush(self, cmd, fp, **args):
441 """execute a <cmd> on server
437 """execute a <cmd> on server
442
438
443 The command is expected to be related to a push. Push has a special
439 The command is expected to be related to a push. Push has a special
444 return method.
440 return method.
445
441
446 returns the server reply as a (ret, output) tuple. ret is either
442 returns the server reply as a (ret, output) tuple. ret is either
447 empty (error) or a stringified int.
443 empty (error) or a stringified int.
448 """
444 """
449 raise NotImplementedError()
445 raise NotImplementedError()
450
446
451 def _calltwowaystream(self, cmd, fp, **args):
447 def _calltwowaystream(self, cmd, fp, **args):
452 """execute <cmd> on server
448 """execute <cmd> on server
453
449
454 The command will send a stream to the server and get a stream in reply.
450 The command will send a stream to the server and get a stream in reply.
455 """
451 """
456 raise NotImplementedError()
452 raise NotImplementedError()
457
453
458 def _abort(self, exception):
454 def _abort(self, exception):
459 """clearly abort the wire protocol connection and raise the exception
455 """clearly abort the wire protocol connection and raise the exception
460 """
456 """
461 raise NotImplementedError()
457 raise NotImplementedError()
462
458
463 # server side
459 # server side
464
460
465 # wire protocol command can either return a string or one of these classes.
461 # wire protocol command can either return a string or one of these classes.
466 class streamres(object):
462 class streamres(object):
467 """wireproto reply: binary stream
463 """wireproto reply: binary stream
468
464
469 The call was successful and the result is a stream.
465 The call was successful and the result is a stream.
470 Iterate on the `self.gen` attribute to retrieve chunks.
466 Iterate on the `self.gen` attribute to retrieve chunks.
471 """
467 """
472 def __init__(self, gen):
468 def __init__(self, gen):
473 self.gen = gen
469 self.gen = gen
474
470
475 class pushres(object):
471 class pushres(object):
476 """wireproto reply: success with simple integer return
472 """wireproto reply: success with simple integer return
477
473
478 The call was successful and returned an integer contained in `self.res`.
474 The call was successful and returned an integer contained in `self.res`.
479 """
475 """
480 def __init__(self, res):
476 def __init__(self, res):
481 self.res = res
477 self.res = res
482
478
483 class pusherr(object):
479 class pusherr(object):
484 """wireproto reply: failure
480 """wireproto reply: failure
485
481
486 The call failed. The `self.res` attribute contains the error message.
482 The call failed. The `self.res` attribute contains the error message.
487 """
483 """
488 def __init__(self, res):
484 def __init__(self, res):
489 self.res = res
485 self.res = res
490
486
491 class ooberror(object):
487 class ooberror(object):
492 """wireproto reply: failure of a batch of operation
488 """wireproto reply: failure of a batch of operation
493
489
494 Something failed during a batch call. The error message is stored in
490 Something failed during a batch call. The error message is stored in
495 `self.message`.
491 `self.message`.
496 """
492 """
497 def __init__(self, message):
493 def __init__(self, message):
498 self.message = message
494 self.message = message
499
495
500 def dispatch(repo, proto, command):
496 def dispatch(repo, proto, command):
501 repo = repo.filtered("served")
497 repo = repo.filtered("served")
502 func, spec = commands[command]
498 func, spec = commands[command]
503 args = proto.getargs(spec)
499 args = proto.getargs(spec)
504 return func(repo, proto, *args)
500 return func(repo, proto, *args)
505
501
506 def options(cmd, keys, others):
502 def options(cmd, keys, others):
507 opts = {}
503 opts = {}
508 for k in keys:
504 for k in keys:
509 if k in others:
505 if k in others:
510 opts[k] = others[k]
506 opts[k] = others[k]
511 del others[k]
507 del others[k]
512 if others:
508 if others:
513 sys.stderr.write("abort: %s got unexpected arguments %s\n"
509 sys.stderr.write("abort: %s got unexpected arguments %s\n"
514 % (cmd, ",".join(others)))
510 % (cmd, ",".join(others)))
515 return opts
511 return opts
516
512
517 # list of commands
513 # list of commands
518 commands = {}
514 commands = {}
519
515
520 def wireprotocommand(name, args=''):
516 def wireprotocommand(name, args=''):
521 """decorator for wire protocol command"""
517 """decorator for wire protocol command"""
522 def register(func):
518 def register(func):
523 commands[name] = (func, args)
519 commands[name] = (func, args)
524 return func
520 return func
525 return register
521 return register
526
522
527 @wireprotocommand('batch', 'cmds *')
523 @wireprotocommand('batch', 'cmds *')
528 def batch(repo, proto, cmds, others):
524 def batch(repo, proto, cmds, others):
529 repo = repo.filtered("served")
525 repo = repo.filtered("served")
530 res = []
526 res = []
531 for pair in cmds.split(';'):
527 for pair in cmds.split(';'):
532 op, args = pair.split(' ', 1)
528 op, args = pair.split(' ', 1)
533 vals = {}
529 vals = {}
534 for a in args.split(','):
530 for a in args.split(','):
535 if a:
531 if a:
536 n, v = a.split('=')
532 n, v = a.split('=')
537 vals[n] = unescapearg(v)
533 vals[n] = unescapearg(v)
538 func, spec = commands[op]
534 func, spec = commands[op]
539 if spec:
535 if spec:
540 keys = spec.split()
536 keys = spec.split()
541 data = {}
537 data = {}
542 for k in keys:
538 for k in keys:
543 if k == '*':
539 if k == '*':
544 star = {}
540 star = {}
545 for key in vals.keys():
541 for key in vals.keys():
546 if key not in keys:
542 if key not in keys:
547 star[key] = vals[key]
543 star[key] = vals[key]
548 data['*'] = star
544 data['*'] = star
549 else:
545 else:
550 data[k] = vals[k]
546 data[k] = vals[k]
551 result = func(repo, proto, *[data[k] for k in keys])
547 result = func(repo, proto, *[data[k] for k in keys])
552 else:
548 else:
553 result = func(repo, proto)
549 result = func(repo, proto)
554 if isinstance(result, ooberror):
550 if isinstance(result, ooberror):
555 return result
551 return result
556 res.append(escapearg(result))
552 res.append(escapearg(result))
557 return ';'.join(res)
553 return ';'.join(res)
558
554
559 @wireprotocommand('between', 'pairs')
555 @wireprotocommand('between', 'pairs')
560 def between(repo, proto, pairs):
556 def between(repo, proto, pairs):
561 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
557 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
562 r = []
558 r = []
563 for b in repo.between(pairs):
559 for b in repo.between(pairs):
564 r.append(encodelist(b) + "\n")
560 r.append(encodelist(b) + "\n")
565 return "".join(r)
561 return "".join(r)
566
562
567 @wireprotocommand('branchmap')
563 @wireprotocommand('branchmap')
568 def branchmap(repo, proto):
564 def branchmap(repo, proto):
569 branchmap = repo.branchmap()
565 branchmap = repo.branchmap()
570 heads = []
566 heads = []
571 for branch, nodes in branchmap.iteritems():
567 for branch, nodes in branchmap.iteritems():
572 branchname = urllib.quote(encoding.fromlocal(branch))
568 branchname = urllib.quote(encoding.fromlocal(branch))
573 branchnodes = encodelist(nodes)
569 branchnodes = encodelist(nodes)
574 heads.append('%s %s' % (branchname, branchnodes))
570 heads.append('%s %s' % (branchname, branchnodes))
575 return '\n'.join(heads)
571 return '\n'.join(heads)
576
572
577 @wireprotocommand('branches', 'nodes')
573 @wireprotocommand('branches', 'nodes')
578 def branches(repo, proto, nodes):
574 def branches(repo, proto, nodes):
579 nodes = decodelist(nodes)
575 nodes = decodelist(nodes)
580 r = []
576 r = []
581 for b in repo.branches(nodes):
577 for b in repo.branches(nodes):
582 r.append(encodelist(b) + "\n")
578 r.append(encodelist(b) + "\n")
583 return "".join(r)
579 return "".join(r)
584
580
585
581
586 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
582 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
587 'known', 'getbundle', 'unbundlehash', 'batch']
583 'known', 'getbundle', 'unbundlehash', 'batch']
588
584
589 def _capabilities(repo, proto):
585 def _capabilities(repo, proto):
590 """return a list of capabilities for a repo
586 """return a list of capabilities for a repo
591
587
592 This function exists to allow extensions to easily wrap capabilities
588 This function exists to allow extensions to easily wrap capabilities
593 computation
589 computation
594
590
595 - returns a lists: easy to alter
591 - returns a lists: easy to alter
596 - change done here will be propagated to both `capabilities` and `hello`
592 - change done here will be propagated to both `capabilities` and `hello`
597 command without any other action needed.
593 command without any other action needed.
598 """
594 """
599 # copy to prevent modification of the global list
595 # copy to prevent modification of the global list
600 caps = list(wireprotocaps)
596 caps = list(wireprotocaps)
601 if _allowstream(repo.ui):
597 if _allowstream(repo.ui):
602 if repo.ui.configbool('server', 'preferuncompressed', False):
598 if repo.ui.configbool('server', 'preferuncompressed', False):
603 caps.append('stream-preferred')
599 caps.append('stream-preferred')
604 requiredformats = repo.requirements & repo.supportedformats
600 requiredformats = repo.requirements & repo.supportedformats
605 # if our local revlogs are just revlogv1, add 'stream' cap
601 # if our local revlogs are just revlogv1, add 'stream' cap
606 if not requiredformats - set(('revlogv1',)):
602 if not requiredformats - set(('revlogv1',)):
607 caps.append('stream')
603 caps.append('stream')
608 # otherwise, add 'streamreqs' detailing our local revlog format
604 # otherwise, add 'streamreqs' detailing our local revlog format
609 else:
605 else:
610 caps.append('streamreqs=%s' % ','.join(requiredformats))
606 caps.append('streamreqs=%s' % ','.join(requiredformats))
611 if repo.ui.configbool('experimental', 'bundle2-exp', False):
607 if repo.ui.configbool('experimental', 'bundle2-exp', False):
612 capsblob = bundle2.encodecaps(repo.bundle2caps)
608 capsblob = bundle2.encodecaps(repo.bundle2caps)
613 caps.append('bundle2-exp=' + urllib.quote(capsblob))
609 caps.append('bundle2-exp=' + urllib.quote(capsblob))
614 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
610 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
615 caps.append('httpheader=1024')
611 caps.append('httpheader=1024')
616 return caps
612 return caps
617
613
618 # If you are writing an extension and consider wrapping this function. Wrap
614 # If you are writing an extension and consider wrapping this function. Wrap
619 # `_capabilities` instead.
615 # `_capabilities` instead.
620 @wireprotocommand('capabilities')
616 @wireprotocommand('capabilities')
621 def capabilities(repo, proto):
617 def capabilities(repo, proto):
622 return ' '.join(_capabilities(repo, proto))
618 return ' '.join(_capabilities(repo, proto))
623
619
624 @wireprotocommand('changegroup', 'roots')
620 @wireprotocommand('changegroup', 'roots')
625 def changegroup(repo, proto, roots):
621 def changegroup(repo, proto, roots):
626 nodes = decodelist(roots)
622 nodes = decodelist(roots)
627 cg = changegroupmod.changegroup(repo, nodes, 'serve')
623 cg = changegroupmod.changegroup(repo, nodes, 'serve')
628 return streamres(proto.groupchunks(cg))
624 return streamres(proto.groupchunks(cg))
629
625
630 @wireprotocommand('changegroupsubset', 'bases heads')
626 @wireprotocommand('changegroupsubset', 'bases heads')
631 def changegroupsubset(repo, proto, bases, heads):
627 def changegroupsubset(repo, proto, bases, heads):
632 bases = decodelist(bases)
628 bases = decodelist(bases)
633 heads = decodelist(heads)
629 heads = decodelist(heads)
634 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
630 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
635 return streamres(proto.groupchunks(cg))
631 return streamres(proto.groupchunks(cg))
636
632
637 @wireprotocommand('debugwireargs', 'one two *')
633 @wireprotocommand('debugwireargs', 'one two *')
638 def debugwireargs(repo, proto, one, two, others):
634 def debugwireargs(repo, proto, one, two, others):
639 # only accept optional args from the known set
635 # only accept optional args from the known set
640 opts = options('debugwireargs', ['three', 'four'], others)
636 opts = options('debugwireargs', ['three', 'four'], others)
641 return repo.debugwireargs(one, two, **opts)
637 return repo.debugwireargs(one, two, **opts)
642
638
643 # List of options accepted by getbundle.
639 # List of options accepted by getbundle.
644 #
640 #
645 # Meant to be extended by extensions. It is the extension's responsibility to
641 # Meant to be extended by extensions. It is the extension's responsibility to
646 # ensure such options are properly processed in exchange.getbundle.
642 # ensure such options are properly processed in exchange.getbundle.
647 gboptslist = ['heads', 'common', 'bundlecaps']
643 gboptslist = ['heads', 'common', 'bundlecaps']
648
644
649 @wireprotocommand('getbundle', '*')
645 @wireprotocommand('getbundle', '*')
650 def getbundle(repo, proto, others):
646 def getbundle(repo, proto, others):
651 opts = options('getbundle', gboptsmap.keys(), others)
647 opts = options('getbundle', gboptsmap.keys(), others)
652 for k, v in opts.iteritems():
648 for k, v in opts.iteritems():
653 keytype = gboptsmap[k]
649 keytype = gboptsmap[k]
654 if keytype == 'nodes':
650 if keytype == 'nodes':
655 opts[k] = decodelist(v)
651 opts[k] = decodelist(v)
656 elif keytype == 'csv':
652 elif keytype == 'csv':
657 opts[k] = set(v.split(','))
653 opts[k] = set(v.split(','))
658 elif keytype != 'plain':
654 elif keytype != 'plain':
659 raise KeyError('unknown getbundle option type %s'
655 raise KeyError('unknown getbundle option type %s'
660 % keytype)
656 % keytype)
661 cg = exchange.getbundle(repo, 'serve', **opts)
657 cg = exchange.getbundle(repo, 'serve', **opts)
662 return streamres(proto.groupchunks(cg))
658 return streamres(proto.groupchunks(cg))
663
659
664 @wireprotocommand('heads')
660 @wireprotocommand('heads')
665 def heads(repo, proto):
661 def heads(repo, proto):
666 h = repo.heads()
662 h = repo.heads()
667 return encodelist(h) + "\n"
663 return encodelist(h) + "\n"
668
664
669 @wireprotocommand('hello')
665 @wireprotocommand('hello')
670 def hello(repo, proto):
666 def hello(repo, proto):
671 '''the hello command returns a set of lines describing various
667 '''the hello command returns a set of lines describing various
672 interesting things about the server, in an RFC822-like format.
668 interesting things about the server, in an RFC822-like format.
673 Currently the only one defined is "capabilities", which
669 Currently the only one defined is "capabilities", which
674 consists of a line in the form:
670 consists of a line in the form:
675
671
676 capabilities: space separated list of tokens
672 capabilities: space separated list of tokens
677 '''
673 '''
678 return "capabilities: %s\n" % (capabilities(repo, proto))
674 return "capabilities: %s\n" % (capabilities(repo, proto))
679
675
680 @wireprotocommand('listkeys', 'namespace')
676 @wireprotocommand('listkeys', 'namespace')
681 def listkeys(repo, proto, namespace):
677 def listkeys(repo, proto, namespace):
682 d = repo.listkeys(encoding.tolocal(namespace)).items()
678 d = repo.listkeys(encoding.tolocal(namespace)).items()
683 return pushkeymod.encodekeys(d)
679 return pushkeymod.encodekeys(d)
684
680
685 @wireprotocommand('lookup', 'key')
681 @wireprotocommand('lookup', 'key')
686 def lookup(repo, proto, key):
682 def lookup(repo, proto, key):
687 try:
683 try:
688 k = encoding.tolocal(key)
684 k = encoding.tolocal(key)
689 c = repo[k]
685 c = repo[k]
690 r = c.hex()
686 r = c.hex()
691 success = 1
687 success = 1
692 except Exception, inst:
688 except Exception, inst:
693 r = str(inst)
689 r = str(inst)
694 success = 0
690 success = 0
695 return "%s %s\n" % (success, r)
691 return "%s %s\n" % (success, r)
696
692
697 @wireprotocommand('known', 'nodes *')
693 @wireprotocommand('known', 'nodes *')
698 def known(repo, proto, nodes, others):
694 def known(repo, proto, nodes, others):
699 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
695 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
700
696
701 @wireprotocommand('pushkey', 'namespace key old new')
697 @wireprotocommand('pushkey', 'namespace key old new')
702 def pushkey(repo, proto, namespace, key, old, new):
698 def pushkey(repo, proto, namespace, key, old, new):
703 # compatibility with pre-1.8 clients which were accidentally
699 # compatibility with pre-1.8 clients which were accidentally
704 # sending raw binary nodes rather than utf-8-encoded hex
700 # sending raw binary nodes rather than utf-8-encoded hex
705 if len(new) == 20 and new.encode('string-escape') != new:
701 if len(new) == 20 and new.encode('string-escape') != new:
706 # looks like it could be a binary node
702 # looks like it could be a binary node
707 try:
703 try:
708 new.decode('utf-8')
704 new.decode('utf-8')
709 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
705 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
710 except UnicodeDecodeError:
706 except UnicodeDecodeError:
711 pass # binary, leave unmodified
707 pass # binary, leave unmodified
712 else:
708 else:
713 new = encoding.tolocal(new) # normal path
709 new = encoding.tolocal(new) # normal path
714
710
715 if util.safehasattr(proto, 'restore'):
711 if util.safehasattr(proto, 'restore'):
716
712
717 proto.redirect()
713 proto.redirect()
718
714
719 try:
715 try:
720 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
716 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
721 encoding.tolocal(old), new) or False
717 encoding.tolocal(old), new) or False
722 except util.Abort:
718 except util.Abort:
723 r = False
719 r = False
724
720
725 output = proto.restore()
721 output = proto.restore()
726
722
727 return '%s\n%s' % (int(r), output)
723 return '%s\n%s' % (int(r), output)
728
724
729 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
725 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
730 encoding.tolocal(old), new)
726 encoding.tolocal(old), new)
731 return '%s\n' % int(r)
727 return '%s\n' % int(r)
732
728
733 def _allowstream(ui):
729 def _allowstream(ui):
734 return ui.configbool('server', 'uncompressed', True, untrusted=True)
730 return ui.configbool('server', 'uncompressed', True, untrusted=True)
735
731
736 def _walkstreamfiles(repo):
732 def _walkstreamfiles(repo):
737 # this is it's own function so extensions can override it
733 # this is it's own function so extensions can override it
738 return repo.store.walk()
734 return repo.store.walk()
739
735
740 @wireprotocommand('stream_out')
736 @wireprotocommand('stream_out')
741 def stream(repo, proto):
737 def stream(repo, proto):
742 '''If the server supports streaming clone, it advertises the "stream"
738 '''If the server supports streaming clone, it advertises the "stream"
743 capability with a value representing the version and flags of the repo
739 capability with a value representing the version and flags of the repo
744 it is serving. Client checks to see if it understands the format.
740 it is serving. Client checks to see if it understands the format.
745
741
746 The format is simple: the server writes out a line with the amount
742 The format is simple: the server writes out a line with the amount
747 of files, then the total amount of bytes to be transferred (separated
743 of files, then the total amount of bytes to be transferred (separated
748 by a space). Then, for each file, the server first writes the filename
744 by a space). Then, for each file, the server first writes the filename
749 and file size (separated by the null character), then the file contents.
745 and file size (separated by the null character), then the file contents.
750 '''
746 '''
751
747
752 if not _allowstream(repo.ui):
748 if not _allowstream(repo.ui):
753 return '1\n'
749 return '1\n'
754
750
755 entries = []
751 entries = []
756 total_bytes = 0
752 total_bytes = 0
757 try:
753 try:
758 # get consistent snapshot of repo, lock during scan
754 # get consistent snapshot of repo, lock during scan
759 lock = repo.lock()
755 lock = repo.lock()
760 try:
756 try:
761 repo.ui.debug('scanning\n')
757 repo.ui.debug('scanning\n')
762 for name, ename, size in _walkstreamfiles(repo):
758 for name, ename, size in _walkstreamfiles(repo):
763 if size:
759 if size:
764 entries.append((name, size))
760 entries.append((name, size))
765 total_bytes += size
761 total_bytes += size
766 finally:
762 finally:
767 lock.release()
763 lock.release()
768 except error.LockError:
764 except error.LockError:
769 return '2\n' # error: 2
765 return '2\n' # error: 2
770
766
771 def streamer(repo, entries, total):
767 def streamer(repo, entries, total):
772 '''stream out all metadata files in repository.'''
768 '''stream out all metadata files in repository.'''
773 yield '0\n' # success
769 yield '0\n' # success
774 repo.ui.debug('%d files, %d bytes to transfer\n' %
770 repo.ui.debug('%d files, %d bytes to transfer\n' %
775 (len(entries), total_bytes))
771 (len(entries), total_bytes))
776 yield '%d %d\n' % (len(entries), total_bytes)
772 yield '%d %d\n' % (len(entries), total_bytes)
777
773
778 sopener = repo.sopener
774 sopener = repo.sopener
779 oldaudit = sopener.mustaudit
775 oldaudit = sopener.mustaudit
780 debugflag = repo.ui.debugflag
776 debugflag = repo.ui.debugflag
781 sopener.mustaudit = False
777 sopener.mustaudit = False
782
778
783 try:
779 try:
784 for name, size in entries:
780 for name, size in entries:
785 if debugflag:
781 if debugflag:
786 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
782 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
787 # partially encode name over the wire for backwards compat
783 # partially encode name over the wire for backwards compat
788 yield '%s\0%d\n' % (store.encodedir(name), size)
784 yield '%s\0%d\n' % (store.encodedir(name), size)
789 if size <= 65536:
785 if size <= 65536:
790 fp = sopener(name)
786 fp = sopener(name)
791 try:
787 try:
792 data = fp.read(size)
788 data = fp.read(size)
793 finally:
789 finally:
794 fp.close()
790 fp.close()
795 yield data
791 yield data
796 else:
792 else:
797 for chunk in util.filechunkiter(sopener(name), limit=size):
793 for chunk in util.filechunkiter(sopener(name), limit=size):
798 yield chunk
794 yield chunk
799 # replace with "finally:" when support for python 2.4 has been dropped
795 # replace with "finally:" when support for python 2.4 has been dropped
800 except Exception:
796 except Exception:
801 sopener.mustaudit = oldaudit
797 sopener.mustaudit = oldaudit
802 raise
798 raise
803 sopener.mustaudit = oldaudit
799 sopener.mustaudit = oldaudit
804
800
805 return streamres(streamer(repo, entries, total_bytes))
801 return streamres(streamer(repo, entries, total_bytes))
806
802
807 @wireprotocommand('unbundle', 'heads')
803 @wireprotocommand('unbundle', 'heads')
808 def unbundle(repo, proto, heads):
804 def unbundle(repo, proto, heads):
809 their_heads = decodelist(heads)
805 their_heads = decodelist(heads)
810
806
811 try:
807 try:
812 proto.redirect()
808 proto.redirect()
813
809
814 exchange.check_heads(repo, their_heads, 'preparing changes')
810 exchange.check_heads(repo, their_heads, 'preparing changes')
815
811
816 # write bundle data to temporary file because it can be big
812 # write bundle data to temporary file because it can be big
817 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
813 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
818 fp = os.fdopen(fd, 'wb+')
814 fp = os.fdopen(fd, 'wb+')
819 r = 0
815 r = 0
820 try:
816 try:
821 proto.getfile(fp)
817 proto.getfile(fp)
822 fp.seek(0)
818 fp.seek(0)
823 gen = exchange.readbundle(repo.ui, fp, None)
819 gen = exchange.readbundle(repo.ui, fp, None)
824 r = exchange.unbundle(repo, gen, their_heads, 'serve',
820 r = exchange.unbundle(repo, gen, their_heads, 'serve',
825 proto._client())
821 proto._client())
826 if util.safehasattr(r, 'addpart'):
822 if util.safehasattr(r, 'addpart'):
827 # The return looks streameable, we are in the bundle2 case and
823 # The return looks streameable, we are in the bundle2 case and
828 # should return a stream.
824 # should return a stream.
829 return streamres(r.getchunks())
825 return streamres(r.getchunks())
830 return pushres(r)
826 return pushres(r)
831
827
832 finally:
828 finally:
833 fp.close()
829 fp.close()
834 os.unlink(tempname)
830 os.unlink(tempname)
835 except error.BundleValueError, exc:
831 except error.BundleValueError, exc:
836 bundler = bundle2.bundle20(repo.ui)
832 bundler = bundle2.bundle20(repo.ui)
837 errpart = bundler.newpart('B2X:ERROR:UNSUPPORTEDCONTENT')
833 errpart = bundler.newpart('B2X:ERROR:UNSUPPORTEDCONTENT')
838 if exc.parttype is not None:
834 if exc.parttype is not None:
839 errpart.addparam('parttype', exc.parttype)
835 errpart.addparam('parttype', exc.parttype)
840 if exc.params:
836 if exc.params:
841 errpart.addparam('params', '\0'.join(exc.params))
837 errpart.addparam('params', '\0'.join(exc.params))
842 return streamres(bundler.getchunks())
838 return streamres(bundler.getchunks())
843 except util.Abort, inst:
839 except util.Abort, inst:
844 # The old code we moved used sys.stderr directly.
840 # The old code we moved used sys.stderr directly.
845 # We did not change it to minimise code change.
841 # We did not change it to minimise code change.
846 # This need to be moved to something proper.
842 # This need to be moved to something proper.
847 # Feel free to do it.
843 # Feel free to do it.
848 if getattr(inst, 'duringunbundle2', False):
844 if getattr(inst, 'duringunbundle2', False):
849 bundler = bundle2.bundle20(repo.ui)
845 bundler = bundle2.bundle20(repo.ui)
850 manargs = [('message', str(inst))]
846 manargs = [('message', str(inst))]
851 advargs = []
847 advargs = []
852 if inst.hint is not None:
848 if inst.hint is not None:
853 advargs.append(('hint', inst.hint))
849 advargs.append(('hint', inst.hint))
854 bundler.addpart(bundle2.bundlepart('B2X:ERROR:ABORT',
850 bundler.addpart(bundle2.bundlepart('B2X:ERROR:ABORT',
855 manargs, advargs))
851 manargs, advargs))
856 return streamres(bundler.getchunks())
852 return streamres(bundler.getchunks())
857 else:
853 else:
858 sys.stderr.write("abort: %s\n" % inst)
854 sys.stderr.write("abort: %s\n" % inst)
859 return pushres(0)
855 return pushres(0)
860 except error.PushRaced, exc:
856 except error.PushRaced, exc:
861 if getattr(exc, 'duringunbundle2', False):
857 if getattr(exc, 'duringunbundle2', False):
862 bundler = bundle2.bundle20(repo.ui)
858 bundler = bundle2.bundle20(repo.ui)
863 bundler.newpart('B2X:ERROR:PUSHRACED', [('message', str(exc))])
859 bundler.newpart('B2X:ERROR:PUSHRACED', [('message', str(exc))])
864 return streamres(bundler.getchunks())
860 return streamres(bundler.getchunks())
865 else:
861 else:
866 return pusherr(str(exc))
862 return pusherr(str(exc))
General Comments 0
You need to be logged in to leave comments. Login now