##// END OF EJS Templates
wireproto: remove unused 'store' import
Martin von Zweigbergk -
r25240:a415e94f default
parent child Browse files
Show More
@@ -1,833 +1,833 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import urllib, tempfile, os, sys
8 import urllib, tempfile, os, sys
9 from i18n import _
9 from i18n import _
10 from node import bin, hex
10 from node import bin, hex
11 import changegroup as changegroupmod, bundle2, pushkey as pushkeymod
11 import changegroup as changegroupmod, bundle2, pushkey as pushkeymod
12 import peer, error, encoding, util, store, exchange
12 import peer, error, encoding, util, exchange
13
13
14
14
15 class abstractserverproto(object):
15 class abstractserverproto(object):
16 """abstract class that summarizes the protocol API
16 """abstract class that summarizes the protocol API
17
17
18 Used as reference and documentation.
18 Used as reference and documentation.
19 """
19 """
20
20
21 def getargs(self, args):
21 def getargs(self, args):
22 """return the value for arguments in <args>
22 """return the value for arguments in <args>
23
23
24 returns a list of values (same order as <args>)"""
24 returns a list of values (same order as <args>)"""
25 raise NotImplementedError()
25 raise NotImplementedError()
26
26
27 def getfile(self, fp):
27 def getfile(self, fp):
28 """write the whole content of a file into a file like object
28 """write the whole content of a file into a file like object
29
29
30 The file is in the form::
30 The file is in the form::
31
31
32 (<chunk-size>\n<chunk>)+0\n
32 (<chunk-size>\n<chunk>)+0\n
33
33
34 chunk size is the ascii version of the int.
34 chunk size is the ascii version of the int.
35 """
35 """
36 raise NotImplementedError()
36 raise NotImplementedError()
37
37
38 def redirect(self):
38 def redirect(self):
39 """may setup interception for stdout and stderr
39 """may setup interception for stdout and stderr
40
40
41 See also the `restore` method."""
41 See also the `restore` method."""
42 raise NotImplementedError()
42 raise NotImplementedError()
43
43
44 # If the `redirect` function does install interception, the `restore`
44 # If the `redirect` function does install interception, the `restore`
45 # function MUST be defined. If interception is not used, this function
45 # function MUST be defined. If interception is not used, this function
46 # MUST NOT be defined.
46 # MUST NOT be defined.
47 #
47 #
48 # left commented here on purpose
48 # left commented here on purpose
49 #
49 #
50 #def restore(self):
50 #def restore(self):
51 # """reinstall previous stdout and stderr and return intercepted stdout
51 # """reinstall previous stdout and stderr and return intercepted stdout
52 # """
52 # """
53 # raise NotImplementedError()
53 # raise NotImplementedError()
54
54
55 def groupchunks(self, cg):
55 def groupchunks(self, cg):
56 """return 4096 chunks from a changegroup object
56 """return 4096 chunks from a changegroup object
57
57
58 Some protocols may have compressed the contents."""
58 Some protocols may have compressed the contents."""
59 raise NotImplementedError()
59 raise NotImplementedError()
60
60
61 # abstract batching support
61 # abstract batching support
62
62
63 class future(object):
63 class future(object):
64 '''placeholder for a value to be set later'''
64 '''placeholder for a value to be set later'''
65 def set(self, value):
65 def set(self, value):
66 if util.safehasattr(self, 'value'):
66 if util.safehasattr(self, 'value'):
67 raise error.RepoError("future is already set")
67 raise error.RepoError("future is already set")
68 self.value = value
68 self.value = value
69
69
70 class batcher(object):
70 class batcher(object):
71 '''base class for batches of commands submittable in a single request
71 '''base class for batches of commands submittable in a single request
72
72
73 All methods invoked on instances of this class are simply queued and
73 All methods invoked on instances of this class are simply queued and
74 return a a future for the result. Once you call submit(), all the queued
74 return a a future for the result. Once you call submit(), all the queued
75 calls are performed and the results set in their respective futures.
75 calls are performed and the results set in their respective futures.
76 '''
76 '''
77 def __init__(self):
77 def __init__(self):
78 self.calls = []
78 self.calls = []
79 def __getattr__(self, name):
79 def __getattr__(self, name):
80 def call(*args, **opts):
80 def call(*args, **opts):
81 resref = future()
81 resref = future()
82 self.calls.append((name, args, opts, resref,))
82 self.calls.append((name, args, opts, resref,))
83 return resref
83 return resref
84 return call
84 return call
85 def submit(self):
85 def submit(self):
86 pass
86 pass
87
87
88 class localbatch(batcher):
88 class localbatch(batcher):
89 '''performs the queued calls directly'''
89 '''performs the queued calls directly'''
90 def __init__(self, local):
90 def __init__(self, local):
91 batcher.__init__(self)
91 batcher.__init__(self)
92 self.local = local
92 self.local = local
93 def submit(self):
93 def submit(self):
94 for name, args, opts, resref in self.calls:
94 for name, args, opts, resref in self.calls:
95 resref.set(getattr(self.local, name)(*args, **opts))
95 resref.set(getattr(self.local, name)(*args, **opts))
96
96
97 class remotebatch(batcher):
97 class remotebatch(batcher):
98 '''batches the queued calls; uses as few roundtrips as possible'''
98 '''batches the queued calls; uses as few roundtrips as possible'''
99 def __init__(self, remote):
99 def __init__(self, remote):
100 '''remote must support _submitbatch(encbatch) and
100 '''remote must support _submitbatch(encbatch) and
101 _submitone(op, encargs)'''
101 _submitone(op, encargs)'''
102 batcher.__init__(self)
102 batcher.__init__(self)
103 self.remote = remote
103 self.remote = remote
104 def submit(self):
104 def submit(self):
105 req, rsp = [], []
105 req, rsp = [], []
106 for name, args, opts, resref in self.calls:
106 for name, args, opts, resref in self.calls:
107 mtd = getattr(self.remote, name)
107 mtd = getattr(self.remote, name)
108 batchablefn = getattr(mtd, 'batchable', None)
108 batchablefn = getattr(mtd, 'batchable', None)
109 if batchablefn is not None:
109 if batchablefn is not None:
110 batchable = batchablefn(mtd.im_self, *args, **opts)
110 batchable = batchablefn(mtd.im_self, *args, **opts)
111 encargsorres, encresref = batchable.next()
111 encargsorres, encresref = batchable.next()
112 if encresref:
112 if encresref:
113 req.append((name, encargsorres,))
113 req.append((name, encargsorres,))
114 rsp.append((batchable, encresref, resref,))
114 rsp.append((batchable, encresref, resref,))
115 else:
115 else:
116 resref.set(encargsorres)
116 resref.set(encargsorres)
117 else:
117 else:
118 if req:
118 if req:
119 self._submitreq(req, rsp)
119 self._submitreq(req, rsp)
120 req, rsp = [], []
120 req, rsp = [], []
121 resref.set(mtd(*args, **opts))
121 resref.set(mtd(*args, **opts))
122 if req:
122 if req:
123 self._submitreq(req, rsp)
123 self._submitreq(req, rsp)
124 def _submitreq(self, req, rsp):
124 def _submitreq(self, req, rsp):
125 encresults = self.remote._submitbatch(req)
125 encresults = self.remote._submitbatch(req)
126 for encres, r in zip(encresults, rsp):
126 for encres, r in zip(encresults, rsp):
127 batchable, encresref, resref = r
127 batchable, encresref, resref = r
128 encresref.set(encres)
128 encresref.set(encres)
129 resref.set(batchable.next())
129 resref.set(batchable.next())
130
130
131 def batchable(f):
131 def batchable(f):
132 '''annotation for batchable methods
132 '''annotation for batchable methods
133
133
134 Such methods must implement a coroutine as follows:
134 Such methods must implement a coroutine as follows:
135
135
136 @batchable
136 @batchable
137 def sample(self, one, two=None):
137 def sample(self, one, two=None):
138 # Handle locally computable results first:
138 # Handle locally computable results first:
139 if not one:
139 if not one:
140 yield "a local result", None
140 yield "a local result", None
141 # Build list of encoded arguments suitable for your wire protocol:
141 # Build list of encoded arguments suitable for your wire protocol:
142 encargs = [('one', encode(one),), ('two', encode(two),)]
142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 # Create future for injection of encoded result:
143 # Create future for injection of encoded result:
144 encresref = future()
144 encresref = future()
145 # Return encoded arguments and future:
145 # Return encoded arguments and future:
146 yield encargs, encresref
146 yield encargs, encresref
147 # Assuming the future to be filled with the result from the batched
147 # Assuming the future to be filled with the result from the batched
148 # request now. Decode it:
148 # request now. Decode it:
149 yield decode(encresref.value)
149 yield decode(encresref.value)
150
150
151 The decorator returns a function which wraps this coroutine as a plain
151 The decorator returns a function which wraps this coroutine as a plain
152 method, but adds the original method as an attribute called "batchable",
152 method, but adds the original method as an attribute called "batchable",
153 which is used by remotebatch to split the call into separate encoding and
153 which is used by remotebatch to split the call into separate encoding and
154 decoding phases.
154 decoding phases.
155 '''
155 '''
156 def plain(*args, **opts):
156 def plain(*args, **opts):
157 batchable = f(*args, **opts)
157 batchable = f(*args, **opts)
158 encargsorres, encresref = batchable.next()
158 encargsorres, encresref = batchable.next()
159 if not encresref:
159 if not encresref:
160 return encargsorres # a local result in this case
160 return encargsorres # a local result in this case
161 self = args[0]
161 self = args[0]
162 encresref.set(self._submitone(f.func_name, encargsorres))
162 encresref.set(self._submitone(f.func_name, encargsorres))
163 return batchable.next()
163 return batchable.next()
164 setattr(plain, 'batchable', f)
164 setattr(plain, 'batchable', f)
165 return plain
165 return plain
166
166
167 # list of nodes encoding / decoding
167 # list of nodes encoding / decoding
168
168
169 def decodelist(l, sep=' '):
169 def decodelist(l, sep=' '):
170 if l:
170 if l:
171 return map(bin, l.split(sep))
171 return map(bin, l.split(sep))
172 return []
172 return []
173
173
174 def encodelist(l, sep=' '):
174 def encodelist(l, sep=' '):
175 try:
175 try:
176 return sep.join(map(hex, l))
176 return sep.join(map(hex, l))
177 except TypeError:
177 except TypeError:
178 print l
178 print l
179 raise
179 raise
180
180
181 # batched call argument encoding
181 # batched call argument encoding
182
182
183 def escapearg(plain):
183 def escapearg(plain):
184 return (plain
184 return (plain
185 .replace(':', '::')
185 .replace(':', '::')
186 .replace(',', ':,')
186 .replace(',', ':,')
187 .replace(';', ':;')
187 .replace(';', ':;')
188 .replace('=', ':='))
188 .replace('=', ':='))
189
189
190 def unescapearg(escaped):
190 def unescapearg(escaped):
191 return (escaped
191 return (escaped
192 .replace(':=', '=')
192 .replace(':=', '=')
193 .replace(':;', ';')
193 .replace(':;', ';')
194 .replace(':,', ',')
194 .replace(':,', ',')
195 .replace('::', ':'))
195 .replace('::', ':'))
196
196
197 # mapping of options accepted by getbundle and their types
197 # mapping of options accepted by getbundle and their types
198 #
198 #
199 # Meant to be extended by extensions. It is extensions responsibility to ensure
199 # Meant to be extended by extensions. It is extensions responsibility to ensure
200 # such options are properly processed in exchange.getbundle.
200 # such options are properly processed in exchange.getbundle.
201 #
201 #
202 # supported types are:
202 # supported types are:
203 #
203 #
204 # :nodes: list of binary nodes
204 # :nodes: list of binary nodes
205 # :csv: list of comma-separated values
205 # :csv: list of comma-separated values
206 # :plain: string with no transformation needed.
206 # :plain: string with no transformation needed.
207 gboptsmap = {'heads': 'nodes',
207 gboptsmap = {'heads': 'nodes',
208 'common': 'nodes',
208 'common': 'nodes',
209 'obsmarkers': 'boolean',
209 'obsmarkers': 'boolean',
210 'bundlecaps': 'csv',
210 'bundlecaps': 'csv',
211 'listkeys': 'csv',
211 'listkeys': 'csv',
212 'cg': 'boolean'}
212 'cg': 'boolean'}
213
213
214 # client side
214 # client side
215
215
216 class wirepeer(peer.peerrepository):
216 class wirepeer(peer.peerrepository):
217
217
218 def batch(self):
218 def batch(self):
219 return remotebatch(self)
219 return remotebatch(self)
220 def _submitbatch(self, req):
220 def _submitbatch(self, req):
221 cmds = []
221 cmds = []
222 for op, argsdict in req:
222 for op, argsdict in req:
223 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
223 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
224 cmds.append('%s %s' % (op, args))
224 cmds.append('%s %s' % (op, args))
225 rsp = self._call("batch", cmds=';'.join(cmds))
225 rsp = self._call("batch", cmds=';'.join(cmds))
226 return rsp.split(';')
226 return rsp.split(';')
227 def _submitone(self, op, args):
227 def _submitone(self, op, args):
228 return self._call(op, **args)
228 return self._call(op, **args)
229
229
230 @batchable
230 @batchable
231 def lookup(self, key):
231 def lookup(self, key):
232 self.requirecap('lookup', _('look up remote revision'))
232 self.requirecap('lookup', _('look up remote revision'))
233 f = future()
233 f = future()
234 yield {'key': encoding.fromlocal(key)}, f
234 yield {'key': encoding.fromlocal(key)}, f
235 d = f.value
235 d = f.value
236 success, data = d[:-1].split(" ", 1)
236 success, data = d[:-1].split(" ", 1)
237 if int(success):
237 if int(success):
238 yield bin(data)
238 yield bin(data)
239 self._abort(error.RepoError(data))
239 self._abort(error.RepoError(data))
240
240
241 @batchable
241 @batchable
242 def heads(self):
242 def heads(self):
243 f = future()
243 f = future()
244 yield {}, f
244 yield {}, f
245 d = f.value
245 d = f.value
246 try:
246 try:
247 yield decodelist(d[:-1])
247 yield decodelist(d[:-1])
248 except ValueError:
248 except ValueError:
249 self._abort(error.ResponseError(_("unexpected response:"), d))
249 self._abort(error.ResponseError(_("unexpected response:"), d))
250
250
251 @batchable
251 @batchable
252 def known(self, nodes):
252 def known(self, nodes):
253 f = future()
253 f = future()
254 yield {'nodes': encodelist(nodes)}, f
254 yield {'nodes': encodelist(nodes)}, f
255 d = f.value
255 d = f.value
256 try:
256 try:
257 yield [bool(int(b)) for b in d]
257 yield [bool(int(b)) for b in d]
258 except ValueError:
258 except ValueError:
259 self._abort(error.ResponseError(_("unexpected response:"), d))
259 self._abort(error.ResponseError(_("unexpected response:"), d))
260
260
261 @batchable
261 @batchable
262 def branchmap(self):
262 def branchmap(self):
263 f = future()
263 f = future()
264 yield {}, f
264 yield {}, f
265 d = f.value
265 d = f.value
266 try:
266 try:
267 branchmap = {}
267 branchmap = {}
268 for branchpart in d.splitlines():
268 for branchpart in d.splitlines():
269 branchname, branchheads = branchpart.split(' ', 1)
269 branchname, branchheads = branchpart.split(' ', 1)
270 branchname = encoding.tolocal(urllib.unquote(branchname))
270 branchname = encoding.tolocal(urllib.unquote(branchname))
271 branchheads = decodelist(branchheads)
271 branchheads = decodelist(branchheads)
272 branchmap[branchname] = branchheads
272 branchmap[branchname] = branchheads
273 yield branchmap
273 yield branchmap
274 except TypeError:
274 except TypeError:
275 self._abort(error.ResponseError(_("unexpected response:"), d))
275 self._abort(error.ResponseError(_("unexpected response:"), d))
276
276
277 def branches(self, nodes):
277 def branches(self, nodes):
278 n = encodelist(nodes)
278 n = encodelist(nodes)
279 d = self._call("branches", nodes=n)
279 d = self._call("branches", nodes=n)
280 try:
280 try:
281 br = [tuple(decodelist(b)) for b in d.splitlines()]
281 br = [tuple(decodelist(b)) for b in d.splitlines()]
282 return br
282 return br
283 except ValueError:
283 except ValueError:
284 self._abort(error.ResponseError(_("unexpected response:"), d))
284 self._abort(error.ResponseError(_("unexpected response:"), d))
285
285
286 def between(self, pairs):
286 def between(self, pairs):
287 batch = 8 # avoid giant requests
287 batch = 8 # avoid giant requests
288 r = []
288 r = []
289 for i in xrange(0, len(pairs), batch):
289 for i in xrange(0, len(pairs), batch):
290 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
290 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
291 d = self._call("between", pairs=n)
291 d = self._call("between", pairs=n)
292 try:
292 try:
293 r.extend(l and decodelist(l) or [] for l in d.splitlines())
293 r.extend(l and decodelist(l) or [] for l in d.splitlines())
294 except ValueError:
294 except ValueError:
295 self._abort(error.ResponseError(_("unexpected response:"), d))
295 self._abort(error.ResponseError(_("unexpected response:"), d))
296 return r
296 return r
297
297
298 @batchable
298 @batchable
299 def pushkey(self, namespace, key, old, new):
299 def pushkey(self, namespace, key, old, new):
300 if not self.capable('pushkey'):
300 if not self.capable('pushkey'):
301 yield False, None
301 yield False, None
302 f = future()
302 f = future()
303 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
303 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
304 yield {'namespace': encoding.fromlocal(namespace),
304 yield {'namespace': encoding.fromlocal(namespace),
305 'key': encoding.fromlocal(key),
305 'key': encoding.fromlocal(key),
306 'old': encoding.fromlocal(old),
306 'old': encoding.fromlocal(old),
307 'new': encoding.fromlocal(new)}, f
307 'new': encoding.fromlocal(new)}, f
308 d = f.value
308 d = f.value
309 d, output = d.split('\n', 1)
309 d, output = d.split('\n', 1)
310 try:
310 try:
311 d = bool(int(d))
311 d = bool(int(d))
312 except ValueError:
312 except ValueError:
313 raise error.ResponseError(
313 raise error.ResponseError(
314 _('push failed (unexpected response):'), d)
314 _('push failed (unexpected response):'), d)
315 for l in output.splitlines(True):
315 for l in output.splitlines(True):
316 self.ui.status(_('remote: '), l)
316 self.ui.status(_('remote: '), l)
317 yield d
317 yield d
318
318
319 @batchable
319 @batchable
320 def listkeys(self, namespace):
320 def listkeys(self, namespace):
321 if not self.capable('pushkey'):
321 if not self.capable('pushkey'):
322 yield {}, None
322 yield {}, None
323 f = future()
323 f = future()
324 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
324 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
325 yield {'namespace': encoding.fromlocal(namespace)}, f
325 yield {'namespace': encoding.fromlocal(namespace)}, f
326 d = f.value
326 d = f.value
327 yield pushkeymod.decodekeys(d)
327 yield pushkeymod.decodekeys(d)
328
328
329 def stream_out(self):
329 def stream_out(self):
330 return self._callstream('stream_out')
330 return self._callstream('stream_out')
331
331
332 def changegroup(self, nodes, kind):
332 def changegroup(self, nodes, kind):
333 n = encodelist(nodes)
333 n = encodelist(nodes)
334 f = self._callcompressable("changegroup", roots=n)
334 f = self._callcompressable("changegroup", roots=n)
335 return changegroupmod.cg1unpacker(f, 'UN')
335 return changegroupmod.cg1unpacker(f, 'UN')
336
336
337 def changegroupsubset(self, bases, heads, kind):
337 def changegroupsubset(self, bases, heads, kind):
338 self.requirecap('changegroupsubset', _('look up remote changes'))
338 self.requirecap('changegroupsubset', _('look up remote changes'))
339 bases = encodelist(bases)
339 bases = encodelist(bases)
340 heads = encodelist(heads)
340 heads = encodelist(heads)
341 f = self._callcompressable("changegroupsubset",
341 f = self._callcompressable("changegroupsubset",
342 bases=bases, heads=heads)
342 bases=bases, heads=heads)
343 return changegroupmod.cg1unpacker(f, 'UN')
343 return changegroupmod.cg1unpacker(f, 'UN')
344
344
345 def getbundle(self, source, **kwargs):
345 def getbundle(self, source, **kwargs):
346 self.requirecap('getbundle', _('look up remote changes'))
346 self.requirecap('getbundle', _('look up remote changes'))
347 opts = {}
347 opts = {}
348 bundlecaps = kwargs.get('bundlecaps')
348 bundlecaps = kwargs.get('bundlecaps')
349 if bundlecaps is not None:
349 if bundlecaps is not None:
350 kwargs['bundlecaps'] = sorted(bundlecaps)
350 kwargs['bundlecaps'] = sorted(bundlecaps)
351 else:
351 else:
352 bundlecaps = () # kwargs could have it to None
352 bundlecaps = () # kwargs could have it to None
353 for key, value in kwargs.iteritems():
353 for key, value in kwargs.iteritems():
354 if value is None:
354 if value is None:
355 continue
355 continue
356 keytype = gboptsmap.get(key)
356 keytype = gboptsmap.get(key)
357 if keytype is None:
357 if keytype is None:
358 assert False, 'unexpected'
358 assert False, 'unexpected'
359 elif keytype == 'nodes':
359 elif keytype == 'nodes':
360 value = encodelist(value)
360 value = encodelist(value)
361 elif keytype == 'csv':
361 elif keytype == 'csv':
362 value = ','.join(value)
362 value = ','.join(value)
363 elif keytype == 'boolean':
363 elif keytype == 'boolean':
364 value = '%i' % bool(value)
364 value = '%i' % bool(value)
365 elif keytype != 'plain':
365 elif keytype != 'plain':
366 raise KeyError('unknown getbundle option type %s'
366 raise KeyError('unknown getbundle option type %s'
367 % keytype)
367 % keytype)
368 opts[key] = value
368 opts[key] = value
369 f = self._callcompressable("getbundle", **opts)
369 f = self._callcompressable("getbundle", **opts)
370 if any((cap.startswith('HG2') for cap in bundlecaps)):
370 if any((cap.startswith('HG2') for cap in bundlecaps)):
371 return bundle2.getunbundler(self.ui, f)
371 return bundle2.getunbundler(self.ui, f)
372 else:
372 else:
373 return changegroupmod.cg1unpacker(f, 'UN')
373 return changegroupmod.cg1unpacker(f, 'UN')
374
374
375 def unbundle(self, cg, heads, source):
375 def unbundle(self, cg, heads, source):
376 '''Send cg (a readable file-like object representing the
376 '''Send cg (a readable file-like object representing the
377 changegroup to push, typically a chunkbuffer object) to the
377 changegroup to push, typically a chunkbuffer object) to the
378 remote server as a bundle.
378 remote server as a bundle.
379
379
380 When pushing a bundle10 stream, return an integer indicating the
380 When pushing a bundle10 stream, return an integer indicating the
381 result of the push (see localrepository.addchangegroup()).
381 result of the push (see localrepository.addchangegroup()).
382
382
383 When pushing a bundle20 stream, return a bundle20 stream.'''
383 When pushing a bundle20 stream, return a bundle20 stream.'''
384
384
385 if heads != ['force'] and self.capable('unbundlehash'):
385 if heads != ['force'] and self.capable('unbundlehash'):
386 heads = encodelist(['hashed',
386 heads = encodelist(['hashed',
387 util.sha1(''.join(sorted(heads))).digest()])
387 util.sha1(''.join(sorted(heads))).digest()])
388 else:
388 else:
389 heads = encodelist(heads)
389 heads = encodelist(heads)
390
390
391 if util.safehasattr(cg, 'deltaheader'):
391 if util.safehasattr(cg, 'deltaheader'):
392 # this a bundle10, do the old style call sequence
392 # this a bundle10, do the old style call sequence
393 ret, output = self._callpush("unbundle", cg, heads=heads)
393 ret, output = self._callpush("unbundle", cg, heads=heads)
394 if ret == "":
394 if ret == "":
395 raise error.ResponseError(
395 raise error.ResponseError(
396 _('push failed:'), output)
396 _('push failed:'), output)
397 try:
397 try:
398 ret = int(ret)
398 ret = int(ret)
399 except ValueError:
399 except ValueError:
400 raise error.ResponseError(
400 raise error.ResponseError(
401 _('push failed (unexpected response):'), ret)
401 _('push failed (unexpected response):'), ret)
402
402
403 for l in output.splitlines(True):
403 for l in output.splitlines(True):
404 self.ui.status(_('remote: '), l)
404 self.ui.status(_('remote: '), l)
405 else:
405 else:
406 # bundle2 push. Send a stream, fetch a stream.
406 # bundle2 push. Send a stream, fetch a stream.
407 stream = self._calltwowaystream('unbundle', cg, heads=heads)
407 stream = self._calltwowaystream('unbundle', cg, heads=heads)
408 ret = bundle2.getunbundler(self.ui, stream)
408 ret = bundle2.getunbundler(self.ui, stream)
409 return ret
409 return ret
410
410
411 def debugwireargs(self, one, two, three=None, four=None, five=None):
411 def debugwireargs(self, one, two, three=None, four=None, five=None):
412 # don't pass optional arguments left at their default value
412 # don't pass optional arguments left at their default value
413 opts = {}
413 opts = {}
414 if three is not None:
414 if three is not None:
415 opts['three'] = three
415 opts['three'] = three
416 if four is not None:
416 if four is not None:
417 opts['four'] = four
417 opts['four'] = four
418 return self._call('debugwireargs', one=one, two=two, **opts)
418 return self._call('debugwireargs', one=one, two=two, **opts)
419
419
420 def _call(self, cmd, **args):
420 def _call(self, cmd, **args):
421 """execute <cmd> on the server
421 """execute <cmd> on the server
422
422
423 The command is expected to return a simple string.
423 The command is expected to return a simple string.
424
424
425 returns the server reply as a string."""
425 returns the server reply as a string."""
426 raise NotImplementedError()
426 raise NotImplementedError()
427
427
428 def _callstream(self, cmd, **args):
428 def _callstream(self, cmd, **args):
429 """execute <cmd> on the server
429 """execute <cmd> on the server
430
430
431 The command is expected to return a stream.
431 The command is expected to return a stream.
432
432
433 returns the server reply as a file like object."""
433 returns the server reply as a file like object."""
434 raise NotImplementedError()
434 raise NotImplementedError()
435
435
436 def _callcompressable(self, cmd, **args):
436 def _callcompressable(self, cmd, **args):
437 """execute <cmd> on the server
437 """execute <cmd> on the server
438
438
439 The command is expected to return a stream.
439 The command is expected to return a stream.
440
440
441 The stream may have been compressed in some implementations. This
441 The stream may have been compressed in some implementations. This
442 function takes care of the decompression. This is the only difference
442 function takes care of the decompression. This is the only difference
443 with _callstream.
443 with _callstream.
444
444
445 returns the server reply as a file like object.
445 returns the server reply as a file like object.
446 """
446 """
447 raise NotImplementedError()
447 raise NotImplementedError()
448
448
449 def _callpush(self, cmd, fp, **args):
449 def _callpush(self, cmd, fp, **args):
450 """execute a <cmd> on server
450 """execute a <cmd> on server
451
451
452 The command is expected to be related to a push. Push has a special
452 The command is expected to be related to a push. Push has a special
453 return method.
453 return method.
454
454
455 returns the server reply as a (ret, output) tuple. ret is either
455 returns the server reply as a (ret, output) tuple. ret is either
456 empty (error) or a stringified int.
456 empty (error) or a stringified int.
457 """
457 """
458 raise NotImplementedError()
458 raise NotImplementedError()
459
459
460 def _calltwowaystream(self, cmd, fp, **args):
460 def _calltwowaystream(self, cmd, fp, **args):
461 """execute <cmd> on server
461 """execute <cmd> on server
462
462
463 The command will send a stream to the server and get a stream in reply.
463 The command will send a stream to the server and get a stream in reply.
464 """
464 """
465 raise NotImplementedError()
465 raise NotImplementedError()
466
466
467 def _abort(self, exception):
467 def _abort(self, exception):
468 """clearly abort the wire protocol connection and raise the exception
468 """clearly abort the wire protocol connection and raise the exception
469 """
469 """
470 raise NotImplementedError()
470 raise NotImplementedError()
471
471
472 # server side
472 # server side
473
473
474 # wire protocol command can either return a string or one of these classes.
474 # wire protocol command can either return a string or one of these classes.
475 class streamres(object):
475 class streamres(object):
476 """wireproto reply: binary stream
476 """wireproto reply: binary stream
477
477
478 The call was successful and the result is a stream.
478 The call was successful and the result is a stream.
479 Iterate on the `self.gen` attribute to retrieve chunks.
479 Iterate on the `self.gen` attribute to retrieve chunks.
480 """
480 """
481 def __init__(self, gen):
481 def __init__(self, gen):
482 self.gen = gen
482 self.gen = gen
483
483
484 class pushres(object):
484 class pushres(object):
485 """wireproto reply: success with simple integer return
485 """wireproto reply: success with simple integer return
486
486
487 The call was successful and returned an integer contained in `self.res`.
487 The call was successful and returned an integer contained in `self.res`.
488 """
488 """
489 def __init__(self, res):
489 def __init__(self, res):
490 self.res = res
490 self.res = res
491
491
492 class pusherr(object):
492 class pusherr(object):
493 """wireproto reply: failure
493 """wireproto reply: failure
494
494
495 The call failed. The `self.res` attribute contains the error message.
495 The call failed. The `self.res` attribute contains the error message.
496 """
496 """
497 def __init__(self, res):
497 def __init__(self, res):
498 self.res = res
498 self.res = res
499
499
500 class ooberror(object):
500 class ooberror(object):
501 """wireproto reply: failure of a batch of operation
501 """wireproto reply: failure of a batch of operation
502
502
503 Something failed during a batch call. The error message is stored in
503 Something failed during a batch call. The error message is stored in
504 `self.message`.
504 `self.message`.
505 """
505 """
506 def __init__(self, message):
506 def __init__(self, message):
507 self.message = message
507 self.message = message
508
508
509 def dispatch(repo, proto, command):
509 def dispatch(repo, proto, command):
510 repo = repo.filtered("served")
510 repo = repo.filtered("served")
511 func, spec = commands[command]
511 func, spec = commands[command]
512 args = proto.getargs(spec)
512 args = proto.getargs(spec)
513 return func(repo, proto, *args)
513 return func(repo, proto, *args)
514
514
515 def options(cmd, keys, others):
515 def options(cmd, keys, others):
516 opts = {}
516 opts = {}
517 for k in keys:
517 for k in keys:
518 if k in others:
518 if k in others:
519 opts[k] = others[k]
519 opts[k] = others[k]
520 del others[k]
520 del others[k]
521 if others:
521 if others:
522 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
522 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
523 % (cmd, ",".join(others)))
523 % (cmd, ",".join(others)))
524 return opts
524 return opts
525
525
526 # list of commands
526 # list of commands
527 commands = {}
527 commands = {}
528
528
529 def wireprotocommand(name, args=''):
529 def wireprotocommand(name, args=''):
530 """decorator for wire protocol command"""
530 """decorator for wire protocol command"""
531 def register(func):
531 def register(func):
532 commands[name] = (func, args)
532 commands[name] = (func, args)
533 return func
533 return func
534 return register
534 return register
535
535
536 @wireprotocommand('batch', 'cmds *')
536 @wireprotocommand('batch', 'cmds *')
537 def batch(repo, proto, cmds, others):
537 def batch(repo, proto, cmds, others):
538 repo = repo.filtered("served")
538 repo = repo.filtered("served")
539 res = []
539 res = []
540 for pair in cmds.split(';'):
540 for pair in cmds.split(';'):
541 op, args = pair.split(' ', 1)
541 op, args = pair.split(' ', 1)
542 vals = {}
542 vals = {}
543 for a in args.split(','):
543 for a in args.split(','):
544 if a:
544 if a:
545 n, v = a.split('=')
545 n, v = a.split('=')
546 vals[n] = unescapearg(v)
546 vals[n] = unescapearg(v)
547 func, spec = commands[op]
547 func, spec = commands[op]
548 if spec:
548 if spec:
549 keys = spec.split()
549 keys = spec.split()
550 data = {}
550 data = {}
551 for k in keys:
551 for k in keys:
552 if k == '*':
552 if k == '*':
553 star = {}
553 star = {}
554 for key in vals.keys():
554 for key in vals.keys():
555 if key not in keys:
555 if key not in keys:
556 star[key] = vals[key]
556 star[key] = vals[key]
557 data['*'] = star
557 data['*'] = star
558 else:
558 else:
559 data[k] = vals[k]
559 data[k] = vals[k]
560 result = func(repo, proto, *[data[k] for k in keys])
560 result = func(repo, proto, *[data[k] for k in keys])
561 else:
561 else:
562 result = func(repo, proto)
562 result = func(repo, proto)
563 if isinstance(result, ooberror):
563 if isinstance(result, ooberror):
564 return result
564 return result
565 res.append(escapearg(result))
565 res.append(escapearg(result))
566 return ';'.join(res)
566 return ';'.join(res)
567
567
568 @wireprotocommand('between', 'pairs')
568 @wireprotocommand('between', 'pairs')
569 def between(repo, proto, pairs):
569 def between(repo, proto, pairs):
570 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
570 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
571 r = []
571 r = []
572 for b in repo.between(pairs):
572 for b in repo.between(pairs):
573 r.append(encodelist(b) + "\n")
573 r.append(encodelist(b) + "\n")
574 return "".join(r)
574 return "".join(r)
575
575
576 @wireprotocommand('branchmap')
576 @wireprotocommand('branchmap')
577 def branchmap(repo, proto):
577 def branchmap(repo, proto):
578 branchmap = repo.branchmap()
578 branchmap = repo.branchmap()
579 heads = []
579 heads = []
580 for branch, nodes in branchmap.iteritems():
580 for branch, nodes in branchmap.iteritems():
581 branchname = urllib.quote(encoding.fromlocal(branch))
581 branchname = urllib.quote(encoding.fromlocal(branch))
582 branchnodes = encodelist(nodes)
582 branchnodes = encodelist(nodes)
583 heads.append('%s %s' % (branchname, branchnodes))
583 heads.append('%s %s' % (branchname, branchnodes))
584 return '\n'.join(heads)
584 return '\n'.join(heads)
585
585
586 @wireprotocommand('branches', 'nodes')
586 @wireprotocommand('branches', 'nodes')
587 def branches(repo, proto, nodes):
587 def branches(repo, proto, nodes):
588 nodes = decodelist(nodes)
588 nodes = decodelist(nodes)
589 r = []
589 r = []
590 for b in repo.branches(nodes):
590 for b in repo.branches(nodes):
591 r.append(encodelist(b) + "\n")
591 r.append(encodelist(b) + "\n")
592 return "".join(r)
592 return "".join(r)
593
593
594
594
595 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
595 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
596 'known', 'getbundle', 'unbundlehash', 'batch']
596 'known', 'getbundle', 'unbundlehash', 'batch']
597
597
598 def _capabilities(repo, proto):
598 def _capabilities(repo, proto):
599 """return a list of capabilities for a repo
599 """return a list of capabilities for a repo
600
600
601 This function exists to allow extensions to easily wrap capabilities
601 This function exists to allow extensions to easily wrap capabilities
602 computation
602 computation
603
603
604 - returns a lists: easy to alter
604 - returns a lists: easy to alter
605 - change done here will be propagated to both `capabilities` and `hello`
605 - change done here will be propagated to both `capabilities` and `hello`
606 command without any other action needed.
606 command without any other action needed.
607 """
607 """
608 # copy to prevent modification of the global list
608 # copy to prevent modification of the global list
609 caps = list(wireprotocaps)
609 caps = list(wireprotocaps)
610 if _allowstream(repo.ui):
610 if _allowstream(repo.ui):
611 if repo.ui.configbool('server', 'preferuncompressed', False):
611 if repo.ui.configbool('server', 'preferuncompressed', False):
612 caps.append('stream-preferred')
612 caps.append('stream-preferred')
613 requiredformats = repo.requirements & repo.supportedformats
613 requiredformats = repo.requirements & repo.supportedformats
614 # if our local revlogs are just revlogv1, add 'stream' cap
614 # if our local revlogs are just revlogv1, add 'stream' cap
615 if not requiredformats - set(('revlogv1',)):
615 if not requiredformats - set(('revlogv1',)):
616 caps.append('stream')
616 caps.append('stream')
617 # otherwise, add 'streamreqs' detailing our local revlog format
617 # otherwise, add 'streamreqs' detailing our local revlog format
618 else:
618 else:
619 caps.append('streamreqs=%s' % ','.join(requiredformats))
619 caps.append('streamreqs=%s' % ','.join(requiredformats))
620 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
620 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
621 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
621 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
622 caps.append('bundle2=' + urllib.quote(capsblob))
622 caps.append('bundle2=' + urllib.quote(capsblob))
623 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
623 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
624 caps.append('httpheader=1024')
624 caps.append('httpheader=1024')
625 return caps
625 return caps
626
626
627 # If you are writing an extension and consider wrapping this function. Wrap
627 # If you are writing an extension and consider wrapping this function. Wrap
628 # `_capabilities` instead.
628 # `_capabilities` instead.
629 @wireprotocommand('capabilities')
629 @wireprotocommand('capabilities')
630 def capabilities(repo, proto):
630 def capabilities(repo, proto):
631 return ' '.join(_capabilities(repo, proto))
631 return ' '.join(_capabilities(repo, proto))
632
632
633 @wireprotocommand('changegroup', 'roots')
633 @wireprotocommand('changegroup', 'roots')
634 def changegroup(repo, proto, roots):
634 def changegroup(repo, proto, roots):
635 nodes = decodelist(roots)
635 nodes = decodelist(roots)
636 cg = changegroupmod.changegroup(repo, nodes, 'serve')
636 cg = changegroupmod.changegroup(repo, nodes, 'serve')
637 return streamres(proto.groupchunks(cg))
637 return streamres(proto.groupchunks(cg))
638
638
639 @wireprotocommand('changegroupsubset', 'bases heads')
639 @wireprotocommand('changegroupsubset', 'bases heads')
640 def changegroupsubset(repo, proto, bases, heads):
640 def changegroupsubset(repo, proto, bases, heads):
641 bases = decodelist(bases)
641 bases = decodelist(bases)
642 heads = decodelist(heads)
642 heads = decodelist(heads)
643 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
643 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
644 return streamres(proto.groupchunks(cg))
644 return streamres(proto.groupchunks(cg))
645
645
646 @wireprotocommand('debugwireargs', 'one two *')
646 @wireprotocommand('debugwireargs', 'one two *')
647 def debugwireargs(repo, proto, one, two, others):
647 def debugwireargs(repo, proto, one, two, others):
648 # only accept optional args from the known set
648 # only accept optional args from the known set
649 opts = options('debugwireargs', ['three', 'four'], others)
649 opts = options('debugwireargs', ['three', 'four'], others)
650 return repo.debugwireargs(one, two, **opts)
650 return repo.debugwireargs(one, two, **opts)
651
651
652 # List of options accepted by getbundle.
652 # List of options accepted by getbundle.
653 #
653 #
654 # Meant to be extended by extensions. It is the extension's responsibility to
654 # Meant to be extended by extensions. It is the extension's responsibility to
655 # ensure such options are properly processed in exchange.getbundle.
655 # ensure such options are properly processed in exchange.getbundle.
656 gboptslist = ['heads', 'common', 'bundlecaps']
656 gboptslist = ['heads', 'common', 'bundlecaps']
657
657
658 @wireprotocommand('getbundle', '*')
658 @wireprotocommand('getbundle', '*')
659 def getbundle(repo, proto, others):
659 def getbundle(repo, proto, others):
660 opts = options('getbundle', gboptsmap.keys(), others)
660 opts = options('getbundle', gboptsmap.keys(), others)
661 for k, v in opts.iteritems():
661 for k, v in opts.iteritems():
662 keytype = gboptsmap[k]
662 keytype = gboptsmap[k]
663 if keytype == 'nodes':
663 if keytype == 'nodes':
664 opts[k] = decodelist(v)
664 opts[k] = decodelist(v)
665 elif keytype == 'csv':
665 elif keytype == 'csv':
666 opts[k] = set(v.split(','))
666 opts[k] = set(v.split(','))
667 elif keytype == 'boolean':
667 elif keytype == 'boolean':
668 opts[k] = bool(v)
668 opts[k] = bool(v)
669 elif keytype != 'plain':
669 elif keytype != 'plain':
670 raise KeyError('unknown getbundle option type %s'
670 raise KeyError('unknown getbundle option type %s'
671 % keytype)
671 % keytype)
672 cg = exchange.getbundle(repo, 'serve', **opts)
672 cg = exchange.getbundle(repo, 'serve', **opts)
673 return streamres(proto.groupchunks(cg))
673 return streamres(proto.groupchunks(cg))
674
674
675 @wireprotocommand('heads')
675 @wireprotocommand('heads')
676 def heads(repo, proto):
676 def heads(repo, proto):
677 h = repo.heads()
677 h = repo.heads()
678 return encodelist(h) + "\n"
678 return encodelist(h) + "\n"
679
679
680 @wireprotocommand('hello')
680 @wireprotocommand('hello')
681 def hello(repo, proto):
681 def hello(repo, proto):
682 '''the hello command returns a set of lines describing various
682 '''the hello command returns a set of lines describing various
683 interesting things about the server, in an RFC822-like format.
683 interesting things about the server, in an RFC822-like format.
684 Currently the only one defined is "capabilities", which
684 Currently the only one defined is "capabilities", which
685 consists of a line in the form:
685 consists of a line in the form:
686
686
687 capabilities: space separated list of tokens
687 capabilities: space separated list of tokens
688 '''
688 '''
689 return "capabilities: %s\n" % (capabilities(repo, proto))
689 return "capabilities: %s\n" % (capabilities(repo, proto))
690
690
691 @wireprotocommand('listkeys', 'namespace')
691 @wireprotocommand('listkeys', 'namespace')
692 def listkeys(repo, proto, namespace):
692 def listkeys(repo, proto, namespace):
693 d = repo.listkeys(encoding.tolocal(namespace)).items()
693 d = repo.listkeys(encoding.tolocal(namespace)).items()
694 return pushkeymod.encodekeys(d)
694 return pushkeymod.encodekeys(d)
695
695
696 @wireprotocommand('lookup', 'key')
696 @wireprotocommand('lookup', 'key')
697 def lookup(repo, proto, key):
697 def lookup(repo, proto, key):
698 try:
698 try:
699 k = encoding.tolocal(key)
699 k = encoding.tolocal(key)
700 c = repo[k]
700 c = repo[k]
701 r = c.hex()
701 r = c.hex()
702 success = 1
702 success = 1
703 except Exception, inst:
703 except Exception, inst:
704 r = str(inst)
704 r = str(inst)
705 success = 0
705 success = 0
706 return "%s %s\n" % (success, r)
706 return "%s %s\n" % (success, r)
707
707
708 @wireprotocommand('known', 'nodes *')
708 @wireprotocommand('known', 'nodes *')
709 def known(repo, proto, nodes, others):
709 def known(repo, proto, nodes, others):
710 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
710 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
711
711
712 @wireprotocommand('pushkey', 'namespace key old new')
712 @wireprotocommand('pushkey', 'namespace key old new')
713 def pushkey(repo, proto, namespace, key, old, new):
713 def pushkey(repo, proto, namespace, key, old, new):
714 # compatibility with pre-1.8 clients which were accidentally
714 # compatibility with pre-1.8 clients which were accidentally
715 # sending raw binary nodes rather than utf-8-encoded hex
715 # sending raw binary nodes rather than utf-8-encoded hex
716 if len(new) == 20 and new.encode('string-escape') != new:
716 if len(new) == 20 and new.encode('string-escape') != new:
717 # looks like it could be a binary node
717 # looks like it could be a binary node
718 try:
718 try:
719 new.decode('utf-8')
719 new.decode('utf-8')
720 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
720 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
721 except UnicodeDecodeError:
721 except UnicodeDecodeError:
722 pass # binary, leave unmodified
722 pass # binary, leave unmodified
723 else:
723 else:
724 new = encoding.tolocal(new) # normal path
724 new = encoding.tolocal(new) # normal path
725
725
726 if util.safehasattr(proto, 'restore'):
726 if util.safehasattr(proto, 'restore'):
727
727
728 proto.redirect()
728 proto.redirect()
729
729
730 try:
730 try:
731 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
731 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
732 encoding.tolocal(old), new) or False
732 encoding.tolocal(old), new) or False
733 except util.Abort:
733 except util.Abort:
734 r = False
734 r = False
735
735
736 output = proto.restore()
736 output = proto.restore()
737
737
738 return '%s\n%s' % (int(r), output)
738 return '%s\n%s' % (int(r), output)
739
739
740 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
740 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
741 encoding.tolocal(old), new)
741 encoding.tolocal(old), new)
742 return '%s\n' % int(r)
742 return '%s\n' % int(r)
743
743
744 def _allowstream(ui):
744 def _allowstream(ui):
745 return ui.configbool('server', 'uncompressed', True, untrusted=True)
745 return ui.configbool('server', 'uncompressed', True, untrusted=True)
746
746
747 @wireprotocommand('stream_out')
747 @wireprotocommand('stream_out')
748 def stream(repo, proto):
748 def stream(repo, proto):
749 '''If the server supports streaming clone, it advertises the "stream"
749 '''If the server supports streaming clone, it advertises the "stream"
750 capability with a value representing the version and flags of the repo
750 capability with a value representing the version and flags of the repo
751 it is serving. Client checks to see if it understands the format.
751 it is serving. Client checks to see if it understands the format.
752 '''
752 '''
753 if not _allowstream(repo.ui):
753 if not _allowstream(repo.ui):
754 return '1\n'
754 return '1\n'
755
755
756 def getstream(it):
756 def getstream(it):
757 yield '0\n'
757 yield '0\n'
758 for chunk in it:
758 for chunk in it:
759 yield chunk
759 yield chunk
760
760
761 try:
761 try:
762 # LockError may be raised before the first result is yielded. Don't
762 # LockError may be raised before the first result is yielded. Don't
763 # emit output until we're sure we got the lock successfully.
763 # emit output until we're sure we got the lock successfully.
764 it = exchange.generatestreamclone(repo)
764 it = exchange.generatestreamclone(repo)
765 return streamres(getstream(it))
765 return streamres(getstream(it))
766 except error.LockError:
766 except error.LockError:
767 return '2\n'
767 return '2\n'
768
768
769 @wireprotocommand('unbundle', 'heads')
769 @wireprotocommand('unbundle', 'heads')
770 def unbundle(repo, proto, heads):
770 def unbundle(repo, proto, heads):
771 their_heads = decodelist(heads)
771 their_heads = decodelist(heads)
772
772
773 try:
773 try:
774 proto.redirect()
774 proto.redirect()
775
775
776 exchange.check_heads(repo, their_heads, 'preparing changes')
776 exchange.check_heads(repo, their_heads, 'preparing changes')
777
777
778 # write bundle data to temporary file because it can be big
778 # write bundle data to temporary file because it can be big
779 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
779 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
780 fp = os.fdopen(fd, 'wb+')
780 fp = os.fdopen(fd, 'wb+')
781 r = 0
781 r = 0
782 try:
782 try:
783 proto.getfile(fp)
783 proto.getfile(fp)
784 fp.seek(0)
784 fp.seek(0)
785 gen = exchange.readbundle(repo.ui, fp, None)
785 gen = exchange.readbundle(repo.ui, fp, None)
786 r = exchange.unbundle(repo, gen, their_heads, 'serve',
786 r = exchange.unbundle(repo, gen, their_heads, 'serve',
787 proto._client())
787 proto._client())
788 if util.safehasattr(r, 'addpart'):
788 if util.safehasattr(r, 'addpart'):
789 # The return looks streamable, we are in the bundle2 case and
789 # The return looks streamable, we are in the bundle2 case and
790 # should return a stream.
790 # should return a stream.
791 return streamres(r.getchunks())
791 return streamres(r.getchunks())
792 return pushres(r)
792 return pushres(r)
793
793
794 finally:
794 finally:
795 fp.close()
795 fp.close()
796 os.unlink(tempname)
796 os.unlink(tempname)
797
797
798 except (error.BundleValueError, util.Abort, error.PushRaced), exc:
798 except (error.BundleValueError, util.Abort, error.PushRaced), exc:
799 # handle non-bundle2 case first
799 # handle non-bundle2 case first
800 if not getattr(exc, 'duringunbundle2', False):
800 if not getattr(exc, 'duringunbundle2', False):
801 try:
801 try:
802 raise
802 raise
803 except util.Abort:
803 except util.Abort:
804 # The old code we moved used sys.stderr directly.
804 # The old code we moved used sys.stderr directly.
805 # We did not change it to minimise code change.
805 # We did not change it to minimise code change.
806 # This need to be moved to something proper.
806 # This need to be moved to something proper.
807 # Feel free to do it.
807 # Feel free to do it.
808 sys.stderr.write("abort: %s\n" % exc)
808 sys.stderr.write("abort: %s\n" % exc)
809 return pushres(0)
809 return pushres(0)
810 except error.PushRaced:
810 except error.PushRaced:
811 return pusherr(str(exc))
811 return pusherr(str(exc))
812
812
813 bundler = bundle2.bundle20(repo.ui)
813 bundler = bundle2.bundle20(repo.ui)
814 for out in getattr(exc, '_bundle2salvagedoutput', ()):
814 for out in getattr(exc, '_bundle2salvagedoutput', ()):
815 bundler.addpart(out)
815 bundler.addpart(out)
816 try:
816 try:
817 raise
817 raise
818 except error.BundleValueError, exc:
818 except error.BundleValueError, exc:
819 errpart = bundler.newpart('error:unsupportedcontent')
819 errpart = bundler.newpart('error:unsupportedcontent')
820 if exc.parttype is not None:
820 if exc.parttype is not None:
821 errpart.addparam('parttype', exc.parttype)
821 errpart.addparam('parttype', exc.parttype)
822 if exc.params:
822 if exc.params:
823 errpart.addparam('params', '\0'.join(exc.params))
823 errpart.addparam('params', '\0'.join(exc.params))
824 except util.Abort, exc:
824 except util.Abort, exc:
825 manargs = [('message', str(exc))]
825 manargs = [('message', str(exc))]
826 advargs = []
826 advargs = []
827 if exc.hint is not None:
827 if exc.hint is not None:
828 advargs.append(('hint', exc.hint))
828 advargs.append(('hint', exc.hint))
829 bundler.addpart(bundle2.bundlepart('error:abort',
829 bundler.addpart(bundle2.bundlepart('error:abort',
830 manargs, advargs))
830 manargs, advargs))
831 except error.PushRaced, exc:
831 except error.PushRaced, exc:
832 bundler.newpart('error:pushraced', [('message', str(exc))])
832 bundler.newpart('error:pushraced', [('message', str(exc))])
833 return streamres(bundler.getchunks())
833 return streamres(bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now