##// END OF EJS Templates
wireproto: use decorator for the ubundle command
Pierre-Yves David -
r20923:d771641b default
parent child Browse files
Show More
@@ -1,801 +1,798 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import urllib, tempfile, os, sys
8 import urllib, tempfile, os, sys
9 from i18n import _
9 from i18n import _
10 from node import bin, hex
10 from node import bin, hex
11 import changegroup as changegroupmod
11 import changegroup as changegroupmod
12 import peer, error, encoding, util, store
12 import peer, error, encoding, util, store
13
13
14
14
15 class abstractserverproto(object):
15 class abstractserverproto(object):
16 """abstract class that summarizes the protocol API
16 """abstract class that summarizes the protocol API
17
17
18 Used as reference and documentation.
18 Used as reference and documentation.
19 """
19 """
20
20
21 def getargs(self, args):
21 def getargs(self, args):
22 """return the value for arguments in <args>
22 """return the value for arguments in <args>
23
23
24 returns a list of values (same order as <args>)"""
24 returns a list of values (same order as <args>)"""
25 raise NotImplementedError()
25 raise NotImplementedError()
26
26
27 def getfile(self, fp):
27 def getfile(self, fp):
28 """write the whole content of a file into a file like object
28 """write the whole content of a file into a file like object
29
29
30 The file is in the form::
30 The file is in the form::
31
31
32 (<chunk-size>\n<chunk>)+0\n
32 (<chunk-size>\n<chunk>)+0\n
33
33
34 chunk size is the ascii version of the int.
34 chunk size is the ascii version of the int.
35 """
35 """
36 raise NotImplementedError()
36 raise NotImplementedError()
37
37
38 def redirect(self):
38 def redirect(self):
39 """may setup interception for stdout and stderr
39 """may setup interception for stdout and stderr
40
40
41 See also the `restore` method."""
41 See also the `restore` method."""
42 raise NotImplementedError()
42 raise NotImplementedError()
43
43
44 # If the `redirect` function does install interception, the `restore`
44 # If the `redirect` function does install interception, the `restore`
45 # function MUST be defined. If interception is not used, this function
45 # function MUST be defined. If interception is not used, this function
46 # MUST NOT be defined.
46 # MUST NOT be defined.
47 #
47 #
48 # left commented here on purpose
48 # left commented here on purpose
49 #
49 #
50 #def restore(self):
50 #def restore(self):
51 # """reinstall previous stdout and stderr and return intercepted stdout
51 # """reinstall previous stdout and stderr and return intercepted stdout
52 # """
52 # """
53 # raise NotImplementedError()
53 # raise NotImplementedError()
54
54
55 def groupchunks(self, cg):
55 def groupchunks(self, cg):
56 """return 4096 chunks from a changegroup object
56 """return 4096 chunks from a changegroup object
57
57
58 Some protocols may have compressed the contents."""
58 Some protocols may have compressed the contents."""
59 raise NotImplementedError()
59 raise NotImplementedError()
60
60
61 # abstract batching support
61 # abstract batching support
62
62
63 class future(object):
63 class future(object):
64 '''placeholder for a value to be set later'''
64 '''placeholder for a value to be set later'''
65 def set(self, value):
65 def set(self, value):
66 if util.safehasattr(self, 'value'):
66 if util.safehasattr(self, 'value'):
67 raise error.RepoError("future is already set")
67 raise error.RepoError("future is already set")
68 self.value = value
68 self.value = value
69
69
70 class batcher(object):
70 class batcher(object):
71 '''base class for batches of commands submittable in a single request
71 '''base class for batches of commands submittable in a single request
72
72
73 All methods invoked on instances of this class are simply queued and
73 All methods invoked on instances of this class are simply queued and
74 return a a future for the result. Once you call submit(), all the queued
74 return a a future for the result. Once you call submit(), all the queued
75 calls are performed and the results set in their respective futures.
75 calls are performed and the results set in their respective futures.
76 '''
76 '''
77 def __init__(self):
77 def __init__(self):
78 self.calls = []
78 self.calls = []
79 def __getattr__(self, name):
79 def __getattr__(self, name):
80 def call(*args, **opts):
80 def call(*args, **opts):
81 resref = future()
81 resref = future()
82 self.calls.append((name, args, opts, resref,))
82 self.calls.append((name, args, opts, resref,))
83 return resref
83 return resref
84 return call
84 return call
85 def submit(self):
85 def submit(self):
86 pass
86 pass
87
87
88 class localbatch(batcher):
88 class localbatch(batcher):
89 '''performs the queued calls directly'''
89 '''performs the queued calls directly'''
90 def __init__(self, local):
90 def __init__(self, local):
91 batcher.__init__(self)
91 batcher.__init__(self)
92 self.local = local
92 self.local = local
93 def submit(self):
93 def submit(self):
94 for name, args, opts, resref in self.calls:
94 for name, args, opts, resref in self.calls:
95 resref.set(getattr(self.local, name)(*args, **opts))
95 resref.set(getattr(self.local, name)(*args, **opts))
96
96
97 class remotebatch(batcher):
97 class remotebatch(batcher):
98 '''batches the queued calls; uses as few roundtrips as possible'''
98 '''batches the queued calls; uses as few roundtrips as possible'''
99 def __init__(self, remote):
99 def __init__(self, remote):
100 '''remote must support _submitbatch(encbatch) and
100 '''remote must support _submitbatch(encbatch) and
101 _submitone(op, encargs)'''
101 _submitone(op, encargs)'''
102 batcher.__init__(self)
102 batcher.__init__(self)
103 self.remote = remote
103 self.remote = remote
104 def submit(self):
104 def submit(self):
105 req, rsp = [], []
105 req, rsp = [], []
106 for name, args, opts, resref in self.calls:
106 for name, args, opts, resref in self.calls:
107 mtd = getattr(self.remote, name)
107 mtd = getattr(self.remote, name)
108 batchablefn = getattr(mtd, 'batchable', None)
108 batchablefn = getattr(mtd, 'batchable', None)
109 if batchablefn is not None:
109 if batchablefn is not None:
110 batchable = batchablefn(mtd.im_self, *args, **opts)
110 batchable = batchablefn(mtd.im_self, *args, **opts)
111 encargsorres, encresref = batchable.next()
111 encargsorres, encresref = batchable.next()
112 if encresref:
112 if encresref:
113 req.append((name, encargsorres,))
113 req.append((name, encargsorres,))
114 rsp.append((batchable, encresref, resref,))
114 rsp.append((batchable, encresref, resref,))
115 else:
115 else:
116 resref.set(encargsorres)
116 resref.set(encargsorres)
117 else:
117 else:
118 if req:
118 if req:
119 self._submitreq(req, rsp)
119 self._submitreq(req, rsp)
120 req, rsp = [], []
120 req, rsp = [], []
121 resref.set(mtd(*args, **opts))
121 resref.set(mtd(*args, **opts))
122 if req:
122 if req:
123 self._submitreq(req, rsp)
123 self._submitreq(req, rsp)
124 def _submitreq(self, req, rsp):
124 def _submitreq(self, req, rsp):
125 encresults = self.remote._submitbatch(req)
125 encresults = self.remote._submitbatch(req)
126 for encres, r in zip(encresults, rsp):
126 for encres, r in zip(encresults, rsp):
127 batchable, encresref, resref = r
127 batchable, encresref, resref = r
128 encresref.set(encres)
128 encresref.set(encres)
129 resref.set(batchable.next())
129 resref.set(batchable.next())
130
130
131 def batchable(f):
131 def batchable(f):
132 '''annotation for batchable methods
132 '''annotation for batchable methods
133
133
134 Such methods must implement a coroutine as follows:
134 Such methods must implement a coroutine as follows:
135
135
136 @batchable
136 @batchable
137 def sample(self, one, two=None):
137 def sample(self, one, two=None):
138 # Handle locally computable results first:
138 # Handle locally computable results first:
139 if not one:
139 if not one:
140 yield "a local result", None
140 yield "a local result", None
141 # Build list of encoded arguments suitable for your wire protocol:
141 # Build list of encoded arguments suitable for your wire protocol:
142 encargs = [('one', encode(one),), ('two', encode(two),)]
142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 # Create future for injection of encoded result:
143 # Create future for injection of encoded result:
144 encresref = future()
144 encresref = future()
145 # Return encoded arguments and future:
145 # Return encoded arguments and future:
146 yield encargs, encresref
146 yield encargs, encresref
147 # Assuming the future to be filled with the result from the batched
147 # Assuming the future to be filled with the result from the batched
148 # request now. Decode it:
148 # request now. Decode it:
149 yield decode(encresref.value)
149 yield decode(encresref.value)
150
150
151 The decorator returns a function which wraps this coroutine as a plain
151 The decorator returns a function which wraps this coroutine as a plain
152 method, but adds the original method as an attribute called "batchable",
152 method, but adds the original method as an attribute called "batchable",
153 which is used by remotebatch to split the call into separate encoding and
153 which is used by remotebatch to split the call into separate encoding and
154 decoding phases.
154 decoding phases.
155 '''
155 '''
156 def plain(*args, **opts):
156 def plain(*args, **opts):
157 batchable = f(*args, **opts)
157 batchable = f(*args, **opts)
158 encargsorres, encresref = batchable.next()
158 encargsorres, encresref = batchable.next()
159 if not encresref:
159 if not encresref:
160 return encargsorres # a local result in this case
160 return encargsorres # a local result in this case
161 self = args[0]
161 self = args[0]
162 encresref.set(self._submitone(f.func_name, encargsorres))
162 encresref.set(self._submitone(f.func_name, encargsorres))
163 return batchable.next()
163 return batchable.next()
164 setattr(plain, 'batchable', f)
164 setattr(plain, 'batchable', f)
165 return plain
165 return plain
166
166
167 # list of nodes encoding / decoding
167 # list of nodes encoding / decoding
168
168
169 def decodelist(l, sep=' '):
169 def decodelist(l, sep=' '):
170 if l:
170 if l:
171 return map(bin, l.split(sep))
171 return map(bin, l.split(sep))
172 return []
172 return []
173
173
174 def encodelist(l, sep=' '):
174 def encodelist(l, sep=' '):
175 return sep.join(map(hex, l))
175 return sep.join(map(hex, l))
176
176
177 # batched call argument encoding
177 # batched call argument encoding
178
178
179 def escapearg(plain):
179 def escapearg(plain):
180 return (plain
180 return (plain
181 .replace(':', '::')
181 .replace(':', '::')
182 .replace(',', ':,')
182 .replace(',', ':,')
183 .replace(';', ':;')
183 .replace(';', ':;')
184 .replace('=', ':='))
184 .replace('=', ':='))
185
185
186 def unescapearg(escaped):
186 def unescapearg(escaped):
187 return (escaped
187 return (escaped
188 .replace(':=', '=')
188 .replace(':=', '=')
189 .replace(':;', ';')
189 .replace(':;', ';')
190 .replace(':,', ',')
190 .replace(':,', ',')
191 .replace('::', ':'))
191 .replace('::', ':'))
192
192
193 # client side
193 # client side
194
194
195 class wirepeer(peer.peerrepository):
195 class wirepeer(peer.peerrepository):
196
196
197 def batch(self):
197 def batch(self):
198 return remotebatch(self)
198 return remotebatch(self)
199 def _submitbatch(self, req):
199 def _submitbatch(self, req):
200 cmds = []
200 cmds = []
201 for op, argsdict in req:
201 for op, argsdict in req:
202 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
202 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
203 cmds.append('%s %s' % (op, args))
203 cmds.append('%s %s' % (op, args))
204 rsp = self._call("batch", cmds=';'.join(cmds))
204 rsp = self._call("batch", cmds=';'.join(cmds))
205 return rsp.split(';')
205 return rsp.split(';')
206 def _submitone(self, op, args):
206 def _submitone(self, op, args):
207 return self._call(op, **args)
207 return self._call(op, **args)
208
208
209 @batchable
209 @batchable
210 def lookup(self, key):
210 def lookup(self, key):
211 self.requirecap('lookup', _('look up remote revision'))
211 self.requirecap('lookup', _('look up remote revision'))
212 f = future()
212 f = future()
213 yield {'key': encoding.fromlocal(key)}, f
213 yield {'key': encoding.fromlocal(key)}, f
214 d = f.value
214 d = f.value
215 success, data = d[:-1].split(" ", 1)
215 success, data = d[:-1].split(" ", 1)
216 if int(success):
216 if int(success):
217 yield bin(data)
217 yield bin(data)
218 self._abort(error.RepoError(data))
218 self._abort(error.RepoError(data))
219
219
220 @batchable
220 @batchable
221 def heads(self):
221 def heads(self):
222 f = future()
222 f = future()
223 yield {}, f
223 yield {}, f
224 d = f.value
224 d = f.value
225 try:
225 try:
226 yield decodelist(d[:-1])
226 yield decodelist(d[:-1])
227 except ValueError:
227 except ValueError:
228 self._abort(error.ResponseError(_("unexpected response:"), d))
228 self._abort(error.ResponseError(_("unexpected response:"), d))
229
229
230 @batchable
230 @batchable
231 def known(self, nodes):
231 def known(self, nodes):
232 f = future()
232 f = future()
233 yield {'nodes': encodelist(nodes)}, f
233 yield {'nodes': encodelist(nodes)}, f
234 d = f.value
234 d = f.value
235 try:
235 try:
236 yield [bool(int(f)) for f in d]
236 yield [bool(int(f)) for f in d]
237 except ValueError:
237 except ValueError:
238 self._abort(error.ResponseError(_("unexpected response:"), d))
238 self._abort(error.ResponseError(_("unexpected response:"), d))
239
239
240 @batchable
240 @batchable
241 def branchmap(self):
241 def branchmap(self):
242 f = future()
242 f = future()
243 yield {}, f
243 yield {}, f
244 d = f.value
244 d = f.value
245 try:
245 try:
246 branchmap = {}
246 branchmap = {}
247 for branchpart in d.splitlines():
247 for branchpart in d.splitlines():
248 branchname, branchheads = branchpart.split(' ', 1)
248 branchname, branchheads = branchpart.split(' ', 1)
249 branchname = encoding.tolocal(urllib.unquote(branchname))
249 branchname = encoding.tolocal(urllib.unquote(branchname))
250 branchheads = decodelist(branchheads)
250 branchheads = decodelist(branchheads)
251 branchmap[branchname] = branchheads
251 branchmap[branchname] = branchheads
252 yield branchmap
252 yield branchmap
253 except TypeError:
253 except TypeError:
254 self._abort(error.ResponseError(_("unexpected response:"), d))
254 self._abort(error.ResponseError(_("unexpected response:"), d))
255
255
256 def branches(self, nodes):
256 def branches(self, nodes):
257 n = encodelist(nodes)
257 n = encodelist(nodes)
258 d = self._call("branches", nodes=n)
258 d = self._call("branches", nodes=n)
259 try:
259 try:
260 br = [tuple(decodelist(b)) for b in d.splitlines()]
260 br = [tuple(decodelist(b)) for b in d.splitlines()]
261 return br
261 return br
262 except ValueError:
262 except ValueError:
263 self._abort(error.ResponseError(_("unexpected response:"), d))
263 self._abort(error.ResponseError(_("unexpected response:"), d))
264
264
265 def between(self, pairs):
265 def between(self, pairs):
266 batch = 8 # avoid giant requests
266 batch = 8 # avoid giant requests
267 r = []
267 r = []
268 for i in xrange(0, len(pairs), batch):
268 for i in xrange(0, len(pairs), batch):
269 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
269 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
270 d = self._call("between", pairs=n)
270 d = self._call("between", pairs=n)
271 try:
271 try:
272 r.extend(l and decodelist(l) or [] for l in d.splitlines())
272 r.extend(l and decodelist(l) or [] for l in d.splitlines())
273 except ValueError:
273 except ValueError:
274 self._abort(error.ResponseError(_("unexpected response:"), d))
274 self._abort(error.ResponseError(_("unexpected response:"), d))
275 return r
275 return r
276
276
277 @batchable
277 @batchable
278 def pushkey(self, namespace, key, old, new):
278 def pushkey(self, namespace, key, old, new):
279 if not self.capable('pushkey'):
279 if not self.capable('pushkey'):
280 yield False, None
280 yield False, None
281 f = future()
281 f = future()
282 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
282 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
283 yield {'namespace': encoding.fromlocal(namespace),
283 yield {'namespace': encoding.fromlocal(namespace),
284 'key': encoding.fromlocal(key),
284 'key': encoding.fromlocal(key),
285 'old': encoding.fromlocal(old),
285 'old': encoding.fromlocal(old),
286 'new': encoding.fromlocal(new)}, f
286 'new': encoding.fromlocal(new)}, f
287 d = f.value
287 d = f.value
288 d, output = d.split('\n', 1)
288 d, output = d.split('\n', 1)
289 try:
289 try:
290 d = bool(int(d))
290 d = bool(int(d))
291 except ValueError:
291 except ValueError:
292 raise error.ResponseError(
292 raise error.ResponseError(
293 _('push failed (unexpected response):'), d)
293 _('push failed (unexpected response):'), d)
294 for l in output.splitlines(True):
294 for l in output.splitlines(True):
295 self.ui.status(_('remote: '), l)
295 self.ui.status(_('remote: '), l)
296 yield d
296 yield d
297
297
298 @batchable
298 @batchable
299 def listkeys(self, namespace):
299 def listkeys(self, namespace):
300 if not self.capable('pushkey'):
300 if not self.capable('pushkey'):
301 yield {}, None
301 yield {}, None
302 f = future()
302 f = future()
303 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
303 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
304 yield {'namespace': encoding.fromlocal(namespace)}, f
304 yield {'namespace': encoding.fromlocal(namespace)}, f
305 d = f.value
305 d = f.value
306 r = {}
306 r = {}
307 for l in d.splitlines():
307 for l in d.splitlines():
308 k, v = l.split('\t')
308 k, v = l.split('\t')
309 r[encoding.tolocal(k)] = encoding.tolocal(v)
309 r[encoding.tolocal(k)] = encoding.tolocal(v)
310 yield r
310 yield r
311
311
312 def stream_out(self):
312 def stream_out(self):
313 return self._callstream('stream_out')
313 return self._callstream('stream_out')
314
314
315 def changegroup(self, nodes, kind):
315 def changegroup(self, nodes, kind):
316 n = encodelist(nodes)
316 n = encodelist(nodes)
317 f = self._callcompressable("changegroup", roots=n)
317 f = self._callcompressable("changegroup", roots=n)
318 return changegroupmod.unbundle10(f, 'UN')
318 return changegroupmod.unbundle10(f, 'UN')
319
319
320 def changegroupsubset(self, bases, heads, kind):
320 def changegroupsubset(self, bases, heads, kind):
321 self.requirecap('changegroupsubset', _('look up remote changes'))
321 self.requirecap('changegroupsubset', _('look up remote changes'))
322 bases = encodelist(bases)
322 bases = encodelist(bases)
323 heads = encodelist(heads)
323 heads = encodelist(heads)
324 f = self._callcompressable("changegroupsubset",
324 f = self._callcompressable("changegroupsubset",
325 bases=bases, heads=heads)
325 bases=bases, heads=heads)
326 return changegroupmod.unbundle10(f, 'UN')
326 return changegroupmod.unbundle10(f, 'UN')
327
327
328 def getbundle(self, source, heads=None, common=None, bundlecaps=None):
328 def getbundle(self, source, heads=None, common=None, bundlecaps=None):
329 self.requirecap('getbundle', _('look up remote changes'))
329 self.requirecap('getbundle', _('look up remote changes'))
330 opts = {}
330 opts = {}
331 if heads is not None:
331 if heads is not None:
332 opts['heads'] = encodelist(heads)
332 opts['heads'] = encodelist(heads)
333 if common is not None:
333 if common is not None:
334 opts['common'] = encodelist(common)
334 opts['common'] = encodelist(common)
335 if bundlecaps is not None:
335 if bundlecaps is not None:
336 opts['bundlecaps'] = ','.join(bundlecaps)
336 opts['bundlecaps'] = ','.join(bundlecaps)
337 f = self._callcompressable("getbundle", **opts)
337 f = self._callcompressable("getbundle", **opts)
338 return changegroupmod.unbundle10(f, 'UN')
338 return changegroupmod.unbundle10(f, 'UN')
339
339
340 def unbundle(self, cg, heads, source):
340 def unbundle(self, cg, heads, source):
341 '''Send cg (a readable file-like object representing the
341 '''Send cg (a readable file-like object representing the
342 changegroup to push, typically a chunkbuffer object) to the
342 changegroup to push, typically a chunkbuffer object) to the
343 remote server as a bundle. Return an integer indicating the
343 remote server as a bundle. Return an integer indicating the
344 result of the push (see localrepository.addchangegroup()).'''
344 result of the push (see localrepository.addchangegroup()).'''
345
345
346 if heads != ['force'] and self.capable('unbundlehash'):
346 if heads != ['force'] and self.capable('unbundlehash'):
347 heads = encodelist(['hashed',
347 heads = encodelist(['hashed',
348 util.sha1(''.join(sorted(heads))).digest()])
348 util.sha1(''.join(sorted(heads))).digest()])
349 else:
349 else:
350 heads = encodelist(heads)
350 heads = encodelist(heads)
351
351
352 ret, output = self._callpush("unbundle", cg, heads=heads)
352 ret, output = self._callpush("unbundle", cg, heads=heads)
353 if ret == "":
353 if ret == "":
354 raise error.ResponseError(
354 raise error.ResponseError(
355 _('push failed:'), output)
355 _('push failed:'), output)
356 try:
356 try:
357 ret = int(ret)
357 ret = int(ret)
358 except ValueError:
358 except ValueError:
359 raise error.ResponseError(
359 raise error.ResponseError(
360 _('push failed (unexpected response):'), ret)
360 _('push failed (unexpected response):'), ret)
361
361
362 for l in output.splitlines(True):
362 for l in output.splitlines(True):
363 self.ui.status(_('remote: '), l)
363 self.ui.status(_('remote: '), l)
364 return ret
364 return ret
365
365
366 def debugwireargs(self, one, two, three=None, four=None, five=None):
366 def debugwireargs(self, one, two, three=None, four=None, five=None):
367 # don't pass optional arguments left at their default value
367 # don't pass optional arguments left at their default value
368 opts = {}
368 opts = {}
369 if three is not None:
369 if three is not None:
370 opts['three'] = three
370 opts['three'] = three
371 if four is not None:
371 if four is not None:
372 opts['four'] = four
372 opts['four'] = four
373 return self._call('debugwireargs', one=one, two=two, **opts)
373 return self._call('debugwireargs', one=one, two=two, **opts)
374
374
375 def _call(self, cmd, **args):
375 def _call(self, cmd, **args):
376 """execute <cmd> on the server
376 """execute <cmd> on the server
377
377
378 The command is expected to return a simple string.
378 The command is expected to return a simple string.
379
379
380 returns the server reply as a string."""
380 returns the server reply as a string."""
381 raise NotImplementedError()
381 raise NotImplementedError()
382
382
383 def _callstream(self, cmd, **args):
383 def _callstream(self, cmd, **args):
384 """execute <cmd> on the server
384 """execute <cmd> on the server
385
385
386 The command is expected to return a stream.
386 The command is expected to return a stream.
387
387
388 returns the server reply as a file like object."""
388 returns the server reply as a file like object."""
389 raise NotImplementedError()
389 raise NotImplementedError()
390
390
391 def _callcompressable(self, cmd, **args):
391 def _callcompressable(self, cmd, **args):
392 """execute <cmd> on the server
392 """execute <cmd> on the server
393
393
394 The command is expected to return a stream.
394 The command is expected to return a stream.
395
395
396 The stream may have been compressed in some implementaitons. This
396 The stream may have been compressed in some implementaitons. This
397 function takes care of the decompression. This is the only difference
397 function takes care of the decompression. This is the only difference
398 with _callstream.
398 with _callstream.
399
399
400 returns the server reply as a file like object.
400 returns the server reply as a file like object.
401 """
401 """
402 raise NotImplementedError()
402 raise NotImplementedError()
403
403
404 def _callpush(self, cmd, fp, **args):
404 def _callpush(self, cmd, fp, **args):
405 """execute a <cmd> on server
405 """execute a <cmd> on server
406
406
407 The command is expected to be related to a push. Push has a special
407 The command is expected to be related to a push. Push has a special
408 return method.
408 return method.
409
409
410 returns the server reply as a (ret, output) tuple. ret is either
410 returns the server reply as a (ret, output) tuple. ret is either
411 empty (error) or a stringified int.
411 empty (error) or a stringified int.
412 """
412 """
413 raise NotImplementedError()
413 raise NotImplementedError()
414
414
415 def _abort(self, exception):
415 def _abort(self, exception):
416 """clearly abort the wire protocol connection and raise the exception
416 """clearly abort the wire protocol connection and raise the exception
417 """
417 """
418 raise NotImplementedError()
418 raise NotImplementedError()
419
419
420 # server side
420 # server side
421
421
422 # wire protocol command can either return a string or one of these classes.
422 # wire protocol command can either return a string or one of these classes.
423 class streamres(object):
423 class streamres(object):
424 """wireproto reply: binary stream
424 """wireproto reply: binary stream
425
425
426 The call was successful and the result is a stream.
426 The call was successful and the result is a stream.
427 Iterate on the `self.gen` attribute to retrieve chunks.
427 Iterate on the `self.gen` attribute to retrieve chunks.
428 """
428 """
429 def __init__(self, gen):
429 def __init__(self, gen):
430 self.gen = gen
430 self.gen = gen
431
431
432 class pushres(object):
432 class pushres(object):
433 """wireproto reply: success with simple integer return
433 """wireproto reply: success with simple integer return
434
434
435 The call was successful and returned an integer contained in `self.res`.
435 The call was successful and returned an integer contained in `self.res`.
436 """
436 """
437 def __init__(self, res):
437 def __init__(self, res):
438 self.res = res
438 self.res = res
439
439
440 class pusherr(object):
440 class pusherr(object):
441 """wireproto reply: failure
441 """wireproto reply: failure
442
442
443 The call failed. The `self.res` attribute contains the error message.
443 The call failed. The `self.res` attribute contains the error message.
444 """
444 """
445 def __init__(self, res):
445 def __init__(self, res):
446 self.res = res
446 self.res = res
447
447
448 class ooberror(object):
448 class ooberror(object):
449 """wireproto reply: failure of a batch of operation
449 """wireproto reply: failure of a batch of operation
450
450
451 Something failed during a batch call. The error message is stored in
451 Something failed during a batch call. The error message is stored in
452 `self.message`.
452 `self.message`.
453 """
453 """
454 def __init__(self, message):
454 def __init__(self, message):
455 self.message = message
455 self.message = message
456
456
457 def dispatch(repo, proto, command):
457 def dispatch(repo, proto, command):
458 repo = repo.filtered("served")
458 repo = repo.filtered("served")
459 func, spec = commands[command]
459 func, spec = commands[command]
460 args = proto.getargs(spec)
460 args = proto.getargs(spec)
461 return func(repo, proto, *args)
461 return func(repo, proto, *args)
462
462
463 def options(cmd, keys, others):
463 def options(cmd, keys, others):
464 opts = {}
464 opts = {}
465 for k in keys:
465 for k in keys:
466 if k in others:
466 if k in others:
467 opts[k] = others[k]
467 opts[k] = others[k]
468 del others[k]
468 del others[k]
469 if others:
469 if others:
470 sys.stderr.write("abort: %s got unexpected arguments %s\n"
470 sys.stderr.write("abort: %s got unexpected arguments %s\n"
471 % (cmd, ",".join(others)))
471 % (cmd, ",".join(others)))
472 return opts
472 return opts
473
473
474 # list of commands
474 # list of commands
475 commands = {}
475 commands = {}
476
476
477 def wireprotocommand(name, args=''):
477 def wireprotocommand(name, args=''):
478 """decorator for wireprotocol command"""
478 """decorator for wireprotocol command"""
479 def register(func):
479 def register(func):
480 commands[name] = (func, args)
480 commands[name] = (func, args)
481 return func
481 return func
482 return register
482 return register
483
483
484 @wireprotocommand('batch', 'cmds *')
484 @wireprotocommand('batch', 'cmds *')
485 def batch(repo, proto, cmds, others):
485 def batch(repo, proto, cmds, others):
486 repo = repo.filtered("served")
486 repo = repo.filtered("served")
487 res = []
487 res = []
488 for pair in cmds.split(';'):
488 for pair in cmds.split(';'):
489 op, args = pair.split(' ', 1)
489 op, args = pair.split(' ', 1)
490 vals = {}
490 vals = {}
491 for a in args.split(','):
491 for a in args.split(','):
492 if a:
492 if a:
493 n, v = a.split('=')
493 n, v = a.split('=')
494 vals[n] = unescapearg(v)
494 vals[n] = unescapearg(v)
495 func, spec = commands[op]
495 func, spec = commands[op]
496 if spec:
496 if spec:
497 keys = spec.split()
497 keys = spec.split()
498 data = {}
498 data = {}
499 for k in keys:
499 for k in keys:
500 if k == '*':
500 if k == '*':
501 star = {}
501 star = {}
502 for key in vals.keys():
502 for key in vals.keys():
503 if key not in keys:
503 if key not in keys:
504 star[key] = vals[key]
504 star[key] = vals[key]
505 data['*'] = star
505 data['*'] = star
506 else:
506 else:
507 data[k] = vals[k]
507 data[k] = vals[k]
508 result = func(repo, proto, *[data[k] for k in keys])
508 result = func(repo, proto, *[data[k] for k in keys])
509 else:
509 else:
510 result = func(repo, proto)
510 result = func(repo, proto)
511 if isinstance(result, ooberror):
511 if isinstance(result, ooberror):
512 return result
512 return result
513 res.append(escapearg(result))
513 res.append(escapearg(result))
514 return ';'.join(res)
514 return ';'.join(res)
515
515
516 @wireprotocommand('between', 'pairs')
516 @wireprotocommand('between', 'pairs')
517 def between(repo, proto, pairs):
517 def between(repo, proto, pairs):
518 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
518 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
519 r = []
519 r = []
520 for b in repo.between(pairs):
520 for b in repo.between(pairs):
521 r.append(encodelist(b) + "\n")
521 r.append(encodelist(b) + "\n")
522 return "".join(r)
522 return "".join(r)
523
523
524 @wireprotocommand('branchmap')
524 @wireprotocommand('branchmap')
525 def branchmap(repo, proto):
525 def branchmap(repo, proto):
526 branchmap = repo.branchmap()
526 branchmap = repo.branchmap()
527 heads = []
527 heads = []
528 for branch, nodes in branchmap.iteritems():
528 for branch, nodes in branchmap.iteritems():
529 branchname = urllib.quote(encoding.fromlocal(branch))
529 branchname = urllib.quote(encoding.fromlocal(branch))
530 branchnodes = encodelist(nodes)
530 branchnodes = encodelist(nodes)
531 heads.append('%s %s' % (branchname, branchnodes))
531 heads.append('%s %s' % (branchname, branchnodes))
532 return '\n'.join(heads)
532 return '\n'.join(heads)
533
533
534 @wireprotocommand('branches', 'nodes')
534 @wireprotocommand('branches', 'nodes')
535 def branches(repo, proto, nodes):
535 def branches(repo, proto, nodes):
536 nodes = decodelist(nodes)
536 nodes = decodelist(nodes)
537 r = []
537 r = []
538 for b in repo.branches(nodes):
538 for b in repo.branches(nodes):
539 r.append(encodelist(b) + "\n")
539 r.append(encodelist(b) + "\n")
540 return "".join(r)
540 return "".join(r)
541
541
542
542
543 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
543 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
544 'known', 'getbundle', 'unbundlehash', 'batch']
544 'known', 'getbundle', 'unbundlehash', 'batch']
545
545
546 def _capabilities(repo, proto):
546 def _capabilities(repo, proto):
547 """return a list of capabilities for a repo
547 """return a list of capabilities for a repo
548
548
549 This function exists to allow extensions to easily wrap capabilities
549 This function exists to allow extensions to easily wrap capabilities
550 computation
550 computation
551
551
552 - returns a lists: easy to alter
552 - returns a lists: easy to alter
553 - change done here will be propagated to both `capabilities` and `hello`
553 - change done here will be propagated to both `capabilities` and `hello`
554 command without any other effort. without any other action needed.
554 command without any other effort. without any other action needed.
555 """
555 """
556 # copy to prevent modification of the global list
556 # copy to prevent modification of the global list
557 caps = list(wireprotocaps)
557 caps = list(wireprotocaps)
558 if _allowstream(repo.ui):
558 if _allowstream(repo.ui):
559 if repo.ui.configbool('server', 'preferuncompressed', False):
559 if repo.ui.configbool('server', 'preferuncompressed', False):
560 caps.append('stream-preferred')
560 caps.append('stream-preferred')
561 requiredformats = repo.requirements & repo.supportedformats
561 requiredformats = repo.requirements & repo.supportedformats
562 # if our local revlogs are just revlogv1, add 'stream' cap
562 # if our local revlogs are just revlogv1, add 'stream' cap
563 if not requiredformats - set(('revlogv1',)):
563 if not requiredformats - set(('revlogv1',)):
564 caps.append('stream')
564 caps.append('stream')
565 # otherwise, add 'streamreqs' detailing our local revlog format
565 # otherwise, add 'streamreqs' detailing our local revlog format
566 else:
566 else:
567 caps.append('streamreqs=%s' % ','.join(requiredformats))
567 caps.append('streamreqs=%s' % ','.join(requiredformats))
568 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
568 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
569 caps.append('httpheader=1024')
569 caps.append('httpheader=1024')
570 return caps
570 return caps
571
571
572 # If you are writting and extension and consider wrapping this function. Wrap
572 # If you are writting and extension and consider wrapping this function. Wrap
573 # `_capabilities` instead.
573 # `_capabilities` instead.
574 @wireprotocommand('capabilities')
574 @wireprotocommand('capabilities')
575 def capabilities(repo, proto):
575 def capabilities(repo, proto):
576 return ' '.join(_capabilities(repo, proto))
576 return ' '.join(_capabilities(repo, proto))
577
577
578 @wireprotocommand('changegroup', 'roots')
578 @wireprotocommand('changegroup', 'roots')
579 def changegroup(repo, proto, roots):
579 def changegroup(repo, proto, roots):
580 nodes = decodelist(roots)
580 nodes = decodelist(roots)
581 cg = repo.changegroup(nodes, 'serve')
581 cg = repo.changegroup(nodes, 'serve')
582 return streamres(proto.groupchunks(cg))
582 return streamres(proto.groupchunks(cg))
583
583
584 @wireprotocommand('changegroupsubset', 'bases heads')
584 @wireprotocommand('changegroupsubset', 'bases heads')
585 def changegroupsubset(repo, proto, bases, heads):
585 def changegroupsubset(repo, proto, bases, heads):
586 bases = decodelist(bases)
586 bases = decodelist(bases)
587 heads = decodelist(heads)
587 heads = decodelist(heads)
588 cg = repo.changegroupsubset(bases, heads, 'serve')
588 cg = repo.changegroupsubset(bases, heads, 'serve')
589 return streamres(proto.groupchunks(cg))
589 return streamres(proto.groupchunks(cg))
590
590
591 @wireprotocommand('debugwireargs', 'one two *')
591 @wireprotocommand('debugwireargs', 'one two *')
592 def debugwireargs(repo, proto, one, two, others):
592 def debugwireargs(repo, proto, one, two, others):
593 # only accept optional args from the known set
593 # only accept optional args from the known set
594 opts = options('debugwireargs', ['three', 'four'], others)
594 opts = options('debugwireargs', ['three', 'four'], others)
595 return repo.debugwireargs(one, two, **opts)
595 return repo.debugwireargs(one, two, **opts)
596
596
597 @wireprotocommand('getbundle', '*')
597 @wireprotocommand('getbundle', '*')
598 def getbundle(repo, proto, others):
598 def getbundle(repo, proto, others):
599 opts = options('getbundle', ['heads', 'common', 'bundlecaps'], others)
599 opts = options('getbundle', ['heads', 'common', 'bundlecaps'], others)
600 for k, v in opts.iteritems():
600 for k, v in opts.iteritems():
601 if k in ('heads', 'common'):
601 if k in ('heads', 'common'):
602 opts[k] = decodelist(v)
602 opts[k] = decodelist(v)
603 elif k == 'bundlecaps':
603 elif k == 'bundlecaps':
604 opts[k] = set(v.split(','))
604 opts[k] = set(v.split(','))
605 cg = repo.getbundle('serve', **opts)
605 cg = repo.getbundle('serve', **opts)
606 return streamres(proto.groupchunks(cg))
606 return streamres(proto.groupchunks(cg))
607
607
608 @wireprotocommand('heads')
608 @wireprotocommand('heads')
609 def heads(repo, proto):
609 def heads(repo, proto):
610 h = repo.heads()
610 h = repo.heads()
611 return encodelist(h) + "\n"
611 return encodelist(h) + "\n"
612
612
613 @wireprotocommand('hello')
613 @wireprotocommand('hello')
614 def hello(repo, proto):
614 def hello(repo, proto):
615 '''the hello command returns a set of lines describing various
615 '''the hello command returns a set of lines describing various
616 interesting things about the server, in an RFC822-like format.
616 interesting things about the server, in an RFC822-like format.
617 Currently the only one defined is "capabilities", which
617 Currently the only one defined is "capabilities", which
618 consists of a line in the form:
618 consists of a line in the form:
619
619
620 capabilities: space separated list of tokens
620 capabilities: space separated list of tokens
621 '''
621 '''
622 return "capabilities: %s\n" % (capabilities(repo, proto))
622 return "capabilities: %s\n" % (capabilities(repo, proto))
623
623
624 @wireprotocommand('listkeys', 'namespace')
624 @wireprotocommand('listkeys', 'namespace')
625 def listkeys(repo, proto, namespace):
625 def listkeys(repo, proto, namespace):
626 d = repo.listkeys(encoding.tolocal(namespace)).items()
626 d = repo.listkeys(encoding.tolocal(namespace)).items()
627 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
627 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
628 for k, v in d])
628 for k, v in d])
629 return t
629 return t
630
630
631 @wireprotocommand('lookup', 'key')
631 @wireprotocommand('lookup', 'key')
632 def lookup(repo, proto, key):
632 def lookup(repo, proto, key):
633 try:
633 try:
634 k = encoding.tolocal(key)
634 k = encoding.tolocal(key)
635 c = repo[k]
635 c = repo[k]
636 r = c.hex()
636 r = c.hex()
637 success = 1
637 success = 1
638 except Exception, inst:
638 except Exception, inst:
639 r = str(inst)
639 r = str(inst)
640 success = 0
640 success = 0
641 return "%s %s\n" % (success, r)
641 return "%s %s\n" % (success, r)
642
642
643 @wireprotocommand('known', 'nodes *')
643 @wireprotocommand('known', 'nodes *')
644 def known(repo, proto, nodes, others):
644 def known(repo, proto, nodes, others):
645 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
645 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
646
646
647 @wireprotocommand('pushkey', 'namespace key old new')
647 @wireprotocommand('pushkey', 'namespace key old new')
648 def pushkey(repo, proto, namespace, key, old, new):
648 def pushkey(repo, proto, namespace, key, old, new):
649 # compatibility with pre-1.8 clients which were accidentally
649 # compatibility with pre-1.8 clients which were accidentally
650 # sending raw binary nodes rather than utf-8-encoded hex
650 # sending raw binary nodes rather than utf-8-encoded hex
651 if len(new) == 20 and new.encode('string-escape') != new:
651 if len(new) == 20 and new.encode('string-escape') != new:
652 # looks like it could be a binary node
652 # looks like it could be a binary node
653 try:
653 try:
654 new.decode('utf-8')
654 new.decode('utf-8')
655 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
655 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
656 except UnicodeDecodeError:
656 except UnicodeDecodeError:
657 pass # binary, leave unmodified
657 pass # binary, leave unmodified
658 else:
658 else:
659 new = encoding.tolocal(new) # normal path
659 new = encoding.tolocal(new) # normal path
660
660
661 if util.safehasattr(proto, 'restore'):
661 if util.safehasattr(proto, 'restore'):
662
662
663 proto.redirect()
663 proto.redirect()
664
664
665 try:
665 try:
666 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
666 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
667 encoding.tolocal(old), new) or False
667 encoding.tolocal(old), new) or False
668 except util.Abort:
668 except util.Abort:
669 r = False
669 r = False
670
670
671 output = proto.restore()
671 output = proto.restore()
672
672
673 return '%s\n%s' % (int(r), output)
673 return '%s\n%s' % (int(r), output)
674
674
675 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
675 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
676 encoding.tolocal(old), new)
676 encoding.tolocal(old), new)
677 return '%s\n' % int(r)
677 return '%s\n' % int(r)
678
678
679 def _allowstream(ui):
679 def _allowstream(ui):
680 return ui.configbool('server', 'uncompressed', True, untrusted=True)
680 return ui.configbool('server', 'uncompressed', True, untrusted=True)
681
681
682 def _walkstreamfiles(repo):
682 def _walkstreamfiles(repo):
683 # this is it's own function so extensions can override it
683 # this is it's own function so extensions can override it
684 return repo.store.walk()
684 return repo.store.walk()
685
685
686 @wireprotocommand('stream_out')
686 @wireprotocommand('stream_out')
687 def stream(repo, proto):
687 def stream(repo, proto):
688 '''If the server supports streaming clone, it advertises the "stream"
688 '''If the server supports streaming clone, it advertises the "stream"
689 capability with a value representing the version and flags of the repo
689 capability with a value representing the version and flags of the repo
690 it is serving. Client checks to see if it understands the format.
690 it is serving. Client checks to see if it understands the format.
691
691
692 The format is simple: the server writes out a line with the amount
692 The format is simple: the server writes out a line with the amount
693 of files, then the total amount of bytes to be transferred (separated
693 of files, then the total amount of bytes to be transferred (separated
694 by a space). Then, for each file, the server first writes the filename
694 by a space). Then, for each file, the server first writes the filename
695 and filesize (separated by the null character), then the file contents.
695 and filesize (separated by the null character), then the file contents.
696 '''
696 '''
697
697
698 if not _allowstream(repo.ui):
698 if not _allowstream(repo.ui):
699 return '1\n'
699 return '1\n'
700
700
701 entries = []
701 entries = []
702 total_bytes = 0
702 total_bytes = 0
703 try:
703 try:
704 # get consistent snapshot of repo, lock during scan
704 # get consistent snapshot of repo, lock during scan
705 lock = repo.lock()
705 lock = repo.lock()
706 try:
706 try:
707 repo.ui.debug('scanning\n')
707 repo.ui.debug('scanning\n')
708 for name, ename, size in _walkstreamfiles(repo):
708 for name, ename, size in _walkstreamfiles(repo):
709 if size:
709 if size:
710 entries.append((name, size))
710 entries.append((name, size))
711 total_bytes += size
711 total_bytes += size
712 finally:
712 finally:
713 lock.release()
713 lock.release()
714 except error.LockError:
714 except error.LockError:
715 return '2\n' # error: 2
715 return '2\n' # error: 2
716
716
717 def streamer(repo, entries, total):
717 def streamer(repo, entries, total):
718 '''stream out all metadata files in repository.'''
718 '''stream out all metadata files in repository.'''
719 yield '0\n' # success
719 yield '0\n' # success
720 repo.ui.debug('%d files, %d bytes to transfer\n' %
720 repo.ui.debug('%d files, %d bytes to transfer\n' %
721 (len(entries), total_bytes))
721 (len(entries), total_bytes))
722 yield '%d %d\n' % (len(entries), total_bytes)
722 yield '%d %d\n' % (len(entries), total_bytes)
723
723
724 sopener = repo.sopener
724 sopener = repo.sopener
725 oldaudit = sopener.mustaudit
725 oldaudit = sopener.mustaudit
726 debugflag = repo.ui.debugflag
726 debugflag = repo.ui.debugflag
727 sopener.mustaudit = False
727 sopener.mustaudit = False
728
728
729 try:
729 try:
730 for name, size in entries:
730 for name, size in entries:
731 if debugflag:
731 if debugflag:
732 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
732 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
733 # partially encode name over the wire for backwards compat
733 # partially encode name over the wire for backwards compat
734 yield '%s\0%d\n' % (store.encodedir(name), size)
734 yield '%s\0%d\n' % (store.encodedir(name), size)
735 if size <= 65536:
735 if size <= 65536:
736 fp = sopener(name)
736 fp = sopener(name)
737 try:
737 try:
738 data = fp.read(size)
738 data = fp.read(size)
739 finally:
739 finally:
740 fp.close()
740 fp.close()
741 yield data
741 yield data
742 else:
742 else:
743 for chunk in util.filechunkiter(sopener(name), limit=size):
743 for chunk in util.filechunkiter(sopener(name), limit=size):
744 yield chunk
744 yield chunk
745 # replace with "finally:" when support for python 2.4 has been dropped
745 # replace with "finally:" when support for python 2.4 has been dropped
746 except Exception:
746 except Exception:
747 sopener.mustaudit = oldaudit
747 sopener.mustaudit = oldaudit
748 raise
748 raise
749 sopener.mustaudit = oldaudit
749 sopener.mustaudit = oldaudit
750
750
751 return streamres(streamer(repo, entries, total_bytes))
751 return streamres(streamer(repo, entries, total_bytes))
752
752
753 @wireprotocommand('unbundle', 'heads')
753 def unbundle(repo, proto, heads):
754 def unbundle(repo, proto, heads):
754 their_heads = decodelist(heads)
755 their_heads = decodelist(heads)
755
756
756 def check_heads():
757 def check_heads():
757 heads = repo.heads()
758 heads = repo.heads()
758 heads_hash = util.sha1(''.join(sorted(heads))).digest()
759 heads_hash = util.sha1(''.join(sorted(heads))).digest()
759 return (their_heads == ['force'] or their_heads == heads or
760 return (their_heads == ['force'] or their_heads == heads or
760 their_heads == ['hashed', heads_hash])
761 their_heads == ['hashed', heads_hash])
761
762
762 proto.redirect()
763 proto.redirect()
763
764
764 # fail early if possible
765 # fail early if possible
765 if not check_heads():
766 if not check_heads():
766 return pusherr('repository changed while preparing changes - '
767 return pusherr('repository changed while preparing changes - '
767 'please try again')
768 'please try again')
768
769
769 # write bundle data to temporary file because it can be big
770 # write bundle data to temporary file because it can be big
770 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
771 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
771 fp = os.fdopen(fd, 'wb+')
772 fp = os.fdopen(fd, 'wb+')
772 r = 0
773 r = 0
773 try:
774 try:
774 proto.getfile(fp)
775 proto.getfile(fp)
775 lock = repo.lock()
776 lock = repo.lock()
776 try:
777 try:
777 if not check_heads():
778 if not check_heads():
778 # someone else committed/pushed/unbundled while we
779 # someone else committed/pushed/unbundled while we
779 # were transferring data
780 # were transferring data
780 return pusherr('repository changed while uploading changes - '
781 return pusherr('repository changed while uploading changes - '
781 'please try again')
782 'please try again')
782
783
783 # push can proceed
784 # push can proceed
784 fp.seek(0)
785 fp.seek(0)
785 gen = changegroupmod.readbundle(fp, None)
786 gen = changegroupmod.readbundle(fp, None)
786
787
787 try:
788 try:
788 r = repo.addchangegroup(gen, 'serve', proto._client())
789 r = repo.addchangegroup(gen, 'serve', proto._client())
789 except util.Abort, inst:
790 except util.Abort, inst:
790 sys.stderr.write("abort: %s\n" % inst)
791 sys.stderr.write("abort: %s\n" % inst)
791 finally:
792 finally:
792 lock.release()
793 lock.release()
793 return pushres(r)
794 return pushres(r)
794
795
795 finally:
796 finally:
796 fp.close()
797 fp.close()
797 os.unlink(tempname)
798 os.unlink(tempname)
798
799 commands.update({
800 'unbundle': (unbundle, 'heads'),
801 })
General Comments 0
You need to be logged in to leave comments. Login now