##// END OF EJS Templates
py3: fix loop over byte string in wireprotov1peer...
Yuya Nishihara -
r38350:aa9dd805 default
parent child Browse files
Show More
@@ -1,507 +1,511
1 1 test-abort-checkin.t
2 2 test-add.t
3 3 test-addremove-similar.t
4 4 test-addremove.t
5 5 test-amend-subrepo.t
6 6 test-amend.t
7 7 test-ancestor.py
8 8 test-annotate.py
9 9 test-annotate.t
10 10 test-archive-symlinks.t
11 11 test-atomictempfile.py
12 12 test-audit-path.t
13 13 test-audit-subrepo.t
14 14 test-automv.t
15 15 test-backout.t
16 16 test-backwards-remove.t
17 17 test-basic.t
18 18 test-bheads.t
19 19 test-bisect.t
20 20 test-bisect2.t
21 21 test-bisect3.t
22 22 test-blackbox.t
23 23 test-bookmarks-current.t
24 24 test-bookmarks-merge.t
25 25 test-bookmarks-rebase.t
26 26 test-bookmarks-strip.t
27 27 test-bookmarks.t
28 28 test-branch-change.t
29 29 test-branch-option.t
30 30 test-branch-tag-confict.t
31 31 test-branches.t
32 32 test-bundle-phases.t
33 33 test-bundle-type.t
34 34 test-bundle-vs-outgoing.t
35 35 test-bundle2-multiple-changegroups.t
36 36 test-cappedreader.py
37 37 test-casecollision.t
38 38 test-cat.t
39 39 test-cbor.py
40 40 test-censor.t
41 41 test-changelog-exec.t
42 42 test-check-commit.t
43 43 test-check-execute.t
44 44 test-check-interfaces.py
45 45 test-check-module-imports.t
46 46 test-check-pyflakes.t
47 47 test-check-pylint.t
48 48 test-check-shbang.t
49 49 test-children.t
50 50 test-clone-cgi.t
51 51 test-clone-pull-corruption.t
52 52 test-clone-r.t
53 53 test-clone-update-order.t
54 54 test-command-template.t
55 55 test-commit-amend.t
56 56 test-commit-interactive.t
57 57 test-commit-multiple.t
58 58 test-commit-unresolved.t
59 59 test-commit.t
60 60 test-committer.t
61 61 test-completion.t
62 62 test-config-env.py
63 63 test-config.t
64 64 test-conflict.t
65 65 test-confused-revert.t
66 66 test-context.py
67 67 test-contrib-check-code.t
68 68 test-contrib-check-commit.t
69 69 test-convert-authormap.t
70 70 test-convert-clonebranches.t
71 71 test-convert-cvs-branch.t
72 72 test-convert-cvs-detectmerge.t
73 73 test-convert-cvs-synthetic.t
74 74 test-convert-cvs.t
75 75 test-convert-cvsnt-mergepoints.t
76 76 test-convert-datesort.t
77 77 test-convert-filemap.t
78 78 test-convert-hg-sink.t
79 79 test-convert-hg-source.t
80 80 test-convert-hg-startrev.t
81 81 test-convert-splicemap.t
82 82 test-convert-tagsbranch-topology.t
83 83 test-copy-move-merge.t
84 84 test-copy.t
85 85 test-copytrace-heuristics.t
86 86 test-debugbuilddag.t
87 87 test-debugbundle.t
88 88 test-debugextensions.t
89 89 test-debugindexdot.t
90 90 test-debugrename.t
91 91 test-default-push.t
92 92 test-diff-binary-file.t
93 93 test-diff-change.t
94 94 test-diff-copy-depth.t
95 95 test-diff-hashes.t
96 96 test-diff-ignore-whitespace.t
97 97 test-diff-indent-heuristic.t
98 98 test-diff-issue2761.t
99 99 test-diff-newlines.t
100 100 test-diff-reverse.t
101 101 test-diff-subdir.t
102 102 test-diff-unified.t
103 103 test-diff-upgrade.t
104 104 test-diffdir.t
105 105 test-diffstat.t
106 106 test-directaccess.t
107 107 test-dirstate-backup.t
108 108 test-dirstate-nonnormalset.t
109 109 test-dirstate.t
110 110 test-dispatch.py
111 111 test-doctest.py
112 112 test-double-merge.t
113 113 test-drawdag.t
114 114 test-duplicateoptions.py
115 115 test-editor-filename.t
116 116 test-empty-dir.t
117 117 test-empty-file.t
118 118 test-empty-group.t
119 119 test-empty.t
120 120 test-encode.t
121 121 test-encoding-func.py
122 122 test-encoding.t
123 123 test-eol-add.t
124 124 test-eol-clone.t
125 125 test-eol-hook.t
126 126 test-eol-tag.t
127 127 test-eol-update.t
128 128 test-excessive-merge.t
129 129 test-exchange-obsmarkers-case-A1.t
130 130 test-exchange-obsmarkers-case-A2.t
131 131 test-exchange-obsmarkers-case-A3.t
132 132 test-exchange-obsmarkers-case-A4.t
133 133 test-exchange-obsmarkers-case-A5.t
134 134 test-exchange-obsmarkers-case-A6.t
135 135 test-exchange-obsmarkers-case-A7.t
136 136 test-exchange-obsmarkers-case-B1.t
137 137 test-exchange-obsmarkers-case-B2.t
138 138 test-exchange-obsmarkers-case-B3.t
139 139 test-exchange-obsmarkers-case-B4.t
140 140 test-exchange-obsmarkers-case-B5.t
141 141 test-exchange-obsmarkers-case-B6.t
142 142 test-exchange-obsmarkers-case-B7.t
143 143 test-exchange-obsmarkers-case-C1.t
144 144 test-exchange-obsmarkers-case-C2.t
145 145 test-exchange-obsmarkers-case-C3.t
146 146 test-exchange-obsmarkers-case-C4.t
147 147 test-exchange-obsmarkers-case-D1.t
148 148 test-exchange-obsmarkers-case-D2.t
149 149 test-exchange-obsmarkers-case-D3.t
150 150 test-exchange-obsmarkers-case-D4.t
151 151 test-execute-bit.t
152 152 test-export.t
153 153 test-extdata.t
154 154 test-extdiff.t
155 155 test-extensions-afterloaded.t
156 156 test-extensions-wrapfunction.py
157 157 test-extra-filelog-entry.t
158 158 test-filebranch.t
159 159 test-filecache.py
160 160 test-filelog.py
161 161 test-fileset-generated.t
162 162 test-fileset.t
163 163 test-fix-topology.t
164 164 test-flags.t
165 165 test-generaldelta.t
166 166 test-getbundle.t
167 167 test-git-export.t
168 168 test-glog-topological.t
169 169 test-gpg.t
170 170 test-graft.t
171 171 test-hg-parseurl.py
172 172 test-hghave.t
173 173 test-hgignore.t
174 174 test-hgk.t
175 175 test-hgrc.t
176 176 test-hgweb-bundle.t
177 177 test-hgweb-descend-empties.t
178 178 test-hgweb-empty.t
179 179 test-hgweb-removed.t
180 180 test-hgwebdir-paths.py
181 181 test-hgwebdirsym.t
182 182 test-histedit-arguments.t
183 183 test-histedit-base.t
184 184 test-histedit-bookmark-motion.t
185 185 test-histedit-commute.t
186 186 test-histedit-drop.t
187 187 test-histedit-edit.t
188 188 test-histedit-fold-non-commute.t
189 189 test-histedit-fold.t
190 190 test-histedit-no-change.t
191 191 test-histedit-non-commute-abort.t
192 192 test-histedit-non-commute.t
193 193 test-histedit-obsolete.t
194 194 test-histedit-outgoing.t
195 195 test-histedit-templates.t
196 196 test-http-branchmap.t
197 197 test-http-bundle1.t
198 198 test-http-clone-r.t
199 199 test-http.t
200 200 test-hybridencode.py
201 201 test-identify.t
202 202 test-import-merge.t
203 203 test-import-unknown.t
204 204 test-import.t
205 205 test-imports-checker.t
206 206 test-incoming-outgoing.t
207 207 test-inherit-mode.t
208 test-init.t
208 209 test-issue1089.t
209 210 test-issue1102.t
210 211 test-issue1175.t
211 212 test-issue1306.t
212 213 test-issue1438.t
213 214 test-issue1502.t
214 215 test-issue1802.t
215 216 test-issue1877.t
216 217 test-issue1993.t
217 218 test-issue2137.t
218 219 test-issue3084.t
219 220 test-issue4074.t
220 221 test-issue522.t
221 222 test-issue586.t
222 223 test-issue612.t
223 224 test-issue619.t
224 225 test-issue660.t
225 226 test-issue672.t
226 227 test-issue842.t
227 228 test-journal-exists.t
228 229 test-journal-share.t
229 230 test-journal.t
231 test-known.t
230 232 test-largefiles-cache.t
231 233 test-largefiles-misc.t
232 234 test-largefiles-small-disk.t
233 235 test-largefiles-update.t
234 236 test-largefiles.t
235 237 test-lfs-largefiles.t
236 238 test-lfs-pointer.py
237 239 test-linerange.py
238 240 test-locate.t
239 241 test-lock-badness.t
240 242 test-log-linerange.t
241 243 test-log.t
242 244 test-logexchange.t
243 245 test-lrucachedict.py
244 246 test-mactext.t
245 247 test-mailmap.t
246 248 test-manifest-merging.t
247 249 test-manifest.py
248 250 test-manifest.t
249 251 test-match.py
250 252 test-mdiff.py
251 253 test-merge-changedelete.t
252 254 test-merge-closedheads.t
253 255 test-merge-commit.t
254 256 test-merge-criss-cross.t
255 257 test-merge-default.t
256 258 test-merge-force.t
257 259 test-merge-halt.t
258 260 test-merge-internal-tools-pattern.t
259 261 test-merge-local.t
260 262 test-merge-remove.t
261 263 test-merge-revert.t
262 264 test-merge-revert2.t
263 265 test-merge-subrepos.t
264 266 test-merge-symlinks.t
265 267 test-merge-tools.t
266 268 test-merge-types.t
267 269 test-merge1.t
268 270 test-merge10.t
269 271 test-merge2.t
270 272 test-merge4.t
271 273 test-merge5.t
272 274 test-merge6.t
273 275 test-merge7.t
274 276 test-merge8.t
275 277 test-merge9.t
276 278 test-minifileset.py
277 279 test-minirst.py
278 280 test-mq-git.t
279 281 test-mq-header-date.t
280 282 test-mq-header-from.t
281 283 test-mq-merge.t
282 284 test-mq-pull-from-bundle.t
283 285 test-mq-qclone-http.t
284 286 test-mq-qdelete.t
285 287 test-mq-qdiff.t
286 288 test-mq-qfold.t
287 289 test-mq-qgoto.t
288 290 test-mq-qimport-fail-cleanup.t
289 291 test-mq-qnew.t
290 292 test-mq-qpush-exact.t
291 293 test-mq-qqueue.t
292 294 test-mq-qrefresh-interactive.t
293 295 test-mq-qrefresh-replace-log-message.t
294 296 test-mq-qrefresh.t
295 297 test-mq-qrename.t
296 298 test-mq-qsave.t
297 299 test-mq-safety.t
298 300 test-mq-subrepo.t
299 301 test-mq-symlinks.t
300 302 test-mv-cp-st-diff.t
301 303 test-narrow-archive.t
302 304 test-narrow-clone-no-ellipsis.t
303 305 test-narrow-clone-non-narrow-server.t
304 306 test-narrow-clone-nonlinear.t
305 307 test-narrow-clone.t
306 308 test-narrow-commit.t
307 309 test-narrow-copies.t
308 310 test-narrow-debugcommands.t
309 311 test-narrow-debugrebuilddirstate.t
310 312 test-narrow-exchange-merges.t
311 313 test-narrow-exchange.t
312 314 test-narrow-expanddirstate.t
313 315 test-narrow-merge.t
314 316 test-narrow-patch.t
315 317 test-narrow-patterns.t
316 318 test-narrow-pull.t
317 319 test-narrow-rebase.t
318 320 test-narrow-shallow-merges.t
319 321 test-narrow-shallow.t
320 322 test-narrow-strip.t
321 323 test-narrow-update.t
322 324 test-nested-repo.t
323 325 test-newbranch.t
324 326 test-obshistory.t
325 327 test-obsmarker-template.t
326 328 test-obsmarkers-effectflag.t
327 329 test-obsolete-bundle-strip.t
328 330 test-obsolete-changeset-exchange.t
329 331 test-obsolete-checkheads.t
330 332 test-obsolete-distributed.t
331 333 test-obsolete-tag-cache.t
332 334 test-pager.t
333 335 test-parents.t
334 336 test-parseindex2.py
335 337 test-pathconflicts-merge.t
336 338 test-pathconflicts-update.t
337 339 test-pathencode.py
338 340 test-pending.t
339 341 test-permissions.t
340 342 test-phases.t
341 343 test-pull-branch.t
342 344 test-pull-http.t
343 345 test-pull-permission.t
344 346 test-pull-pull-corruption.t
345 347 test-pull-r.t
346 348 test-pull-update.t
347 349 test-pull.t
348 350 test-purge.t
349 351 test-push-checkheads-partial-C1.t
350 352 test-push-checkheads-partial-C2.t
351 353 test-push-checkheads-partial-C3.t
352 354 test-push-checkheads-partial-C4.t
353 355 test-push-checkheads-pruned-B1.t
354 356 test-push-checkheads-pruned-B2.t
355 357 test-push-checkheads-pruned-B3.t
356 358 test-push-checkheads-pruned-B4.t
357 359 test-push-checkheads-pruned-B5.t
358 360 test-push-checkheads-pruned-B6.t
359 361 test-push-checkheads-pruned-B7.t
360 362 test-push-checkheads-pruned-B8.t
361 363 test-push-checkheads-superceed-A1.t
362 364 test-push-checkheads-superceed-A2.t
363 365 test-push-checkheads-superceed-A3.t
364 366 test-push-checkheads-superceed-A4.t
365 367 test-push-checkheads-superceed-A5.t
366 368 test-push-checkheads-superceed-A6.t
367 369 test-push-checkheads-superceed-A7.t
368 370 test-push-checkheads-superceed-A8.t
369 371 test-push-checkheads-unpushed-D1.t
370 372 test-push-checkheads-unpushed-D2.t
371 373 test-push-checkheads-unpushed-D3.t
372 374 test-push-checkheads-unpushed-D4.t
373 375 test-push-checkheads-unpushed-D5.t
374 376 test-push-checkheads-unpushed-D6.t
375 377 test-push-checkheads-unpushed-D7.t
376 378 test-push-http.t
377 379 test-push-warn.t
378 380 test-push.t
379 381 test-pushvars.t
380 382 test-rebase-abort.t
381 383 test-rebase-base-flag.t
382 384 test-rebase-bookmarks.t
383 385 test-rebase-brute-force.t
384 386 test-rebase-cache.t
385 387 test-rebase-check-restore.t
386 388 test-rebase-collapse.t
387 389 test-rebase-conflicts.t
388 390 test-rebase-dest.t
389 391 test-rebase-detach.t
390 392 test-rebase-emptycommit.t
391 393 test-rebase-inmemory.t
392 394 test-rebase-interruptions.t
393 395 test-rebase-issue-noparam-single-rev.t
394 396 test-rebase-legacy.t
395 397 test-rebase-mq-skip.t
396 398 test-rebase-mq.t
397 399 test-rebase-named-branches.t
398 400 test-rebase-newancestor.t
399 401 test-rebase-obsolete.t
400 402 test-rebase-parameters.t
401 403 test-rebase-partial.t
402 404 test-rebase-pull.t
403 405 test-rebase-rename.t
404 406 test-rebase-scenario-global.t
405 407 test-rebase-templates.t
406 408 test-rebase-transaction.t
407 409 test-rebuildstate.t
408 410 test-record.t
409 411 test-relink.t
410 412 test-remove.t
411 413 test-rename-after-merge.t
412 414 test-rename-dir-merge.t
413 415 test-rename-merge1.t
414 416 test-rename.t
415 417 test-repair-strip.t
416 418 test-repo-compengines.t
417 419 test-resolve.t
418 420 test-revert-flags.t
419 421 test-revert-interactive.t
420 422 test-revert-unknown.t
421 423 test-revlog-ancestry.py
422 424 test-revlog-group-emptyiter.t
423 425 test-revlog-mmapindex.t
424 426 test-revlog-packentry.t
425 427 test-revlog-raw.py
426 428 test-revset-dirstate-parents.t
427 429 test-revset-legacy-lookup.t
428 430 test-revset-outgoing.t
429 431 test-rollback.t
430 432 test-run-tests.py
431 433 test-run-tests.t
432 434 test-schemes.t
433 435 test-serve.t
434 436 test-setdiscovery.t
435 437 test-share.t
436 438 test-shelve.t
437 439 test-show-stack.t
438 440 test-show-work.t
439 441 test-show.t
440 442 test-simple-update.t
441 443 test-simplekeyvaluefile.py
442 444 test-simplemerge.py
443 445 test-single-head.t
444 446 test-sparse-clear.t
445 447 test-sparse-clone.t
446 448 test-sparse-import.t
447 449 test-sparse-merges.t
448 450 test-sparse-profiles.t
449 451 test-sparse-requirement.t
450 452 test-sparse-verbose-json.t
451 453 test-sparse.t
452 454 test-split.t
455 test-ssh-bundle1.t
453 456 test-ssh-clone-r.t
454 457 test-ssh-proto-unbundle.t
455 458 test-ssh-proto.t
459 test-ssh.t
456 460 test-sshserver.py
457 461 test-stack.t
458 462 test-status-inprocess.py
459 463 test-status-rev.t
460 464 test-status-terse.t
461 465 test-strict.t
462 466 test-strip-cross.t
463 467 test-strip.t
464 468 test-subrepo-deep-nested-change.t
465 469 test-subrepo-missing.t
466 470 test-subrepo-paths.t
467 471 test-subrepo-recursion.t
468 472 test-subrepo-relative-path.t
469 473 test-subrepo.t
470 474 test-symlink-os-yes-fs-no.py
471 475 test-symlink-placeholder.t
472 476 test-symlinks.t
473 477 test-tag.t
474 478 test-tags.t
475 479 test-template-engine.t
476 480 test-template-filters.t
477 481 test-treemanifest.t
478 482 test-ui-color.py
479 483 test-ui-config.py
480 484 test-ui-verbosity.py
481 485 test-unamend.t
482 486 test-uncommit.t
483 487 test-unified-test.t
484 488 test-unionrepo.t
485 489 test-unrelated-pull.t
486 490 test-up-local-change.t
487 491 test-update-branches.t
488 492 test-update-dest.t
489 493 test-update-issue1456.t
490 494 test-update-names.t
491 495 test-update-reverse.t
492 496 test-upgrade-repo.t
493 497 test-url-download.t
494 498 test-url-rev.t
495 499 test-url.py
496 500 test-username-newline.t
497 501 test-verify.t
498 502 test-walk.t
499 503 test-walkrepo.py
500 504 test-websub.t
501 505 test-win32text.t
502 506 test-wireproto-clientreactor.py
503 507 test-wireproto-framing.py
504 508 test-wireproto-serverreactor.py
505 509 test-wireproto.py
506 510 test-wsgirequest.py
507 511 test-xdg.t
@@ -1,620 +1,620
1 1 # wireprotov1peer.py - Client-side functionality for wire protocol version 1.
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import sys
12 12 import weakref
13 13
14 14 from .i18n import _
15 15 from .node import (
16 16 bin,
17 17 )
18 18 from . import (
19 19 bundle2,
20 20 changegroup as changegroupmod,
21 21 encoding,
22 22 error,
23 23 pushkey as pushkeymod,
24 24 pycompat,
25 25 repository,
26 26 util,
27 27 wireprototypes,
28 28 )
29 29 from .utils import (
30 30 interfaceutil,
31 31 )
32 32
33 33 urlreq = util.urlreq
34 34
35 35 def batchable(f):
36 36 '''annotation for batchable methods
37 37
38 38 Such methods must implement a coroutine as follows:
39 39
40 40 @batchable
41 41 def sample(self, one, two=None):
42 42 # Build list of encoded arguments suitable for your wire protocol:
43 43 encargs = [('one', encode(one),), ('two', encode(two),)]
44 44 # Create future for injection of encoded result:
45 45 encresref = future()
46 46 # Return encoded arguments and future:
47 47 yield encargs, encresref
48 48 # Assuming the future to be filled with the result from the batched
49 49 # request now. Decode it:
50 50 yield decode(encresref.value)
51 51
52 52 The decorator returns a function which wraps this coroutine as a plain
53 53 method, but adds the original method as an attribute called "batchable",
54 54 which is used by remotebatch to split the call into separate encoding and
55 55 decoding phases.
56 56 '''
57 57 def plain(*args, **opts):
58 58 batchable = f(*args, **opts)
59 59 encargsorres, encresref = next(batchable)
60 60 if not encresref:
61 61 return encargsorres # a local result in this case
62 62 self = args[0]
63 63 cmd = pycompat.bytesurl(f.__name__) # ensure cmd is ascii bytestr
64 64 encresref.set(self._submitone(cmd, encargsorres))
65 65 return next(batchable)
66 66 setattr(plain, 'batchable', f)
67 67 return plain
68 68
69 69 class future(object):
70 70 '''placeholder for a value to be set later'''
71 71 def set(self, value):
72 72 if util.safehasattr(self, 'value'):
73 73 raise error.RepoError("future is already set")
74 74 self.value = value
75 75
76 76 def encodebatchcmds(req):
77 77 """Return a ``cmds`` argument value for the ``batch`` command."""
78 78 escapearg = wireprototypes.escapebatcharg
79 79
80 80 cmds = []
81 81 for op, argsdict in req:
82 82 # Old servers didn't properly unescape argument names. So prevent
83 83 # the sending of argument names that may not be decoded properly by
84 84 # servers.
85 85 assert all(escapearg(k) == k for k in argsdict)
86 86
87 87 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
88 88 for k, v in argsdict.iteritems())
89 89 cmds.append('%s %s' % (op, args))
90 90
91 91 return ';'.join(cmds)
92 92
93 93 class unsentfuture(pycompat.futures.Future):
94 94 """A Future variation to represent an unsent command.
95 95
96 96 Because we buffer commands and don't submit them immediately, calling
97 97 ``result()`` on an unsent future could deadlock. Futures for buffered
98 98 commands are represented by this type, which wraps ``result()`` to
99 99 call ``sendcommands()``.
100 100 """
101 101
102 102 def result(self, timeout=None):
103 103 if self.done():
104 104 return pycompat.futures.Future.result(self, timeout)
105 105
106 106 self._peerexecutor.sendcommands()
107 107
108 108 # This looks like it will infinitely recurse. However,
109 109 # sendcommands() should modify __class__. This call serves as a check
110 110 # on that.
111 111 return self.result(timeout)
112 112
113 113 @interfaceutil.implementer(repository.ipeercommandexecutor)
114 114 class peerexecutor(object):
115 115 def __init__(self, peer):
116 116 self._peer = peer
117 117 self._sent = False
118 118 self._closed = False
119 119 self._calls = []
120 120 self._futures = weakref.WeakSet()
121 121 self._responseexecutor = None
122 122 self._responsef = None
123 123
124 124 def __enter__(self):
125 125 return self
126 126
127 127 def __exit__(self, exctype, excvalee, exctb):
128 128 self.close()
129 129
130 130 def callcommand(self, command, args):
131 131 if self._sent:
132 132 raise error.ProgrammingError('callcommand() cannot be used '
133 133 'after commands are sent')
134 134
135 135 if self._closed:
136 136 raise error.ProgrammingError('callcommand() cannot be used '
137 137 'after close()')
138 138
139 139 # Commands are dispatched through methods on the peer.
140 140 fn = getattr(self._peer, pycompat.sysstr(command), None)
141 141
142 142 if not fn:
143 143 raise error.ProgrammingError(
144 144 'cannot call command %s: method of same name not available '
145 145 'on peer' % command)
146 146
147 147 # Commands are either batchable or they aren't. If a command
148 148 # isn't batchable, we send it immediately because the executor
149 149 # can no longer accept new commands after a non-batchable command.
150 150 # If a command is batchable, we queue it for later. But we have
151 151 # to account for the case of a non-batchable command arriving after
152 152 # a batchable one and refuse to service it.
153 153
154 154 def addcall():
155 155 f = pycompat.futures.Future()
156 156 self._futures.add(f)
157 157 self._calls.append((command, args, fn, f))
158 158 return f
159 159
160 160 if getattr(fn, 'batchable', False):
161 161 f = addcall()
162 162
163 163 # But since we don't issue it immediately, we wrap its result()
164 164 # to trigger sending so we avoid deadlocks.
165 165 f.__class__ = unsentfuture
166 166 f._peerexecutor = self
167 167 else:
168 168 if self._calls:
169 169 raise error.ProgrammingError(
170 170 '%s is not batchable and cannot be called on a command '
171 171 'executor along with other commands' % command)
172 172
173 173 f = addcall()
174 174
175 175 # Non-batchable commands can never coexist with another command
176 176 # in this executor. So send the command immediately.
177 177 self.sendcommands()
178 178
179 179 return f
180 180
181 181 def sendcommands(self):
182 182 if self._sent:
183 183 return
184 184
185 185 if not self._calls:
186 186 return
187 187
188 188 self._sent = True
189 189
190 190 # Unhack any future types so caller seens a clean type and to break
191 191 # cycle between us and futures.
192 192 for f in self._futures:
193 193 if isinstance(f, unsentfuture):
194 194 f.__class__ = pycompat.futures.Future
195 195 f._peerexecutor = None
196 196
197 197 calls = self._calls
198 198 # Mainly to destroy references to futures.
199 199 self._calls = None
200 200
201 201 # Simple case of a single command. We call it synchronously.
202 202 if len(calls) == 1:
203 203 command, args, fn, f = calls[0]
204 204
205 205 # Future was cancelled. Ignore it.
206 206 if not f.set_running_or_notify_cancel():
207 207 return
208 208
209 209 try:
210 210 result = fn(**pycompat.strkwargs(args))
211 211 except Exception:
212 212 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
213 213 else:
214 214 f.set_result(result)
215 215
216 216 return
217 217
218 218 # Batch commands are a bit harder. First, we have to deal with the
219 219 # @batchable coroutine. That's a bit annoying. Furthermore, we also
220 220 # need to preserve streaming. i.e. it should be possible for the
221 221 # futures to resolve as data is coming in off the wire without having
222 222 # to wait for the final byte of the final response. We do this by
223 223 # spinning up a thread to read the responses.
224 224
225 225 requests = []
226 226 states = []
227 227
228 228 for command, args, fn, f in calls:
229 229 # Future was cancelled. Ignore it.
230 230 if not f.set_running_or_notify_cancel():
231 231 continue
232 232
233 233 try:
234 234 batchable = fn.batchable(fn.__self__,
235 235 **pycompat.strkwargs(args))
236 236 except Exception:
237 237 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
238 238 return
239 239
240 240 # Encoded arguments and future holding remote result.
241 241 try:
242 242 encodedargs, fremote = next(batchable)
243 243 except Exception:
244 244 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
245 245 return
246 246
247 247 requests.append((command, encodedargs))
248 248 states.append((command, f, batchable, fremote))
249 249
250 250 if not requests:
251 251 return
252 252
253 253 # This will emit responses in order they were executed.
254 254 wireresults = self._peer._submitbatch(requests)
255 255
256 256 # The use of a thread pool executor here is a bit weird for something
257 257 # that only spins up a single thread. However, thread management is
258 258 # hard and it is easy to encounter race conditions, deadlocks, etc.
259 259 # concurrent.futures already solves these problems and its thread pool
260 260 # executor has minimal overhead. So we use it.
261 261 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
262 262 self._responsef = self._responseexecutor.submit(self._readbatchresponse,
263 263 states, wireresults)
264 264
265 265 def close(self):
266 266 self.sendcommands()
267 267
268 268 if self._closed:
269 269 return
270 270
271 271 self._closed = True
272 272
273 273 if not self._responsef:
274 274 return
275 275
276 276 # We need to wait on our in-flight response and then shut down the
277 277 # executor once we have a result.
278 278 try:
279 279 self._responsef.result()
280 280 finally:
281 281 self._responseexecutor.shutdown(wait=True)
282 282 self._responsef = None
283 283 self._responseexecutor = None
284 284
285 285 # If any of our futures are still in progress, mark them as
286 286 # errored. Otherwise a result() could wait indefinitely.
287 287 for f in self._futures:
288 288 if not f.done():
289 289 f.set_exception(error.ResponseError(
290 290 _('unfulfilled batch command response')))
291 291
292 292 self._futures = None
293 293
294 294 def _readbatchresponse(self, states, wireresults):
295 295 # Executes in a thread to read data off the wire.
296 296
297 297 for command, f, batchable, fremote in states:
298 298 # Grab raw result off the wire and teach the internal future
299 299 # about it.
300 300 remoteresult = next(wireresults)
301 301 fremote.set(remoteresult)
302 302
303 303 # And ask the coroutine to decode that value.
304 304 try:
305 305 result = next(batchable)
306 306 except Exception:
307 307 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
308 308 else:
309 309 f.set_result(result)
310 310
311 311 @interfaceutil.implementer(repository.ipeercommands,
312 312 repository.ipeerlegacycommands)
313 313 class wirepeer(repository.peer):
314 314 """Client-side interface for communicating with a peer repository.
315 315
316 316 Methods commonly call wire protocol commands of the same name.
317 317
318 318 See also httppeer.py and sshpeer.py for protocol-specific
319 319 implementations of this interface.
320 320 """
321 321 def commandexecutor(self):
322 322 return peerexecutor(self)
323 323
324 324 # Begin of ipeercommands interface.
325 325
326 326 def clonebundles(self):
327 327 self.requirecap('clonebundles', _('clone bundles'))
328 328 return self._call('clonebundles')
329 329
330 330 @batchable
331 331 def lookup(self, key):
332 332 self.requirecap('lookup', _('look up remote revision'))
333 333 f = future()
334 334 yield {'key': encoding.fromlocal(key)}, f
335 335 d = f.value
336 336 success, data = d[:-1].split(" ", 1)
337 337 if int(success):
338 338 yield bin(data)
339 339 else:
340 340 self._abort(error.RepoError(data))
341 341
342 342 @batchable
343 343 def heads(self):
344 344 f = future()
345 345 yield {}, f
346 346 d = f.value
347 347 try:
348 348 yield wireprototypes.decodelist(d[:-1])
349 349 except ValueError:
350 350 self._abort(error.ResponseError(_("unexpected response:"), d))
351 351
352 352 @batchable
353 353 def known(self, nodes):
354 354 f = future()
355 355 yield {'nodes': wireprototypes.encodelist(nodes)}, f
356 356 d = f.value
357 357 try:
358 yield [bool(int(b)) for b in d]
358 yield [bool(int(b)) for b in pycompat.iterbytestr(d)]
359 359 except ValueError:
360 360 self._abort(error.ResponseError(_("unexpected response:"), d))
361 361
362 362 @batchable
363 363 def branchmap(self):
364 364 f = future()
365 365 yield {}, f
366 366 d = f.value
367 367 try:
368 368 branchmap = {}
369 369 for branchpart in d.splitlines():
370 370 branchname, branchheads = branchpart.split(' ', 1)
371 371 branchname = encoding.tolocal(urlreq.unquote(branchname))
372 372 branchheads = wireprototypes.decodelist(branchheads)
373 373 branchmap[branchname] = branchheads
374 374 yield branchmap
375 375 except TypeError:
376 376 self._abort(error.ResponseError(_("unexpected response:"), d))
377 377
378 378 @batchable
379 379 def listkeys(self, namespace):
380 380 if not self.capable('pushkey'):
381 381 yield {}, None
382 382 f = future()
383 383 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
384 384 yield {'namespace': encoding.fromlocal(namespace)}, f
385 385 d = f.value
386 386 self.ui.debug('received listkey for "%s": %i bytes\n'
387 387 % (namespace, len(d)))
388 388 yield pushkeymod.decodekeys(d)
389 389
390 390 @batchable
391 391 def pushkey(self, namespace, key, old, new):
392 392 if not self.capable('pushkey'):
393 393 yield False, None
394 394 f = future()
395 395 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
396 396 yield {'namespace': encoding.fromlocal(namespace),
397 397 'key': encoding.fromlocal(key),
398 398 'old': encoding.fromlocal(old),
399 399 'new': encoding.fromlocal(new)}, f
400 400 d = f.value
401 401 d, output = d.split('\n', 1)
402 402 try:
403 403 d = bool(int(d))
404 404 except ValueError:
405 405 raise error.ResponseError(
406 406 _('push failed (unexpected response):'), d)
407 407 for l in output.splitlines(True):
408 408 self.ui.status(_('remote: '), l)
409 409 yield d
410 410
411 411 def stream_out(self):
412 412 return self._callstream('stream_out')
413 413
414 414 def getbundle(self, source, **kwargs):
415 415 kwargs = pycompat.byteskwargs(kwargs)
416 416 self.requirecap('getbundle', _('look up remote changes'))
417 417 opts = {}
418 418 bundlecaps = kwargs.get('bundlecaps') or set()
419 419 for key, value in kwargs.iteritems():
420 420 if value is None:
421 421 continue
422 422 keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key)
423 423 if keytype is None:
424 424 raise error.ProgrammingError(
425 425 'Unexpectedly None keytype for key %s' % key)
426 426 elif keytype == 'nodes':
427 427 value = wireprototypes.encodelist(value)
428 428 elif keytype == 'csv':
429 429 value = ','.join(value)
430 430 elif keytype == 'scsv':
431 431 value = ','.join(sorted(value))
432 432 elif keytype == 'boolean':
433 433 value = '%i' % bool(value)
434 434 elif keytype != 'plain':
435 435 raise KeyError('unknown getbundle option type %s'
436 436 % keytype)
437 437 opts[key] = value
438 438 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
439 439 if any((cap.startswith('HG2') for cap in bundlecaps)):
440 440 return bundle2.getunbundler(self.ui, f)
441 441 else:
442 442 return changegroupmod.cg1unpacker(f, 'UN')
443 443
444 444 def unbundle(self, bundle, heads, url):
445 445 '''Send cg (a readable file-like object representing the
446 446 changegroup to push, typically a chunkbuffer object) to the
447 447 remote server as a bundle.
448 448
449 449 When pushing a bundle10 stream, return an integer indicating the
450 450 result of the push (see changegroup.apply()).
451 451
452 452 When pushing a bundle20 stream, return a bundle20 stream.
453 453
454 454 `url` is the url the client thinks it's pushing to, which is
455 455 visible to hooks.
456 456 '''
457 457
458 458 if heads != ['force'] and self.capable('unbundlehash'):
459 459 heads = wireprototypes.encodelist(
460 460 ['hashed', hashlib.sha1(''.join(sorted(heads))).digest()])
461 461 else:
462 462 heads = wireprototypes.encodelist(heads)
463 463
464 464 if util.safehasattr(bundle, 'deltaheader'):
465 465 # this a bundle10, do the old style call sequence
466 466 ret, output = self._callpush("unbundle", bundle, heads=heads)
467 467 if ret == "":
468 468 raise error.ResponseError(
469 469 _('push failed:'), output)
470 470 try:
471 471 ret = int(ret)
472 472 except ValueError:
473 473 raise error.ResponseError(
474 474 _('push failed (unexpected response):'), ret)
475 475
476 476 for l in output.splitlines(True):
477 477 self.ui.status(_('remote: '), l)
478 478 else:
479 479 # bundle2 push. Send a stream, fetch a stream.
480 480 stream = self._calltwowaystream('unbundle', bundle, heads=heads)
481 481 ret = bundle2.getunbundler(self.ui, stream)
482 482 return ret
483 483
484 484 # End of ipeercommands interface.
485 485
486 486 # Begin of ipeerlegacycommands interface.
487 487
488 488 def branches(self, nodes):
489 489 n = wireprototypes.encodelist(nodes)
490 490 d = self._call("branches", nodes=n)
491 491 try:
492 492 br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()]
493 493 return br
494 494 except ValueError:
495 495 self._abort(error.ResponseError(_("unexpected response:"), d))
496 496
497 497 def between(self, pairs):
498 498 batch = 8 # avoid giant requests
499 499 r = []
500 500 for i in xrange(0, len(pairs), batch):
501 501 n = " ".join([wireprototypes.encodelist(p, '-')
502 502 for p in pairs[i:i + batch]])
503 503 d = self._call("between", pairs=n)
504 504 try:
505 505 r.extend(l and wireprototypes.decodelist(l) or []
506 506 for l in d.splitlines())
507 507 except ValueError:
508 508 self._abort(error.ResponseError(_("unexpected response:"), d))
509 509 return r
510 510
511 511 def changegroup(self, nodes, source):
512 512 n = wireprototypes.encodelist(nodes)
513 513 f = self._callcompressable("changegroup", roots=n)
514 514 return changegroupmod.cg1unpacker(f, 'UN')
515 515
516 516 def changegroupsubset(self, bases, heads, source):
517 517 self.requirecap('changegroupsubset', _('look up remote changes'))
518 518 bases = wireprototypes.encodelist(bases)
519 519 heads = wireprototypes.encodelist(heads)
520 520 f = self._callcompressable("changegroupsubset",
521 521 bases=bases, heads=heads)
522 522 return changegroupmod.cg1unpacker(f, 'UN')
523 523
524 524 # End of ipeerlegacycommands interface.
525 525
526 526 def _submitbatch(self, req):
527 527 """run batch request <req> on the server
528 528
529 529 Returns an iterator of the raw responses from the server.
530 530 """
531 531 ui = self.ui
532 532 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
533 533 ui.debug('devel-peer-request: batched-content\n')
534 534 for op, args in req:
535 535 msg = 'devel-peer-request: - %s (%d arguments)\n'
536 536 ui.debug(msg % (op, len(args)))
537 537
538 538 unescapearg = wireprototypes.unescapebatcharg
539 539
540 540 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
541 541 chunk = rsp.read(1024)
542 542 work = [chunk]
543 543 while chunk:
544 544 while ';' not in chunk and chunk:
545 545 chunk = rsp.read(1024)
546 546 work.append(chunk)
547 547 merged = ''.join(work)
548 548 while ';' in merged:
549 549 one, merged = merged.split(';', 1)
550 550 yield unescapearg(one)
551 551 chunk = rsp.read(1024)
552 552 work = [merged, chunk]
553 553 yield unescapearg(''.join(work))
554 554
555 555 def _submitone(self, op, args):
556 556 return self._call(op, **pycompat.strkwargs(args))
557 557
558 558 def debugwireargs(self, one, two, three=None, four=None, five=None):
559 559 # don't pass optional arguments left at their default value
560 560 opts = {}
561 561 if three is not None:
562 562 opts[r'three'] = three
563 563 if four is not None:
564 564 opts[r'four'] = four
565 565 return self._call('debugwireargs', one=one, two=two, **opts)
566 566
567 567 def _call(self, cmd, **args):
568 568 """execute <cmd> on the server
569 569
570 570 The command is expected to return a simple string.
571 571
572 572 returns the server reply as a string."""
573 573 raise NotImplementedError()
574 574
575 575 def _callstream(self, cmd, **args):
576 576 """execute <cmd> on the server
577 577
578 578 The command is expected to return a stream. Note that if the
579 579 command doesn't return a stream, _callstream behaves
580 580 differently for ssh and http peers.
581 581
582 582 returns the server reply as a file like object.
583 583 """
584 584 raise NotImplementedError()
585 585
586 586 def _callcompressable(self, cmd, **args):
587 587 """execute <cmd> on the server
588 588
589 589 The command is expected to return a stream.
590 590
591 591 The stream may have been compressed in some implementations. This
592 592 function takes care of the decompression. This is the only difference
593 593 with _callstream.
594 594
595 595 returns the server reply as a file like object.
596 596 """
597 597 raise NotImplementedError()
598 598
599 599 def _callpush(self, cmd, fp, **args):
600 600 """execute a <cmd> on server
601 601
602 602 The command is expected to be related to a push. Push has a special
603 603 return method.
604 604
605 605 returns the server reply as a (ret, output) tuple. ret is either
606 606 empty (error) or a stringified int.
607 607 """
608 608 raise NotImplementedError()
609 609
610 610 def _calltwowaystream(self, cmd, fp, **args):
611 611 """execute <cmd> on server
612 612
613 613 The command will send a stream to the server and get a stream in reply.
614 614 """
615 615 raise NotImplementedError()
616 616
617 617 def _abort(self, exception):
618 618 """clearly abort the wire protocol connection and raise the exception
619 619 """
620 620 raise NotImplementedError()
General Comments 0
You need to be logged in to leave comments. Login now