##// END OF EJS Templates
wireproto: make wirepeer look-before-you-leap on batching...
Augie Fackler -
r25913:fa14ba7b default
parent child Browse files
Show More
@@ -1,792 +1,795 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 import urllib, tempfile, os, sys
9 9 from i18n import _
10 10 from node import bin, hex
11 11 import changegroup as changegroupmod, bundle2, pushkey as pushkeymod
12 12 import peer, error, encoding, util, exchange
13 13
14 14
15 15 class abstractserverproto(object):
16 16 """abstract class that summarizes the protocol API
17 17
18 18 Used as reference and documentation.
19 19 """
20 20
21 21 def getargs(self, args):
22 22 """return the value for arguments in <args>
23 23
24 24 returns a list of values (same order as <args>)"""
25 25 raise NotImplementedError()
26 26
27 27 def getfile(self, fp):
28 28 """write the whole content of a file into a file like object
29 29
30 30 The file is in the form::
31 31
32 32 (<chunk-size>\n<chunk>)+0\n
33 33
34 34 chunk size is the ascii version of the int.
35 35 """
36 36 raise NotImplementedError()
37 37
38 38 def redirect(self):
39 39 """may setup interception for stdout and stderr
40 40
41 41 See also the `restore` method."""
42 42 raise NotImplementedError()
43 43
44 44 # If the `redirect` function does install interception, the `restore`
45 45 # function MUST be defined. If interception is not used, this function
46 46 # MUST NOT be defined.
47 47 #
48 48 # left commented here on purpose
49 49 #
50 50 #def restore(self):
51 51 # """reinstall previous stdout and stderr and return intercepted stdout
52 52 # """
53 53 # raise NotImplementedError()
54 54
55 55 def groupchunks(self, cg):
56 56 """return 4096 chunks from a changegroup object
57 57
58 58 Some protocols may have compressed the contents."""
59 59 raise NotImplementedError()
60 60
61 61 class remotebatch(peer.batcher):
62 62 '''batches the queued calls; uses as few roundtrips as possible'''
63 63 def __init__(self, remote):
64 64 '''remote must support _submitbatch(encbatch) and
65 65 _submitone(op, encargs)'''
66 66 peer.batcher.__init__(self)
67 67 self.remote = remote
68 68 def submit(self):
69 69 req, rsp = [], []
70 70 for name, args, opts, resref in self.calls:
71 71 mtd = getattr(self.remote, name)
72 72 batchablefn = getattr(mtd, 'batchable', None)
73 73 if batchablefn is not None:
74 74 batchable = batchablefn(mtd.im_self, *args, **opts)
75 75 encargsorres, encresref = batchable.next()
76 76 if encresref:
77 77 req.append((name, encargsorres,))
78 78 rsp.append((batchable, encresref, resref,))
79 79 else:
80 80 resref.set(encargsorres)
81 81 else:
82 82 if req:
83 83 self._submitreq(req, rsp)
84 84 req, rsp = [], []
85 85 resref.set(mtd(*args, **opts))
86 86 if req:
87 87 self._submitreq(req, rsp)
88 88 def _submitreq(self, req, rsp):
89 89 encresults = self.remote._submitbatch(req)
90 90 for encres, r in zip(encresults, rsp):
91 91 batchable, encresref, resref = r
92 92 encresref.set(encres)
93 93 resref.set(batchable.next())
94 94
95 95 # Forward a couple of names from peer to make wireproto interactions
96 96 # slightly more sensible.
97 97 batchable = peer.batchable
98 98 future = peer.future
99 99
100 100 # list of nodes encoding / decoding
101 101
102 102 def decodelist(l, sep=' '):
103 103 if l:
104 104 return map(bin, l.split(sep))
105 105 return []
106 106
107 107 def encodelist(l, sep=' '):
108 108 try:
109 109 return sep.join(map(hex, l))
110 110 except TypeError:
111 111 raise
112 112
113 113 # batched call argument encoding
114 114
115 115 def escapearg(plain):
116 116 return (plain
117 117 .replace(':', ':c')
118 118 .replace(',', ':o')
119 119 .replace(';', ':s')
120 120 .replace('=', ':e'))
121 121
122 122 def unescapearg(escaped):
123 123 return (escaped
124 124 .replace(':e', '=')
125 125 .replace(':s', ';')
126 126 .replace(':o', ',')
127 127 .replace(':c', ':'))
128 128
129 129 # mapping of options accepted by getbundle and their types
130 130 #
131 131 # Meant to be extended by extensions. It is extensions responsibility to ensure
132 132 # such options are properly processed in exchange.getbundle.
133 133 #
134 134 # supported types are:
135 135 #
136 136 # :nodes: list of binary nodes
137 137 # :csv: list of comma-separated values
138 138 # :scsv: list of comma-separated values return as set
139 139 # :plain: string with no transformation needed.
140 140 gboptsmap = {'heads': 'nodes',
141 141 'common': 'nodes',
142 142 'obsmarkers': 'boolean',
143 143 'bundlecaps': 'scsv',
144 144 'listkeys': 'csv',
145 145 'cg': 'boolean'}
146 146
147 147 # client side
148 148
149 149 class wirepeer(peer.peerrepository):
150 150
151 151 def batch(self):
152 return remotebatch(self)
152 if self.capable('batch'):
153 return remotebatch(self)
154 else:
155 return peer.localbatch(self)
153 156 def _submitbatch(self, req):
154 157 cmds = []
155 158 for op, argsdict in req:
156 159 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
157 160 for k, v in argsdict.iteritems())
158 161 cmds.append('%s %s' % (op, args))
159 162 rsp = self._call("batch", cmds=';'.join(cmds))
160 163 return [unescapearg(r) for r in rsp.split(';')]
161 164 def _submitone(self, op, args):
162 165 return self._call(op, **args)
163 166
164 167 @batchable
165 168 def lookup(self, key):
166 169 self.requirecap('lookup', _('look up remote revision'))
167 170 f = future()
168 171 yield {'key': encoding.fromlocal(key)}, f
169 172 d = f.value
170 173 success, data = d[:-1].split(" ", 1)
171 174 if int(success):
172 175 yield bin(data)
173 176 self._abort(error.RepoError(data))
174 177
175 178 @batchable
176 179 def heads(self):
177 180 f = future()
178 181 yield {}, f
179 182 d = f.value
180 183 try:
181 184 yield decodelist(d[:-1])
182 185 except ValueError:
183 186 self._abort(error.ResponseError(_("unexpected response:"), d))
184 187
185 188 @batchable
186 189 def known(self, nodes):
187 190 f = future()
188 191 yield {'nodes': encodelist(nodes)}, f
189 192 d = f.value
190 193 try:
191 194 yield [bool(int(b)) for b in d]
192 195 except ValueError:
193 196 self._abort(error.ResponseError(_("unexpected response:"), d))
194 197
195 198 @batchable
196 199 def branchmap(self):
197 200 f = future()
198 201 yield {}, f
199 202 d = f.value
200 203 try:
201 204 branchmap = {}
202 205 for branchpart in d.splitlines():
203 206 branchname, branchheads = branchpart.split(' ', 1)
204 207 branchname = encoding.tolocal(urllib.unquote(branchname))
205 208 branchheads = decodelist(branchheads)
206 209 branchmap[branchname] = branchheads
207 210 yield branchmap
208 211 except TypeError:
209 212 self._abort(error.ResponseError(_("unexpected response:"), d))
210 213
211 214 def branches(self, nodes):
212 215 n = encodelist(nodes)
213 216 d = self._call("branches", nodes=n)
214 217 try:
215 218 br = [tuple(decodelist(b)) for b in d.splitlines()]
216 219 return br
217 220 except ValueError:
218 221 self._abort(error.ResponseError(_("unexpected response:"), d))
219 222
220 223 def between(self, pairs):
221 224 batch = 8 # avoid giant requests
222 225 r = []
223 226 for i in xrange(0, len(pairs), batch):
224 227 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
225 228 d = self._call("between", pairs=n)
226 229 try:
227 230 r.extend(l and decodelist(l) or [] for l in d.splitlines())
228 231 except ValueError:
229 232 self._abort(error.ResponseError(_("unexpected response:"), d))
230 233 return r
231 234
232 235 @batchable
233 236 def pushkey(self, namespace, key, old, new):
234 237 if not self.capable('pushkey'):
235 238 yield False, None
236 239 f = future()
237 240 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
238 241 yield {'namespace': encoding.fromlocal(namespace),
239 242 'key': encoding.fromlocal(key),
240 243 'old': encoding.fromlocal(old),
241 244 'new': encoding.fromlocal(new)}, f
242 245 d = f.value
243 246 d, output = d.split('\n', 1)
244 247 try:
245 248 d = bool(int(d))
246 249 except ValueError:
247 250 raise error.ResponseError(
248 251 _('push failed (unexpected response):'), d)
249 252 for l in output.splitlines(True):
250 253 self.ui.status(_('remote: '), l)
251 254 yield d
252 255
253 256 @batchable
254 257 def listkeys(self, namespace):
255 258 if not self.capable('pushkey'):
256 259 yield {}, None
257 260 f = future()
258 261 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
259 262 yield {'namespace': encoding.fromlocal(namespace)}, f
260 263 d = f.value
261 264 self.ui.debug('received listkey for "%s": %i bytes\n'
262 265 % (namespace, len(d)))
263 266 yield pushkeymod.decodekeys(d)
264 267
265 268 def stream_out(self):
266 269 return self._callstream('stream_out')
267 270
268 271 def changegroup(self, nodes, kind):
269 272 n = encodelist(nodes)
270 273 f = self._callcompressable("changegroup", roots=n)
271 274 return changegroupmod.cg1unpacker(f, 'UN')
272 275
273 276 def changegroupsubset(self, bases, heads, kind):
274 277 self.requirecap('changegroupsubset', _('look up remote changes'))
275 278 bases = encodelist(bases)
276 279 heads = encodelist(heads)
277 280 f = self._callcompressable("changegroupsubset",
278 281 bases=bases, heads=heads)
279 282 return changegroupmod.cg1unpacker(f, 'UN')
280 283
281 284 def getbundle(self, source, **kwargs):
282 285 self.requirecap('getbundle', _('look up remote changes'))
283 286 opts = {}
284 287 bundlecaps = kwargs.get('bundlecaps')
285 288 if bundlecaps is not None:
286 289 kwargs['bundlecaps'] = sorted(bundlecaps)
287 290 else:
288 291 bundlecaps = () # kwargs could have it to None
289 292 for key, value in kwargs.iteritems():
290 293 if value is None:
291 294 continue
292 295 keytype = gboptsmap.get(key)
293 296 if keytype is None:
294 297 assert False, 'unexpected'
295 298 elif keytype == 'nodes':
296 299 value = encodelist(value)
297 300 elif keytype in ('csv', 'scsv'):
298 301 value = ','.join(value)
299 302 elif keytype == 'boolean':
300 303 value = '%i' % bool(value)
301 304 elif keytype != 'plain':
302 305 raise KeyError('unknown getbundle option type %s'
303 306 % keytype)
304 307 opts[key] = value
305 308 f = self._callcompressable("getbundle", **opts)
306 309 if any((cap.startswith('HG2') for cap in bundlecaps)):
307 310 return bundle2.getunbundler(self.ui, f)
308 311 else:
309 312 return changegroupmod.cg1unpacker(f, 'UN')
310 313
311 314 def unbundle(self, cg, heads, source):
312 315 '''Send cg (a readable file-like object representing the
313 316 changegroup to push, typically a chunkbuffer object) to the
314 317 remote server as a bundle.
315 318
316 319 When pushing a bundle10 stream, return an integer indicating the
317 320 result of the push (see localrepository.addchangegroup()).
318 321
319 322 When pushing a bundle20 stream, return a bundle20 stream.'''
320 323
321 324 if heads != ['force'] and self.capable('unbundlehash'):
322 325 heads = encodelist(['hashed',
323 326 util.sha1(''.join(sorted(heads))).digest()])
324 327 else:
325 328 heads = encodelist(heads)
326 329
327 330 if util.safehasattr(cg, 'deltaheader'):
328 331 # this a bundle10, do the old style call sequence
329 332 ret, output = self._callpush("unbundle", cg, heads=heads)
330 333 if ret == "":
331 334 raise error.ResponseError(
332 335 _('push failed:'), output)
333 336 try:
334 337 ret = int(ret)
335 338 except ValueError:
336 339 raise error.ResponseError(
337 340 _('push failed (unexpected response):'), ret)
338 341
339 342 for l in output.splitlines(True):
340 343 self.ui.status(_('remote: '), l)
341 344 else:
342 345 # bundle2 push. Send a stream, fetch a stream.
343 346 stream = self._calltwowaystream('unbundle', cg, heads=heads)
344 347 ret = bundle2.getunbundler(self.ui, stream)
345 348 return ret
346 349
347 350 def debugwireargs(self, one, two, three=None, four=None, five=None):
348 351 # don't pass optional arguments left at their default value
349 352 opts = {}
350 353 if three is not None:
351 354 opts['three'] = three
352 355 if four is not None:
353 356 opts['four'] = four
354 357 return self._call('debugwireargs', one=one, two=two, **opts)
355 358
356 359 def _call(self, cmd, **args):
357 360 """execute <cmd> on the server
358 361
359 362 The command is expected to return a simple string.
360 363
361 364 returns the server reply as a string."""
362 365 raise NotImplementedError()
363 366
364 367 def _callstream(self, cmd, **args):
365 368 """execute <cmd> on the server
366 369
367 370 The command is expected to return a stream.
368 371
369 372 returns the server reply as a file like object."""
370 373 raise NotImplementedError()
371 374
372 375 def _callcompressable(self, cmd, **args):
373 376 """execute <cmd> on the server
374 377
375 378 The command is expected to return a stream.
376 379
377 380 The stream may have been compressed in some implementations. This
378 381 function takes care of the decompression. This is the only difference
379 382 with _callstream.
380 383
381 384 returns the server reply as a file like object.
382 385 """
383 386 raise NotImplementedError()
384 387
385 388 def _callpush(self, cmd, fp, **args):
386 389 """execute a <cmd> on server
387 390
388 391 The command is expected to be related to a push. Push has a special
389 392 return method.
390 393
391 394 returns the server reply as a (ret, output) tuple. ret is either
392 395 empty (error) or a stringified int.
393 396 """
394 397 raise NotImplementedError()
395 398
396 399 def _calltwowaystream(self, cmd, fp, **args):
397 400 """execute <cmd> on server
398 401
399 402 The command will send a stream to the server and get a stream in reply.
400 403 """
401 404 raise NotImplementedError()
402 405
403 406 def _abort(self, exception):
404 407 """clearly abort the wire protocol connection and raise the exception
405 408 """
406 409 raise NotImplementedError()
407 410
408 411 # server side
409 412
410 413 # wire protocol command can either return a string or one of these classes.
411 414 class streamres(object):
412 415 """wireproto reply: binary stream
413 416
414 417 The call was successful and the result is a stream.
415 418 Iterate on the `self.gen` attribute to retrieve chunks.
416 419 """
417 420 def __init__(self, gen):
418 421 self.gen = gen
419 422
420 423 class pushres(object):
421 424 """wireproto reply: success with simple integer return
422 425
423 426 The call was successful and returned an integer contained in `self.res`.
424 427 """
425 428 def __init__(self, res):
426 429 self.res = res
427 430
428 431 class pusherr(object):
429 432 """wireproto reply: failure
430 433
431 434 The call failed. The `self.res` attribute contains the error message.
432 435 """
433 436 def __init__(self, res):
434 437 self.res = res
435 438
436 439 class ooberror(object):
437 440 """wireproto reply: failure of a batch of operation
438 441
439 442 Something failed during a batch call. The error message is stored in
440 443 `self.message`.
441 444 """
442 445 def __init__(self, message):
443 446 self.message = message
444 447
445 448 def dispatch(repo, proto, command):
446 449 repo = repo.filtered("served")
447 450 func, spec = commands[command]
448 451 args = proto.getargs(spec)
449 452 return func(repo, proto, *args)
450 453
451 454 def options(cmd, keys, others):
452 455 opts = {}
453 456 for k in keys:
454 457 if k in others:
455 458 opts[k] = others[k]
456 459 del others[k]
457 460 if others:
458 461 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
459 462 % (cmd, ",".join(others)))
460 463 return opts
461 464
462 465 # list of commands
463 466 commands = {}
464 467
465 468 def wireprotocommand(name, args=''):
466 469 """decorator for wire protocol command"""
467 470 def register(func):
468 471 commands[name] = (func, args)
469 472 return func
470 473 return register
471 474
472 475 @wireprotocommand('batch', 'cmds *')
473 476 def batch(repo, proto, cmds, others):
474 477 repo = repo.filtered("served")
475 478 res = []
476 479 for pair in cmds.split(';'):
477 480 op, args = pair.split(' ', 1)
478 481 vals = {}
479 482 for a in args.split(','):
480 483 if a:
481 484 n, v = a.split('=')
482 485 vals[n] = unescapearg(v)
483 486 func, spec = commands[op]
484 487 if spec:
485 488 keys = spec.split()
486 489 data = {}
487 490 for k in keys:
488 491 if k == '*':
489 492 star = {}
490 493 for key in vals.keys():
491 494 if key not in keys:
492 495 star[key] = vals[key]
493 496 data['*'] = star
494 497 else:
495 498 data[k] = vals[k]
496 499 result = func(repo, proto, *[data[k] for k in keys])
497 500 else:
498 501 result = func(repo, proto)
499 502 if isinstance(result, ooberror):
500 503 return result
501 504 res.append(escapearg(result))
502 505 return ';'.join(res)
503 506
504 507 @wireprotocommand('between', 'pairs')
505 508 def between(repo, proto, pairs):
506 509 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
507 510 r = []
508 511 for b in repo.between(pairs):
509 512 r.append(encodelist(b) + "\n")
510 513 return "".join(r)
511 514
512 515 @wireprotocommand('branchmap')
513 516 def branchmap(repo, proto):
514 517 branchmap = repo.branchmap()
515 518 heads = []
516 519 for branch, nodes in branchmap.iteritems():
517 520 branchname = urllib.quote(encoding.fromlocal(branch))
518 521 branchnodes = encodelist(nodes)
519 522 heads.append('%s %s' % (branchname, branchnodes))
520 523 return '\n'.join(heads)
521 524
522 525 @wireprotocommand('branches', 'nodes')
523 526 def branches(repo, proto, nodes):
524 527 nodes = decodelist(nodes)
525 528 r = []
526 529 for b in repo.branches(nodes):
527 530 r.append(encodelist(b) + "\n")
528 531 return "".join(r)
529 532
530 533
531 534 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
532 535 'known', 'getbundle', 'unbundlehash', 'batch']
533 536
534 537 def _capabilities(repo, proto):
535 538 """return a list of capabilities for a repo
536 539
537 540 This function exists to allow extensions to easily wrap capabilities
538 541 computation
539 542
540 543 - returns a lists: easy to alter
541 544 - change done here will be propagated to both `capabilities` and `hello`
542 545 command without any other action needed.
543 546 """
544 547 # copy to prevent modification of the global list
545 548 caps = list(wireprotocaps)
546 549 if _allowstream(repo.ui):
547 550 if repo.ui.configbool('server', 'preferuncompressed', False):
548 551 caps.append('stream-preferred')
549 552 requiredformats = repo.requirements & repo.supportedformats
550 553 # if our local revlogs are just revlogv1, add 'stream' cap
551 554 if not requiredformats - set(('revlogv1',)):
552 555 caps.append('stream')
553 556 # otherwise, add 'streamreqs' detailing our local revlog format
554 557 else:
555 558 caps.append('streamreqs=%s' % ','.join(requiredformats))
556 559 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
557 560 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
558 561 caps.append('bundle2=' + urllib.quote(capsblob))
559 562 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
560 563 caps.append(
561 564 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
562 565 return caps
563 566
564 567 # If you are writing an extension and consider wrapping this function. Wrap
565 568 # `_capabilities` instead.
566 569 @wireprotocommand('capabilities')
567 570 def capabilities(repo, proto):
568 571 return ' '.join(_capabilities(repo, proto))
569 572
570 573 @wireprotocommand('changegroup', 'roots')
571 574 def changegroup(repo, proto, roots):
572 575 nodes = decodelist(roots)
573 576 cg = changegroupmod.changegroup(repo, nodes, 'serve')
574 577 return streamres(proto.groupchunks(cg))
575 578
576 579 @wireprotocommand('changegroupsubset', 'bases heads')
577 580 def changegroupsubset(repo, proto, bases, heads):
578 581 bases = decodelist(bases)
579 582 heads = decodelist(heads)
580 583 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
581 584 return streamres(proto.groupchunks(cg))
582 585
583 586 @wireprotocommand('debugwireargs', 'one two *')
584 587 def debugwireargs(repo, proto, one, two, others):
585 588 # only accept optional args from the known set
586 589 opts = options('debugwireargs', ['three', 'four'], others)
587 590 return repo.debugwireargs(one, two, **opts)
588 591
589 592 # List of options accepted by getbundle.
590 593 #
591 594 # Meant to be extended by extensions. It is the extension's responsibility to
592 595 # ensure such options are properly processed in exchange.getbundle.
593 596 gboptslist = ['heads', 'common', 'bundlecaps']
594 597
595 598 @wireprotocommand('getbundle', '*')
596 599 def getbundle(repo, proto, others):
597 600 opts = options('getbundle', gboptsmap.keys(), others)
598 601 for k, v in opts.iteritems():
599 602 keytype = gboptsmap[k]
600 603 if keytype == 'nodes':
601 604 opts[k] = decodelist(v)
602 605 elif keytype == 'csv':
603 606 opts[k] = list(v.split(','))
604 607 elif keytype == 'scsv':
605 608 opts[k] = set(v.split(','))
606 609 elif keytype == 'boolean':
607 610 opts[k] = bool(v)
608 611 elif keytype != 'plain':
609 612 raise KeyError('unknown getbundle option type %s'
610 613 % keytype)
611 614 cg = exchange.getbundle(repo, 'serve', **opts)
612 615 return streamres(proto.groupchunks(cg))
613 616
614 617 @wireprotocommand('heads')
615 618 def heads(repo, proto):
616 619 h = repo.heads()
617 620 return encodelist(h) + "\n"
618 621
619 622 @wireprotocommand('hello')
620 623 def hello(repo, proto):
621 624 '''the hello command returns a set of lines describing various
622 625 interesting things about the server, in an RFC822-like format.
623 626 Currently the only one defined is "capabilities", which
624 627 consists of a line in the form:
625 628
626 629 capabilities: space separated list of tokens
627 630 '''
628 631 return "capabilities: %s\n" % (capabilities(repo, proto))
629 632
630 633 @wireprotocommand('listkeys', 'namespace')
631 634 def listkeys(repo, proto, namespace):
632 635 d = repo.listkeys(encoding.tolocal(namespace)).items()
633 636 return pushkeymod.encodekeys(d)
634 637
635 638 @wireprotocommand('lookup', 'key')
636 639 def lookup(repo, proto, key):
637 640 try:
638 641 k = encoding.tolocal(key)
639 642 c = repo[k]
640 643 r = c.hex()
641 644 success = 1
642 645 except Exception as inst:
643 646 r = str(inst)
644 647 success = 0
645 648 return "%s %s\n" % (success, r)
646 649
647 650 @wireprotocommand('known', 'nodes *')
648 651 def known(repo, proto, nodes, others):
649 652 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
650 653
651 654 @wireprotocommand('pushkey', 'namespace key old new')
652 655 def pushkey(repo, proto, namespace, key, old, new):
653 656 # compatibility with pre-1.8 clients which were accidentally
654 657 # sending raw binary nodes rather than utf-8-encoded hex
655 658 if len(new) == 20 and new.encode('string-escape') != new:
656 659 # looks like it could be a binary node
657 660 try:
658 661 new.decode('utf-8')
659 662 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
660 663 except UnicodeDecodeError:
661 664 pass # binary, leave unmodified
662 665 else:
663 666 new = encoding.tolocal(new) # normal path
664 667
665 668 if util.safehasattr(proto, 'restore'):
666 669
667 670 proto.redirect()
668 671
669 672 try:
670 673 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
671 674 encoding.tolocal(old), new) or False
672 675 except util.Abort:
673 676 r = False
674 677
675 678 output = proto.restore()
676 679
677 680 return '%s\n%s' % (int(r), output)
678 681
679 682 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
680 683 encoding.tolocal(old), new)
681 684 return '%s\n' % int(r)
682 685
683 686 def _allowstream(ui):
684 687 return ui.configbool('server', 'uncompressed', True, untrusted=True)
685 688
686 689 @wireprotocommand('stream_out')
687 690 def stream(repo, proto):
688 691 '''If the server supports streaming clone, it advertises the "stream"
689 692 capability with a value representing the version and flags of the repo
690 693 it is serving. Client checks to see if it understands the format.
691 694 '''
692 695 if not _allowstream(repo.ui):
693 696 return '1\n'
694 697
695 698 def getstream(it):
696 699 yield '0\n'
697 700 for chunk in it:
698 701 yield chunk
699 702
700 703 try:
701 704 # LockError may be raised before the first result is yielded. Don't
702 705 # emit output until we're sure we got the lock successfully.
703 706 it = exchange.generatestreamclone(repo)
704 707 return streamres(getstream(it))
705 708 except error.LockError:
706 709 return '2\n'
707 710
708 711 @wireprotocommand('unbundle', 'heads')
709 712 def unbundle(repo, proto, heads):
710 713 their_heads = decodelist(heads)
711 714
712 715 try:
713 716 proto.redirect()
714 717
715 718 exchange.check_heads(repo, their_heads, 'preparing changes')
716 719
717 720 # write bundle data to temporary file because it can be big
718 721 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
719 722 fp = os.fdopen(fd, 'wb+')
720 723 r = 0
721 724 try:
722 725 proto.getfile(fp)
723 726 fp.seek(0)
724 727 gen = exchange.readbundle(repo.ui, fp, None)
725 728 r = exchange.unbundle(repo, gen, their_heads, 'serve',
726 729 proto._client())
727 730 if util.safehasattr(r, 'addpart'):
728 731 # The return looks streamable, we are in the bundle2 case and
729 732 # should return a stream.
730 733 return streamres(r.getchunks())
731 734 return pushres(r)
732 735
733 736 finally:
734 737 fp.close()
735 738 os.unlink(tempname)
736 739
737 740 except (error.BundleValueError, util.Abort, error.PushRaced) as exc:
738 741 # handle non-bundle2 case first
739 742 if not getattr(exc, 'duringunbundle2', False):
740 743 try:
741 744 raise
742 745 except util.Abort:
743 746 # The old code we moved used sys.stderr directly.
744 747 # We did not change it to minimise code change.
745 748 # This need to be moved to something proper.
746 749 # Feel free to do it.
747 750 sys.stderr.write("abort: %s\n" % exc)
748 751 return pushres(0)
749 752 except error.PushRaced:
750 753 return pusherr(str(exc))
751 754
752 755 bundler = bundle2.bundle20(repo.ui)
753 756 for out in getattr(exc, '_bundle2salvagedoutput', ()):
754 757 bundler.addpart(out)
755 758 try:
756 759 try:
757 760 raise
758 761 except error.PushkeyFailed as exc:
759 762 # check client caps
760 763 remotecaps = getattr(exc, '_replycaps', None)
761 764 if (remotecaps is not None
762 765 and 'pushkey' not in remotecaps.get('error', ())):
763 766 # no support remote side, fallback to Abort handler.
764 767 raise
765 768 part = bundler.newpart('error:pushkey')
766 769 part.addparam('in-reply-to', exc.partid)
767 770 if exc.namespace is not None:
768 771 part.addparam('namespace', exc.namespace, mandatory=False)
769 772 if exc.key is not None:
770 773 part.addparam('key', exc.key, mandatory=False)
771 774 if exc.new is not None:
772 775 part.addparam('new', exc.new, mandatory=False)
773 776 if exc.old is not None:
774 777 part.addparam('old', exc.old, mandatory=False)
775 778 if exc.ret is not None:
776 779 part.addparam('ret', exc.ret, mandatory=False)
777 780 except error.BundleValueError as exc:
778 781 errpart = bundler.newpart('error:unsupportedcontent')
779 782 if exc.parttype is not None:
780 783 errpart.addparam('parttype', exc.parttype)
781 784 if exc.params:
782 785 errpart.addparam('params', '\0'.join(exc.params))
783 786 except util.Abort as exc:
784 787 manargs = [('message', str(exc))]
785 788 advargs = []
786 789 if exc.hint is not None:
787 790 advargs.append(('hint', exc.hint))
788 791 bundler.addpart(bundle2.bundlepart('error:abort',
789 792 manargs, advargs))
790 793 except error.PushRaced as exc:
791 794 bundler.newpart('error:pushraced', [('message', str(exc))])
792 795 return streamres(bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now