##// END OF EJS Templates
wireproto: remove a debug print...
Pierre-Yves David -
r25711:26579a91 default
parent child Browse files
Show More
@@ -1,860 +1,859 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import urllib, tempfile, os, sys
8 import urllib, tempfile, os, sys
9 from i18n import _
9 from i18n import _
10 from node import bin, hex
10 from node import bin, hex
11 import changegroup as changegroupmod, bundle2, pushkey as pushkeymod
11 import changegroup as changegroupmod, bundle2, pushkey as pushkeymod
12 import peer, error, encoding, util, exchange
12 import peer, error, encoding, util, exchange
13
13
14
14
15 class abstractserverproto(object):
15 class abstractserverproto(object):
16 """abstract class that summarizes the protocol API
16 """abstract class that summarizes the protocol API
17
17
18 Used as reference and documentation.
18 Used as reference and documentation.
19 """
19 """
20
20
21 def getargs(self, args):
21 def getargs(self, args):
22 """return the value for arguments in <args>
22 """return the value for arguments in <args>
23
23
24 returns a list of values (same order as <args>)"""
24 returns a list of values (same order as <args>)"""
25 raise NotImplementedError()
25 raise NotImplementedError()
26
26
27 def getfile(self, fp):
27 def getfile(self, fp):
28 """write the whole content of a file into a file like object
28 """write the whole content of a file into a file like object
29
29
30 The file is in the form::
30 The file is in the form::
31
31
32 (<chunk-size>\n<chunk>)+0\n
32 (<chunk-size>\n<chunk>)+0\n
33
33
34 chunk size is the ascii version of the int.
34 chunk size is the ascii version of the int.
35 """
35 """
36 raise NotImplementedError()
36 raise NotImplementedError()
37
37
38 def redirect(self):
38 def redirect(self):
39 """may setup interception for stdout and stderr
39 """may setup interception for stdout and stderr
40
40
41 See also the `restore` method."""
41 See also the `restore` method."""
42 raise NotImplementedError()
42 raise NotImplementedError()
43
43
44 # If the `redirect` function does install interception, the `restore`
44 # If the `redirect` function does install interception, the `restore`
45 # function MUST be defined. If interception is not used, this function
45 # function MUST be defined. If interception is not used, this function
46 # MUST NOT be defined.
46 # MUST NOT be defined.
47 #
47 #
48 # left commented here on purpose
48 # left commented here on purpose
49 #
49 #
50 #def restore(self):
50 #def restore(self):
51 # """reinstall previous stdout and stderr and return intercepted stdout
51 # """reinstall previous stdout and stderr and return intercepted stdout
52 # """
52 # """
53 # raise NotImplementedError()
53 # raise NotImplementedError()
54
54
55 def groupchunks(self, cg):
55 def groupchunks(self, cg):
56 """return 4096 chunks from a changegroup object
56 """return 4096 chunks from a changegroup object
57
57
58 Some protocols may have compressed the contents."""
58 Some protocols may have compressed the contents."""
59 raise NotImplementedError()
59 raise NotImplementedError()
60
60
61 # abstract batching support
61 # abstract batching support
62
62
63 class future(object):
63 class future(object):
64 '''placeholder for a value to be set later'''
64 '''placeholder for a value to be set later'''
65 def set(self, value):
65 def set(self, value):
66 if util.safehasattr(self, 'value'):
66 if util.safehasattr(self, 'value'):
67 raise error.RepoError("future is already set")
67 raise error.RepoError("future is already set")
68 self.value = value
68 self.value = value
69
69
70 class batcher(object):
70 class batcher(object):
71 '''base class for batches of commands submittable in a single request
71 '''base class for batches of commands submittable in a single request
72
72
73 All methods invoked on instances of this class are simply queued and
73 All methods invoked on instances of this class are simply queued and
74 return a a future for the result. Once you call submit(), all the queued
74 return a a future for the result. Once you call submit(), all the queued
75 calls are performed and the results set in their respective futures.
75 calls are performed and the results set in their respective futures.
76 '''
76 '''
77 def __init__(self):
77 def __init__(self):
78 self.calls = []
78 self.calls = []
79 def __getattr__(self, name):
79 def __getattr__(self, name):
80 def call(*args, **opts):
80 def call(*args, **opts):
81 resref = future()
81 resref = future()
82 self.calls.append((name, args, opts, resref,))
82 self.calls.append((name, args, opts, resref,))
83 return resref
83 return resref
84 return call
84 return call
85 def submit(self):
85 def submit(self):
86 pass
86 pass
87
87
88 class localbatch(batcher):
88 class localbatch(batcher):
89 '''performs the queued calls directly'''
89 '''performs the queued calls directly'''
90 def __init__(self, local):
90 def __init__(self, local):
91 batcher.__init__(self)
91 batcher.__init__(self)
92 self.local = local
92 self.local = local
93 def submit(self):
93 def submit(self):
94 for name, args, opts, resref in self.calls:
94 for name, args, opts, resref in self.calls:
95 resref.set(getattr(self.local, name)(*args, **opts))
95 resref.set(getattr(self.local, name)(*args, **opts))
96
96
97 class remotebatch(batcher):
97 class remotebatch(batcher):
98 '''batches the queued calls; uses as few roundtrips as possible'''
98 '''batches the queued calls; uses as few roundtrips as possible'''
99 def __init__(self, remote):
99 def __init__(self, remote):
100 '''remote must support _submitbatch(encbatch) and
100 '''remote must support _submitbatch(encbatch) and
101 _submitone(op, encargs)'''
101 _submitone(op, encargs)'''
102 batcher.__init__(self)
102 batcher.__init__(self)
103 self.remote = remote
103 self.remote = remote
104 def submit(self):
104 def submit(self):
105 req, rsp = [], []
105 req, rsp = [], []
106 for name, args, opts, resref in self.calls:
106 for name, args, opts, resref in self.calls:
107 mtd = getattr(self.remote, name)
107 mtd = getattr(self.remote, name)
108 batchablefn = getattr(mtd, 'batchable', None)
108 batchablefn = getattr(mtd, 'batchable', None)
109 if batchablefn is not None:
109 if batchablefn is not None:
110 batchable = batchablefn(mtd.im_self, *args, **opts)
110 batchable = batchablefn(mtd.im_self, *args, **opts)
111 encargsorres, encresref = batchable.next()
111 encargsorres, encresref = batchable.next()
112 if encresref:
112 if encresref:
113 req.append((name, encargsorres,))
113 req.append((name, encargsorres,))
114 rsp.append((batchable, encresref, resref,))
114 rsp.append((batchable, encresref, resref,))
115 else:
115 else:
116 resref.set(encargsorres)
116 resref.set(encargsorres)
117 else:
117 else:
118 if req:
118 if req:
119 self._submitreq(req, rsp)
119 self._submitreq(req, rsp)
120 req, rsp = [], []
120 req, rsp = [], []
121 resref.set(mtd(*args, **opts))
121 resref.set(mtd(*args, **opts))
122 if req:
122 if req:
123 self._submitreq(req, rsp)
123 self._submitreq(req, rsp)
124 def _submitreq(self, req, rsp):
124 def _submitreq(self, req, rsp):
125 encresults = self.remote._submitbatch(req)
125 encresults = self.remote._submitbatch(req)
126 for encres, r in zip(encresults, rsp):
126 for encres, r in zip(encresults, rsp):
127 batchable, encresref, resref = r
127 batchable, encresref, resref = r
128 encresref.set(encres)
128 encresref.set(encres)
129 resref.set(batchable.next())
129 resref.set(batchable.next())
130
130
131 def batchable(f):
131 def batchable(f):
132 '''annotation for batchable methods
132 '''annotation for batchable methods
133
133
134 Such methods must implement a coroutine as follows:
134 Such methods must implement a coroutine as follows:
135
135
136 @batchable
136 @batchable
137 def sample(self, one, two=None):
137 def sample(self, one, two=None):
138 # Handle locally computable results first:
138 # Handle locally computable results first:
139 if not one:
139 if not one:
140 yield "a local result", None
140 yield "a local result", None
141 # Build list of encoded arguments suitable for your wire protocol:
141 # Build list of encoded arguments suitable for your wire protocol:
142 encargs = [('one', encode(one),), ('two', encode(two),)]
142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 # Create future for injection of encoded result:
143 # Create future for injection of encoded result:
144 encresref = future()
144 encresref = future()
145 # Return encoded arguments and future:
145 # Return encoded arguments and future:
146 yield encargs, encresref
146 yield encargs, encresref
147 # Assuming the future to be filled with the result from the batched
147 # Assuming the future to be filled with the result from the batched
148 # request now. Decode it:
148 # request now. Decode it:
149 yield decode(encresref.value)
149 yield decode(encresref.value)
150
150
151 The decorator returns a function which wraps this coroutine as a plain
151 The decorator returns a function which wraps this coroutine as a plain
152 method, but adds the original method as an attribute called "batchable",
152 method, but adds the original method as an attribute called "batchable",
153 which is used by remotebatch to split the call into separate encoding and
153 which is used by remotebatch to split the call into separate encoding and
154 decoding phases.
154 decoding phases.
155 '''
155 '''
156 def plain(*args, **opts):
156 def plain(*args, **opts):
157 batchable = f(*args, **opts)
157 batchable = f(*args, **opts)
158 encargsorres, encresref = batchable.next()
158 encargsorres, encresref = batchable.next()
159 if not encresref:
159 if not encresref:
160 return encargsorres # a local result in this case
160 return encargsorres # a local result in this case
161 self = args[0]
161 self = args[0]
162 encresref.set(self._submitone(f.func_name, encargsorres))
162 encresref.set(self._submitone(f.func_name, encargsorres))
163 return batchable.next()
163 return batchable.next()
164 setattr(plain, 'batchable', f)
164 setattr(plain, 'batchable', f)
165 return plain
165 return plain
166
166
167 # list of nodes encoding / decoding
167 # list of nodes encoding / decoding
168
168
169 def decodelist(l, sep=' '):
169 def decodelist(l, sep=' '):
170 if l:
170 if l:
171 return map(bin, l.split(sep))
171 return map(bin, l.split(sep))
172 return []
172 return []
173
173
174 def encodelist(l, sep=' '):
174 def encodelist(l, sep=' '):
175 try:
175 try:
176 return sep.join(map(hex, l))
176 return sep.join(map(hex, l))
177 except TypeError:
177 except TypeError:
178 print l
179 raise
178 raise
180
179
181 # batched call argument encoding
180 # batched call argument encoding
182
181
183 def escapearg(plain):
182 def escapearg(plain):
184 return (plain
183 return (plain
185 .replace(':', ':c')
184 .replace(':', ':c')
186 .replace(',', ':o')
185 .replace(',', ':o')
187 .replace(';', ':s')
186 .replace(';', ':s')
188 .replace('=', ':e'))
187 .replace('=', ':e'))
189
188
190 def unescapearg(escaped):
189 def unescapearg(escaped):
191 return (escaped
190 return (escaped
192 .replace(':e', '=')
191 .replace(':e', '=')
193 .replace(':s', ';')
192 .replace(':s', ';')
194 .replace(':o', ',')
193 .replace(':o', ',')
195 .replace(':c', ':'))
194 .replace(':c', ':'))
196
195
197 # mapping of options accepted by getbundle and their types
196 # mapping of options accepted by getbundle and their types
198 #
197 #
199 # Meant to be extended by extensions. It is extensions responsibility to ensure
198 # Meant to be extended by extensions. It is extensions responsibility to ensure
200 # such options are properly processed in exchange.getbundle.
199 # such options are properly processed in exchange.getbundle.
201 #
200 #
202 # supported types are:
201 # supported types are:
203 #
202 #
204 # :nodes: list of binary nodes
203 # :nodes: list of binary nodes
205 # :csv: list of comma-separated values
204 # :csv: list of comma-separated values
206 # :scsv: list of comma-separated values return as set
205 # :scsv: list of comma-separated values return as set
207 # :plain: string with no transformation needed.
206 # :plain: string with no transformation needed.
208 gboptsmap = {'heads': 'nodes',
207 gboptsmap = {'heads': 'nodes',
209 'common': 'nodes',
208 'common': 'nodes',
210 'obsmarkers': 'boolean',
209 'obsmarkers': 'boolean',
211 'bundlecaps': 'scsv',
210 'bundlecaps': 'scsv',
212 'listkeys': 'csv',
211 'listkeys': 'csv',
213 'cg': 'boolean'}
212 'cg': 'boolean'}
214
213
215 # client side
214 # client side
216
215
217 class wirepeer(peer.peerrepository):
216 class wirepeer(peer.peerrepository):
218
217
219 def batch(self):
218 def batch(self):
220 return remotebatch(self)
219 return remotebatch(self)
221 def _submitbatch(self, req):
220 def _submitbatch(self, req):
222 cmds = []
221 cmds = []
223 for op, argsdict in req:
222 for op, argsdict in req:
224 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
223 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
225 for k, v in argsdict.iteritems())
224 for k, v in argsdict.iteritems())
226 cmds.append('%s %s' % (op, args))
225 cmds.append('%s %s' % (op, args))
227 rsp = self._call("batch", cmds=';'.join(cmds))
226 rsp = self._call("batch", cmds=';'.join(cmds))
228 return [unescapearg(r) for r in rsp.split(';')]
227 return [unescapearg(r) for r in rsp.split(';')]
229 def _submitone(self, op, args):
228 def _submitone(self, op, args):
230 return self._call(op, **args)
229 return self._call(op, **args)
231
230
232 @batchable
231 @batchable
233 def lookup(self, key):
232 def lookup(self, key):
234 self.requirecap('lookup', _('look up remote revision'))
233 self.requirecap('lookup', _('look up remote revision'))
235 f = future()
234 f = future()
236 yield {'key': encoding.fromlocal(key)}, f
235 yield {'key': encoding.fromlocal(key)}, f
237 d = f.value
236 d = f.value
238 success, data = d[:-1].split(" ", 1)
237 success, data = d[:-1].split(" ", 1)
239 if int(success):
238 if int(success):
240 yield bin(data)
239 yield bin(data)
241 self._abort(error.RepoError(data))
240 self._abort(error.RepoError(data))
242
241
243 @batchable
242 @batchable
244 def heads(self):
243 def heads(self):
245 f = future()
244 f = future()
246 yield {}, f
245 yield {}, f
247 d = f.value
246 d = f.value
248 try:
247 try:
249 yield decodelist(d[:-1])
248 yield decodelist(d[:-1])
250 except ValueError:
249 except ValueError:
251 self._abort(error.ResponseError(_("unexpected response:"), d))
250 self._abort(error.ResponseError(_("unexpected response:"), d))
252
251
253 @batchable
252 @batchable
254 def known(self, nodes):
253 def known(self, nodes):
255 f = future()
254 f = future()
256 yield {'nodes': encodelist(nodes)}, f
255 yield {'nodes': encodelist(nodes)}, f
257 d = f.value
256 d = f.value
258 try:
257 try:
259 yield [bool(int(b)) for b in d]
258 yield [bool(int(b)) for b in d]
260 except ValueError:
259 except ValueError:
261 self._abort(error.ResponseError(_("unexpected response:"), d))
260 self._abort(error.ResponseError(_("unexpected response:"), d))
262
261
263 @batchable
262 @batchable
264 def branchmap(self):
263 def branchmap(self):
265 f = future()
264 f = future()
266 yield {}, f
265 yield {}, f
267 d = f.value
266 d = f.value
268 try:
267 try:
269 branchmap = {}
268 branchmap = {}
270 for branchpart in d.splitlines():
269 for branchpart in d.splitlines():
271 branchname, branchheads = branchpart.split(' ', 1)
270 branchname, branchheads = branchpart.split(' ', 1)
272 branchname = encoding.tolocal(urllib.unquote(branchname))
271 branchname = encoding.tolocal(urllib.unquote(branchname))
273 branchheads = decodelist(branchheads)
272 branchheads = decodelist(branchheads)
274 branchmap[branchname] = branchheads
273 branchmap[branchname] = branchheads
275 yield branchmap
274 yield branchmap
276 except TypeError:
275 except TypeError:
277 self._abort(error.ResponseError(_("unexpected response:"), d))
276 self._abort(error.ResponseError(_("unexpected response:"), d))
278
277
279 def branches(self, nodes):
278 def branches(self, nodes):
280 n = encodelist(nodes)
279 n = encodelist(nodes)
281 d = self._call("branches", nodes=n)
280 d = self._call("branches", nodes=n)
282 try:
281 try:
283 br = [tuple(decodelist(b)) for b in d.splitlines()]
282 br = [tuple(decodelist(b)) for b in d.splitlines()]
284 return br
283 return br
285 except ValueError:
284 except ValueError:
286 self._abort(error.ResponseError(_("unexpected response:"), d))
285 self._abort(error.ResponseError(_("unexpected response:"), d))
287
286
288 def between(self, pairs):
287 def between(self, pairs):
289 batch = 8 # avoid giant requests
288 batch = 8 # avoid giant requests
290 r = []
289 r = []
291 for i in xrange(0, len(pairs), batch):
290 for i in xrange(0, len(pairs), batch):
292 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
291 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
293 d = self._call("between", pairs=n)
292 d = self._call("between", pairs=n)
294 try:
293 try:
295 r.extend(l and decodelist(l) or [] for l in d.splitlines())
294 r.extend(l and decodelist(l) or [] for l in d.splitlines())
296 except ValueError:
295 except ValueError:
297 self._abort(error.ResponseError(_("unexpected response:"), d))
296 self._abort(error.ResponseError(_("unexpected response:"), d))
298 return r
297 return r
299
298
300 @batchable
299 @batchable
301 def pushkey(self, namespace, key, old, new):
300 def pushkey(self, namespace, key, old, new):
302 if not self.capable('pushkey'):
301 if not self.capable('pushkey'):
303 yield False, None
302 yield False, None
304 f = future()
303 f = future()
305 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
304 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
306 yield {'namespace': encoding.fromlocal(namespace),
305 yield {'namespace': encoding.fromlocal(namespace),
307 'key': encoding.fromlocal(key),
306 'key': encoding.fromlocal(key),
308 'old': encoding.fromlocal(old),
307 'old': encoding.fromlocal(old),
309 'new': encoding.fromlocal(new)}, f
308 'new': encoding.fromlocal(new)}, f
310 d = f.value
309 d = f.value
311 d, output = d.split('\n', 1)
310 d, output = d.split('\n', 1)
312 try:
311 try:
313 d = bool(int(d))
312 d = bool(int(d))
314 except ValueError:
313 except ValueError:
315 raise error.ResponseError(
314 raise error.ResponseError(
316 _('push failed (unexpected response):'), d)
315 _('push failed (unexpected response):'), d)
317 for l in output.splitlines(True):
316 for l in output.splitlines(True):
318 self.ui.status(_('remote: '), l)
317 self.ui.status(_('remote: '), l)
319 yield d
318 yield d
320
319
321 @batchable
320 @batchable
322 def listkeys(self, namespace):
321 def listkeys(self, namespace):
323 if not self.capable('pushkey'):
322 if not self.capable('pushkey'):
324 yield {}, None
323 yield {}, None
325 f = future()
324 f = future()
326 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
325 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
327 yield {'namespace': encoding.fromlocal(namespace)}, f
326 yield {'namespace': encoding.fromlocal(namespace)}, f
328 d = f.value
327 d = f.value
329 self.ui.debug('received listkey for "%s": %i bytes\n'
328 self.ui.debug('received listkey for "%s": %i bytes\n'
330 % (namespace, len(d)))
329 % (namespace, len(d)))
331 yield pushkeymod.decodekeys(d)
330 yield pushkeymod.decodekeys(d)
332
331
333 def stream_out(self):
332 def stream_out(self):
334 return self._callstream('stream_out')
333 return self._callstream('stream_out')
335
334
336 def changegroup(self, nodes, kind):
335 def changegroup(self, nodes, kind):
337 n = encodelist(nodes)
336 n = encodelist(nodes)
338 f = self._callcompressable("changegroup", roots=n)
337 f = self._callcompressable("changegroup", roots=n)
339 return changegroupmod.cg1unpacker(f, 'UN')
338 return changegroupmod.cg1unpacker(f, 'UN')
340
339
341 def changegroupsubset(self, bases, heads, kind):
340 def changegroupsubset(self, bases, heads, kind):
342 self.requirecap('changegroupsubset', _('look up remote changes'))
341 self.requirecap('changegroupsubset', _('look up remote changes'))
343 bases = encodelist(bases)
342 bases = encodelist(bases)
344 heads = encodelist(heads)
343 heads = encodelist(heads)
345 f = self._callcompressable("changegroupsubset",
344 f = self._callcompressable("changegroupsubset",
346 bases=bases, heads=heads)
345 bases=bases, heads=heads)
347 return changegroupmod.cg1unpacker(f, 'UN')
346 return changegroupmod.cg1unpacker(f, 'UN')
348
347
349 def getbundle(self, source, **kwargs):
348 def getbundle(self, source, **kwargs):
350 self.requirecap('getbundle', _('look up remote changes'))
349 self.requirecap('getbundle', _('look up remote changes'))
351 opts = {}
350 opts = {}
352 bundlecaps = kwargs.get('bundlecaps')
351 bundlecaps = kwargs.get('bundlecaps')
353 if bundlecaps is not None:
352 if bundlecaps is not None:
354 kwargs['bundlecaps'] = sorted(bundlecaps)
353 kwargs['bundlecaps'] = sorted(bundlecaps)
355 else:
354 else:
356 bundlecaps = () # kwargs could have it to None
355 bundlecaps = () # kwargs could have it to None
357 for key, value in kwargs.iteritems():
356 for key, value in kwargs.iteritems():
358 if value is None:
357 if value is None:
359 continue
358 continue
360 keytype = gboptsmap.get(key)
359 keytype = gboptsmap.get(key)
361 if keytype is None:
360 if keytype is None:
362 assert False, 'unexpected'
361 assert False, 'unexpected'
363 elif keytype == 'nodes':
362 elif keytype == 'nodes':
364 value = encodelist(value)
363 value = encodelist(value)
365 elif keytype in ('csv', 'scsv'):
364 elif keytype in ('csv', 'scsv'):
366 value = ','.join(value)
365 value = ','.join(value)
367 elif keytype == 'boolean':
366 elif keytype == 'boolean':
368 value = '%i' % bool(value)
367 value = '%i' % bool(value)
369 elif keytype != 'plain':
368 elif keytype != 'plain':
370 raise KeyError('unknown getbundle option type %s'
369 raise KeyError('unknown getbundle option type %s'
371 % keytype)
370 % keytype)
372 opts[key] = value
371 opts[key] = value
373 f = self._callcompressable("getbundle", **opts)
372 f = self._callcompressable("getbundle", **opts)
374 if any((cap.startswith('HG2') for cap in bundlecaps)):
373 if any((cap.startswith('HG2') for cap in bundlecaps)):
375 return bundle2.getunbundler(self.ui, f)
374 return bundle2.getunbundler(self.ui, f)
376 else:
375 else:
377 return changegroupmod.cg1unpacker(f, 'UN')
376 return changegroupmod.cg1unpacker(f, 'UN')
378
377
379 def unbundle(self, cg, heads, source):
378 def unbundle(self, cg, heads, source):
380 '''Send cg (a readable file-like object representing the
379 '''Send cg (a readable file-like object representing the
381 changegroup to push, typically a chunkbuffer object) to the
380 changegroup to push, typically a chunkbuffer object) to the
382 remote server as a bundle.
381 remote server as a bundle.
383
382
384 When pushing a bundle10 stream, return an integer indicating the
383 When pushing a bundle10 stream, return an integer indicating the
385 result of the push (see localrepository.addchangegroup()).
384 result of the push (see localrepository.addchangegroup()).
386
385
387 When pushing a bundle20 stream, return a bundle20 stream.'''
386 When pushing a bundle20 stream, return a bundle20 stream.'''
388
387
389 if heads != ['force'] and self.capable('unbundlehash'):
388 if heads != ['force'] and self.capable('unbundlehash'):
390 heads = encodelist(['hashed',
389 heads = encodelist(['hashed',
391 util.sha1(''.join(sorted(heads))).digest()])
390 util.sha1(''.join(sorted(heads))).digest()])
392 else:
391 else:
393 heads = encodelist(heads)
392 heads = encodelist(heads)
394
393
395 if util.safehasattr(cg, 'deltaheader'):
394 if util.safehasattr(cg, 'deltaheader'):
396 # this a bundle10, do the old style call sequence
395 # this a bundle10, do the old style call sequence
397 ret, output = self._callpush("unbundle", cg, heads=heads)
396 ret, output = self._callpush("unbundle", cg, heads=heads)
398 if ret == "":
397 if ret == "":
399 raise error.ResponseError(
398 raise error.ResponseError(
400 _('push failed:'), output)
399 _('push failed:'), output)
401 try:
400 try:
402 ret = int(ret)
401 ret = int(ret)
403 except ValueError:
402 except ValueError:
404 raise error.ResponseError(
403 raise error.ResponseError(
405 _('push failed (unexpected response):'), ret)
404 _('push failed (unexpected response):'), ret)
406
405
407 for l in output.splitlines(True):
406 for l in output.splitlines(True):
408 self.ui.status(_('remote: '), l)
407 self.ui.status(_('remote: '), l)
409 else:
408 else:
410 # bundle2 push. Send a stream, fetch a stream.
409 # bundle2 push. Send a stream, fetch a stream.
411 stream = self._calltwowaystream('unbundle', cg, heads=heads)
410 stream = self._calltwowaystream('unbundle', cg, heads=heads)
412 ret = bundle2.getunbundler(self.ui, stream)
411 ret = bundle2.getunbundler(self.ui, stream)
413 return ret
412 return ret
414
413
415 def debugwireargs(self, one, two, three=None, four=None, five=None):
414 def debugwireargs(self, one, two, three=None, four=None, five=None):
416 # don't pass optional arguments left at their default value
415 # don't pass optional arguments left at their default value
417 opts = {}
416 opts = {}
418 if three is not None:
417 if three is not None:
419 opts['three'] = three
418 opts['three'] = three
420 if four is not None:
419 if four is not None:
421 opts['four'] = four
420 opts['four'] = four
422 return self._call('debugwireargs', one=one, two=two, **opts)
421 return self._call('debugwireargs', one=one, two=two, **opts)
423
422
424 def _call(self, cmd, **args):
423 def _call(self, cmd, **args):
425 """execute <cmd> on the server
424 """execute <cmd> on the server
426
425
427 The command is expected to return a simple string.
426 The command is expected to return a simple string.
428
427
429 returns the server reply as a string."""
428 returns the server reply as a string."""
430 raise NotImplementedError()
429 raise NotImplementedError()
431
430
432 def _callstream(self, cmd, **args):
431 def _callstream(self, cmd, **args):
433 """execute <cmd> on the server
432 """execute <cmd> on the server
434
433
435 The command is expected to return a stream.
434 The command is expected to return a stream.
436
435
437 returns the server reply as a file like object."""
436 returns the server reply as a file like object."""
438 raise NotImplementedError()
437 raise NotImplementedError()
439
438
440 def _callcompressable(self, cmd, **args):
439 def _callcompressable(self, cmd, **args):
441 """execute <cmd> on the server
440 """execute <cmd> on the server
442
441
443 The command is expected to return a stream.
442 The command is expected to return a stream.
444
443
445 The stream may have been compressed in some implementations. This
444 The stream may have been compressed in some implementations. This
446 function takes care of the decompression. This is the only difference
445 function takes care of the decompression. This is the only difference
447 with _callstream.
446 with _callstream.
448
447
449 returns the server reply as a file like object.
448 returns the server reply as a file like object.
450 """
449 """
451 raise NotImplementedError()
450 raise NotImplementedError()
452
451
453 def _callpush(self, cmd, fp, **args):
452 def _callpush(self, cmd, fp, **args):
454 """execute a <cmd> on server
453 """execute a <cmd> on server
455
454
456 The command is expected to be related to a push. Push has a special
455 The command is expected to be related to a push. Push has a special
457 return method.
456 return method.
458
457
459 returns the server reply as a (ret, output) tuple. ret is either
458 returns the server reply as a (ret, output) tuple. ret is either
460 empty (error) or a stringified int.
459 empty (error) or a stringified int.
461 """
460 """
462 raise NotImplementedError()
461 raise NotImplementedError()
463
462
464 def _calltwowaystream(self, cmd, fp, **args):
463 def _calltwowaystream(self, cmd, fp, **args):
465 """execute <cmd> on server
464 """execute <cmd> on server
466
465
467 The command will send a stream to the server and get a stream in reply.
466 The command will send a stream to the server and get a stream in reply.
468 """
467 """
469 raise NotImplementedError()
468 raise NotImplementedError()
470
469
471 def _abort(self, exception):
470 def _abort(self, exception):
472 """clearly abort the wire protocol connection and raise the exception
471 """clearly abort the wire protocol connection and raise the exception
473 """
472 """
474 raise NotImplementedError()
473 raise NotImplementedError()
475
474
476 # server side
475 # server side
477
476
478 # wire protocol command can either return a string or one of these classes.
477 # wire protocol command can either return a string or one of these classes.
479 class streamres(object):
478 class streamres(object):
480 """wireproto reply: binary stream
479 """wireproto reply: binary stream
481
480
482 The call was successful and the result is a stream.
481 The call was successful and the result is a stream.
483 Iterate on the `self.gen` attribute to retrieve chunks.
482 Iterate on the `self.gen` attribute to retrieve chunks.
484 """
483 """
485 def __init__(self, gen):
484 def __init__(self, gen):
486 self.gen = gen
485 self.gen = gen
487
486
488 class pushres(object):
487 class pushres(object):
489 """wireproto reply: success with simple integer return
488 """wireproto reply: success with simple integer return
490
489
491 The call was successful and returned an integer contained in `self.res`.
490 The call was successful and returned an integer contained in `self.res`.
492 """
491 """
493 def __init__(self, res):
492 def __init__(self, res):
494 self.res = res
493 self.res = res
495
494
496 class pusherr(object):
495 class pusherr(object):
497 """wireproto reply: failure
496 """wireproto reply: failure
498
497
499 The call failed. The `self.res` attribute contains the error message.
498 The call failed. The `self.res` attribute contains the error message.
500 """
499 """
501 def __init__(self, res):
500 def __init__(self, res):
502 self.res = res
501 self.res = res
503
502
504 class ooberror(object):
503 class ooberror(object):
505 """wireproto reply: failure of a batch of operation
504 """wireproto reply: failure of a batch of operation
506
505
507 Something failed during a batch call. The error message is stored in
506 Something failed during a batch call. The error message is stored in
508 `self.message`.
507 `self.message`.
509 """
508 """
510 def __init__(self, message):
509 def __init__(self, message):
511 self.message = message
510 self.message = message
512
511
513 def dispatch(repo, proto, command):
512 def dispatch(repo, proto, command):
514 repo = repo.filtered("served")
513 repo = repo.filtered("served")
515 func, spec = commands[command]
514 func, spec = commands[command]
516 args = proto.getargs(spec)
515 args = proto.getargs(spec)
517 return func(repo, proto, *args)
516 return func(repo, proto, *args)
518
517
519 def options(cmd, keys, others):
518 def options(cmd, keys, others):
520 opts = {}
519 opts = {}
521 for k in keys:
520 for k in keys:
522 if k in others:
521 if k in others:
523 opts[k] = others[k]
522 opts[k] = others[k]
524 del others[k]
523 del others[k]
525 if others:
524 if others:
526 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
525 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
527 % (cmd, ",".join(others)))
526 % (cmd, ",".join(others)))
528 return opts
527 return opts
529
528
530 # list of commands
529 # list of commands
531 commands = {}
530 commands = {}
532
531
533 def wireprotocommand(name, args=''):
532 def wireprotocommand(name, args=''):
534 """decorator for wire protocol command"""
533 """decorator for wire protocol command"""
535 def register(func):
534 def register(func):
536 commands[name] = (func, args)
535 commands[name] = (func, args)
537 return func
536 return func
538 return register
537 return register
539
538
540 @wireprotocommand('batch', 'cmds *')
539 @wireprotocommand('batch', 'cmds *')
541 def batch(repo, proto, cmds, others):
540 def batch(repo, proto, cmds, others):
542 repo = repo.filtered("served")
541 repo = repo.filtered("served")
543 res = []
542 res = []
544 for pair in cmds.split(';'):
543 for pair in cmds.split(';'):
545 op, args = pair.split(' ', 1)
544 op, args = pair.split(' ', 1)
546 vals = {}
545 vals = {}
547 for a in args.split(','):
546 for a in args.split(','):
548 if a:
547 if a:
549 n, v = a.split('=')
548 n, v = a.split('=')
550 vals[n] = unescapearg(v)
549 vals[n] = unescapearg(v)
551 func, spec = commands[op]
550 func, spec = commands[op]
552 if spec:
551 if spec:
553 keys = spec.split()
552 keys = spec.split()
554 data = {}
553 data = {}
555 for k in keys:
554 for k in keys:
556 if k == '*':
555 if k == '*':
557 star = {}
556 star = {}
558 for key in vals.keys():
557 for key in vals.keys():
559 if key not in keys:
558 if key not in keys:
560 star[key] = vals[key]
559 star[key] = vals[key]
561 data['*'] = star
560 data['*'] = star
562 else:
561 else:
563 data[k] = vals[k]
562 data[k] = vals[k]
564 result = func(repo, proto, *[data[k] for k in keys])
563 result = func(repo, proto, *[data[k] for k in keys])
565 else:
564 else:
566 result = func(repo, proto)
565 result = func(repo, proto)
567 if isinstance(result, ooberror):
566 if isinstance(result, ooberror):
568 return result
567 return result
569 res.append(escapearg(result))
568 res.append(escapearg(result))
570 return ';'.join(res)
569 return ';'.join(res)
571
570
572 @wireprotocommand('between', 'pairs')
571 @wireprotocommand('between', 'pairs')
573 def between(repo, proto, pairs):
572 def between(repo, proto, pairs):
574 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
573 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
575 r = []
574 r = []
576 for b in repo.between(pairs):
575 for b in repo.between(pairs):
577 r.append(encodelist(b) + "\n")
576 r.append(encodelist(b) + "\n")
578 return "".join(r)
577 return "".join(r)
579
578
580 @wireprotocommand('branchmap')
579 @wireprotocommand('branchmap')
581 def branchmap(repo, proto):
580 def branchmap(repo, proto):
582 branchmap = repo.branchmap()
581 branchmap = repo.branchmap()
583 heads = []
582 heads = []
584 for branch, nodes in branchmap.iteritems():
583 for branch, nodes in branchmap.iteritems():
585 branchname = urllib.quote(encoding.fromlocal(branch))
584 branchname = urllib.quote(encoding.fromlocal(branch))
586 branchnodes = encodelist(nodes)
585 branchnodes = encodelist(nodes)
587 heads.append('%s %s' % (branchname, branchnodes))
586 heads.append('%s %s' % (branchname, branchnodes))
588 return '\n'.join(heads)
587 return '\n'.join(heads)
589
588
590 @wireprotocommand('branches', 'nodes')
589 @wireprotocommand('branches', 'nodes')
591 def branches(repo, proto, nodes):
590 def branches(repo, proto, nodes):
592 nodes = decodelist(nodes)
591 nodes = decodelist(nodes)
593 r = []
592 r = []
594 for b in repo.branches(nodes):
593 for b in repo.branches(nodes):
595 r.append(encodelist(b) + "\n")
594 r.append(encodelist(b) + "\n")
596 return "".join(r)
595 return "".join(r)
597
596
598
597
599 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
598 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
600 'known', 'getbundle', 'unbundlehash', 'batch']
599 'known', 'getbundle', 'unbundlehash', 'batch']
601
600
602 def _capabilities(repo, proto):
601 def _capabilities(repo, proto):
603 """return a list of capabilities for a repo
602 """return a list of capabilities for a repo
604
603
605 This function exists to allow extensions to easily wrap capabilities
604 This function exists to allow extensions to easily wrap capabilities
606 computation
605 computation
607
606
608 - returns a lists: easy to alter
607 - returns a lists: easy to alter
609 - change done here will be propagated to both `capabilities` and `hello`
608 - change done here will be propagated to both `capabilities` and `hello`
610 command without any other action needed.
609 command without any other action needed.
611 """
610 """
612 # copy to prevent modification of the global list
611 # copy to prevent modification of the global list
613 caps = list(wireprotocaps)
612 caps = list(wireprotocaps)
614 if _allowstream(repo.ui):
613 if _allowstream(repo.ui):
615 if repo.ui.configbool('server', 'preferuncompressed', False):
614 if repo.ui.configbool('server', 'preferuncompressed', False):
616 caps.append('stream-preferred')
615 caps.append('stream-preferred')
617 requiredformats = repo.requirements & repo.supportedformats
616 requiredformats = repo.requirements & repo.supportedformats
618 # if our local revlogs are just revlogv1, add 'stream' cap
617 # if our local revlogs are just revlogv1, add 'stream' cap
619 if not requiredformats - set(('revlogv1',)):
618 if not requiredformats - set(('revlogv1',)):
620 caps.append('stream')
619 caps.append('stream')
621 # otherwise, add 'streamreqs' detailing our local revlog format
620 # otherwise, add 'streamreqs' detailing our local revlog format
622 else:
621 else:
623 caps.append('streamreqs=%s' % ','.join(requiredformats))
622 caps.append('streamreqs=%s' % ','.join(requiredformats))
624 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
623 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
625 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
624 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
626 caps.append('bundle2=' + urllib.quote(capsblob))
625 caps.append('bundle2=' + urllib.quote(capsblob))
627 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
626 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
628 caps.append(
627 caps.append(
629 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
628 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
630 return caps
629 return caps
631
630
632 # If you are writing an extension and consider wrapping this function. Wrap
631 # If you are writing an extension and consider wrapping this function. Wrap
633 # `_capabilities` instead.
632 # `_capabilities` instead.
634 @wireprotocommand('capabilities')
633 @wireprotocommand('capabilities')
635 def capabilities(repo, proto):
634 def capabilities(repo, proto):
636 return ' '.join(_capabilities(repo, proto))
635 return ' '.join(_capabilities(repo, proto))
637
636
638 @wireprotocommand('changegroup', 'roots')
637 @wireprotocommand('changegroup', 'roots')
639 def changegroup(repo, proto, roots):
638 def changegroup(repo, proto, roots):
640 nodes = decodelist(roots)
639 nodes = decodelist(roots)
641 cg = changegroupmod.changegroup(repo, nodes, 'serve')
640 cg = changegroupmod.changegroup(repo, nodes, 'serve')
642 return streamres(proto.groupchunks(cg))
641 return streamres(proto.groupchunks(cg))
643
642
644 @wireprotocommand('changegroupsubset', 'bases heads')
643 @wireprotocommand('changegroupsubset', 'bases heads')
645 def changegroupsubset(repo, proto, bases, heads):
644 def changegroupsubset(repo, proto, bases, heads):
646 bases = decodelist(bases)
645 bases = decodelist(bases)
647 heads = decodelist(heads)
646 heads = decodelist(heads)
648 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
647 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
649 return streamres(proto.groupchunks(cg))
648 return streamres(proto.groupchunks(cg))
650
649
651 @wireprotocommand('debugwireargs', 'one two *')
650 @wireprotocommand('debugwireargs', 'one two *')
652 def debugwireargs(repo, proto, one, two, others):
651 def debugwireargs(repo, proto, one, two, others):
653 # only accept optional args from the known set
652 # only accept optional args from the known set
654 opts = options('debugwireargs', ['three', 'four'], others)
653 opts = options('debugwireargs', ['three', 'four'], others)
655 return repo.debugwireargs(one, two, **opts)
654 return repo.debugwireargs(one, two, **opts)
656
655
657 # List of options accepted by getbundle.
656 # List of options accepted by getbundle.
658 #
657 #
659 # Meant to be extended by extensions. It is the extension's responsibility to
658 # Meant to be extended by extensions. It is the extension's responsibility to
660 # ensure such options are properly processed in exchange.getbundle.
659 # ensure such options are properly processed in exchange.getbundle.
661 gboptslist = ['heads', 'common', 'bundlecaps']
660 gboptslist = ['heads', 'common', 'bundlecaps']
662
661
663 @wireprotocommand('getbundle', '*')
662 @wireprotocommand('getbundle', '*')
664 def getbundle(repo, proto, others):
663 def getbundle(repo, proto, others):
665 opts = options('getbundle', gboptsmap.keys(), others)
664 opts = options('getbundle', gboptsmap.keys(), others)
666 for k, v in opts.iteritems():
665 for k, v in opts.iteritems():
667 keytype = gboptsmap[k]
666 keytype = gboptsmap[k]
668 if keytype == 'nodes':
667 if keytype == 'nodes':
669 opts[k] = decodelist(v)
668 opts[k] = decodelist(v)
670 elif keytype == 'csv':
669 elif keytype == 'csv':
671 opts[k] = list(v.split(','))
670 opts[k] = list(v.split(','))
672 elif keytype == 'scsv':
671 elif keytype == 'scsv':
673 opts[k] = set(v.split(','))
672 opts[k] = set(v.split(','))
674 elif keytype == 'boolean':
673 elif keytype == 'boolean':
675 opts[k] = bool(v)
674 opts[k] = bool(v)
676 elif keytype != 'plain':
675 elif keytype != 'plain':
677 raise KeyError('unknown getbundle option type %s'
676 raise KeyError('unknown getbundle option type %s'
678 % keytype)
677 % keytype)
679 cg = exchange.getbundle(repo, 'serve', **opts)
678 cg = exchange.getbundle(repo, 'serve', **opts)
680 return streamres(proto.groupchunks(cg))
679 return streamres(proto.groupchunks(cg))
681
680
682 @wireprotocommand('heads')
681 @wireprotocommand('heads')
683 def heads(repo, proto):
682 def heads(repo, proto):
684 h = repo.heads()
683 h = repo.heads()
685 return encodelist(h) + "\n"
684 return encodelist(h) + "\n"
686
685
687 @wireprotocommand('hello')
686 @wireprotocommand('hello')
688 def hello(repo, proto):
687 def hello(repo, proto):
689 '''the hello command returns a set of lines describing various
688 '''the hello command returns a set of lines describing various
690 interesting things about the server, in an RFC822-like format.
689 interesting things about the server, in an RFC822-like format.
691 Currently the only one defined is "capabilities", which
690 Currently the only one defined is "capabilities", which
692 consists of a line in the form:
691 consists of a line in the form:
693
692
694 capabilities: space separated list of tokens
693 capabilities: space separated list of tokens
695 '''
694 '''
696 return "capabilities: %s\n" % (capabilities(repo, proto))
695 return "capabilities: %s\n" % (capabilities(repo, proto))
697
696
698 @wireprotocommand('listkeys', 'namespace')
697 @wireprotocommand('listkeys', 'namespace')
699 def listkeys(repo, proto, namespace):
698 def listkeys(repo, proto, namespace):
700 d = repo.listkeys(encoding.tolocal(namespace)).items()
699 d = repo.listkeys(encoding.tolocal(namespace)).items()
701 return pushkeymod.encodekeys(d)
700 return pushkeymod.encodekeys(d)
702
701
703 @wireprotocommand('lookup', 'key')
702 @wireprotocommand('lookup', 'key')
704 def lookup(repo, proto, key):
703 def lookup(repo, proto, key):
705 try:
704 try:
706 k = encoding.tolocal(key)
705 k = encoding.tolocal(key)
707 c = repo[k]
706 c = repo[k]
708 r = c.hex()
707 r = c.hex()
709 success = 1
708 success = 1
710 except Exception as inst:
709 except Exception as inst:
711 r = str(inst)
710 r = str(inst)
712 success = 0
711 success = 0
713 return "%s %s\n" % (success, r)
712 return "%s %s\n" % (success, r)
714
713
715 @wireprotocommand('known', 'nodes *')
714 @wireprotocommand('known', 'nodes *')
716 def known(repo, proto, nodes, others):
715 def known(repo, proto, nodes, others):
717 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
716 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
718
717
719 @wireprotocommand('pushkey', 'namespace key old new')
718 @wireprotocommand('pushkey', 'namespace key old new')
720 def pushkey(repo, proto, namespace, key, old, new):
719 def pushkey(repo, proto, namespace, key, old, new):
721 # compatibility with pre-1.8 clients which were accidentally
720 # compatibility with pre-1.8 clients which were accidentally
722 # sending raw binary nodes rather than utf-8-encoded hex
721 # sending raw binary nodes rather than utf-8-encoded hex
723 if len(new) == 20 and new.encode('string-escape') != new:
722 if len(new) == 20 and new.encode('string-escape') != new:
724 # looks like it could be a binary node
723 # looks like it could be a binary node
725 try:
724 try:
726 new.decode('utf-8')
725 new.decode('utf-8')
727 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
726 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
728 except UnicodeDecodeError:
727 except UnicodeDecodeError:
729 pass # binary, leave unmodified
728 pass # binary, leave unmodified
730 else:
729 else:
731 new = encoding.tolocal(new) # normal path
730 new = encoding.tolocal(new) # normal path
732
731
733 if util.safehasattr(proto, 'restore'):
732 if util.safehasattr(proto, 'restore'):
734
733
735 proto.redirect()
734 proto.redirect()
736
735
737 try:
736 try:
738 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
737 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
739 encoding.tolocal(old), new) or False
738 encoding.tolocal(old), new) or False
740 except util.Abort:
739 except util.Abort:
741 r = False
740 r = False
742
741
743 output = proto.restore()
742 output = proto.restore()
744
743
745 return '%s\n%s' % (int(r), output)
744 return '%s\n%s' % (int(r), output)
746
745
747 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
746 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
748 encoding.tolocal(old), new)
747 encoding.tolocal(old), new)
749 return '%s\n' % int(r)
748 return '%s\n' % int(r)
750
749
751 def _allowstream(ui):
750 def _allowstream(ui):
752 return ui.configbool('server', 'uncompressed', True, untrusted=True)
751 return ui.configbool('server', 'uncompressed', True, untrusted=True)
753
752
754 @wireprotocommand('stream_out')
753 @wireprotocommand('stream_out')
755 def stream(repo, proto):
754 def stream(repo, proto):
756 '''If the server supports streaming clone, it advertises the "stream"
755 '''If the server supports streaming clone, it advertises the "stream"
757 capability with a value representing the version and flags of the repo
756 capability with a value representing the version and flags of the repo
758 it is serving. Client checks to see if it understands the format.
757 it is serving. Client checks to see if it understands the format.
759 '''
758 '''
760 if not _allowstream(repo.ui):
759 if not _allowstream(repo.ui):
761 return '1\n'
760 return '1\n'
762
761
763 def getstream(it):
762 def getstream(it):
764 yield '0\n'
763 yield '0\n'
765 for chunk in it:
764 for chunk in it:
766 yield chunk
765 yield chunk
767
766
768 try:
767 try:
769 # LockError may be raised before the first result is yielded. Don't
768 # LockError may be raised before the first result is yielded. Don't
770 # emit output until we're sure we got the lock successfully.
769 # emit output until we're sure we got the lock successfully.
771 it = exchange.generatestreamclone(repo)
770 it = exchange.generatestreamclone(repo)
772 return streamres(getstream(it))
771 return streamres(getstream(it))
773 except error.LockError:
772 except error.LockError:
774 return '2\n'
773 return '2\n'
775
774
776 @wireprotocommand('unbundle', 'heads')
775 @wireprotocommand('unbundle', 'heads')
777 def unbundle(repo, proto, heads):
776 def unbundle(repo, proto, heads):
778 their_heads = decodelist(heads)
777 their_heads = decodelist(heads)
779
778
780 try:
779 try:
781 proto.redirect()
780 proto.redirect()
782
781
783 exchange.check_heads(repo, their_heads, 'preparing changes')
782 exchange.check_heads(repo, their_heads, 'preparing changes')
784
783
785 # write bundle data to temporary file because it can be big
784 # write bundle data to temporary file because it can be big
786 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
785 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
787 fp = os.fdopen(fd, 'wb+')
786 fp = os.fdopen(fd, 'wb+')
788 r = 0
787 r = 0
789 try:
788 try:
790 proto.getfile(fp)
789 proto.getfile(fp)
791 fp.seek(0)
790 fp.seek(0)
792 gen = exchange.readbundle(repo.ui, fp, None)
791 gen = exchange.readbundle(repo.ui, fp, None)
793 r = exchange.unbundle(repo, gen, their_heads, 'serve',
792 r = exchange.unbundle(repo, gen, their_heads, 'serve',
794 proto._client())
793 proto._client())
795 if util.safehasattr(r, 'addpart'):
794 if util.safehasattr(r, 'addpart'):
796 # The return looks streamable, we are in the bundle2 case and
795 # The return looks streamable, we are in the bundle2 case and
797 # should return a stream.
796 # should return a stream.
798 return streamres(r.getchunks())
797 return streamres(r.getchunks())
799 return pushres(r)
798 return pushres(r)
800
799
801 finally:
800 finally:
802 fp.close()
801 fp.close()
803 os.unlink(tempname)
802 os.unlink(tempname)
804
803
805 except (error.BundleValueError, util.Abort, error.PushRaced) as exc:
804 except (error.BundleValueError, util.Abort, error.PushRaced) as exc:
806 # handle non-bundle2 case first
805 # handle non-bundle2 case first
807 if not getattr(exc, 'duringunbundle2', False):
806 if not getattr(exc, 'duringunbundle2', False):
808 try:
807 try:
809 raise
808 raise
810 except util.Abort:
809 except util.Abort:
811 # The old code we moved used sys.stderr directly.
810 # The old code we moved used sys.stderr directly.
812 # We did not change it to minimise code change.
811 # We did not change it to minimise code change.
813 # This need to be moved to something proper.
812 # This need to be moved to something proper.
814 # Feel free to do it.
813 # Feel free to do it.
815 sys.stderr.write("abort: %s\n" % exc)
814 sys.stderr.write("abort: %s\n" % exc)
816 return pushres(0)
815 return pushres(0)
817 except error.PushRaced:
816 except error.PushRaced:
818 return pusherr(str(exc))
817 return pusherr(str(exc))
819
818
820 bundler = bundle2.bundle20(repo.ui)
819 bundler = bundle2.bundle20(repo.ui)
821 for out in getattr(exc, '_bundle2salvagedoutput', ()):
820 for out in getattr(exc, '_bundle2salvagedoutput', ()):
822 bundler.addpart(out)
821 bundler.addpart(out)
823 try:
822 try:
824 try:
823 try:
825 raise
824 raise
826 except error.PushkeyFailed as exc:
825 except error.PushkeyFailed as exc:
827 # check client caps
826 # check client caps
828 remotecaps = getattr(exc, '_replycaps', None)
827 remotecaps = getattr(exc, '_replycaps', None)
829 if (remotecaps is not None
828 if (remotecaps is not None
830 and 'pushkey' not in remotecaps.get('error', ())):
829 and 'pushkey' not in remotecaps.get('error', ())):
831 # no support remote side, fallback to Abort handler.
830 # no support remote side, fallback to Abort handler.
832 raise
831 raise
833 part = bundler.newpart('error:pushkey')
832 part = bundler.newpart('error:pushkey')
834 part.addparam('in-reply-to', exc.partid)
833 part.addparam('in-reply-to', exc.partid)
835 if exc.namespace is not None:
834 if exc.namespace is not None:
836 part.addparam('namespace', exc.namespace, mandatory=False)
835 part.addparam('namespace', exc.namespace, mandatory=False)
837 if exc.key is not None:
836 if exc.key is not None:
838 part.addparam('key', exc.key, mandatory=False)
837 part.addparam('key', exc.key, mandatory=False)
839 if exc.new is not None:
838 if exc.new is not None:
840 part.addparam('new', exc.new, mandatory=False)
839 part.addparam('new', exc.new, mandatory=False)
841 if exc.old is not None:
840 if exc.old is not None:
842 part.addparam('old', exc.old, mandatory=False)
841 part.addparam('old', exc.old, mandatory=False)
843 if exc.ret is not None:
842 if exc.ret is not None:
844 part.addparam('ret', exc.ret, mandatory=False)
843 part.addparam('ret', exc.ret, mandatory=False)
845 except error.BundleValueError as exc:
844 except error.BundleValueError as exc:
846 errpart = bundler.newpart('error:unsupportedcontent')
845 errpart = bundler.newpart('error:unsupportedcontent')
847 if exc.parttype is not None:
846 if exc.parttype is not None:
848 errpart.addparam('parttype', exc.parttype)
847 errpart.addparam('parttype', exc.parttype)
849 if exc.params:
848 if exc.params:
850 errpart.addparam('params', '\0'.join(exc.params))
849 errpart.addparam('params', '\0'.join(exc.params))
851 except util.Abort as exc:
850 except util.Abort as exc:
852 manargs = [('message', str(exc))]
851 manargs = [('message', str(exc))]
853 advargs = []
852 advargs = []
854 if exc.hint is not None:
853 if exc.hint is not None:
855 advargs.append(('hint', exc.hint))
854 advargs.append(('hint', exc.hint))
856 bundler.addpart(bundle2.bundlepart('error:abort',
855 bundler.addpart(bundle2.bundlepart('error:abort',
857 manargs, advargs))
856 manargs, advargs))
858 except error.PushRaced as exc:
857 except error.PushRaced as exc:
859 bundler.newpart('error:pushraced', [('message', str(exc))])
858 bundler.newpart('error:pushraced', [('message', str(exc))])
860 return streamres(bundler.getchunks())
859 return streamres(bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now