##// END OF EJS Templates
batching: migrate basic noop batching into peer.peer...
Augie Fackler -
r25912:cbbdd085 default
parent child Browse files
Show More
@@ -1,46 +1,122
1 1 # peer.py - repository base classes for mercurial
2 2 #
3 3 # Copyright 2005, 2006 Matt Mackall <mpm@selenic.com>
4 4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from i18n import _
10 10 import error
11 import util
12
13 # abstract batching support
14
15 class future(object):
16 '''placeholder for a value to be set later'''
17 def set(self, value):
18 if util.safehasattr(self, 'value'):
19 raise error.RepoError("future is already set")
20 self.value = value
21
22 class batcher(object):
23 '''base class for batches of commands submittable in a single request
24
25 All methods invoked on instances of this class are simply queued and
26 return a a future for the result. Once you call submit(), all the queued
27 calls are performed and the results set in their respective futures.
28 '''
29 def __init__(self):
30 self.calls = []
31 def __getattr__(self, name):
32 def call(*args, **opts):
33 resref = future()
34 self.calls.append((name, args, opts, resref,))
35 return resref
36 return call
37 def submit(self):
38 pass
39
40 class localbatch(batcher):
41 '''performs the queued calls directly'''
42 def __init__(self, local):
43 batcher.__init__(self)
44 self.local = local
45 def submit(self):
46 for name, args, opts, resref in self.calls:
47 resref.set(getattr(self.local, name)(*args, **opts))
48
49 def batchable(f):
50 '''annotation for batchable methods
51
52 Such methods must implement a coroutine as follows:
53
54 @batchable
55 def sample(self, one, two=None):
56 # Handle locally computable results first:
57 if not one:
58 yield "a local result", None
59 # Build list of encoded arguments suitable for your wire protocol:
60 encargs = [('one', encode(one),), ('two', encode(two),)]
61 # Create future for injection of encoded result:
62 encresref = future()
63 # Return encoded arguments and future:
64 yield encargs, encresref
65 # Assuming the future to be filled with the result from the batched
66 # request now. Decode it:
67 yield decode(encresref.value)
68
69 The decorator returns a function which wraps this coroutine as a plain
70 method, but adds the original method as an attribute called "batchable",
71 which is used by remotebatch to split the call into separate encoding and
72 decoding phases.
73 '''
74 def plain(*args, **opts):
75 batchable = f(*args, **opts)
76 encargsorres, encresref = batchable.next()
77 if not encresref:
78 return encargsorres # a local result in this case
79 self = args[0]
80 encresref.set(self._submitone(f.func_name, encargsorres))
81 return batchable.next()
82 setattr(plain, 'batchable', f)
83 return plain
11 84
12 85 class peerrepository(object):
13 86
87 def batch(self):
88 return localbatch(self)
89
14 90 def capable(self, name):
15 91 '''tell whether repo supports named capability.
16 92 return False if not supported.
17 93 if boolean capability, return True.
18 94 if string capability, return string.'''
19 95 caps = self._capabilities()
20 96 if name in caps:
21 97 return True
22 98 name_eq = name + '='
23 99 for cap in caps:
24 100 if cap.startswith(name_eq):
25 101 return cap[len(name_eq):]
26 102 return False
27 103
28 104 def requirecap(self, name, purpose):
29 105 '''raise an exception if the given capability is not present'''
30 106 if not self.capable(name):
31 107 raise error.CapabilityError(
32 108 _('cannot %s; remote repository does not '
33 109 'support the %r capability') % (purpose, name))
34 110
35 111 def local(self):
36 112 '''return peer as a localrepo, or None'''
37 113 return None
38 114
39 115 def peer(self):
40 116 return self
41 117
42 118 def canpush(self):
43 119 return True
44 120
45 121 def close(self):
46 122 pass
@@ -1,859 +1,792
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import urllib, tempfile, os, sys
9 9 from i18n import _
10 10 from node import bin, hex
11 11 import changegroup as changegroupmod, bundle2, pushkey as pushkeymod
12 12 import peer, error, encoding, util, exchange
13 13
14 14
15 15 class abstractserverproto(object):
16 16 """abstract class that summarizes the protocol API
17 17
18 18 Used as reference and documentation.
19 19 """
20 20
21 21 def getargs(self, args):
22 22 """return the value for arguments in <args>
23 23
24 24 returns a list of values (same order as <args>)"""
25 25 raise NotImplementedError()
26 26
27 27 def getfile(self, fp):
28 28 """write the whole content of a file into a file like object
29 29
30 30 The file is in the form::
31 31
32 32 (<chunk-size>\n<chunk>)+0\n
33 33
34 34 chunk size is the ascii version of the int.
35 35 """
36 36 raise NotImplementedError()
37 37
38 38 def redirect(self):
39 39 """may setup interception for stdout and stderr
40 40
41 41 See also the `restore` method."""
42 42 raise NotImplementedError()
43 43
44 44 # If the `redirect` function does install interception, the `restore`
45 45 # function MUST be defined. If interception is not used, this function
46 46 # MUST NOT be defined.
47 47 #
48 48 # left commented here on purpose
49 49 #
50 50 #def restore(self):
51 51 # """reinstall previous stdout and stderr and return intercepted stdout
52 52 # """
53 53 # raise NotImplementedError()
54 54
55 55 def groupchunks(self, cg):
56 56 """return 4096 chunks from a changegroup object
57 57
58 58 Some protocols may have compressed the contents."""
59 59 raise NotImplementedError()
60 60
61 # abstract batching support
62
63 class future(object):
64 '''placeholder for a value to be set later'''
65 def set(self, value):
66 if util.safehasattr(self, 'value'):
67 raise error.RepoError("future is already set")
68 self.value = value
69
70 class batcher(object):
71 '''base class for batches of commands submittable in a single request
72
73 All methods invoked on instances of this class are simply queued and
74 return a a future for the result. Once you call submit(), all the queued
75 calls are performed and the results set in their respective futures.
76 '''
77 def __init__(self):
78 self.calls = []
79 def __getattr__(self, name):
80 def call(*args, **opts):
81 resref = future()
82 self.calls.append((name, args, opts, resref,))
83 return resref
84 return call
85 def submit(self):
86 pass
87
88 class localbatch(batcher):
89 '''performs the queued calls directly'''
90 def __init__(self, local):
91 batcher.__init__(self)
92 self.local = local
93 def submit(self):
94 for name, args, opts, resref in self.calls:
95 resref.set(getattr(self.local, name)(*args, **opts))
96
97 class remotebatch(batcher):
61 class remotebatch(peer.batcher):
98 62 '''batches the queued calls; uses as few roundtrips as possible'''
99 63 def __init__(self, remote):
100 64 '''remote must support _submitbatch(encbatch) and
101 65 _submitone(op, encargs)'''
102 batcher.__init__(self)
66 peer.batcher.__init__(self)
103 67 self.remote = remote
104 68 def submit(self):
105 69 req, rsp = [], []
106 70 for name, args, opts, resref in self.calls:
107 71 mtd = getattr(self.remote, name)
108 72 batchablefn = getattr(mtd, 'batchable', None)
109 73 if batchablefn is not None:
110 74 batchable = batchablefn(mtd.im_self, *args, **opts)
111 75 encargsorres, encresref = batchable.next()
112 76 if encresref:
113 77 req.append((name, encargsorres,))
114 78 rsp.append((batchable, encresref, resref,))
115 79 else:
116 80 resref.set(encargsorres)
117 81 else:
118 82 if req:
119 83 self._submitreq(req, rsp)
120 84 req, rsp = [], []
121 85 resref.set(mtd(*args, **opts))
122 86 if req:
123 87 self._submitreq(req, rsp)
124 88 def _submitreq(self, req, rsp):
125 89 encresults = self.remote._submitbatch(req)
126 90 for encres, r in zip(encresults, rsp):
127 91 batchable, encresref, resref = r
128 92 encresref.set(encres)
129 93 resref.set(batchable.next())
130 94
131 def batchable(f):
132 '''annotation for batchable methods
133
134 Such methods must implement a coroutine as follows:
135
136 @batchable
137 def sample(self, one, two=None):
138 # Handle locally computable results first:
139 if not one:
140 yield "a local result", None
141 # Build list of encoded arguments suitable for your wire protocol:
142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 # Create future for injection of encoded result:
144 encresref = future()
145 # Return encoded arguments and future:
146 yield encargs, encresref
147 # Assuming the future to be filled with the result from the batched
148 # request now. Decode it:
149 yield decode(encresref.value)
150
151 The decorator returns a function which wraps this coroutine as a plain
152 method, but adds the original method as an attribute called "batchable",
153 which is used by remotebatch to split the call into separate encoding and
154 decoding phases.
155 '''
156 def plain(*args, **opts):
157 batchable = f(*args, **opts)
158 encargsorres, encresref = batchable.next()
159 if not encresref:
160 return encargsorres # a local result in this case
161 self = args[0]
162 encresref.set(self._submitone(f.func_name, encargsorres))
163 return batchable.next()
164 setattr(plain, 'batchable', f)
165 return plain
95 # Forward a couple of names from peer to make wireproto interactions
96 # slightly more sensible.
97 batchable = peer.batchable
98 future = peer.future
166 99
167 100 # list of nodes encoding / decoding
168 101
169 102 def decodelist(l, sep=' '):
170 103 if l:
171 104 return map(bin, l.split(sep))
172 105 return []
173 106
174 107 def encodelist(l, sep=' '):
175 108 try:
176 109 return sep.join(map(hex, l))
177 110 except TypeError:
178 111 raise
179 112
180 113 # batched call argument encoding
181 114
182 115 def escapearg(plain):
183 116 return (plain
184 117 .replace(':', ':c')
185 118 .replace(',', ':o')
186 119 .replace(';', ':s')
187 120 .replace('=', ':e'))
188 121
189 122 def unescapearg(escaped):
190 123 return (escaped
191 124 .replace(':e', '=')
192 125 .replace(':s', ';')
193 126 .replace(':o', ',')
194 127 .replace(':c', ':'))
195 128
196 129 # mapping of options accepted by getbundle and their types
197 130 #
198 131 # Meant to be extended by extensions. It is extensions responsibility to ensure
199 132 # such options are properly processed in exchange.getbundle.
200 133 #
201 134 # supported types are:
202 135 #
203 136 # :nodes: list of binary nodes
204 137 # :csv: list of comma-separated values
205 138 # :scsv: list of comma-separated values return as set
206 139 # :plain: string with no transformation needed.
207 140 gboptsmap = {'heads': 'nodes',
208 141 'common': 'nodes',
209 142 'obsmarkers': 'boolean',
210 143 'bundlecaps': 'scsv',
211 144 'listkeys': 'csv',
212 145 'cg': 'boolean'}
213 146
214 147 # client side
215 148
216 149 class wirepeer(peer.peerrepository):
217 150
218 151 def batch(self):
219 152 return remotebatch(self)
220 153 def _submitbatch(self, req):
221 154 cmds = []
222 155 for op, argsdict in req:
223 156 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
224 157 for k, v in argsdict.iteritems())
225 158 cmds.append('%s %s' % (op, args))
226 159 rsp = self._call("batch", cmds=';'.join(cmds))
227 160 return [unescapearg(r) for r in rsp.split(';')]
228 161 def _submitone(self, op, args):
229 162 return self._call(op, **args)
230 163
231 164 @batchable
232 165 def lookup(self, key):
233 166 self.requirecap('lookup', _('look up remote revision'))
234 167 f = future()
235 168 yield {'key': encoding.fromlocal(key)}, f
236 169 d = f.value
237 170 success, data = d[:-1].split(" ", 1)
238 171 if int(success):
239 172 yield bin(data)
240 173 self._abort(error.RepoError(data))
241 174
242 175 @batchable
243 176 def heads(self):
244 177 f = future()
245 178 yield {}, f
246 179 d = f.value
247 180 try:
248 181 yield decodelist(d[:-1])
249 182 except ValueError:
250 183 self._abort(error.ResponseError(_("unexpected response:"), d))
251 184
252 185 @batchable
253 186 def known(self, nodes):
254 187 f = future()
255 188 yield {'nodes': encodelist(nodes)}, f
256 189 d = f.value
257 190 try:
258 191 yield [bool(int(b)) for b in d]
259 192 except ValueError:
260 193 self._abort(error.ResponseError(_("unexpected response:"), d))
261 194
262 195 @batchable
263 196 def branchmap(self):
264 197 f = future()
265 198 yield {}, f
266 199 d = f.value
267 200 try:
268 201 branchmap = {}
269 202 for branchpart in d.splitlines():
270 203 branchname, branchheads = branchpart.split(' ', 1)
271 204 branchname = encoding.tolocal(urllib.unquote(branchname))
272 205 branchheads = decodelist(branchheads)
273 206 branchmap[branchname] = branchheads
274 207 yield branchmap
275 208 except TypeError:
276 209 self._abort(error.ResponseError(_("unexpected response:"), d))
277 210
278 211 def branches(self, nodes):
279 212 n = encodelist(nodes)
280 213 d = self._call("branches", nodes=n)
281 214 try:
282 215 br = [tuple(decodelist(b)) for b in d.splitlines()]
283 216 return br
284 217 except ValueError:
285 218 self._abort(error.ResponseError(_("unexpected response:"), d))
286 219
287 220 def between(self, pairs):
288 221 batch = 8 # avoid giant requests
289 222 r = []
290 223 for i in xrange(0, len(pairs), batch):
291 224 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
292 225 d = self._call("between", pairs=n)
293 226 try:
294 227 r.extend(l and decodelist(l) or [] for l in d.splitlines())
295 228 except ValueError:
296 229 self._abort(error.ResponseError(_("unexpected response:"), d))
297 230 return r
298 231
299 232 @batchable
300 233 def pushkey(self, namespace, key, old, new):
301 234 if not self.capable('pushkey'):
302 235 yield False, None
303 236 f = future()
304 237 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
305 238 yield {'namespace': encoding.fromlocal(namespace),
306 239 'key': encoding.fromlocal(key),
307 240 'old': encoding.fromlocal(old),
308 241 'new': encoding.fromlocal(new)}, f
309 242 d = f.value
310 243 d, output = d.split('\n', 1)
311 244 try:
312 245 d = bool(int(d))
313 246 except ValueError:
314 247 raise error.ResponseError(
315 248 _('push failed (unexpected response):'), d)
316 249 for l in output.splitlines(True):
317 250 self.ui.status(_('remote: '), l)
318 251 yield d
319 252
320 253 @batchable
321 254 def listkeys(self, namespace):
322 255 if not self.capable('pushkey'):
323 256 yield {}, None
324 257 f = future()
325 258 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
326 259 yield {'namespace': encoding.fromlocal(namespace)}, f
327 260 d = f.value
328 261 self.ui.debug('received listkey for "%s": %i bytes\n'
329 262 % (namespace, len(d)))
330 263 yield pushkeymod.decodekeys(d)
331 264
332 265 def stream_out(self):
333 266 return self._callstream('stream_out')
334 267
335 268 def changegroup(self, nodes, kind):
336 269 n = encodelist(nodes)
337 270 f = self._callcompressable("changegroup", roots=n)
338 271 return changegroupmod.cg1unpacker(f, 'UN')
339 272
340 273 def changegroupsubset(self, bases, heads, kind):
341 274 self.requirecap('changegroupsubset', _('look up remote changes'))
342 275 bases = encodelist(bases)
343 276 heads = encodelist(heads)
344 277 f = self._callcompressable("changegroupsubset",
345 278 bases=bases, heads=heads)
346 279 return changegroupmod.cg1unpacker(f, 'UN')
347 280
348 281 def getbundle(self, source, **kwargs):
349 282 self.requirecap('getbundle', _('look up remote changes'))
350 283 opts = {}
351 284 bundlecaps = kwargs.get('bundlecaps')
352 285 if bundlecaps is not None:
353 286 kwargs['bundlecaps'] = sorted(bundlecaps)
354 287 else:
355 288 bundlecaps = () # kwargs could have it to None
356 289 for key, value in kwargs.iteritems():
357 290 if value is None:
358 291 continue
359 292 keytype = gboptsmap.get(key)
360 293 if keytype is None:
361 294 assert False, 'unexpected'
362 295 elif keytype == 'nodes':
363 296 value = encodelist(value)
364 297 elif keytype in ('csv', 'scsv'):
365 298 value = ','.join(value)
366 299 elif keytype == 'boolean':
367 300 value = '%i' % bool(value)
368 301 elif keytype != 'plain':
369 302 raise KeyError('unknown getbundle option type %s'
370 303 % keytype)
371 304 opts[key] = value
372 305 f = self._callcompressable("getbundle", **opts)
373 306 if any((cap.startswith('HG2') for cap in bundlecaps)):
374 307 return bundle2.getunbundler(self.ui, f)
375 308 else:
376 309 return changegroupmod.cg1unpacker(f, 'UN')
377 310
378 311 def unbundle(self, cg, heads, source):
379 312 '''Send cg (a readable file-like object representing the
380 313 changegroup to push, typically a chunkbuffer object) to the
381 314 remote server as a bundle.
382 315
383 316 When pushing a bundle10 stream, return an integer indicating the
384 317 result of the push (see localrepository.addchangegroup()).
385 318
386 319 When pushing a bundle20 stream, return a bundle20 stream.'''
387 320
388 321 if heads != ['force'] and self.capable('unbundlehash'):
389 322 heads = encodelist(['hashed',
390 323 util.sha1(''.join(sorted(heads))).digest()])
391 324 else:
392 325 heads = encodelist(heads)
393 326
394 327 if util.safehasattr(cg, 'deltaheader'):
395 328 # this a bundle10, do the old style call sequence
396 329 ret, output = self._callpush("unbundle", cg, heads=heads)
397 330 if ret == "":
398 331 raise error.ResponseError(
399 332 _('push failed:'), output)
400 333 try:
401 334 ret = int(ret)
402 335 except ValueError:
403 336 raise error.ResponseError(
404 337 _('push failed (unexpected response):'), ret)
405 338
406 339 for l in output.splitlines(True):
407 340 self.ui.status(_('remote: '), l)
408 341 else:
409 342 # bundle2 push. Send a stream, fetch a stream.
410 343 stream = self._calltwowaystream('unbundle', cg, heads=heads)
411 344 ret = bundle2.getunbundler(self.ui, stream)
412 345 return ret
413 346
414 347 def debugwireargs(self, one, two, three=None, four=None, five=None):
415 348 # don't pass optional arguments left at their default value
416 349 opts = {}
417 350 if three is not None:
418 351 opts['three'] = three
419 352 if four is not None:
420 353 opts['four'] = four
421 354 return self._call('debugwireargs', one=one, two=two, **opts)
422 355
423 356 def _call(self, cmd, **args):
424 357 """execute <cmd> on the server
425 358
426 359 The command is expected to return a simple string.
427 360
428 361 returns the server reply as a string."""
429 362 raise NotImplementedError()
430 363
431 364 def _callstream(self, cmd, **args):
432 365 """execute <cmd> on the server
433 366
434 367 The command is expected to return a stream.
435 368
436 369 returns the server reply as a file like object."""
437 370 raise NotImplementedError()
438 371
439 372 def _callcompressable(self, cmd, **args):
440 373 """execute <cmd> on the server
441 374
442 375 The command is expected to return a stream.
443 376
444 377 The stream may have been compressed in some implementations. This
445 378 function takes care of the decompression. This is the only difference
446 379 with _callstream.
447 380
448 381 returns the server reply as a file like object.
449 382 """
450 383 raise NotImplementedError()
451 384
452 385 def _callpush(self, cmd, fp, **args):
453 386 """execute a <cmd> on server
454 387
455 388 The command is expected to be related to a push. Push has a special
456 389 return method.
457 390
458 391 returns the server reply as a (ret, output) tuple. ret is either
459 392 empty (error) or a stringified int.
460 393 """
461 394 raise NotImplementedError()
462 395
463 396 def _calltwowaystream(self, cmd, fp, **args):
464 397 """execute <cmd> on server
465 398
466 399 The command will send a stream to the server and get a stream in reply.
467 400 """
468 401 raise NotImplementedError()
469 402
470 403 def _abort(self, exception):
471 404 """clearly abort the wire protocol connection and raise the exception
472 405 """
473 406 raise NotImplementedError()
474 407
475 408 # server side
476 409
477 410 # wire protocol command can either return a string or one of these classes.
478 411 class streamres(object):
479 412 """wireproto reply: binary stream
480 413
481 414 The call was successful and the result is a stream.
482 415 Iterate on the `self.gen` attribute to retrieve chunks.
483 416 """
484 417 def __init__(self, gen):
485 418 self.gen = gen
486 419
487 420 class pushres(object):
488 421 """wireproto reply: success with simple integer return
489 422
490 423 The call was successful and returned an integer contained in `self.res`.
491 424 """
492 425 def __init__(self, res):
493 426 self.res = res
494 427
495 428 class pusherr(object):
496 429 """wireproto reply: failure
497 430
498 431 The call failed. The `self.res` attribute contains the error message.
499 432 """
500 433 def __init__(self, res):
501 434 self.res = res
502 435
503 436 class ooberror(object):
504 437 """wireproto reply: failure of a batch of operation
505 438
506 439 Something failed during a batch call. The error message is stored in
507 440 `self.message`.
508 441 """
509 442 def __init__(self, message):
510 443 self.message = message
511 444
512 445 def dispatch(repo, proto, command):
513 446 repo = repo.filtered("served")
514 447 func, spec = commands[command]
515 448 args = proto.getargs(spec)
516 449 return func(repo, proto, *args)
517 450
518 451 def options(cmd, keys, others):
519 452 opts = {}
520 453 for k in keys:
521 454 if k in others:
522 455 opts[k] = others[k]
523 456 del others[k]
524 457 if others:
525 458 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
526 459 % (cmd, ",".join(others)))
527 460 return opts
528 461
529 462 # list of commands
530 463 commands = {}
531 464
532 465 def wireprotocommand(name, args=''):
533 466 """decorator for wire protocol command"""
534 467 def register(func):
535 468 commands[name] = (func, args)
536 469 return func
537 470 return register
538 471
539 472 @wireprotocommand('batch', 'cmds *')
540 473 def batch(repo, proto, cmds, others):
541 474 repo = repo.filtered("served")
542 475 res = []
543 476 for pair in cmds.split(';'):
544 477 op, args = pair.split(' ', 1)
545 478 vals = {}
546 479 for a in args.split(','):
547 480 if a:
548 481 n, v = a.split('=')
549 482 vals[n] = unescapearg(v)
550 483 func, spec = commands[op]
551 484 if spec:
552 485 keys = spec.split()
553 486 data = {}
554 487 for k in keys:
555 488 if k == '*':
556 489 star = {}
557 490 for key in vals.keys():
558 491 if key not in keys:
559 492 star[key] = vals[key]
560 493 data['*'] = star
561 494 else:
562 495 data[k] = vals[k]
563 496 result = func(repo, proto, *[data[k] for k in keys])
564 497 else:
565 498 result = func(repo, proto)
566 499 if isinstance(result, ooberror):
567 500 return result
568 501 res.append(escapearg(result))
569 502 return ';'.join(res)
570 503
571 504 @wireprotocommand('between', 'pairs')
572 505 def between(repo, proto, pairs):
573 506 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
574 507 r = []
575 508 for b in repo.between(pairs):
576 509 r.append(encodelist(b) + "\n")
577 510 return "".join(r)
578 511
579 512 @wireprotocommand('branchmap')
580 513 def branchmap(repo, proto):
581 514 branchmap = repo.branchmap()
582 515 heads = []
583 516 for branch, nodes in branchmap.iteritems():
584 517 branchname = urllib.quote(encoding.fromlocal(branch))
585 518 branchnodes = encodelist(nodes)
586 519 heads.append('%s %s' % (branchname, branchnodes))
587 520 return '\n'.join(heads)
588 521
589 522 @wireprotocommand('branches', 'nodes')
590 523 def branches(repo, proto, nodes):
591 524 nodes = decodelist(nodes)
592 525 r = []
593 526 for b in repo.branches(nodes):
594 527 r.append(encodelist(b) + "\n")
595 528 return "".join(r)
596 529
597 530
598 531 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
599 532 'known', 'getbundle', 'unbundlehash', 'batch']
600 533
601 534 def _capabilities(repo, proto):
602 535 """return a list of capabilities for a repo
603 536
604 537 This function exists to allow extensions to easily wrap capabilities
605 538 computation
606 539
607 540 - returns a lists: easy to alter
608 541 - change done here will be propagated to both `capabilities` and `hello`
609 542 command without any other action needed.
610 543 """
611 544 # copy to prevent modification of the global list
612 545 caps = list(wireprotocaps)
613 546 if _allowstream(repo.ui):
614 547 if repo.ui.configbool('server', 'preferuncompressed', False):
615 548 caps.append('stream-preferred')
616 549 requiredformats = repo.requirements & repo.supportedformats
617 550 # if our local revlogs are just revlogv1, add 'stream' cap
618 551 if not requiredformats - set(('revlogv1',)):
619 552 caps.append('stream')
620 553 # otherwise, add 'streamreqs' detailing our local revlog format
621 554 else:
622 555 caps.append('streamreqs=%s' % ','.join(requiredformats))
623 556 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
624 557 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
625 558 caps.append('bundle2=' + urllib.quote(capsblob))
626 559 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
627 560 caps.append(
628 561 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
629 562 return caps
630 563
631 564 # If you are writing an extension and consider wrapping this function. Wrap
632 565 # `_capabilities` instead.
633 566 @wireprotocommand('capabilities')
634 567 def capabilities(repo, proto):
635 568 return ' '.join(_capabilities(repo, proto))
636 569
637 570 @wireprotocommand('changegroup', 'roots')
638 571 def changegroup(repo, proto, roots):
639 572 nodes = decodelist(roots)
640 573 cg = changegroupmod.changegroup(repo, nodes, 'serve')
641 574 return streamres(proto.groupchunks(cg))
642 575
643 576 @wireprotocommand('changegroupsubset', 'bases heads')
644 577 def changegroupsubset(repo, proto, bases, heads):
645 578 bases = decodelist(bases)
646 579 heads = decodelist(heads)
647 580 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
648 581 return streamres(proto.groupchunks(cg))
649 582
650 583 @wireprotocommand('debugwireargs', 'one two *')
651 584 def debugwireargs(repo, proto, one, two, others):
652 585 # only accept optional args from the known set
653 586 opts = options('debugwireargs', ['three', 'four'], others)
654 587 return repo.debugwireargs(one, two, **opts)
655 588
656 589 # List of options accepted by getbundle.
657 590 #
658 591 # Meant to be extended by extensions. It is the extension's responsibility to
659 592 # ensure such options are properly processed in exchange.getbundle.
660 593 gboptslist = ['heads', 'common', 'bundlecaps']
661 594
662 595 @wireprotocommand('getbundle', '*')
663 596 def getbundle(repo, proto, others):
664 597 opts = options('getbundle', gboptsmap.keys(), others)
665 598 for k, v in opts.iteritems():
666 599 keytype = gboptsmap[k]
667 600 if keytype == 'nodes':
668 601 opts[k] = decodelist(v)
669 602 elif keytype == 'csv':
670 603 opts[k] = list(v.split(','))
671 604 elif keytype == 'scsv':
672 605 opts[k] = set(v.split(','))
673 606 elif keytype == 'boolean':
674 607 opts[k] = bool(v)
675 608 elif keytype != 'plain':
676 609 raise KeyError('unknown getbundle option type %s'
677 610 % keytype)
678 611 cg = exchange.getbundle(repo, 'serve', **opts)
679 612 return streamres(proto.groupchunks(cg))
680 613
681 614 @wireprotocommand('heads')
682 615 def heads(repo, proto):
683 616 h = repo.heads()
684 617 return encodelist(h) + "\n"
685 618
686 619 @wireprotocommand('hello')
687 620 def hello(repo, proto):
688 621 '''the hello command returns a set of lines describing various
689 622 interesting things about the server, in an RFC822-like format.
690 623 Currently the only one defined is "capabilities", which
691 624 consists of a line in the form:
692 625
693 626 capabilities: space separated list of tokens
694 627 '''
695 628 return "capabilities: %s\n" % (capabilities(repo, proto))
696 629
697 630 @wireprotocommand('listkeys', 'namespace')
698 631 def listkeys(repo, proto, namespace):
699 632 d = repo.listkeys(encoding.tolocal(namespace)).items()
700 633 return pushkeymod.encodekeys(d)
701 634
702 635 @wireprotocommand('lookup', 'key')
703 636 def lookup(repo, proto, key):
704 637 try:
705 638 k = encoding.tolocal(key)
706 639 c = repo[k]
707 640 r = c.hex()
708 641 success = 1
709 642 except Exception as inst:
710 643 r = str(inst)
711 644 success = 0
712 645 return "%s %s\n" % (success, r)
713 646
714 647 @wireprotocommand('known', 'nodes *')
715 648 def known(repo, proto, nodes, others):
716 649 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
717 650
718 651 @wireprotocommand('pushkey', 'namespace key old new')
719 652 def pushkey(repo, proto, namespace, key, old, new):
720 653 # compatibility with pre-1.8 clients which were accidentally
721 654 # sending raw binary nodes rather than utf-8-encoded hex
722 655 if len(new) == 20 and new.encode('string-escape') != new:
723 656 # looks like it could be a binary node
724 657 try:
725 658 new.decode('utf-8')
726 659 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
727 660 except UnicodeDecodeError:
728 661 pass # binary, leave unmodified
729 662 else:
730 663 new = encoding.tolocal(new) # normal path
731 664
732 665 if util.safehasattr(proto, 'restore'):
733 666
734 667 proto.redirect()
735 668
736 669 try:
737 670 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
738 671 encoding.tolocal(old), new) or False
739 672 except util.Abort:
740 673 r = False
741 674
742 675 output = proto.restore()
743 676
744 677 return '%s\n%s' % (int(r), output)
745 678
746 679 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
747 680 encoding.tolocal(old), new)
748 681 return '%s\n' % int(r)
749 682
750 683 def _allowstream(ui):
751 684 return ui.configbool('server', 'uncompressed', True, untrusted=True)
752 685
753 686 @wireprotocommand('stream_out')
754 687 def stream(repo, proto):
755 688 '''If the server supports streaming clone, it advertises the "stream"
756 689 capability with a value representing the version and flags of the repo
757 690 it is serving. Client checks to see if it understands the format.
758 691 '''
759 692 if not _allowstream(repo.ui):
760 693 return '1\n'
761 694
762 695 def getstream(it):
763 696 yield '0\n'
764 697 for chunk in it:
765 698 yield chunk
766 699
767 700 try:
768 701 # LockError may be raised before the first result is yielded. Don't
769 702 # emit output until we're sure we got the lock successfully.
770 703 it = exchange.generatestreamclone(repo)
771 704 return streamres(getstream(it))
772 705 except error.LockError:
773 706 return '2\n'
774 707
775 708 @wireprotocommand('unbundle', 'heads')
776 709 def unbundle(repo, proto, heads):
777 710 their_heads = decodelist(heads)
778 711
779 712 try:
780 713 proto.redirect()
781 714
782 715 exchange.check_heads(repo, their_heads, 'preparing changes')
783 716
784 717 # write bundle data to temporary file because it can be big
785 718 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
786 719 fp = os.fdopen(fd, 'wb+')
787 720 r = 0
788 721 try:
789 722 proto.getfile(fp)
790 723 fp.seek(0)
791 724 gen = exchange.readbundle(repo.ui, fp, None)
792 725 r = exchange.unbundle(repo, gen, their_heads, 'serve',
793 726 proto._client())
794 727 if util.safehasattr(r, 'addpart'):
795 728 # The return looks streamable, we are in the bundle2 case and
796 729 # should return a stream.
797 730 return streamres(r.getchunks())
798 731 return pushres(r)
799 732
800 733 finally:
801 734 fp.close()
802 735 os.unlink(tempname)
803 736
804 737 except (error.BundleValueError, util.Abort, error.PushRaced) as exc:
805 738 # handle non-bundle2 case first
806 739 if not getattr(exc, 'duringunbundle2', False):
807 740 try:
808 741 raise
809 742 except util.Abort:
810 743 # The old code we moved used sys.stderr directly.
811 744 # We did not change it to minimise code change.
812 745 # This need to be moved to something proper.
813 746 # Feel free to do it.
814 747 sys.stderr.write("abort: %s\n" % exc)
815 748 return pushres(0)
816 749 except error.PushRaced:
817 750 return pusherr(str(exc))
818 751
819 752 bundler = bundle2.bundle20(repo.ui)
820 753 for out in getattr(exc, '_bundle2salvagedoutput', ()):
821 754 bundler.addpart(out)
822 755 try:
823 756 try:
824 757 raise
825 758 except error.PushkeyFailed as exc:
826 759 # check client caps
827 760 remotecaps = getattr(exc, '_replycaps', None)
828 761 if (remotecaps is not None
829 762 and 'pushkey' not in remotecaps.get('error', ())):
830 763 # no support remote side, fallback to Abort handler.
831 764 raise
832 765 part = bundler.newpart('error:pushkey')
833 766 part.addparam('in-reply-to', exc.partid)
834 767 if exc.namespace is not None:
835 768 part.addparam('namespace', exc.namespace, mandatory=False)
836 769 if exc.key is not None:
837 770 part.addparam('key', exc.key, mandatory=False)
838 771 if exc.new is not None:
839 772 part.addparam('new', exc.new, mandatory=False)
840 773 if exc.old is not None:
841 774 part.addparam('old', exc.old, mandatory=False)
842 775 if exc.ret is not None:
843 776 part.addparam('ret', exc.ret, mandatory=False)
844 777 except error.BundleValueError as exc:
845 778 errpart = bundler.newpart('error:unsupportedcontent')
846 779 if exc.parttype is not None:
847 780 errpart.addparam('parttype', exc.parttype)
848 781 if exc.params:
849 782 errpart.addparam('params', '\0'.join(exc.params))
850 783 except util.Abort as exc:
851 784 manargs = [('message', str(exc))]
852 785 advargs = []
853 786 if exc.hint is not None:
854 787 advargs.append(('hint', exc.hint))
855 788 bundler.addpart(bundle2.bundlepart('error:abort',
856 789 manargs, advargs))
857 790 except error.PushRaced as exc:
858 791 bundler.newpart('error:pushraced', [('message', str(exc))])
859 792 return streamres(bundler.getchunks())
@@ -1,175 +1,176
1 1 # test-batching.py - tests for transparent command batching
2 2 #
3 3 # Copyright 2011 Peter Arrenbrecht <peter@arrenbrecht.ch>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 from mercurial.wireproto import localbatch, remotebatch, batchable, future
8 from mercurial.peer import localbatch, batchable, future
9 from mercurial.wireproto import remotebatch
9 10
10 11 # equivalent of repo.repository
11 12 class thing(object):
12 13 def hello(self):
13 14 return "Ready."
14 15
15 16 # equivalent of localrepo.localrepository
16 17 class localthing(thing):
17 18 def foo(self, one, two=None):
18 19 if one:
19 20 return "%s and %s" % (one, two,)
20 21 return "Nope"
21 22 def bar(self, b, a):
22 23 return "%s und %s" % (b, a,)
23 24 def greet(self, name=None):
24 25 return "Hello, %s" % name
25 26 def batch(self):
26 27 '''Support for local batching.'''
27 28 return localbatch(self)
28 29
29 30 # usage of "thing" interface
30 31 def use(it):
31 32
32 33 # Direct call to base method shared between client and server.
33 34 print it.hello()
34 35
35 36 # Direct calls to proxied methods. They cause individual roundtrips.
36 37 print it.foo("Un", two="Deux")
37 38 print it.bar("Eins", "Zwei")
38 39
39 40 # Batched call to a couple of (possibly proxied) methods.
40 41 batch = it.batch()
41 42 # The calls return futures to eventually hold results.
42 43 foo = batch.foo(one="One", two="Two")
43 44 foo2 = batch.foo(None)
44 45 bar = batch.bar("Eins", "Zwei")
45 46 # We can call non-batchable proxy methods, but the break the current batch
46 47 # request and cause additional roundtrips.
47 48 greet = batch.greet(name="John Smith")
48 49 # We can also add local methods into the mix, but they break the batch too.
49 50 hello = batch.hello()
50 51 bar2 = batch.bar(b="Uno", a="Due")
51 52 # Only now are all the calls executed in sequence, with as few roundtrips
52 53 # as possible.
53 54 batch.submit()
54 55 # After the call to submit, the futures actually contain values.
55 56 print foo.value
56 57 print foo2.value
57 58 print bar.value
58 59 print greet.value
59 60 print hello.value
60 61 print bar2.value
61 62
62 63 # local usage
63 64 mylocal = localthing()
64 65 print
65 66 print "== Local"
66 67 use(mylocal)
67 68
68 69 # demo remoting; mimicks what wireproto and HTTP/SSH do
69 70
70 71 # shared
71 72
72 73 def escapearg(plain):
73 74 return (plain
74 75 .replace(':', '::')
75 76 .replace(',', ':,')
76 77 .replace(';', ':;')
77 78 .replace('=', ':='))
78 79 def unescapearg(escaped):
79 80 return (escaped
80 81 .replace(':=', '=')
81 82 .replace(':;', ';')
82 83 .replace(':,', ',')
83 84 .replace('::', ':'))
84 85
85 86 # server side
86 87
87 88 # equivalent of wireproto's global functions
88 89 class server(object):
89 90 def __init__(self, local):
90 91 self.local = local
91 92 def _call(self, name, args):
92 93 args = dict(arg.split('=', 1) for arg in args)
93 94 return getattr(self, name)(**args)
94 95 def perform(self, req):
95 96 print "REQ:", req
96 97 name, args = req.split('?', 1)
97 98 args = args.split('&')
98 99 vals = dict(arg.split('=', 1) for arg in args)
99 100 res = getattr(self, name)(**vals)
100 101 print " ->", res
101 102 return res
102 103 def batch(self, cmds):
103 104 res = []
104 105 for pair in cmds.split(';'):
105 106 name, args = pair.split(':', 1)
106 107 vals = {}
107 108 for a in args.split(','):
108 109 if a:
109 110 n, v = a.split('=')
110 111 vals[n] = unescapearg(v)
111 112 res.append(escapearg(getattr(self, name)(**vals)))
112 113 return ';'.join(res)
113 114 def foo(self, one, two):
114 115 return mangle(self.local.foo(unmangle(one), unmangle(two)))
115 116 def bar(self, b, a):
116 117 return mangle(self.local.bar(unmangle(b), unmangle(a)))
117 118 def greet(self, name):
118 119 return mangle(self.local.greet(unmangle(name)))
119 120 myserver = server(mylocal)
120 121
121 122 # local side
122 123
123 124 # equivalent of wireproto.encode/decodelist, that is, type-specific marshalling
124 125 # here we just transform the strings a bit to check we're properly en-/decoding
125 126 def mangle(s):
126 127 return ''.join(chr(ord(c) + 1) for c in s)
127 128 def unmangle(s):
128 129 return ''.join(chr(ord(c) - 1) for c in s)
129 130
130 131 # equivalent of wireproto.wirerepository and something like http's wire format
131 132 class remotething(thing):
132 133 def __init__(self, server):
133 134 self.server = server
134 135 def _submitone(self, name, args):
135 136 req = name + '?' + '&'.join(['%s=%s' % (n, v) for n, v in args])
136 137 return self.server.perform(req)
137 138 def _submitbatch(self, cmds):
138 139 req = []
139 140 for name, args in cmds:
140 141 args = ','.join(n + '=' + escapearg(v) for n, v in args)
141 142 req.append(name + ':' + args)
142 143 req = ';'.join(req)
143 144 res = self._submitone('batch', [('cmds', req,)])
144 145 return res.split(';')
145 146
146 147 def batch(self):
147 148 return remotebatch(self)
148 149
149 150 @batchable
150 151 def foo(self, one, two=None):
151 152 if not one:
152 153 yield "Nope", None
153 154 encargs = [('one', mangle(one),), ('two', mangle(two),)]
154 155 encresref = future()
155 156 yield encargs, encresref
156 157 yield unmangle(encresref.value)
157 158
158 159 @batchable
159 160 def bar(self, b, a):
160 161 encresref = future()
161 162 yield [('b', mangle(b),), ('a', mangle(a),)], encresref
162 163 yield unmangle(encresref.value)
163 164
164 165 # greet is coded directly. It therefore does not support batching. If it
165 166 # does appear in a batch, the batch is split around greet, and the call to
166 167 # greet is done in its own roundtrip.
167 168 def greet(self, name=None):
168 169 return unmangle(self._submitone('greet', [('name', mangle(name),)]))
169 170
170 171 # demo remote usage
171 172
172 173 myproxy = remotething(myserver)
173 174 print
174 175 print "== Remote"
175 176 use(myproxy)
@@ -1,48 +1,52
1 1 from mercurial import wireproto
2 2
3 3 class proto(object):
4 4 def __init__(self, args):
5 5 self.args = args
6 6 def getargs(self, spec):
7 7 args = self.args
8 8 args.setdefault('*', {})
9 9 names = spec.split()
10 10 return [args[n] for n in names]
11 11
12 12 class clientpeer(wireproto.wirepeer):
13 13 def __init__(self, serverrepo):
14 14 self.serverrepo = serverrepo
15
16 def _capabilities(self):
17 return ['batch']
18
15 19 def _call(self, cmd, **args):
16 20 return wireproto.dispatch(self.serverrepo, proto(args), cmd)
17 21
18 22 @wireproto.batchable
19 23 def greet(self, name):
20 24 f = wireproto.future()
21 25 yield {'name': mangle(name)}, f
22 26 yield unmangle(f.value)
23 27
24 28 class serverrepo(object):
25 29 def greet(self, name):
26 30 return "Hello, " + name
27 31
28 32 def filtered(self, name):
29 33 return self
30 34
31 35 def mangle(s):
32 36 return ''.join(chr(ord(c) + 1) for c in s)
33 37 def unmangle(s):
34 38 return ''.join(chr(ord(c) - 1) for c in s)
35 39
36 40 def greet(repo, proto, name):
37 41 return mangle(repo.greet(unmangle(name)))
38 42
39 43 wireproto.commands['greet'] = (greet, 'name',)
40 44
41 45 srv = serverrepo()
42 46 clt = clientpeer(srv)
43 47
44 48 print clt.greet("Foobar")
45 49 b = clt.batch()
46 50 fs = [b.greet(s) for s in ["Fo, =;:<o", "Bar"]]
47 51 b.submit()
48 52 print [f.value for f in fs]
General Comments 0
You need to be logged in to leave comments. Login now