##// END OF EJS Templates
wireprotov1peer: update all rpcs to use the new batchable scheme...
Valentin Gatien-Baron -
r48672:f4f9b0e0 stable draft
parent child Browse files
Show More
@@ -1,263 +1,261 b''
1 # Copyright 2016-present Facebook. All Rights Reserved.
1 # Copyright 2016-present Facebook. All Rights Reserved.
2 #
2 #
3 # protocol: logic for a server providing fastannotate support
3 # protocol: logic for a server providing fastannotate support
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 from __future__ import absolute_import
7 from __future__ import absolute_import
8
8
9 import contextlib
9 import contextlib
10 import os
10 import os
11
11
12 from mercurial.i18n import _
12 from mercurial.i18n import _
13 from mercurial.pycompat import open
13 from mercurial.pycompat import open
14 from mercurial import (
14 from mercurial import (
15 error,
15 error,
16 extensions,
16 extensions,
17 hg,
17 hg,
18 pycompat,
18 pycompat,
19 util,
19 util,
20 wireprotov1peer,
20 wireprotov1peer,
21 wireprotov1server,
21 wireprotov1server,
22 )
22 )
23 from mercurial.utils import (
23 from mercurial.utils import (
24 urlutil,
24 urlutil,
25 )
25 )
26 from . import context
26 from . import context
27
27
28 # common
28 # common
29
29
30
30
31 def _getmaster(ui):
31 def _getmaster(ui):
32 """get the mainbranch, and enforce it is set"""
32 """get the mainbranch, and enforce it is set"""
33 master = ui.config(b'fastannotate', b'mainbranch')
33 master = ui.config(b'fastannotate', b'mainbranch')
34 if not master:
34 if not master:
35 raise error.Abort(
35 raise error.Abort(
36 _(
36 _(
37 b'fastannotate.mainbranch is required '
37 b'fastannotate.mainbranch is required '
38 b'for both the client and the server'
38 b'for both the client and the server'
39 )
39 )
40 )
40 )
41 return master
41 return master
42
42
43
43
44 # server-side
44 # server-side
45
45
46
46
47 def _capabilities(orig, repo, proto):
47 def _capabilities(orig, repo, proto):
48 result = orig(repo, proto)
48 result = orig(repo, proto)
49 result.append(b'getannotate')
49 result.append(b'getannotate')
50 return result
50 return result
51
51
52
52
53 def _getannotate(repo, proto, path, lastnode):
53 def _getannotate(repo, proto, path, lastnode):
54 # output:
54 # output:
55 # FILE := vfspath + '\0' + str(size) + '\0' + content
55 # FILE := vfspath + '\0' + str(size) + '\0' + content
56 # OUTPUT := '' | FILE + OUTPUT
56 # OUTPUT := '' | FILE + OUTPUT
57 result = b''
57 result = b''
58 buildondemand = repo.ui.configbool(
58 buildondemand = repo.ui.configbool(
59 b'fastannotate', b'serverbuildondemand', True
59 b'fastannotate', b'serverbuildondemand', True
60 )
60 )
61 with context.annotatecontext(repo, path) as actx:
61 with context.annotatecontext(repo, path) as actx:
62 if buildondemand:
62 if buildondemand:
63 # update before responding to the client
63 # update before responding to the client
64 master = _getmaster(repo.ui)
64 master = _getmaster(repo.ui)
65 try:
65 try:
66 if not actx.isuptodate(master):
66 if not actx.isuptodate(master):
67 actx.annotate(master, master)
67 actx.annotate(master, master)
68 except Exception:
68 except Exception:
69 # non-fast-forward move or corrupted. rebuild automically.
69 # non-fast-forward move or corrupted. rebuild automically.
70 actx.rebuild()
70 actx.rebuild()
71 try:
71 try:
72 actx.annotate(master, master)
72 actx.annotate(master, master)
73 except Exception:
73 except Exception:
74 actx.rebuild() # delete files
74 actx.rebuild() # delete files
75 finally:
75 finally:
76 # although the "with" context will also do a close/flush, we
76 # although the "with" context will also do a close/flush, we
77 # need to do it early so we can send the correct respond to
77 # need to do it early so we can send the correct respond to
78 # client.
78 # client.
79 actx.close()
79 actx.close()
80 # send back the full content of revmap and linelog, in the future we
80 # send back the full content of revmap and linelog, in the future we
81 # may want to do some rsync-like fancy updating.
81 # may want to do some rsync-like fancy updating.
82 # the lastnode check is not necessary if the client and the server
82 # the lastnode check is not necessary if the client and the server
83 # agree where the main branch is.
83 # agree where the main branch is.
84 if actx.lastnode != lastnode:
84 if actx.lastnode != lastnode:
85 for p in [actx.revmappath, actx.linelogpath]:
85 for p in [actx.revmappath, actx.linelogpath]:
86 if not os.path.exists(p):
86 if not os.path.exists(p):
87 continue
87 continue
88 with open(p, b'rb') as f:
88 with open(p, b'rb') as f:
89 content = f.read()
89 content = f.read()
90 vfsbaselen = len(repo.vfs.base + b'/')
90 vfsbaselen = len(repo.vfs.base + b'/')
91 relpath = p[vfsbaselen:]
91 relpath = p[vfsbaselen:]
92 result += b'%s\0%d\0%s' % (relpath, len(content), content)
92 result += b'%s\0%d\0%s' % (relpath, len(content), content)
93 return result
93 return result
94
94
95
95
96 def _registerwireprotocommand():
96 def _registerwireprotocommand():
97 if b'getannotate' in wireprotov1server.commands:
97 if b'getannotate' in wireprotov1server.commands:
98 return
98 return
99 wireprotov1server.wireprotocommand(b'getannotate', b'path lastnode')(
99 wireprotov1server.wireprotocommand(b'getannotate', b'path lastnode')(
100 _getannotate
100 _getannotate
101 )
101 )
102
102
103
103
104 def serveruisetup(ui):
104 def serveruisetup(ui):
105 _registerwireprotocommand()
105 _registerwireprotocommand()
106 extensions.wrapfunction(wireprotov1server, b'_capabilities', _capabilities)
106 extensions.wrapfunction(wireprotov1server, b'_capabilities', _capabilities)
107
107
108
108
109 # client-side
109 # client-side
110
110
111
111
112 def _parseresponse(payload):
112 def _parseresponse(payload):
113 result = {}
113 result = {}
114 i = 0
114 i = 0
115 l = len(payload) - 1
115 l = len(payload) - 1
116 state = 0 # 0: vfspath, 1: size
116 state = 0 # 0: vfspath, 1: size
117 vfspath = size = b''
117 vfspath = size = b''
118 while i < l:
118 while i < l:
119 ch = payload[i : i + 1]
119 ch = payload[i : i + 1]
120 if ch == b'\0':
120 if ch == b'\0':
121 if state == 1:
121 if state == 1:
122 result[vfspath] = payload[i + 1 : i + 1 + int(size)]
122 result[vfspath] = payload[i + 1 : i + 1 + int(size)]
123 i += int(size)
123 i += int(size)
124 state = 0
124 state = 0
125 vfspath = size = b''
125 vfspath = size = b''
126 elif state == 0:
126 elif state == 0:
127 state = 1
127 state = 1
128 else:
128 else:
129 if state == 1:
129 if state == 1:
130 size += ch
130 size += ch
131 elif state == 0:
131 elif state == 0:
132 vfspath += ch
132 vfspath += ch
133 i += 1
133 i += 1
134 return result
134 return result
135
135
136
136
137 def peersetup(ui, peer):
137 def peersetup(ui, peer):
138 class fastannotatepeer(peer.__class__):
138 class fastannotatepeer(peer.__class__):
139 @wireprotov1peer.batchable
139 @wireprotov1peer.batchable
140 def getannotate(self, path, lastnode=None):
140 def getannotate(self, path, lastnode=None):
141 if not self.capable(b'getannotate'):
141 if not self.capable(b'getannotate'):
142 ui.warn(_(b'remote peer cannot provide annotate cache\n'))
142 ui.warn(_(b'remote peer cannot provide annotate cache\n'))
143 yield None, None
143 return None, None
144 else:
144 else:
145 args = {b'path': path, b'lastnode': lastnode or b''}
145 args = {b'path': path, b'lastnode': lastnode or b''}
146 f = wireprotov1peer.future()
146 return args, _parseresponse
147 yield args, f
148 yield _parseresponse(f.value)
149
147
150 peer.__class__ = fastannotatepeer
148 peer.__class__ = fastannotatepeer
151
149
152
150
153 @contextlib.contextmanager
151 @contextlib.contextmanager
154 def annotatepeer(repo):
152 def annotatepeer(repo):
155 ui = repo.ui
153 ui = repo.ui
156
154
157 remotedest = ui.config(b'fastannotate', b'remotepath', b'default')
155 remotedest = ui.config(b'fastannotate', b'remotepath', b'default')
158 r = urlutil.get_unique_pull_path(b'fastannotate', repo, ui, remotedest)
156 r = urlutil.get_unique_pull_path(b'fastannotate', repo, ui, remotedest)
159 remotepath = r[0]
157 remotepath = r[0]
160 peer = hg.peer(ui, {}, remotepath)
158 peer = hg.peer(ui, {}, remotepath)
161
159
162 try:
160 try:
163 yield peer
161 yield peer
164 finally:
162 finally:
165 peer.close()
163 peer.close()
166
164
167
165
168 def clientfetch(repo, paths, lastnodemap=None, peer=None):
166 def clientfetch(repo, paths, lastnodemap=None, peer=None):
169 """download annotate cache from the server for paths"""
167 """download annotate cache from the server for paths"""
170 if not paths:
168 if not paths:
171 return
169 return
172
170
173 if peer is None:
171 if peer is None:
174 with annotatepeer(repo) as peer:
172 with annotatepeer(repo) as peer:
175 return clientfetch(repo, paths, lastnodemap, peer)
173 return clientfetch(repo, paths, lastnodemap, peer)
176
174
177 if lastnodemap is None:
175 if lastnodemap is None:
178 lastnodemap = {}
176 lastnodemap = {}
179
177
180 ui = repo.ui
178 ui = repo.ui
181 results = []
179 results = []
182 with peer.commandexecutor() as batcher:
180 with peer.commandexecutor() as batcher:
183 ui.debug(b'fastannotate: requesting %d files\n' % len(paths))
181 ui.debug(b'fastannotate: requesting %d files\n' % len(paths))
184 for p in paths:
182 for p in paths:
185 results.append(
183 results.append(
186 batcher.callcommand(
184 batcher.callcommand(
187 b'getannotate',
185 b'getannotate',
188 {b'path': p, b'lastnode': lastnodemap.get(p)},
186 {b'path': p, b'lastnode': lastnodemap.get(p)},
189 )
187 )
190 )
188 )
191
189
192 for result in results:
190 for result in results:
193 r = result.result()
191 r = result.result()
194 # TODO: pconvert these paths on the server?
192 # TODO: pconvert these paths on the server?
195 r = {util.pconvert(p): v for p, v in pycompat.iteritems(r)}
193 r = {util.pconvert(p): v for p, v in pycompat.iteritems(r)}
196 for path in sorted(r):
194 for path in sorted(r):
197 # ignore malicious paths
195 # ignore malicious paths
198 if not path.startswith(b'fastannotate/') or b'/../' in (
196 if not path.startswith(b'fastannotate/') or b'/../' in (
199 path + b'/'
197 path + b'/'
200 ):
198 ):
201 ui.debug(
199 ui.debug(
202 b'fastannotate: ignored malicious path %s\n' % path
200 b'fastannotate: ignored malicious path %s\n' % path
203 )
201 )
204 continue
202 continue
205 content = r[path]
203 content = r[path]
206 if ui.debugflag:
204 if ui.debugflag:
207 ui.debug(
205 ui.debug(
208 b'fastannotate: writing %d bytes to %s\n'
206 b'fastannotate: writing %d bytes to %s\n'
209 % (len(content), path)
207 % (len(content), path)
210 )
208 )
211 repo.vfs.makedirs(os.path.dirname(path))
209 repo.vfs.makedirs(os.path.dirname(path))
212 with repo.vfs(path, b'wb') as f:
210 with repo.vfs(path, b'wb') as f:
213 f.write(content)
211 f.write(content)
214
212
215
213
216 def _filterfetchpaths(repo, paths):
214 def _filterfetchpaths(repo, paths):
217 """return a subset of paths whose history is long and need to fetch linelog
215 """return a subset of paths whose history is long and need to fetch linelog
218 from the server. works with remotefilelog and non-remotefilelog repos.
216 from the server. works with remotefilelog and non-remotefilelog repos.
219 """
217 """
220 threshold = repo.ui.configint(b'fastannotate', b'clientfetchthreshold', 10)
218 threshold = repo.ui.configint(b'fastannotate', b'clientfetchthreshold', 10)
221 if threshold <= 0:
219 if threshold <= 0:
222 return paths
220 return paths
223
221
224 result = []
222 result = []
225 for path in paths:
223 for path in paths:
226 try:
224 try:
227 if len(repo.file(path)) >= threshold:
225 if len(repo.file(path)) >= threshold:
228 result.append(path)
226 result.append(path)
229 except Exception: # file not found etc.
227 except Exception: # file not found etc.
230 result.append(path)
228 result.append(path)
231
229
232 return result
230 return result
233
231
234
232
235 def localreposetup(ui, repo):
233 def localreposetup(ui, repo):
236 class fastannotaterepo(repo.__class__):
234 class fastannotaterepo(repo.__class__):
237 def prefetchfastannotate(self, paths, peer=None):
235 def prefetchfastannotate(self, paths, peer=None):
238 master = _getmaster(self.ui)
236 master = _getmaster(self.ui)
239 needupdatepaths = []
237 needupdatepaths = []
240 lastnodemap = {}
238 lastnodemap = {}
241 try:
239 try:
242 for path in _filterfetchpaths(self, paths):
240 for path in _filterfetchpaths(self, paths):
243 with context.annotatecontext(self, path) as actx:
241 with context.annotatecontext(self, path) as actx:
244 if not actx.isuptodate(master, strict=False):
242 if not actx.isuptodate(master, strict=False):
245 needupdatepaths.append(path)
243 needupdatepaths.append(path)
246 lastnodemap[path] = actx.lastnode
244 lastnodemap[path] = actx.lastnode
247 if needupdatepaths:
245 if needupdatepaths:
248 clientfetch(self, needupdatepaths, lastnodemap, peer)
246 clientfetch(self, needupdatepaths, lastnodemap, peer)
249 except Exception as ex:
247 except Exception as ex:
250 # could be directory not writable or so, not fatal
248 # could be directory not writable or so, not fatal
251 self.ui.debug(b'fastannotate: prefetch failed: %r\n' % ex)
249 self.ui.debug(b'fastannotate: prefetch failed: %r\n' % ex)
252
250
253 repo.__class__ = fastannotaterepo
251 repo.__class__ = fastannotaterepo
254
252
255
253
256 def clientreposetup(ui, repo):
254 def clientreposetup(ui, repo):
257 _registerwireprotocommand()
255 _registerwireprotocommand()
258 if repo.local():
256 if repo.local():
259 localreposetup(ui, repo)
257 localreposetup(ui, repo)
260 # TODO: this mutates global state, but only if at least one repo
258 # TODO: this mutates global state, but only if at least one repo
261 # has the extension enabled. This is probably bad for hgweb.
259 # has the extension enabled. This is probably bad for hgweb.
262 if peersetup not in hg.wirepeersetupfuncs:
260 if peersetup not in hg.wirepeersetupfuncs:
263 hg.wirepeersetupfuncs.append(peersetup)
261 hg.wirepeersetupfuncs.append(peersetup)
@@ -1,1389 +1,1390 b''
1 # Infinite push
1 # Infinite push
2 #
2 #
3 # Copyright 2016 Facebook, Inc.
3 # Copyright 2016 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """ store some pushes in a remote blob store on the server (EXPERIMENTAL)
7 """ store some pushes in a remote blob store on the server (EXPERIMENTAL)
8
8
9 IMPORTANT: if you use this extension, please contact
9 IMPORTANT: if you use this extension, please contact
10 mercurial-devel@mercurial-scm.org ASAP. This extension is believed to
10 mercurial-devel@mercurial-scm.org ASAP. This extension is believed to
11 be unused and barring learning of users of this functionality, we will
11 be unused and barring learning of users of this functionality, we will
12 delete this code at the end of 2020.
12 delete this code at the end of 2020.
13
13
14 [infinitepush]
14 [infinitepush]
15 # Server-side and client-side option. Pattern of the infinitepush bookmark
15 # Server-side and client-side option. Pattern of the infinitepush bookmark
16 branchpattern = PATTERN
16 branchpattern = PATTERN
17
17
18 # Server or client
18 # Server or client
19 server = False
19 server = False
20
20
21 # Server-side option. Possible values: 'disk' or 'sql'. Fails if not set
21 # Server-side option. Possible values: 'disk' or 'sql'. Fails if not set
22 indextype = disk
22 indextype = disk
23
23
24 # Server-side option. Used only if indextype=sql.
24 # Server-side option. Used only if indextype=sql.
25 # Format: 'IP:PORT:DB_NAME:USER:PASSWORD'
25 # Format: 'IP:PORT:DB_NAME:USER:PASSWORD'
26 sqlhost = IP:PORT:DB_NAME:USER:PASSWORD
26 sqlhost = IP:PORT:DB_NAME:USER:PASSWORD
27
27
28 # Server-side option. Used only if indextype=disk.
28 # Server-side option. Used only if indextype=disk.
29 # Filesystem path to the index store
29 # Filesystem path to the index store
30 indexpath = PATH
30 indexpath = PATH
31
31
32 # Server-side option. Possible values: 'disk' or 'external'
32 # Server-side option. Possible values: 'disk' or 'external'
33 # Fails if not set
33 # Fails if not set
34 storetype = disk
34 storetype = disk
35
35
36 # Server-side option.
36 # Server-side option.
37 # Path to the binary that will save bundle to the bundlestore
37 # Path to the binary that will save bundle to the bundlestore
38 # Formatted cmd line will be passed to it (see `put_args`)
38 # Formatted cmd line will be passed to it (see `put_args`)
39 put_binary = put
39 put_binary = put
40
40
41 # Serser-side option. Used only if storetype=external.
41 # Serser-side option. Used only if storetype=external.
42 # Format cmd-line string for put binary. Placeholder: {filename}
42 # Format cmd-line string for put binary. Placeholder: {filename}
43 put_args = {filename}
43 put_args = {filename}
44
44
45 # Server-side option.
45 # Server-side option.
46 # Path to the binary that get bundle from the bundlestore.
46 # Path to the binary that get bundle from the bundlestore.
47 # Formatted cmd line will be passed to it (see `get_args`)
47 # Formatted cmd line will be passed to it (see `get_args`)
48 get_binary = get
48 get_binary = get
49
49
50 # Serser-side option. Used only if storetype=external.
50 # Serser-side option. Used only if storetype=external.
51 # Format cmd-line string for get binary. Placeholders: {filename} {handle}
51 # Format cmd-line string for get binary. Placeholders: {filename} {handle}
52 get_args = {filename} {handle}
52 get_args = {filename} {handle}
53
53
54 # Server-side option
54 # Server-side option
55 logfile = FIlE
55 logfile = FIlE
56
56
57 # Server-side option
57 # Server-side option
58 loglevel = DEBUG
58 loglevel = DEBUG
59
59
60 # Server-side option. Used only if indextype=sql.
60 # Server-side option. Used only if indextype=sql.
61 # Sets mysql wait_timeout option.
61 # Sets mysql wait_timeout option.
62 waittimeout = 300
62 waittimeout = 300
63
63
64 # Server-side option. Used only if indextype=sql.
64 # Server-side option. Used only if indextype=sql.
65 # Sets mysql innodb_lock_wait_timeout option.
65 # Sets mysql innodb_lock_wait_timeout option.
66 locktimeout = 120
66 locktimeout = 120
67
67
68 # Server-side option. Used only if indextype=sql.
68 # Server-side option. Used only if indextype=sql.
69 # Name of the repository
69 # Name of the repository
70 reponame = ''
70 reponame = ''
71
71
72 # Client-side option. Used by --list-remote option. List of remote scratch
72 # Client-side option. Used by --list-remote option. List of remote scratch
73 # patterns to list if no patterns are specified.
73 # patterns to list if no patterns are specified.
74 defaultremotepatterns = ['*']
74 defaultremotepatterns = ['*']
75
75
76 # Instructs infinitepush to forward all received bundle2 parts to the
76 # Instructs infinitepush to forward all received bundle2 parts to the
77 # bundle for storage. Defaults to False.
77 # bundle for storage. Defaults to False.
78 storeallparts = True
78 storeallparts = True
79
79
80 # routes each incoming push to the bundlestore. defaults to False
80 # routes each incoming push to the bundlestore. defaults to False
81 pushtobundlestore = True
81 pushtobundlestore = True
82
82
83 [remotenames]
83 [remotenames]
84 # Client-side option
84 # Client-side option
85 # This option should be set only if remotenames extension is enabled.
85 # This option should be set only if remotenames extension is enabled.
86 # Whether remote bookmarks are tracked by remotenames extension.
86 # Whether remote bookmarks are tracked by remotenames extension.
87 bookmarks = True
87 bookmarks = True
88 """
88 """
89
89
90 from __future__ import absolute_import
90 from __future__ import absolute_import
91
91
92 import collections
92 import collections
93 import contextlib
93 import contextlib
94 import errno
94 import errno
95 import functools
95 import functools
96 import logging
96 import logging
97 import os
97 import os
98 import random
98 import random
99 import re
99 import re
100 import socket
100 import socket
101 import subprocess
101 import subprocess
102 import time
102 import time
103
103
104 from mercurial.node import (
104 from mercurial.node import (
105 bin,
105 bin,
106 hex,
106 hex,
107 )
107 )
108
108
109 from mercurial.i18n import _
109 from mercurial.i18n import _
110
110
111 from mercurial.pycompat import (
111 from mercurial.pycompat import (
112 getattr,
112 getattr,
113 open,
113 open,
114 )
114 )
115
115
116 from mercurial.utils import (
116 from mercurial.utils import (
117 procutil,
117 procutil,
118 stringutil,
118 stringutil,
119 urlutil,
119 urlutil,
120 )
120 )
121
121
122 from mercurial import (
122 from mercurial import (
123 bundle2,
123 bundle2,
124 changegroup,
124 changegroup,
125 commands,
125 commands,
126 discovery,
126 discovery,
127 encoding,
127 encoding,
128 error,
128 error,
129 exchange,
129 exchange,
130 extensions,
130 extensions,
131 hg,
131 hg,
132 localrepo,
132 localrepo,
133 phases,
133 phases,
134 pushkey,
134 pushkey,
135 pycompat,
135 pycompat,
136 registrar,
136 registrar,
137 util,
137 util,
138 wireprototypes,
138 wireprototypes,
139 wireprotov1peer,
139 wireprotov1peer,
140 wireprotov1server,
140 wireprotov1server,
141 )
141 )
142
142
143 from . import (
143 from . import (
144 bundleparts,
144 bundleparts,
145 common,
145 common,
146 )
146 )
147
147
148 # Note for extension authors: ONLY specify testedwith = 'ships-with-hg-core' for
148 # Note for extension authors: ONLY specify testedwith = 'ships-with-hg-core' for
149 # extensions which SHIP WITH MERCURIAL. Non-mainline extensions should
149 # extensions which SHIP WITH MERCURIAL. Non-mainline extensions should
150 # be specifying the version(s) of Mercurial they are tested with, or
150 # be specifying the version(s) of Mercurial they are tested with, or
151 # leave the attribute unspecified.
151 # leave the attribute unspecified.
152 testedwith = b'ships-with-hg-core'
152 testedwith = b'ships-with-hg-core'
153
153
154 configtable = {}
154 configtable = {}
155 configitem = registrar.configitem(configtable)
155 configitem = registrar.configitem(configtable)
156
156
157 configitem(
157 configitem(
158 b'infinitepush',
158 b'infinitepush',
159 b'server',
159 b'server',
160 default=False,
160 default=False,
161 )
161 )
162 configitem(
162 configitem(
163 b'infinitepush',
163 b'infinitepush',
164 b'storetype',
164 b'storetype',
165 default=b'',
165 default=b'',
166 )
166 )
167 configitem(
167 configitem(
168 b'infinitepush',
168 b'infinitepush',
169 b'indextype',
169 b'indextype',
170 default=b'',
170 default=b'',
171 )
171 )
172 configitem(
172 configitem(
173 b'infinitepush',
173 b'infinitepush',
174 b'indexpath',
174 b'indexpath',
175 default=b'',
175 default=b'',
176 )
176 )
177 configitem(
177 configitem(
178 b'infinitepush',
178 b'infinitepush',
179 b'storeallparts',
179 b'storeallparts',
180 default=False,
180 default=False,
181 )
181 )
182 configitem(
182 configitem(
183 b'infinitepush',
183 b'infinitepush',
184 b'reponame',
184 b'reponame',
185 default=b'',
185 default=b'',
186 )
186 )
187 configitem(
187 configitem(
188 b'scratchbranch',
188 b'scratchbranch',
189 b'storepath',
189 b'storepath',
190 default=b'',
190 default=b'',
191 )
191 )
192 configitem(
192 configitem(
193 b'infinitepush',
193 b'infinitepush',
194 b'branchpattern',
194 b'branchpattern',
195 default=b'',
195 default=b'',
196 )
196 )
197 configitem(
197 configitem(
198 b'infinitepush',
198 b'infinitepush',
199 b'pushtobundlestore',
199 b'pushtobundlestore',
200 default=False,
200 default=False,
201 )
201 )
202 configitem(
202 configitem(
203 b'experimental',
203 b'experimental',
204 b'server-bundlestore-bookmark',
204 b'server-bundlestore-bookmark',
205 default=b'',
205 default=b'',
206 )
206 )
207 configitem(
207 configitem(
208 b'experimental',
208 b'experimental',
209 b'infinitepush-scratchpush',
209 b'infinitepush-scratchpush',
210 default=False,
210 default=False,
211 )
211 )
212
212
213 experimental = b'experimental'
213 experimental = b'experimental'
214 configbookmark = b'server-bundlestore-bookmark'
214 configbookmark = b'server-bundlestore-bookmark'
215 configscratchpush = b'infinitepush-scratchpush'
215 configscratchpush = b'infinitepush-scratchpush'
216
216
217 scratchbranchparttype = bundleparts.scratchbranchparttype
217 scratchbranchparttype = bundleparts.scratchbranchparttype
218 revsetpredicate = registrar.revsetpredicate()
218 revsetpredicate = registrar.revsetpredicate()
219 templatekeyword = registrar.templatekeyword()
219 templatekeyword = registrar.templatekeyword()
220 _scratchbranchmatcher = lambda x: False
220 _scratchbranchmatcher = lambda x: False
221 _maybehash = re.compile('^[a-f0-9]+$').search
221 _maybehash = re.compile('^[a-f0-9]+$').search
222
222
223
223
224 def _buildexternalbundlestore(ui):
224 def _buildexternalbundlestore(ui):
225 put_args = ui.configlist(b'infinitepush', b'put_args', [])
225 put_args = ui.configlist(b'infinitepush', b'put_args', [])
226 put_binary = ui.config(b'infinitepush', b'put_binary')
226 put_binary = ui.config(b'infinitepush', b'put_binary')
227 if not put_binary:
227 if not put_binary:
228 raise error.Abort(b'put binary is not specified')
228 raise error.Abort(b'put binary is not specified')
229 get_args = ui.configlist(b'infinitepush', b'get_args', [])
229 get_args = ui.configlist(b'infinitepush', b'get_args', [])
230 get_binary = ui.config(b'infinitepush', b'get_binary')
230 get_binary = ui.config(b'infinitepush', b'get_binary')
231 if not get_binary:
231 if not get_binary:
232 raise error.Abort(b'get binary is not specified')
232 raise error.Abort(b'get binary is not specified')
233 from . import store
233 from . import store
234
234
235 return store.externalbundlestore(put_binary, put_args, get_binary, get_args)
235 return store.externalbundlestore(put_binary, put_args, get_binary, get_args)
236
236
237
237
238 def _buildsqlindex(ui):
238 def _buildsqlindex(ui):
239 sqlhost = ui.config(b'infinitepush', b'sqlhost')
239 sqlhost = ui.config(b'infinitepush', b'sqlhost')
240 if not sqlhost:
240 if not sqlhost:
241 raise error.Abort(_(b'please set infinitepush.sqlhost'))
241 raise error.Abort(_(b'please set infinitepush.sqlhost'))
242 host, port, db, user, password = sqlhost.split(b':')
242 host, port, db, user, password = sqlhost.split(b':')
243 reponame = ui.config(b'infinitepush', b'reponame')
243 reponame = ui.config(b'infinitepush', b'reponame')
244 if not reponame:
244 if not reponame:
245 raise error.Abort(_(b'please set infinitepush.reponame'))
245 raise error.Abort(_(b'please set infinitepush.reponame'))
246
246
247 logfile = ui.config(b'infinitepush', b'logfile', b'')
247 logfile = ui.config(b'infinitepush', b'logfile', b'')
248 waittimeout = ui.configint(b'infinitepush', b'waittimeout', 300)
248 waittimeout = ui.configint(b'infinitepush', b'waittimeout', 300)
249 locktimeout = ui.configint(b'infinitepush', b'locktimeout', 120)
249 locktimeout = ui.configint(b'infinitepush', b'locktimeout', 120)
250 from . import sqlindexapi
250 from . import sqlindexapi
251
251
252 return sqlindexapi.sqlindexapi(
252 return sqlindexapi.sqlindexapi(
253 reponame,
253 reponame,
254 host,
254 host,
255 port,
255 port,
256 db,
256 db,
257 user,
257 user,
258 password,
258 password,
259 logfile,
259 logfile,
260 _getloglevel(ui),
260 _getloglevel(ui),
261 waittimeout=waittimeout,
261 waittimeout=waittimeout,
262 locktimeout=locktimeout,
262 locktimeout=locktimeout,
263 )
263 )
264
264
265
265
266 def _getloglevel(ui):
266 def _getloglevel(ui):
267 loglevel = ui.config(b'infinitepush', b'loglevel', b'DEBUG')
267 loglevel = ui.config(b'infinitepush', b'loglevel', b'DEBUG')
268 numeric_loglevel = getattr(logging, loglevel.upper(), None)
268 numeric_loglevel = getattr(logging, loglevel.upper(), None)
269 if not isinstance(numeric_loglevel, int):
269 if not isinstance(numeric_loglevel, int):
270 raise error.Abort(_(b'invalid log level %s') % loglevel)
270 raise error.Abort(_(b'invalid log level %s') % loglevel)
271 return numeric_loglevel
271 return numeric_loglevel
272
272
273
273
274 def _tryhoist(ui, remotebookmark):
274 def _tryhoist(ui, remotebookmark):
275 """returns a bookmarks with hoisted part removed
275 """returns a bookmarks with hoisted part removed
276
276
277 Remotenames extension has a 'hoist' config that allows to use remote
277 Remotenames extension has a 'hoist' config that allows to use remote
278 bookmarks without specifying remote path. For example, 'hg update master'
278 bookmarks without specifying remote path. For example, 'hg update master'
279 works as well as 'hg update remote/master'. We want to allow the same in
279 works as well as 'hg update remote/master'. We want to allow the same in
280 infinitepush.
280 infinitepush.
281 """
281 """
282
282
283 if common.isremotebooksenabled(ui):
283 if common.isremotebooksenabled(ui):
284 hoist = ui.config(b'remotenames', b'hoistedpeer') + b'/'
284 hoist = ui.config(b'remotenames', b'hoistedpeer') + b'/'
285 if remotebookmark.startswith(hoist):
285 if remotebookmark.startswith(hoist):
286 return remotebookmark[len(hoist) :]
286 return remotebookmark[len(hoist) :]
287 return remotebookmark
287 return remotebookmark
288
288
289
289
290 class bundlestore(object):
290 class bundlestore(object):
291 def __init__(self, repo):
291 def __init__(self, repo):
292 self._repo = repo
292 self._repo = repo
293 storetype = self._repo.ui.config(b'infinitepush', b'storetype')
293 storetype = self._repo.ui.config(b'infinitepush', b'storetype')
294 if storetype == b'disk':
294 if storetype == b'disk':
295 from . import store
295 from . import store
296
296
297 self.store = store.filebundlestore(self._repo.ui, self._repo)
297 self.store = store.filebundlestore(self._repo.ui, self._repo)
298 elif storetype == b'external':
298 elif storetype == b'external':
299 self.store = _buildexternalbundlestore(self._repo.ui)
299 self.store = _buildexternalbundlestore(self._repo.ui)
300 else:
300 else:
301 raise error.Abort(
301 raise error.Abort(
302 _(b'unknown infinitepush store type specified %s') % storetype
302 _(b'unknown infinitepush store type specified %s') % storetype
303 )
303 )
304
304
305 indextype = self._repo.ui.config(b'infinitepush', b'indextype')
305 indextype = self._repo.ui.config(b'infinitepush', b'indextype')
306 if indextype == b'disk':
306 if indextype == b'disk':
307 from . import fileindexapi
307 from . import fileindexapi
308
308
309 self.index = fileindexapi.fileindexapi(self._repo)
309 self.index = fileindexapi.fileindexapi(self._repo)
310 elif indextype == b'sql':
310 elif indextype == b'sql':
311 self.index = _buildsqlindex(self._repo.ui)
311 self.index = _buildsqlindex(self._repo.ui)
312 else:
312 else:
313 raise error.Abort(
313 raise error.Abort(
314 _(b'unknown infinitepush index type specified %s') % indextype
314 _(b'unknown infinitepush index type specified %s') % indextype
315 )
315 )
316
316
317
317
318 def _isserver(ui):
318 def _isserver(ui):
319 return ui.configbool(b'infinitepush', b'server')
319 return ui.configbool(b'infinitepush', b'server')
320
320
321
321
322 def reposetup(ui, repo):
322 def reposetup(ui, repo):
323 if _isserver(ui) and repo.local():
323 if _isserver(ui) and repo.local():
324 repo.bundlestore = bundlestore(repo)
324 repo.bundlestore = bundlestore(repo)
325
325
326
326
327 def extsetup(ui):
327 def extsetup(ui):
328 commonsetup(ui)
328 commonsetup(ui)
329 if _isserver(ui):
329 if _isserver(ui):
330 serverextsetup(ui)
330 serverextsetup(ui)
331 else:
331 else:
332 clientextsetup(ui)
332 clientextsetup(ui)
333
333
334
334
335 def commonsetup(ui):
335 def commonsetup(ui):
336 wireprotov1server.commands[b'listkeyspatterns'] = (
336 wireprotov1server.commands[b'listkeyspatterns'] = (
337 wireprotolistkeyspatterns,
337 wireprotolistkeyspatterns,
338 b'namespace patterns',
338 b'namespace patterns',
339 )
339 )
340 scratchbranchpat = ui.config(b'infinitepush', b'branchpattern')
340 scratchbranchpat = ui.config(b'infinitepush', b'branchpattern')
341 if scratchbranchpat:
341 if scratchbranchpat:
342 global _scratchbranchmatcher
342 global _scratchbranchmatcher
343 kind, pat, _scratchbranchmatcher = stringutil.stringmatcher(
343 kind, pat, _scratchbranchmatcher = stringutil.stringmatcher(
344 scratchbranchpat
344 scratchbranchpat
345 )
345 )
346
346
347
347
348 def serverextsetup(ui):
348 def serverextsetup(ui):
349 origpushkeyhandler = bundle2.parthandlermapping[b'pushkey']
349 origpushkeyhandler = bundle2.parthandlermapping[b'pushkey']
350
350
351 def newpushkeyhandler(*args, **kwargs):
351 def newpushkeyhandler(*args, **kwargs):
352 bundle2pushkey(origpushkeyhandler, *args, **kwargs)
352 bundle2pushkey(origpushkeyhandler, *args, **kwargs)
353
353
354 newpushkeyhandler.params = origpushkeyhandler.params
354 newpushkeyhandler.params = origpushkeyhandler.params
355 bundle2.parthandlermapping[b'pushkey'] = newpushkeyhandler
355 bundle2.parthandlermapping[b'pushkey'] = newpushkeyhandler
356
356
357 orighandlephasehandler = bundle2.parthandlermapping[b'phase-heads']
357 orighandlephasehandler = bundle2.parthandlermapping[b'phase-heads']
358 newphaseheadshandler = lambda *args, **kwargs: bundle2handlephases(
358 newphaseheadshandler = lambda *args, **kwargs: bundle2handlephases(
359 orighandlephasehandler, *args, **kwargs
359 orighandlephasehandler, *args, **kwargs
360 )
360 )
361 newphaseheadshandler.params = orighandlephasehandler.params
361 newphaseheadshandler.params = orighandlephasehandler.params
362 bundle2.parthandlermapping[b'phase-heads'] = newphaseheadshandler
362 bundle2.parthandlermapping[b'phase-heads'] = newphaseheadshandler
363
363
364 extensions.wrapfunction(
364 extensions.wrapfunction(
365 localrepo.localrepository, b'listkeys', localrepolistkeys
365 localrepo.localrepository, b'listkeys', localrepolistkeys
366 )
366 )
367 wireprotov1server.commands[b'lookup'] = (
367 wireprotov1server.commands[b'lookup'] = (
368 _lookupwrap(wireprotov1server.commands[b'lookup'][0]),
368 _lookupwrap(wireprotov1server.commands[b'lookup'][0]),
369 b'key',
369 b'key',
370 )
370 )
371 extensions.wrapfunction(exchange, b'getbundlechunks', getbundlechunks)
371 extensions.wrapfunction(exchange, b'getbundlechunks', getbundlechunks)
372
372
373 extensions.wrapfunction(bundle2, b'processparts', processparts)
373 extensions.wrapfunction(bundle2, b'processparts', processparts)
374
374
375
375
376 def clientextsetup(ui):
376 def clientextsetup(ui):
377 entry = extensions.wrapcommand(commands.table, b'push', _push)
377 entry = extensions.wrapcommand(commands.table, b'push', _push)
378
378
379 entry[1].append(
379 entry[1].append(
380 (
380 (
381 b'',
381 b'',
382 b'bundle-store',
382 b'bundle-store',
383 None,
383 None,
384 _(b'force push to go to bundle store (EXPERIMENTAL)'),
384 _(b'force push to go to bundle store (EXPERIMENTAL)'),
385 )
385 )
386 )
386 )
387
387
388 extensions.wrapcommand(commands.table, b'pull', _pull)
388 extensions.wrapcommand(commands.table, b'pull', _pull)
389
389
390 extensions.wrapfunction(discovery, b'checkheads', _checkheads)
390 extensions.wrapfunction(discovery, b'checkheads', _checkheads)
391
391
392 wireprotov1peer.wirepeer.listkeyspatterns = listkeyspatterns
392 wireprotov1peer.wirepeer.listkeyspatterns = listkeyspatterns
393
393
394 partorder = exchange.b2partsgenorder
394 partorder = exchange.b2partsgenorder
395 index = partorder.index(b'changeset')
395 index = partorder.index(b'changeset')
396 partorder.insert(
396 partorder.insert(
397 index, partorder.pop(partorder.index(scratchbranchparttype))
397 index, partorder.pop(partorder.index(scratchbranchparttype))
398 )
398 )
399
399
400
400
401 def _checkheads(orig, pushop):
401 def _checkheads(orig, pushop):
402 if pushop.ui.configbool(experimental, configscratchpush, False):
402 if pushop.ui.configbool(experimental, configscratchpush, False):
403 return
403 return
404 return orig(pushop)
404 return orig(pushop)
405
405
406
406
407 def wireprotolistkeyspatterns(repo, proto, namespace, patterns):
407 def wireprotolistkeyspatterns(repo, proto, namespace, patterns):
408 patterns = wireprototypes.decodelist(patterns)
408 patterns = wireprototypes.decodelist(patterns)
409 d = pycompat.iteritems(repo.listkeys(encoding.tolocal(namespace), patterns))
409 d = pycompat.iteritems(repo.listkeys(encoding.tolocal(namespace), patterns))
410 return pushkey.encodekeys(d)
410 return pushkey.encodekeys(d)
411
411
412
412
413 def localrepolistkeys(orig, self, namespace, patterns=None):
413 def localrepolistkeys(orig, self, namespace, patterns=None):
414 if namespace == b'bookmarks' and patterns:
414 if namespace == b'bookmarks' and patterns:
415 index = self.bundlestore.index
415 index = self.bundlestore.index
416 results = {}
416 results = {}
417 bookmarks = orig(self, namespace)
417 bookmarks = orig(self, namespace)
418 for pattern in patterns:
418 for pattern in patterns:
419 results.update(index.getbookmarks(pattern))
419 results.update(index.getbookmarks(pattern))
420 if pattern.endswith(b'*'):
420 if pattern.endswith(b'*'):
421 pattern = b're:^' + pattern[:-1] + b'.*'
421 pattern = b're:^' + pattern[:-1] + b'.*'
422 kind, pat, matcher = stringutil.stringmatcher(pattern)
422 kind, pat, matcher = stringutil.stringmatcher(pattern)
423 for bookmark, node in pycompat.iteritems(bookmarks):
423 for bookmark, node in pycompat.iteritems(bookmarks):
424 if matcher(bookmark):
424 if matcher(bookmark):
425 results[bookmark] = node
425 results[bookmark] = node
426 return results
426 return results
427 else:
427 else:
428 return orig(self, namespace)
428 return orig(self, namespace)
429
429
430
430
431 @wireprotov1peer.batchable
431 @wireprotov1peer.batchable
432 def listkeyspatterns(self, namespace, patterns):
432 def listkeyspatterns(self, namespace, patterns):
433 if not self.capable(b'pushkey'):
433 if not self.capable(b'pushkey'):
434 yield {}, None
434 return {}, None
435 f = wireprotov1peer.future()
436 self.ui.debug(b'preparing listkeys for "%s"\n' % namespace)
435 self.ui.debug(b'preparing listkeys for "%s"\n' % namespace)
437 yield {
436
438 b'namespace': encoding.fromlocal(namespace),
437 def decode(d):
439 b'patterns': wireprototypes.encodelist(patterns),
440 }, f
441 d = f.value
442 self.ui.debug(
438 self.ui.debug(
443 b'received listkey for "%s": %i bytes\n' % (namespace, len(d))
439 b'received listkey for "%s": %i bytes\n' % (namespace, len(d))
444 )
440 )
445 yield pushkey.decodekeys(d)
441 return pushkey.decodekeys(d)
442
443 return {
444 b'namespace': encoding.fromlocal(namespace),
445 b'patterns': wireprototypes.encodelist(patterns),
446 }, decode
446
447
447
448
448 def _readbundlerevs(bundlerepo):
449 def _readbundlerevs(bundlerepo):
449 return list(bundlerepo.revs(b'bundle()'))
450 return list(bundlerepo.revs(b'bundle()'))
450
451
451
452
452 def _includefilelogstobundle(bundlecaps, bundlerepo, bundlerevs, ui):
453 def _includefilelogstobundle(bundlecaps, bundlerepo, bundlerevs, ui):
453 """Tells remotefilelog to include all changed files to the changegroup
454 """Tells remotefilelog to include all changed files to the changegroup
454
455
455 By default remotefilelog doesn't include file content to the changegroup.
456 By default remotefilelog doesn't include file content to the changegroup.
456 But we need to include it if we are fetching from bundlestore.
457 But we need to include it if we are fetching from bundlestore.
457 """
458 """
458 changedfiles = set()
459 changedfiles = set()
459 cl = bundlerepo.changelog
460 cl = bundlerepo.changelog
460 for r in bundlerevs:
461 for r in bundlerevs:
461 # [3] means changed files
462 # [3] means changed files
462 changedfiles.update(cl.read(r)[3])
463 changedfiles.update(cl.read(r)[3])
463 if not changedfiles:
464 if not changedfiles:
464 return bundlecaps
465 return bundlecaps
465
466
466 changedfiles = b'\0'.join(changedfiles)
467 changedfiles = b'\0'.join(changedfiles)
467 newcaps = []
468 newcaps = []
468 appended = False
469 appended = False
469 for cap in bundlecaps or []:
470 for cap in bundlecaps or []:
470 if cap.startswith(b'excludepattern='):
471 if cap.startswith(b'excludepattern='):
471 newcaps.append(b'\0'.join((cap, changedfiles)))
472 newcaps.append(b'\0'.join((cap, changedfiles)))
472 appended = True
473 appended = True
473 else:
474 else:
474 newcaps.append(cap)
475 newcaps.append(cap)
475 if not appended:
476 if not appended:
476 # Not found excludepattern cap. Just append it
477 # Not found excludepattern cap. Just append it
477 newcaps.append(b'excludepattern=' + changedfiles)
478 newcaps.append(b'excludepattern=' + changedfiles)
478
479
479 return newcaps
480 return newcaps
480
481
481
482
482 def _rebundle(bundlerepo, bundleroots, unknownhead):
483 def _rebundle(bundlerepo, bundleroots, unknownhead):
483 """
484 """
484 Bundle may include more revision then user requested. For example,
485 Bundle may include more revision then user requested. For example,
485 if user asks for revision but bundle also consists its descendants.
486 if user asks for revision but bundle also consists its descendants.
486 This function will filter out all revision that user is not requested.
487 This function will filter out all revision that user is not requested.
487 """
488 """
488 parts = []
489 parts = []
489
490
490 version = b'02'
491 version = b'02'
491 outgoing = discovery.outgoing(
492 outgoing = discovery.outgoing(
492 bundlerepo, commonheads=bundleroots, ancestorsof=[unknownhead]
493 bundlerepo, commonheads=bundleroots, ancestorsof=[unknownhead]
493 )
494 )
494 cgstream = changegroup.makestream(bundlerepo, outgoing, version, b'pull')
495 cgstream = changegroup.makestream(bundlerepo, outgoing, version, b'pull')
495 cgstream = util.chunkbuffer(cgstream).read()
496 cgstream = util.chunkbuffer(cgstream).read()
496 cgpart = bundle2.bundlepart(b'changegroup', data=cgstream)
497 cgpart = bundle2.bundlepart(b'changegroup', data=cgstream)
497 cgpart.addparam(b'version', version)
498 cgpart.addparam(b'version', version)
498 parts.append(cgpart)
499 parts.append(cgpart)
499
500
500 return parts
501 return parts
501
502
502
503
503 def _getbundleroots(oldrepo, bundlerepo, bundlerevs):
504 def _getbundleroots(oldrepo, bundlerepo, bundlerevs):
504 cl = bundlerepo.changelog
505 cl = bundlerepo.changelog
505 bundleroots = []
506 bundleroots = []
506 for rev in bundlerevs:
507 for rev in bundlerevs:
507 node = cl.node(rev)
508 node = cl.node(rev)
508 parents = cl.parents(node)
509 parents = cl.parents(node)
509 for parent in parents:
510 for parent in parents:
510 # include all revs that exist in the main repo
511 # include all revs that exist in the main repo
511 # to make sure that bundle may apply client-side
512 # to make sure that bundle may apply client-side
512 if parent in oldrepo:
513 if parent in oldrepo:
513 bundleroots.append(parent)
514 bundleroots.append(parent)
514 return bundleroots
515 return bundleroots
515
516
516
517
517 def _needsrebundling(head, bundlerepo):
518 def _needsrebundling(head, bundlerepo):
518 bundleheads = list(bundlerepo.revs(b'heads(bundle())'))
519 bundleheads = list(bundlerepo.revs(b'heads(bundle())'))
519 return not (
520 return not (
520 len(bundleheads) == 1 and bundlerepo[bundleheads[0]].node() == head
521 len(bundleheads) == 1 and bundlerepo[bundleheads[0]].node() == head
521 )
522 )
522
523
523
524
524 def _generateoutputparts(head, bundlerepo, bundleroots, bundlefile):
525 def _generateoutputparts(head, bundlerepo, bundleroots, bundlefile):
525 """generates bundle that will be send to the user
526 """generates bundle that will be send to the user
526
527
527 returns tuple with raw bundle string and bundle type
528 returns tuple with raw bundle string and bundle type
528 """
529 """
529 parts = []
530 parts = []
530 if not _needsrebundling(head, bundlerepo):
531 if not _needsrebundling(head, bundlerepo):
531 with util.posixfile(bundlefile, b"rb") as f:
532 with util.posixfile(bundlefile, b"rb") as f:
532 unbundler = exchange.readbundle(bundlerepo.ui, f, bundlefile)
533 unbundler = exchange.readbundle(bundlerepo.ui, f, bundlefile)
533 if isinstance(unbundler, changegroup.cg1unpacker):
534 if isinstance(unbundler, changegroup.cg1unpacker):
534 part = bundle2.bundlepart(
535 part = bundle2.bundlepart(
535 b'changegroup', data=unbundler._stream.read()
536 b'changegroup', data=unbundler._stream.read()
536 )
537 )
537 part.addparam(b'version', b'01')
538 part.addparam(b'version', b'01')
538 parts.append(part)
539 parts.append(part)
539 elif isinstance(unbundler, bundle2.unbundle20):
540 elif isinstance(unbundler, bundle2.unbundle20):
540 haschangegroup = False
541 haschangegroup = False
541 for part in unbundler.iterparts():
542 for part in unbundler.iterparts():
542 if part.type == b'changegroup':
543 if part.type == b'changegroup':
543 haschangegroup = True
544 haschangegroup = True
544 newpart = bundle2.bundlepart(part.type, data=part.read())
545 newpart = bundle2.bundlepart(part.type, data=part.read())
545 for key, value in pycompat.iteritems(part.params):
546 for key, value in pycompat.iteritems(part.params):
546 newpart.addparam(key, value)
547 newpart.addparam(key, value)
547 parts.append(newpart)
548 parts.append(newpart)
548
549
549 if not haschangegroup:
550 if not haschangegroup:
550 raise error.Abort(
551 raise error.Abort(
551 b'unexpected bundle without changegroup part, '
552 b'unexpected bundle without changegroup part, '
552 + b'head: %s' % hex(head),
553 + b'head: %s' % hex(head),
553 hint=b'report to administrator',
554 hint=b'report to administrator',
554 )
555 )
555 else:
556 else:
556 raise error.Abort(b'unknown bundle type')
557 raise error.Abort(b'unknown bundle type')
557 else:
558 else:
558 parts = _rebundle(bundlerepo, bundleroots, head)
559 parts = _rebundle(bundlerepo, bundleroots, head)
559
560
560 return parts
561 return parts
561
562
562
563
563 def getbundlechunks(orig, repo, source, heads=None, bundlecaps=None, **kwargs):
564 def getbundlechunks(orig, repo, source, heads=None, bundlecaps=None, **kwargs):
564 heads = heads or []
565 heads = heads or []
565 # newheads are parents of roots of scratch bundles that were requested
566 # newheads are parents of roots of scratch bundles that were requested
566 newphases = {}
567 newphases = {}
567 scratchbundles = []
568 scratchbundles = []
568 newheads = []
569 newheads = []
569 scratchheads = []
570 scratchheads = []
570 nodestobundle = {}
571 nodestobundle = {}
571 allbundlestocleanup = []
572 allbundlestocleanup = []
572 try:
573 try:
573 for head in heads:
574 for head in heads:
574 if not repo.changelog.index.has_node(head):
575 if not repo.changelog.index.has_node(head):
575 if head not in nodestobundle:
576 if head not in nodestobundle:
576 newbundlefile = common.downloadbundle(repo, head)
577 newbundlefile = common.downloadbundle(repo, head)
577 bundlepath = b"bundle:%s+%s" % (repo.root, newbundlefile)
578 bundlepath = b"bundle:%s+%s" % (repo.root, newbundlefile)
578 bundlerepo = hg.repository(repo.ui, bundlepath)
579 bundlerepo = hg.repository(repo.ui, bundlepath)
579
580
580 allbundlestocleanup.append((bundlerepo, newbundlefile))
581 allbundlestocleanup.append((bundlerepo, newbundlefile))
581 bundlerevs = set(_readbundlerevs(bundlerepo))
582 bundlerevs = set(_readbundlerevs(bundlerepo))
582 bundlecaps = _includefilelogstobundle(
583 bundlecaps = _includefilelogstobundle(
583 bundlecaps, bundlerepo, bundlerevs, repo.ui
584 bundlecaps, bundlerepo, bundlerevs, repo.ui
584 )
585 )
585 cl = bundlerepo.changelog
586 cl = bundlerepo.changelog
586 bundleroots = _getbundleroots(repo, bundlerepo, bundlerevs)
587 bundleroots = _getbundleroots(repo, bundlerepo, bundlerevs)
587 for rev in bundlerevs:
588 for rev in bundlerevs:
588 node = cl.node(rev)
589 node = cl.node(rev)
589 newphases[hex(node)] = str(phases.draft)
590 newphases[hex(node)] = str(phases.draft)
590 nodestobundle[node] = (
591 nodestobundle[node] = (
591 bundlerepo,
592 bundlerepo,
592 bundleroots,
593 bundleroots,
593 newbundlefile,
594 newbundlefile,
594 )
595 )
595
596
596 scratchbundles.append(
597 scratchbundles.append(
597 _generateoutputparts(head, *nodestobundle[head])
598 _generateoutputparts(head, *nodestobundle[head])
598 )
599 )
599 newheads.extend(bundleroots)
600 newheads.extend(bundleroots)
600 scratchheads.append(head)
601 scratchheads.append(head)
601 finally:
602 finally:
602 for bundlerepo, bundlefile in allbundlestocleanup:
603 for bundlerepo, bundlefile in allbundlestocleanup:
603 bundlerepo.close()
604 bundlerepo.close()
604 try:
605 try:
605 os.unlink(bundlefile)
606 os.unlink(bundlefile)
606 except (IOError, OSError):
607 except (IOError, OSError):
607 # if we can't cleanup the file then just ignore the error,
608 # if we can't cleanup the file then just ignore the error,
608 # no need to fail
609 # no need to fail
609 pass
610 pass
610
611
611 pullfrombundlestore = bool(scratchbundles)
612 pullfrombundlestore = bool(scratchbundles)
612 wrappedchangegrouppart = False
613 wrappedchangegrouppart = False
613 wrappedlistkeys = False
614 wrappedlistkeys = False
614 oldchangegrouppart = exchange.getbundle2partsmapping[b'changegroup']
615 oldchangegrouppart = exchange.getbundle2partsmapping[b'changegroup']
615 try:
616 try:
616
617
617 def _changegrouppart(bundler, *args, **kwargs):
618 def _changegrouppart(bundler, *args, **kwargs):
618 # Order is important here. First add non-scratch part
619 # Order is important here. First add non-scratch part
619 # and only then add parts with scratch bundles because
620 # and only then add parts with scratch bundles because
620 # non-scratch part contains parents of roots of scratch bundles.
621 # non-scratch part contains parents of roots of scratch bundles.
621 result = oldchangegrouppart(bundler, *args, **kwargs)
622 result = oldchangegrouppart(bundler, *args, **kwargs)
622 for bundle in scratchbundles:
623 for bundle in scratchbundles:
623 for part in bundle:
624 for part in bundle:
624 bundler.addpart(part)
625 bundler.addpart(part)
625 return result
626 return result
626
627
627 exchange.getbundle2partsmapping[b'changegroup'] = _changegrouppart
628 exchange.getbundle2partsmapping[b'changegroup'] = _changegrouppart
628 wrappedchangegrouppart = True
629 wrappedchangegrouppart = True
629
630
630 def _listkeys(orig, self, namespace):
631 def _listkeys(orig, self, namespace):
631 origvalues = orig(self, namespace)
632 origvalues = orig(self, namespace)
632 if namespace == b'phases' and pullfrombundlestore:
633 if namespace == b'phases' and pullfrombundlestore:
633 if origvalues.get(b'publishing') == b'True':
634 if origvalues.get(b'publishing') == b'True':
634 # Make repo non-publishing to preserve draft phase
635 # Make repo non-publishing to preserve draft phase
635 del origvalues[b'publishing']
636 del origvalues[b'publishing']
636 origvalues.update(newphases)
637 origvalues.update(newphases)
637 return origvalues
638 return origvalues
638
639
639 extensions.wrapfunction(
640 extensions.wrapfunction(
640 localrepo.localrepository, b'listkeys', _listkeys
641 localrepo.localrepository, b'listkeys', _listkeys
641 )
642 )
642 wrappedlistkeys = True
643 wrappedlistkeys = True
643 heads = list((set(newheads) | set(heads)) - set(scratchheads))
644 heads = list((set(newheads) | set(heads)) - set(scratchheads))
644 result = orig(
645 result = orig(
645 repo, source, heads=heads, bundlecaps=bundlecaps, **kwargs
646 repo, source, heads=heads, bundlecaps=bundlecaps, **kwargs
646 )
647 )
647 finally:
648 finally:
648 if wrappedchangegrouppart:
649 if wrappedchangegrouppart:
649 exchange.getbundle2partsmapping[b'changegroup'] = oldchangegrouppart
650 exchange.getbundle2partsmapping[b'changegroup'] = oldchangegrouppart
650 if wrappedlistkeys:
651 if wrappedlistkeys:
651 extensions.unwrapfunction(
652 extensions.unwrapfunction(
652 localrepo.localrepository, b'listkeys', _listkeys
653 localrepo.localrepository, b'listkeys', _listkeys
653 )
654 )
654 return result
655 return result
655
656
656
657
657 def _lookupwrap(orig):
658 def _lookupwrap(orig):
658 def _lookup(repo, proto, key):
659 def _lookup(repo, proto, key):
659 localkey = encoding.tolocal(key)
660 localkey = encoding.tolocal(key)
660
661
661 if isinstance(localkey, str) and _scratchbranchmatcher(localkey):
662 if isinstance(localkey, str) and _scratchbranchmatcher(localkey):
662 scratchnode = repo.bundlestore.index.getnode(localkey)
663 scratchnode = repo.bundlestore.index.getnode(localkey)
663 if scratchnode:
664 if scratchnode:
664 return b"%d %s\n" % (1, scratchnode)
665 return b"%d %s\n" % (1, scratchnode)
665 else:
666 else:
666 return b"%d %s\n" % (
667 return b"%d %s\n" % (
667 0,
668 0,
668 b'scratch branch %s not found' % localkey,
669 b'scratch branch %s not found' % localkey,
669 )
670 )
670 else:
671 else:
671 try:
672 try:
672 r = hex(repo.lookup(localkey))
673 r = hex(repo.lookup(localkey))
673 return b"%d %s\n" % (1, r)
674 return b"%d %s\n" % (1, r)
674 except Exception as inst:
675 except Exception as inst:
675 if repo.bundlestore.index.getbundle(localkey):
676 if repo.bundlestore.index.getbundle(localkey):
676 return b"%d %s\n" % (1, localkey)
677 return b"%d %s\n" % (1, localkey)
677 else:
678 else:
678 r = stringutil.forcebytestr(inst)
679 r = stringutil.forcebytestr(inst)
679 return b"%d %s\n" % (0, r)
680 return b"%d %s\n" % (0, r)
680
681
681 return _lookup
682 return _lookup
682
683
683
684
684 def _pull(orig, ui, repo, source=b"default", **opts):
685 def _pull(orig, ui, repo, source=b"default", **opts):
685 opts = pycompat.byteskwargs(opts)
686 opts = pycompat.byteskwargs(opts)
686 # Copy paste from `pull` command
687 # Copy paste from `pull` command
687 source, branches = urlutil.get_unique_pull_path(
688 source, branches = urlutil.get_unique_pull_path(
688 b"infinite-push's pull",
689 b"infinite-push's pull",
689 repo,
690 repo,
690 ui,
691 ui,
691 source,
692 source,
692 default_branches=opts.get(b'branch'),
693 default_branches=opts.get(b'branch'),
693 )
694 )
694
695
695 scratchbookmarks = {}
696 scratchbookmarks = {}
696 unfi = repo.unfiltered()
697 unfi = repo.unfiltered()
697 unknownnodes = []
698 unknownnodes = []
698 for rev in opts.get(b'rev', []):
699 for rev in opts.get(b'rev', []):
699 if rev not in unfi:
700 if rev not in unfi:
700 unknownnodes.append(rev)
701 unknownnodes.append(rev)
701 if opts.get(b'bookmark'):
702 if opts.get(b'bookmark'):
702 bookmarks = []
703 bookmarks = []
703 revs = opts.get(b'rev') or []
704 revs = opts.get(b'rev') or []
704 for bookmark in opts.get(b'bookmark'):
705 for bookmark in opts.get(b'bookmark'):
705 if _scratchbranchmatcher(bookmark):
706 if _scratchbranchmatcher(bookmark):
706 # rev is not known yet
707 # rev is not known yet
707 # it will be fetched with listkeyspatterns next
708 # it will be fetched with listkeyspatterns next
708 scratchbookmarks[bookmark] = b'REVTOFETCH'
709 scratchbookmarks[bookmark] = b'REVTOFETCH'
709 else:
710 else:
710 bookmarks.append(bookmark)
711 bookmarks.append(bookmark)
711
712
712 if scratchbookmarks:
713 if scratchbookmarks:
713 other = hg.peer(repo, opts, source)
714 other = hg.peer(repo, opts, source)
714 try:
715 try:
715 fetchedbookmarks = other.listkeyspatterns(
716 fetchedbookmarks = other.listkeyspatterns(
716 b'bookmarks', patterns=scratchbookmarks
717 b'bookmarks', patterns=scratchbookmarks
717 )
718 )
718 for bookmark in scratchbookmarks:
719 for bookmark in scratchbookmarks:
719 if bookmark not in fetchedbookmarks:
720 if bookmark not in fetchedbookmarks:
720 raise error.Abort(
721 raise error.Abort(
721 b'remote bookmark %s not found!' % bookmark
722 b'remote bookmark %s not found!' % bookmark
722 )
723 )
723 scratchbookmarks[bookmark] = fetchedbookmarks[bookmark]
724 scratchbookmarks[bookmark] = fetchedbookmarks[bookmark]
724 revs.append(fetchedbookmarks[bookmark])
725 revs.append(fetchedbookmarks[bookmark])
725 finally:
726 finally:
726 other.close()
727 other.close()
727 opts[b'bookmark'] = bookmarks
728 opts[b'bookmark'] = bookmarks
728 opts[b'rev'] = revs
729 opts[b'rev'] = revs
729
730
730 if scratchbookmarks or unknownnodes:
731 if scratchbookmarks or unknownnodes:
731 # Set anyincoming to True
732 # Set anyincoming to True
732 extensions.wrapfunction(
733 extensions.wrapfunction(
733 discovery, b'findcommonincoming', _findcommonincoming
734 discovery, b'findcommonincoming', _findcommonincoming
734 )
735 )
735 try:
736 try:
736 # Remote scratch bookmarks will be deleted because remotenames doesn't
737 # Remote scratch bookmarks will be deleted because remotenames doesn't
737 # know about them. Let's save it before pull and restore after
738 # know about them. Let's save it before pull and restore after
738 remotescratchbookmarks = _readscratchremotebookmarks(ui, repo, source)
739 remotescratchbookmarks = _readscratchremotebookmarks(ui, repo, source)
739 result = orig(ui, repo, source, **pycompat.strkwargs(opts))
740 result = orig(ui, repo, source, **pycompat.strkwargs(opts))
740 # TODO(stash): race condition is possible
741 # TODO(stash): race condition is possible
741 # if scratch bookmarks was updated right after orig.
742 # if scratch bookmarks was updated right after orig.
742 # But that's unlikely and shouldn't be harmful.
743 # But that's unlikely and shouldn't be harmful.
743 if common.isremotebooksenabled(ui):
744 if common.isremotebooksenabled(ui):
744 remotescratchbookmarks.update(scratchbookmarks)
745 remotescratchbookmarks.update(scratchbookmarks)
745 _saveremotebookmarks(repo, remotescratchbookmarks, source)
746 _saveremotebookmarks(repo, remotescratchbookmarks, source)
746 else:
747 else:
747 _savelocalbookmarks(repo, scratchbookmarks)
748 _savelocalbookmarks(repo, scratchbookmarks)
748 return result
749 return result
749 finally:
750 finally:
750 if scratchbookmarks:
751 if scratchbookmarks:
751 extensions.unwrapfunction(discovery, b'findcommonincoming')
752 extensions.unwrapfunction(discovery, b'findcommonincoming')
752
753
753
754
754 def _readscratchremotebookmarks(ui, repo, other):
755 def _readscratchremotebookmarks(ui, repo, other):
755 if common.isremotebooksenabled(ui):
756 if common.isremotebooksenabled(ui):
756 remotenamesext = extensions.find(b'remotenames')
757 remotenamesext = extensions.find(b'remotenames')
757 remotepath = remotenamesext.activepath(repo.ui, other)
758 remotepath = remotenamesext.activepath(repo.ui, other)
758 result = {}
759 result = {}
759 # Let's refresh remotenames to make sure we have it up to date
760 # Let's refresh remotenames to make sure we have it up to date
760 # Seems that `repo.names['remotebookmarks']` may return stale bookmarks
761 # Seems that `repo.names['remotebookmarks']` may return stale bookmarks
761 # and it results in deleting scratch bookmarks. Our best guess how to
762 # and it results in deleting scratch bookmarks. Our best guess how to
762 # fix it is to use `clearnames()`
763 # fix it is to use `clearnames()`
763 repo._remotenames.clearnames()
764 repo._remotenames.clearnames()
764 for remotebookmark in repo.names[b'remotebookmarks'].listnames(repo):
765 for remotebookmark in repo.names[b'remotebookmarks'].listnames(repo):
765 path, bookname = remotenamesext.splitremotename(remotebookmark)
766 path, bookname = remotenamesext.splitremotename(remotebookmark)
766 if path == remotepath and _scratchbranchmatcher(bookname):
767 if path == remotepath and _scratchbranchmatcher(bookname):
767 nodes = repo.names[b'remotebookmarks'].nodes(
768 nodes = repo.names[b'remotebookmarks'].nodes(
768 repo, remotebookmark
769 repo, remotebookmark
769 )
770 )
770 if nodes:
771 if nodes:
771 result[bookname] = hex(nodes[0])
772 result[bookname] = hex(nodes[0])
772 return result
773 return result
773 else:
774 else:
774 return {}
775 return {}
775
776
776
777
777 def _saveremotebookmarks(repo, newbookmarks, remote):
778 def _saveremotebookmarks(repo, newbookmarks, remote):
778 remotenamesext = extensions.find(b'remotenames')
779 remotenamesext = extensions.find(b'remotenames')
779 remotepath = remotenamesext.activepath(repo.ui, remote)
780 remotepath = remotenamesext.activepath(repo.ui, remote)
780 branches = collections.defaultdict(list)
781 branches = collections.defaultdict(list)
781 bookmarks = {}
782 bookmarks = {}
782 remotenames = remotenamesext.readremotenames(repo)
783 remotenames = remotenamesext.readremotenames(repo)
783 for hexnode, nametype, remote, rname in remotenames:
784 for hexnode, nametype, remote, rname in remotenames:
784 if remote != remotepath:
785 if remote != remotepath:
785 continue
786 continue
786 if nametype == b'bookmarks':
787 if nametype == b'bookmarks':
787 if rname in newbookmarks:
788 if rname in newbookmarks:
788 # It's possible if we have a normal bookmark that matches
789 # It's possible if we have a normal bookmark that matches
789 # scratch branch pattern. In this case just use the current
790 # scratch branch pattern. In this case just use the current
790 # bookmark node
791 # bookmark node
791 del newbookmarks[rname]
792 del newbookmarks[rname]
792 bookmarks[rname] = hexnode
793 bookmarks[rname] = hexnode
793 elif nametype == b'branches':
794 elif nametype == b'branches':
794 # saveremotenames expects 20 byte binary nodes for branches
795 # saveremotenames expects 20 byte binary nodes for branches
795 branches[rname].append(bin(hexnode))
796 branches[rname].append(bin(hexnode))
796
797
797 for bookmark, hexnode in pycompat.iteritems(newbookmarks):
798 for bookmark, hexnode in pycompat.iteritems(newbookmarks):
798 bookmarks[bookmark] = hexnode
799 bookmarks[bookmark] = hexnode
799 remotenamesext.saveremotenames(repo, remotepath, branches, bookmarks)
800 remotenamesext.saveremotenames(repo, remotepath, branches, bookmarks)
800
801
801
802
802 def _savelocalbookmarks(repo, bookmarks):
803 def _savelocalbookmarks(repo, bookmarks):
803 if not bookmarks:
804 if not bookmarks:
804 return
805 return
805 with repo.wlock(), repo.lock(), repo.transaction(b'bookmark') as tr:
806 with repo.wlock(), repo.lock(), repo.transaction(b'bookmark') as tr:
806 changes = []
807 changes = []
807 for scratchbook, node in pycompat.iteritems(bookmarks):
808 for scratchbook, node in pycompat.iteritems(bookmarks):
808 changectx = repo[node]
809 changectx = repo[node]
809 changes.append((scratchbook, changectx.node()))
810 changes.append((scratchbook, changectx.node()))
810 repo._bookmarks.applychanges(repo, tr, changes)
811 repo._bookmarks.applychanges(repo, tr, changes)
811
812
812
813
813 def _findcommonincoming(orig, *args, **kwargs):
814 def _findcommonincoming(orig, *args, **kwargs):
814 common, inc, remoteheads = orig(*args, **kwargs)
815 common, inc, remoteheads = orig(*args, **kwargs)
815 return common, True, remoteheads
816 return common, True, remoteheads
816
817
817
818
818 def _push(orig, ui, repo, *dests, **opts):
819 def _push(orig, ui, repo, *dests, **opts):
819 opts = pycompat.byteskwargs(opts)
820 opts = pycompat.byteskwargs(opts)
820 bookmark = opts.get(b'bookmark')
821 bookmark = opts.get(b'bookmark')
821 # we only support pushing one infinitepush bookmark at once
822 # we only support pushing one infinitepush bookmark at once
822 if len(bookmark) == 1:
823 if len(bookmark) == 1:
823 bookmark = bookmark[0]
824 bookmark = bookmark[0]
824 else:
825 else:
825 bookmark = b''
826 bookmark = b''
826
827
827 oldphasemove = None
828 oldphasemove = None
828 overrides = {(experimental, configbookmark): bookmark}
829 overrides = {(experimental, configbookmark): bookmark}
829
830
830 with ui.configoverride(overrides, b'infinitepush'):
831 with ui.configoverride(overrides, b'infinitepush'):
831 scratchpush = opts.get(b'bundle_store')
832 scratchpush = opts.get(b'bundle_store')
832 if _scratchbranchmatcher(bookmark):
833 if _scratchbranchmatcher(bookmark):
833 scratchpush = True
834 scratchpush = True
834 # bundle2 can be sent back after push (for example, bundle2
835 # bundle2 can be sent back after push (for example, bundle2
835 # containing `pushkey` part to update bookmarks)
836 # containing `pushkey` part to update bookmarks)
836 ui.setconfig(experimental, b'bundle2.pushback', True)
837 ui.setconfig(experimental, b'bundle2.pushback', True)
837
838
838 if scratchpush:
839 if scratchpush:
839 # this is an infinitepush, we don't want the bookmark to be applied
840 # this is an infinitepush, we don't want the bookmark to be applied
840 # rather that should be stored in the bundlestore
841 # rather that should be stored in the bundlestore
841 opts[b'bookmark'] = []
842 opts[b'bookmark'] = []
842 ui.setconfig(experimental, configscratchpush, True)
843 ui.setconfig(experimental, configscratchpush, True)
843 oldphasemove = extensions.wrapfunction(
844 oldphasemove = extensions.wrapfunction(
844 exchange, b'_localphasemove', _phasemove
845 exchange, b'_localphasemove', _phasemove
845 )
846 )
846
847
847 paths = list(urlutil.get_push_paths(repo, ui, dests))
848 paths = list(urlutil.get_push_paths(repo, ui, dests))
848 if len(paths) > 1:
849 if len(paths) > 1:
849 msg = _(b'cannot push to multiple path with infinitepush')
850 msg = _(b'cannot push to multiple path with infinitepush')
850 raise error.Abort(msg)
851 raise error.Abort(msg)
851
852
852 path = paths[0]
853 path = paths[0]
853 destpath = path.pushloc or path.loc
854 destpath = path.pushloc or path.loc
854 # Remote scratch bookmarks will be deleted because remotenames doesn't
855 # Remote scratch bookmarks will be deleted because remotenames doesn't
855 # know about them. Let's save it before push and restore after
856 # know about them. Let's save it before push and restore after
856 remotescratchbookmarks = _readscratchremotebookmarks(ui, repo, destpath)
857 remotescratchbookmarks = _readscratchremotebookmarks(ui, repo, destpath)
857 result = orig(ui, repo, *dests, **pycompat.strkwargs(opts))
858 result = orig(ui, repo, *dests, **pycompat.strkwargs(opts))
858 if common.isremotebooksenabled(ui):
859 if common.isremotebooksenabled(ui):
859 if bookmark and scratchpush:
860 if bookmark and scratchpush:
860 other = hg.peer(repo, opts, destpath)
861 other = hg.peer(repo, opts, destpath)
861 try:
862 try:
862 fetchedbookmarks = other.listkeyspatterns(
863 fetchedbookmarks = other.listkeyspatterns(
863 b'bookmarks', patterns=[bookmark]
864 b'bookmarks', patterns=[bookmark]
864 )
865 )
865 remotescratchbookmarks.update(fetchedbookmarks)
866 remotescratchbookmarks.update(fetchedbookmarks)
866 finally:
867 finally:
867 other.close()
868 other.close()
868 _saveremotebookmarks(repo, remotescratchbookmarks, destpath)
869 _saveremotebookmarks(repo, remotescratchbookmarks, destpath)
869 if oldphasemove:
870 if oldphasemove:
870 exchange._localphasemove = oldphasemove
871 exchange._localphasemove = oldphasemove
871 return result
872 return result
872
873
873
874
874 def _deleteinfinitepushbookmarks(ui, repo, path, names):
875 def _deleteinfinitepushbookmarks(ui, repo, path, names):
875 """Prune remote names by removing the bookmarks we don't want anymore,
876 """Prune remote names by removing the bookmarks we don't want anymore,
876 then writing the result back to disk
877 then writing the result back to disk
877 """
878 """
878 remotenamesext = extensions.find(b'remotenames')
879 remotenamesext = extensions.find(b'remotenames')
879
880
880 # remotename format is:
881 # remotename format is:
881 # (node, nametype ("branches" or "bookmarks"), remote, name)
882 # (node, nametype ("branches" or "bookmarks"), remote, name)
882 nametype_idx = 1
883 nametype_idx = 1
883 remote_idx = 2
884 remote_idx = 2
884 name_idx = 3
885 name_idx = 3
885 remotenames = [
886 remotenames = [
886 remotename
887 remotename
887 for remotename in remotenamesext.readremotenames(repo)
888 for remotename in remotenamesext.readremotenames(repo)
888 if remotename[remote_idx] == path
889 if remotename[remote_idx] == path
889 ]
890 ]
890 remote_bm_names = [
891 remote_bm_names = [
891 remotename[name_idx]
892 remotename[name_idx]
892 for remotename in remotenames
893 for remotename in remotenames
893 if remotename[nametype_idx] == b"bookmarks"
894 if remotename[nametype_idx] == b"bookmarks"
894 ]
895 ]
895
896
896 for name in names:
897 for name in names:
897 if name not in remote_bm_names:
898 if name not in remote_bm_names:
898 raise error.Abort(
899 raise error.Abort(
899 _(
900 _(
900 b"infinitepush bookmark '{}' does not exist "
901 b"infinitepush bookmark '{}' does not exist "
901 b"in path '{}'"
902 b"in path '{}'"
902 ).format(name, path)
903 ).format(name, path)
903 )
904 )
904
905
905 bookmarks = {}
906 bookmarks = {}
906 branches = collections.defaultdict(list)
907 branches = collections.defaultdict(list)
907 for node, nametype, remote, name in remotenames:
908 for node, nametype, remote, name in remotenames:
908 if nametype == b"bookmarks" and name not in names:
909 if nametype == b"bookmarks" and name not in names:
909 bookmarks[name] = node
910 bookmarks[name] = node
910 elif nametype == b"branches":
911 elif nametype == b"branches":
911 # saveremotenames wants binary nodes for branches
912 # saveremotenames wants binary nodes for branches
912 branches[name].append(bin(node))
913 branches[name].append(bin(node))
913
914
914 remotenamesext.saveremotenames(repo, path, branches, bookmarks)
915 remotenamesext.saveremotenames(repo, path, branches, bookmarks)
915
916
916
917
917 def _phasemove(orig, pushop, nodes, phase=phases.public):
918 def _phasemove(orig, pushop, nodes, phase=phases.public):
918 """prevent commits from being marked public
919 """prevent commits from being marked public
919
920
920 Since these are going to a scratch branch, they aren't really being
921 Since these are going to a scratch branch, they aren't really being
921 published."""
922 published."""
922
923
923 if phase != phases.public:
924 if phase != phases.public:
924 orig(pushop, nodes, phase)
925 orig(pushop, nodes, phase)
925
926
926
927
927 @exchange.b2partsgenerator(scratchbranchparttype)
928 @exchange.b2partsgenerator(scratchbranchparttype)
928 def partgen(pushop, bundler):
929 def partgen(pushop, bundler):
929 bookmark = pushop.ui.config(experimental, configbookmark)
930 bookmark = pushop.ui.config(experimental, configbookmark)
930 scratchpush = pushop.ui.configbool(experimental, configscratchpush)
931 scratchpush = pushop.ui.configbool(experimental, configscratchpush)
931 if b'changesets' in pushop.stepsdone or not scratchpush:
932 if b'changesets' in pushop.stepsdone or not scratchpush:
932 return
933 return
933
934
934 if scratchbranchparttype not in bundle2.bundle2caps(pushop.remote):
935 if scratchbranchparttype not in bundle2.bundle2caps(pushop.remote):
935 return
936 return
936
937
937 pushop.stepsdone.add(b'changesets')
938 pushop.stepsdone.add(b'changesets')
938 if not pushop.outgoing.missing:
939 if not pushop.outgoing.missing:
939 pushop.ui.status(_(b'no changes found\n'))
940 pushop.ui.status(_(b'no changes found\n'))
940 pushop.cgresult = 0
941 pushop.cgresult = 0
941 return
942 return
942
943
943 # This parameter tells the server that the following bundle is an
944 # This parameter tells the server that the following bundle is an
944 # infinitepush. This let's it switch the part processing to our infinitepush
945 # infinitepush. This let's it switch the part processing to our infinitepush
945 # code path.
946 # code path.
946 bundler.addparam(b"infinitepush", b"True")
947 bundler.addparam(b"infinitepush", b"True")
947
948
948 scratchparts = bundleparts.getscratchbranchparts(
949 scratchparts = bundleparts.getscratchbranchparts(
949 pushop.repo, pushop.remote, pushop.outgoing, pushop.ui, bookmark
950 pushop.repo, pushop.remote, pushop.outgoing, pushop.ui, bookmark
950 )
951 )
951
952
952 for scratchpart in scratchparts:
953 for scratchpart in scratchparts:
953 bundler.addpart(scratchpart)
954 bundler.addpart(scratchpart)
954
955
955 def handlereply(op):
956 def handlereply(op):
956 # server either succeeds or aborts; no code to read
957 # server either succeeds or aborts; no code to read
957 pushop.cgresult = 1
958 pushop.cgresult = 1
958
959
959 return handlereply
960 return handlereply
960
961
961
962
962 bundle2.capabilities[bundleparts.scratchbranchparttype] = ()
963 bundle2.capabilities[bundleparts.scratchbranchparttype] = ()
963
964
964
965
965 def _getrevs(bundle, oldnode, force, bookmark):
966 def _getrevs(bundle, oldnode, force, bookmark):
966 b'extracts and validates the revs to be imported'
967 b'extracts and validates the revs to be imported'
967 revs = [bundle[r] for r in bundle.revs(b'sort(bundle())')]
968 revs = [bundle[r] for r in bundle.revs(b'sort(bundle())')]
968
969
969 # new bookmark
970 # new bookmark
970 if oldnode is None:
971 if oldnode is None:
971 return revs
972 return revs
972
973
973 # Fast forward update
974 # Fast forward update
974 if oldnode in bundle and list(bundle.set(b'bundle() & %s::', oldnode)):
975 if oldnode in bundle and list(bundle.set(b'bundle() & %s::', oldnode)):
975 return revs
976 return revs
976
977
977 return revs
978 return revs
978
979
979
980
980 @contextlib.contextmanager
981 @contextlib.contextmanager
981 def logservicecall(logger, service, **kwargs):
982 def logservicecall(logger, service, **kwargs):
982 start = time.time()
983 start = time.time()
983 logger(service, eventtype=b'start', **kwargs)
984 logger(service, eventtype=b'start', **kwargs)
984 try:
985 try:
985 yield
986 yield
986 logger(
987 logger(
987 service,
988 service,
988 eventtype=b'success',
989 eventtype=b'success',
989 elapsedms=(time.time() - start) * 1000,
990 elapsedms=(time.time() - start) * 1000,
990 **kwargs
991 **kwargs
991 )
992 )
992 except Exception as e:
993 except Exception as e:
993 logger(
994 logger(
994 service,
995 service,
995 eventtype=b'failure',
996 eventtype=b'failure',
996 elapsedms=(time.time() - start) * 1000,
997 elapsedms=(time.time() - start) * 1000,
997 errormsg=stringutil.forcebytestr(e),
998 errormsg=stringutil.forcebytestr(e),
998 **kwargs
999 **kwargs
999 )
1000 )
1000 raise
1001 raise
1001
1002
1002
1003
1003 def _getorcreateinfinitepushlogger(op):
1004 def _getorcreateinfinitepushlogger(op):
1004 logger = op.records[b'infinitepushlogger']
1005 logger = op.records[b'infinitepushlogger']
1005 if not logger:
1006 if not logger:
1006 ui = op.repo.ui
1007 ui = op.repo.ui
1007 try:
1008 try:
1008 username = procutil.getuser()
1009 username = procutil.getuser()
1009 except Exception:
1010 except Exception:
1010 username = b'unknown'
1011 username = b'unknown'
1011 # Generate random request id to be able to find all logged entries
1012 # Generate random request id to be able to find all logged entries
1012 # for the same request. Since requestid is pseudo-generated it may
1013 # for the same request. Since requestid is pseudo-generated it may
1013 # not be unique, but we assume that (hostname, username, requestid)
1014 # not be unique, but we assume that (hostname, username, requestid)
1014 # is unique.
1015 # is unique.
1015 random.seed()
1016 random.seed()
1016 requestid = random.randint(0, 2000000000)
1017 requestid = random.randint(0, 2000000000)
1017 hostname = socket.gethostname()
1018 hostname = socket.gethostname()
1018 logger = functools.partial(
1019 logger = functools.partial(
1019 ui.log,
1020 ui.log,
1020 b'infinitepush',
1021 b'infinitepush',
1021 user=username,
1022 user=username,
1022 requestid=requestid,
1023 requestid=requestid,
1023 hostname=hostname,
1024 hostname=hostname,
1024 reponame=ui.config(b'infinitepush', b'reponame'),
1025 reponame=ui.config(b'infinitepush', b'reponame'),
1025 )
1026 )
1026 op.records.add(b'infinitepushlogger', logger)
1027 op.records.add(b'infinitepushlogger', logger)
1027 else:
1028 else:
1028 logger = logger[0]
1029 logger = logger[0]
1029 return logger
1030 return logger
1030
1031
1031
1032
1032 def storetobundlestore(orig, repo, op, unbundler):
1033 def storetobundlestore(orig, repo, op, unbundler):
1033 """stores the incoming bundle coming from push command to the bundlestore
1034 """stores the incoming bundle coming from push command to the bundlestore
1034 instead of applying on the revlogs"""
1035 instead of applying on the revlogs"""
1035
1036
1036 repo.ui.status(_(b"storing changesets on the bundlestore\n"))
1037 repo.ui.status(_(b"storing changesets on the bundlestore\n"))
1037 bundler = bundle2.bundle20(repo.ui)
1038 bundler = bundle2.bundle20(repo.ui)
1038
1039
1039 # processing each part and storing it in bundler
1040 # processing each part and storing it in bundler
1040 with bundle2.partiterator(repo, op, unbundler) as parts:
1041 with bundle2.partiterator(repo, op, unbundler) as parts:
1041 for part in parts:
1042 for part in parts:
1042 bundlepart = None
1043 bundlepart = None
1043 if part.type == b'replycaps':
1044 if part.type == b'replycaps':
1044 # This configures the current operation to allow reply parts.
1045 # This configures the current operation to allow reply parts.
1045 bundle2._processpart(op, part)
1046 bundle2._processpart(op, part)
1046 else:
1047 else:
1047 bundlepart = bundle2.bundlepart(part.type, data=part.read())
1048 bundlepart = bundle2.bundlepart(part.type, data=part.read())
1048 for key, value in pycompat.iteritems(part.params):
1049 for key, value in pycompat.iteritems(part.params):
1049 bundlepart.addparam(key, value)
1050 bundlepart.addparam(key, value)
1050
1051
1051 # Certain parts require a response
1052 # Certain parts require a response
1052 if part.type in (b'pushkey', b'changegroup'):
1053 if part.type in (b'pushkey', b'changegroup'):
1053 if op.reply is not None:
1054 if op.reply is not None:
1054 rpart = op.reply.newpart(b'reply:%s' % part.type)
1055 rpart = op.reply.newpart(b'reply:%s' % part.type)
1055 rpart.addparam(
1056 rpart.addparam(
1056 b'in-reply-to', b'%d' % part.id, mandatory=False
1057 b'in-reply-to', b'%d' % part.id, mandatory=False
1057 )
1058 )
1058 rpart.addparam(b'return', b'1', mandatory=False)
1059 rpart.addparam(b'return', b'1', mandatory=False)
1059
1060
1060 op.records.add(
1061 op.records.add(
1061 part.type,
1062 part.type,
1062 {
1063 {
1063 b'return': 1,
1064 b'return': 1,
1064 },
1065 },
1065 )
1066 )
1066 if bundlepart:
1067 if bundlepart:
1067 bundler.addpart(bundlepart)
1068 bundler.addpart(bundlepart)
1068
1069
1069 # storing the bundle in the bundlestore
1070 # storing the bundle in the bundlestore
1070 buf = util.chunkbuffer(bundler.getchunks())
1071 buf = util.chunkbuffer(bundler.getchunks())
1071 fd, bundlefile = pycompat.mkstemp()
1072 fd, bundlefile = pycompat.mkstemp()
1072 try:
1073 try:
1073 try:
1074 try:
1074 fp = os.fdopen(fd, 'wb')
1075 fp = os.fdopen(fd, 'wb')
1075 fp.write(buf.read())
1076 fp.write(buf.read())
1076 finally:
1077 finally:
1077 fp.close()
1078 fp.close()
1078 storebundle(op, {}, bundlefile)
1079 storebundle(op, {}, bundlefile)
1079 finally:
1080 finally:
1080 try:
1081 try:
1081 os.unlink(bundlefile)
1082 os.unlink(bundlefile)
1082 except Exception:
1083 except Exception:
1083 # we would rather see the original exception
1084 # we would rather see the original exception
1084 pass
1085 pass
1085
1086
1086
1087
1087 def processparts(orig, repo, op, unbundler):
1088 def processparts(orig, repo, op, unbundler):
1088
1089
1089 # make sure we don't wrap processparts in case of `hg unbundle`
1090 # make sure we don't wrap processparts in case of `hg unbundle`
1090 if op.source == b'unbundle':
1091 if op.source == b'unbundle':
1091 return orig(repo, op, unbundler)
1092 return orig(repo, op, unbundler)
1092
1093
1093 # this server routes each push to bundle store
1094 # this server routes each push to bundle store
1094 if repo.ui.configbool(b'infinitepush', b'pushtobundlestore'):
1095 if repo.ui.configbool(b'infinitepush', b'pushtobundlestore'):
1095 return storetobundlestore(orig, repo, op, unbundler)
1096 return storetobundlestore(orig, repo, op, unbundler)
1096
1097
1097 if unbundler.params.get(b'infinitepush') != b'True':
1098 if unbundler.params.get(b'infinitepush') != b'True':
1098 return orig(repo, op, unbundler)
1099 return orig(repo, op, unbundler)
1099
1100
1100 handleallparts = repo.ui.configbool(b'infinitepush', b'storeallparts')
1101 handleallparts = repo.ui.configbool(b'infinitepush', b'storeallparts')
1101
1102
1102 bundler = bundle2.bundle20(repo.ui)
1103 bundler = bundle2.bundle20(repo.ui)
1103 cgparams = None
1104 cgparams = None
1104 with bundle2.partiterator(repo, op, unbundler) as parts:
1105 with bundle2.partiterator(repo, op, unbundler) as parts:
1105 for part in parts:
1106 for part in parts:
1106 bundlepart = None
1107 bundlepart = None
1107 if part.type == b'replycaps':
1108 if part.type == b'replycaps':
1108 # This configures the current operation to allow reply parts.
1109 # This configures the current operation to allow reply parts.
1109 bundle2._processpart(op, part)
1110 bundle2._processpart(op, part)
1110 elif part.type == bundleparts.scratchbranchparttype:
1111 elif part.type == bundleparts.scratchbranchparttype:
1111 # Scratch branch parts need to be converted to normal
1112 # Scratch branch parts need to be converted to normal
1112 # changegroup parts, and the extra parameters stored for later
1113 # changegroup parts, and the extra parameters stored for later
1113 # when we upload to the store. Eventually those parameters will
1114 # when we upload to the store. Eventually those parameters will
1114 # be put on the actual bundle instead of this part, then we can
1115 # be put on the actual bundle instead of this part, then we can
1115 # send a vanilla changegroup instead of the scratchbranch part.
1116 # send a vanilla changegroup instead of the scratchbranch part.
1116 cgversion = part.params.get(b'cgversion', b'01')
1117 cgversion = part.params.get(b'cgversion', b'01')
1117 bundlepart = bundle2.bundlepart(
1118 bundlepart = bundle2.bundlepart(
1118 b'changegroup', data=part.read()
1119 b'changegroup', data=part.read()
1119 )
1120 )
1120 bundlepart.addparam(b'version', cgversion)
1121 bundlepart.addparam(b'version', cgversion)
1121 cgparams = part.params
1122 cgparams = part.params
1122
1123
1123 # If we're not dumping all parts into the new bundle, we need to
1124 # If we're not dumping all parts into the new bundle, we need to
1124 # alert the future pushkey and phase-heads handler to skip
1125 # alert the future pushkey and phase-heads handler to skip
1125 # the part.
1126 # the part.
1126 if not handleallparts:
1127 if not handleallparts:
1127 op.records.add(
1128 op.records.add(
1128 scratchbranchparttype + b'_skippushkey', True
1129 scratchbranchparttype + b'_skippushkey', True
1129 )
1130 )
1130 op.records.add(
1131 op.records.add(
1131 scratchbranchparttype + b'_skipphaseheads', True
1132 scratchbranchparttype + b'_skipphaseheads', True
1132 )
1133 )
1133 else:
1134 else:
1134 if handleallparts:
1135 if handleallparts:
1135 # Ideally we would not process any parts, and instead just
1136 # Ideally we would not process any parts, and instead just
1136 # forward them to the bundle for storage, but since this
1137 # forward them to the bundle for storage, but since this
1137 # differs from previous behavior, we need to put it behind a
1138 # differs from previous behavior, we need to put it behind a
1138 # config flag for incremental rollout.
1139 # config flag for incremental rollout.
1139 bundlepart = bundle2.bundlepart(part.type, data=part.read())
1140 bundlepart = bundle2.bundlepart(part.type, data=part.read())
1140 for key, value in pycompat.iteritems(part.params):
1141 for key, value in pycompat.iteritems(part.params):
1141 bundlepart.addparam(key, value)
1142 bundlepart.addparam(key, value)
1142
1143
1143 # Certain parts require a response
1144 # Certain parts require a response
1144 if part.type == b'pushkey':
1145 if part.type == b'pushkey':
1145 if op.reply is not None:
1146 if op.reply is not None:
1146 rpart = op.reply.newpart(b'reply:pushkey')
1147 rpart = op.reply.newpart(b'reply:pushkey')
1147 rpart.addparam(
1148 rpart.addparam(
1148 b'in-reply-to', str(part.id), mandatory=False
1149 b'in-reply-to', str(part.id), mandatory=False
1149 )
1150 )
1150 rpart.addparam(b'return', b'1', mandatory=False)
1151 rpart.addparam(b'return', b'1', mandatory=False)
1151 else:
1152 else:
1152 bundle2._processpart(op, part)
1153 bundle2._processpart(op, part)
1153
1154
1154 if handleallparts:
1155 if handleallparts:
1155 op.records.add(
1156 op.records.add(
1156 part.type,
1157 part.type,
1157 {
1158 {
1158 b'return': 1,
1159 b'return': 1,
1159 },
1160 },
1160 )
1161 )
1161 if bundlepart:
1162 if bundlepart:
1162 bundler.addpart(bundlepart)
1163 bundler.addpart(bundlepart)
1163
1164
1164 # If commits were sent, store them
1165 # If commits were sent, store them
1165 if cgparams:
1166 if cgparams:
1166 buf = util.chunkbuffer(bundler.getchunks())
1167 buf = util.chunkbuffer(bundler.getchunks())
1167 fd, bundlefile = pycompat.mkstemp()
1168 fd, bundlefile = pycompat.mkstemp()
1168 try:
1169 try:
1169 try:
1170 try:
1170 fp = os.fdopen(fd, 'wb')
1171 fp = os.fdopen(fd, 'wb')
1171 fp.write(buf.read())
1172 fp.write(buf.read())
1172 finally:
1173 finally:
1173 fp.close()
1174 fp.close()
1174 storebundle(op, cgparams, bundlefile)
1175 storebundle(op, cgparams, bundlefile)
1175 finally:
1176 finally:
1176 try:
1177 try:
1177 os.unlink(bundlefile)
1178 os.unlink(bundlefile)
1178 except Exception:
1179 except Exception:
1179 # we would rather see the original exception
1180 # we would rather see the original exception
1180 pass
1181 pass
1181
1182
1182
1183
1183 def storebundle(op, params, bundlefile):
1184 def storebundle(op, params, bundlefile):
1184 log = _getorcreateinfinitepushlogger(op)
1185 log = _getorcreateinfinitepushlogger(op)
1185 parthandlerstart = time.time()
1186 parthandlerstart = time.time()
1186 log(scratchbranchparttype, eventtype=b'start')
1187 log(scratchbranchparttype, eventtype=b'start')
1187 index = op.repo.bundlestore.index
1188 index = op.repo.bundlestore.index
1188 store = op.repo.bundlestore.store
1189 store = op.repo.bundlestore.store
1189 op.records.add(scratchbranchparttype + b'_skippushkey', True)
1190 op.records.add(scratchbranchparttype + b'_skippushkey', True)
1190
1191
1191 bundle = None
1192 bundle = None
1192 try: # guards bundle
1193 try: # guards bundle
1193 bundlepath = b"bundle:%s+%s" % (op.repo.root, bundlefile)
1194 bundlepath = b"bundle:%s+%s" % (op.repo.root, bundlefile)
1194 bundle = hg.repository(op.repo.ui, bundlepath)
1195 bundle = hg.repository(op.repo.ui, bundlepath)
1195
1196
1196 bookmark = params.get(b'bookmark')
1197 bookmark = params.get(b'bookmark')
1197 bookprevnode = params.get(b'bookprevnode', b'')
1198 bookprevnode = params.get(b'bookprevnode', b'')
1198 force = params.get(b'force')
1199 force = params.get(b'force')
1199
1200
1200 if bookmark:
1201 if bookmark:
1201 oldnode = index.getnode(bookmark)
1202 oldnode = index.getnode(bookmark)
1202 else:
1203 else:
1203 oldnode = None
1204 oldnode = None
1204 bundleheads = bundle.revs(b'heads(bundle())')
1205 bundleheads = bundle.revs(b'heads(bundle())')
1205 if bookmark and len(bundleheads) > 1:
1206 if bookmark and len(bundleheads) > 1:
1206 raise error.Abort(
1207 raise error.Abort(
1207 _(b'cannot push more than one head to a scratch branch')
1208 _(b'cannot push more than one head to a scratch branch')
1208 )
1209 )
1209
1210
1210 revs = _getrevs(bundle, oldnode, force, bookmark)
1211 revs = _getrevs(bundle, oldnode, force, bookmark)
1211
1212
1212 # Notify the user of what is being pushed
1213 # Notify the user of what is being pushed
1213 plural = b's' if len(revs) > 1 else b''
1214 plural = b's' if len(revs) > 1 else b''
1214 op.repo.ui.warn(_(b"pushing %d commit%s:\n") % (len(revs), plural))
1215 op.repo.ui.warn(_(b"pushing %d commit%s:\n") % (len(revs), plural))
1215 maxoutput = 10
1216 maxoutput = 10
1216 for i in range(0, min(len(revs), maxoutput)):
1217 for i in range(0, min(len(revs), maxoutput)):
1217 firstline = bundle[revs[i]].description().split(b'\n')[0][:50]
1218 firstline = bundle[revs[i]].description().split(b'\n')[0][:50]
1218 op.repo.ui.warn(b" %s %s\n" % (revs[i], firstline))
1219 op.repo.ui.warn(b" %s %s\n" % (revs[i], firstline))
1219
1220
1220 if len(revs) > maxoutput + 1:
1221 if len(revs) > maxoutput + 1:
1221 op.repo.ui.warn(b" ...\n")
1222 op.repo.ui.warn(b" ...\n")
1222 firstline = bundle[revs[-1]].description().split(b'\n')[0][:50]
1223 firstline = bundle[revs[-1]].description().split(b'\n')[0][:50]
1223 op.repo.ui.warn(b" %s %s\n" % (revs[-1], firstline))
1224 op.repo.ui.warn(b" %s %s\n" % (revs[-1], firstline))
1224
1225
1225 nodesctx = [bundle[rev] for rev in revs]
1226 nodesctx = [bundle[rev] for rev in revs]
1226 inindex = lambda rev: bool(index.getbundle(bundle[rev].hex()))
1227 inindex = lambda rev: bool(index.getbundle(bundle[rev].hex()))
1227 if bundleheads:
1228 if bundleheads:
1228 newheadscount = sum(not inindex(rev) for rev in bundleheads)
1229 newheadscount = sum(not inindex(rev) for rev in bundleheads)
1229 else:
1230 else:
1230 newheadscount = 0
1231 newheadscount = 0
1231 # If there's a bookmark specified, there should be only one head,
1232 # If there's a bookmark specified, there should be only one head,
1232 # so we choose the last node, which will be that head.
1233 # so we choose the last node, which will be that head.
1233 # If a bug or malicious client allows there to be a bookmark
1234 # If a bug or malicious client allows there to be a bookmark
1234 # with multiple heads, we will place the bookmark on the last head.
1235 # with multiple heads, we will place the bookmark on the last head.
1235 bookmarknode = nodesctx[-1].hex() if nodesctx else None
1236 bookmarknode = nodesctx[-1].hex() if nodesctx else None
1236 key = None
1237 key = None
1237 if newheadscount:
1238 if newheadscount:
1238 with open(bundlefile, b'rb') as f:
1239 with open(bundlefile, b'rb') as f:
1239 bundledata = f.read()
1240 bundledata = f.read()
1240 with logservicecall(
1241 with logservicecall(
1241 log, b'bundlestore', bundlesize=len(bundledata)
1242 log, b'bundlestore', bundlesize=len(bundledata)
1242 ):
1243 ):
1243 bundlesizelimit = 100 * 1024 * 1024 # 100 MB
1244 bundlesizelimit = 100 * 1024 * 1024 # 100 MB
1244 if len(bundledata) > bundlesizelimit:
1245 if len(bundledata) > bundlesizelimit:
1245 error_msg = (
1246 error_msg = (
1246 b'bundle is too big: %d bytes. '
1247 b'bundle is too big: %d bytes. '
1247 + b'max allowed size is 100 MB'
1248 + b'max allowed size is 100 MB'
1248 )
1249 )
1249 raise error.Abort(error_msg % (len(bundledata),))
1250 raise error.Abort(error_msg % (len(bundledata),))
1250 key = store.write(bundledata)
1251 key = store.write(bundledata)
1251
1252
1252 with logservicecall(log, b'index', newheadscount=newheadscount), index:
1253 with logservicecall(log, b'index', newheadscount=newheadscount), index:
1253 if key:
1254 if key:
1254 index.addbundle(key, nodesctx)
1255 index.addbundle(key, nodesctx)
1255 if bookmark:
1256 if bookmark:
1256 index.addbookmark(bookmark, bookmarknode)
1257 index.addbookmark(bookmark, bookmarknode)
1257 _maybeaddpushbackpart(
1258 _maybeaddpushbackpart(
1258 op, bookmark, bookmarknode, bookprevnode, params
1259 op, bookmark, bookmarknode, bookprevnode, params
1259 )
1260 )
1260 log(
1261 log(
1261 scratchbranchparttype,
1262 scratchbranchparttype,
1262 eventtype=b'success',
1263 eventtype=b'success',
1263 elapsedms=(time.time() - parthandlerstart) * 1000,
1264 elapsedms=(time.time() - parthandlerstart) * 1000,
1264 )
1265 )
1265
1266
1266 except Exception as e:
1267 except Exception as e:
1267 log(
1268 log(
1268 scratchbranchparttype,
1269 scratchbranchparttype,
1269 eventtype=b'failure',
1270 eventtype=b'failure',
1270 elapsedms=(time.time() - parthandlerstart) * 1000,
1271 elapsedms=(time.time() - parthandlerstart) * 1000,
1271 errormsg=stringutil.forcebytestr(e),
1272 errormsg=stringutil.forcebytestr(e),
1272 )
1273 )
1273 raise
1274 raise
1274 finally:
1275 finally:
1275 if bundle:
1276 if bundle:
1276 bundle.close()
1277 bundle.close()
1277
1278
1278
1279
1279 @bundle2.parthandler(
1280 @bundle2.parthandler(
1280 scratchbranchparttype,
1281 scratchbranchparttype,
1281 (
1282 (
1282 b'bookmark',
1283 b'bookmark',
1283 b'bookprevnode',
1284 b'bookprevnode',
1284 b'force',
1285 b'force',
1285 b'pushbackbookmarks',
1286 b'pushbackbookmarks',
1286 b'cgversion',
1287 b'cgversion',
1287 ),
1288 ),
1288 )
1289 )
1289 def bundle2scratchbranch(op, part):
1290 def bundle2scratchbranch(op, part):
1290 '''unbundle a bundle2 part containing a changegroup to store'''
1291 '''unbundle a bundle2 part containing a changegroup to store'''
1291
1292
1292 bundler = bundle2.bundle20(op.repo.ui)
1293 bundler = bundle2.bundle20(op.repo.ui)
1293 cgversion = part.params.get(b'cgversion', b'01')
1294 cgversion = part.params.get(b'cgversion', b'01')
1294 cgpart = bundle2.bundlepart(b'changegroup', data=part.read())
1295 cgpart = bundle2.bundlepart(b'changegroup', data=part.read())
1295 cgpart.addparam(b'version', cgversion)
1296 cgpart.addparam(b'version', cgversion)
1296 bundler.addpart(cgpart)
1297 bundler.addpart(cgpart)
1297 buf = util.chunkbuffer(bundler.getchunks())
1298 buf = util.chunkbuffer(bundler.getchunks())
1298
1299
1299 fd, bundlefile = pycompat.mkstemp()
1300 fd, bundlefile = pycompat.mkstemp()
1300 try:
1301 try:
1301 try:
1302 try:
1302 fp = os.fdopen(fd, 'wb')
1303 fp = os.fdopen(fd, 'wb')
1303 fp.write(buf.read())
1304 fp.write(buf.read())
1304 finally:
1305 finally:
1305 fp.close()
1306 fp.close()
1306 storebundle(op, part.params, bundlefile)
1307 storebundle(op, part.params, bundlefile)
1307 finally:
1308 finally:
1308 try:
1309 try:
1309 os.unlink(bundlefile)
1310 os.unlink(bundlefile)
1310 except OSError as e:
1311 except OSError as e:
1311 if e.errno != errno.ENOENT:
1312 if e.errno != errno.ENOENT:
1312 raise
1313 raise
1313
1314
1314 return 1
1315 return 1
1315
1316
1316
1317
1317 def _maybeaddpushbackpart(op, bookmark, newnode, oldnode, params):
1318 def _maybeaddpushbackpart(op, bookmark, newnode, oldnode, params):
1318 if params.get(b'pushbackbookmarks'):
1319 if params.get(b'pushbackbookmarks'):
1319 if op.reply and b'pushback' in op.reply.capabilities:
1320 if op.reply and b'pushback' in op.reply.capabilities:
1320 params = {
1321 params = {
1321 b'namespace': b'bookmarks',
1322 b'namespace': b'bookmarks',
1322 b'key': bookmark,
1323 b'key': bookmark,
1323 b'new': newnode,
1324 b'new': newnode,
1324 b'old': oldnode,
1325 b'old': oldnode,
1325 }
1326 }
1326 op.reply.newpart(
1327 op.reply.newpart(
1327 b'pushkey', mandatoryparams=pycompat.iteritems(params)
1328 b'pushkey', mandatoryparams=pycompat.iteritems(params)
1328 )
1329 )
1329
1330
1330
1331
1331 def bundle2pushkey(orig, op, part):
1332 def bundle2pushkey(orig, op, part):
1332 """Wrapper of bundle2.handlepushkey()
1333 """Wrapper of bundle2.handlepushkey()
1333
1334
1334 The only goal is to skip calling the original function if flag is set.
1335 The only goal is to skip calling the original function if flag is set.
1335 It's set if infinitepush push is happening.
1336 It's set if infinitepush push is happening.
1336 """
1337 """
1337 if op.records[scratchbranchparttype + b'_skippushkey']:
1338 if op.records[scratchbranchparttype + b'_skippushkey']:
1338 if op.reply is not None:
1339 if op.reply is not None:
1339 rpart = op.reply.newpart(b'reply:pushkey')
1340 rpart = op.reply.newpart(b'reply:pushkey')
1340 rpart.addparam(b'in-reply-to', str(part.id), mandatory=False)
1341 rpart.addparam(b'in-reply-to', str(part.id), mandatory=False)
1341 rpart.addparam(b'return', b'1', mandatory=False)
1342 rpart.addparam(b'return', b'1', mandatory=False)
1342 return 1
1343 return 1
1343
1344
1344 return orig(op, part)
1345 return orig(op, part)
1345
1346
1346
1347
1347 def bundle2handlephases(orig, op, part):
1348 def bundle2handlephases(orig, op, part):
1348 """Wrapper of bundle2.handlephases()
1349 """Wrapper of bundle2.handlephases()
1349
1350
1350 The only goal is to skip calling the original function if flag is set.
1351 The only goal is to skip calling the original function if flag is set.
1351 It's set if infinitepush push is happening.
1352 It's set if infinitepush push is happening.
1352 """
1353 """
1353
1354
1354 if op.records[scratchbranchparttype + b'_skipphaseheads']:
1355 if op.records[scratchbranchparttype + b'_skipphaseheads']:
1355 return
1356 return
1356
1357
1357 return orig(op, part)
1358 return orig(op, part)
1358
1359
1359
1360
1360 def _asyncsavemetadata(root, nodes):
1361 def _asyncsavemetadata(root, nodes):
1361 """starts a separate process that fills metadata for the nodes
1362 """starts a separate process that fills metadata for the nodes
1362
1363
1363 This function creates a separate process and doesn't wait for it's
1364 This function creates a separate process and doesn't wait for it's
1364 completion. This was done to avoid slowing down pushes
1365 completion. This was done to avoid slowing down pushes
1365 """
1366 """
1366
1367
1367 maxnodes = 50
1368 maxnodes = 50
1368 if len(nodes) > maxnodes:
1369 if len(nodes) > maxnodes:
1369 return
1370 return
1370 nodesargs = []
1371 nodesargs = []
1371 for node in nodes:
1372 for node in nodes:
1372 nodesargs.append(b'--node')
1373 nodesargs.append(b'--node')
1373 nodesargs.append(node)
1374 nodesargs.append(node)
1374 with open(os.devnull, b'w+b') as devnull:
1375 with open(os.devnull, b'w+b') as devnull:
1375 cmdline = [
1376 cmdline = [
1376 util.hgexecutable(),
1377 util.hgexecutable(),
1377 b'debugfillinfinitepushmetadata',
1378 b'debugfillinfinitepushmetadata',
1378 b'-R',
1379 b'-R',
1379 root,
1380 root,
1380 ] + nodesargs
1381 ] + nodesargs
1381 # Process will run in background. We don't care about the return code
1382 # Process will run in background. We don't care about the return code
1382 subprocess.Popen(
1383 subprocess.Popen(
1383 pycompat.rapply(procutil.tonativestr, cmdline),
1384 pycompat.rapply(procutil.tonativestr, cmdline),
1384 close_fds=True,
1385 close_fds=True,
1385 shell=False,
1386 shell=False,
1386 stdin=devnull,
1387 stdin=devnull,
1387 stdout=devnull,
1388 stdout=devnull,
1388 stderr=devnull,
1389 stderr=devnull,
1389 )
1390 )
@@ -1,217 +1,218 b''
1 # Copyright 2011 Fog Creek Software
1 # Copyright 2011 Fog Creek Software
2 #
2 #
3 # This software may be used and distributed according to the terms of the
3 # This software may be used and distributed according to the terms of the
4 # GNU General Public License version 2 or any later version.
4 # GNU General Public License version 2 or any later version.
5 from __future__ import absolute_import
5 from __future__ import absolute_import
6
6
7 import os
7 import os
8
8
9 from mercurial.i18n import _
9 from mercurial.i18n import _
10 from mercurial.pycompat import open
10 from mercurial.pycompat import open
11
11
12 from mercurial import (
12 from mercurial import (
13 error,
13 error,
14 exthelper,
14 exthelper,
15 httppeer,
15 httppeer,
16 util,
16 util,
17 wireprototypes,
17 wireprototypes,
18 wireprotov1peer,
18 wireprotov1peer,
19 wireprotov1server,
19 wireprotov1server,
20 )
20 )
21
21
22 from . import lfutil
22 from . import lfutil
23
23
24 urlerr = util.urlerr
24 urlerr = util.urlerr
25 urlreq = util.urlreq
25 urlreq = util.urlreq
26
26
27 LARGEFILES_REQUIRED_MSG = (
27 LARGEFILES_REQUIRED_MSG = (
28 b'\nThis repository uses the largefiles extension.'
28 b'\nThis repository uses the largefiles extension.'
29 b'\n\nPlease enable it in your Mercurial config '
29 b'\n\nPlease enable it in your Mercurial config '
30 b'file.\n'
30 b'file.\n'
31 )
31 )
32
32
33 eh = exthelper.exthelper()
33 eh = exthelper.exthelper()
34
34
35
35
36 def putlfile(repo, proto, sha):
36 def putlfile(repo, proto, sha):
37 """Server command for putting a largefile into a repository's local store
37 """Server command for putting a largefile into a repository's local store
38 and into the user cache."""
38 and into the user cache."""
39 with proto.mayberedirectstdio() as output:
39 with proto.mayberedirectstdio() as output:
40 path = lfutil.storepath(repo, sha)
40 path = lfutil.storepath(repo, sha)
41 util.makedirs(os.path.dirname(path))
41 util.makedirs(os.path.dirname(path))
42 tmpfp = util.atomictempfile(path, createmode=repo.store.createmode)
42 tmpfp = util.atomictempfile(path, createmode=repo.store.createmode)
43
43
44 try:
44 try:
45 for p in proto.getpayload():
45 for p in proto.getpayload():
46 tmpfp.write(p)
46 tmpfp.write(p)
47 tmpfp._fp.seek(0)
47 tmpfp._fp.seek(0)
48 if sha != lfutil.hexsha1(tmpfp._fp):
48 if sha != lfutil.hexsha1(tmpfp._fp):
49 raise IOError(0, _(b'largefile contents do not match hash'))
49 raise IOError(0, _(b'largefile contents do not match hash'))
50 tmpfp.close()
50 tmpfp.close()
51 lfutil.linktousercache(repo, sha)
51 lfutil.linktousercache(repo, sha)
52 except IOError as e:
52 except IOError as e:
53 repo.ui.warn(
53 repo.ui.warn(
54 _(b'largefiles: failed to put %s into store: %s\n')
54 _(b'largefiles: failed to put %s into store: %s\n')
55 % (sha, e.strerror)
55 % (sha, e.strerror)
56 )
56 )
57 return wireprototypes.pushres(
57 return wireprototypes.pushres(
58 1, output.getvalue() if output else b''
58 1, output.getvalue() if output else b''
59 )
59 )
60 finally:
60 finally:
61 tmpfp.discard()
61 tmpfp.discard()
62
62
63 return wireprototypes.pushres(0, output.getvalue() if output else b'')
63 return wireprototypes.pushres(0, output.getvalue() if output else b'')
64
64
65
65
66 def getlfile(repo, proto, sha):
66 def getlfile(repo, proto, sha):
67 """Server command for retrieving a largefile from the repository-local
67 """Server command for retrieving a largefile from the repository-local
68 cache or user cache."""
68 cache or user cache."""
69 filename = lfutil.findfile(repo, sha)
69 filename = lfutil.findfile(repo, sha)
70 if not filename:
70 if not filename:
71 raise error.Abort(
71 raise error.Abort(
72 _(b'requested largefile %s not present in cache') % sha
72 _(b'requested largefile %s not present in cache') % sha
73 )
73 )
74 f = open(filename, b'rb')
74 f = open(filename, b'rb')
75 length = os.fstat(f.fileno())[6]
75 length = os.fstat(f.fileno())[6]
76
76
77 # Since we can't set an HTTP content-length header here, and
77 # Since we can't set an HTTP content-length header here, and
78 # Mercurial core provides no way to give the length of a streamres
78 # Mercurial core provides no way to give the length of a streamres
79 # (and reading the entire file into RAM would be ill-advised), we
79 # (and reading the entire file into RAM would be ill-advised), we
80 # just send the length on the first line of the response, like the
80 # just send the length on the first line of the response, like the
81 # ssh proto does for string responses.
81 # ssh proto does for string responses.
82 def generator():
82 def generator():
83 yield b'%d\n' % length
83 yield b'%d\n' % length
84 for chunk in util.filechunkiter(f):
84 for chunk in util.filechunkiter(f):
85 yield chunk
85 yield chunk
86
86
87 return wireprototypes.streamreslegacy(gen=generator())
87 return wireprototypes.streamreslegacy(gen=generator())
88
88
89
89
90 def statlfile(repo, proto, sha):
90 def statlfile(repo, proto, sha):
91 """Server command for checking if a largefile is present - returns '2\n' if
91 """Server command for checking if a largefile is present - returns '2\n' if
92 the largefile is missing, '0\n' if it seems to be in good condition.
92 the largefile is missing, '0\n' if it seems to be in good condition.
93
93
94 The value 1 is reserved for mismatched checksum, but that is too expensive
94 The value 1 is reserved for mismatched checksum, but that is too expensive
95 to be verified on every stat and must be caught be running 'hg verify'
95 to be verified on every stat and must be caught be running 'hg verify'
96 server side."""
96 server side."""
97 filename = lfutil.findfile(repo, sha)
97 filename = lfutil.findfile(repo, sha)
98 if not filename:
98 if not filename:
99 return wireprototypes.bytesresponse(b'2\n')
99 return wireprototypes.bytesresponse(b'2\n')
100 return wireprototypes.bytesresponse(b'0\n')
100 return wireprototypes.bytesresponse(b'0\n')
101
101
102
102
103 def wirereposetup(ui, repo):
103 def wirereposetup(ui, repo):
104 orig_commandexecutor = repo.commandexecutor
104 orig_commandexecutor = repo.commandexecutor
105
105
106 class lfileswirerepository(repo.__class__):
106 class lfileswirerepository(repo.__class__):
107 def commandexecutor(self):
107 def commandexecutor(self):
108 executor = orig_commandexecutor()
108 executor = orig_commandexecutor()
109 if self.capable(b'largefiles'):
109 if self.capable(b'largefiles'):
110 orig_callcommand = executor.callcommand
110 orig_callcommand = executor.callcommand
111
111
112 class lfscommandexecutor(executor.__class__):
112 class lfscommandexecutor(executor.__class__):
113 def callcommand(self, command, args):
113 def callcommand(self, command, args):
114 if command == b'heads':
114 if command == b'heads':
115 command = b'lheads'
115 command = b'lheads'
116 return orig_callcommand(command, args)
116 return orig_callcommand(command, args)
117
117
118 executor.__class__ = lfscommandexecutor
118 executor.__class__ = lfscommandexecutor
119 return executor
119 return executor
120
120
121 @wireprotov1peer.batchable
121 @wireprotov1peer.batchable
122 def lheads(self):
122 def lheads(self):
123 return self.heads.batchable(self)
123 return self.heads.batchable(self)
124
124
125 def putlfile(self, sha, fd):
125 def putlfile(self, sha, fd):
126 # unfortunately, httprepository._callpush tries to convert its
126 # unfortunately, httprepository._callpush tries to convert its
127 # input file-like into a bundle before sending it, so we can't use
127 # input file-like into a bundle before sending it, so we can't use
128 # it ...
128 # it ...
129 if issubclass(self.__class__, httppeer.httppeer):
129 if issubclass(self.__class__, httppeer.httppeer):
130 res = self._call(
130 res = self._call(
131 b'putlfile',
131 b'putlfile',
132 data=fd,
132 data=fd,
133 sha=sha,
133 sha=sha,
134 headers={'content-type': 'application/mercurial-0.1'},
134 headers={'content-type': 'application/mercurial-0.1'},
135 )
135 )
136 try:
136 try:
137 d, output = res.split(b'\n', 1)
137 d, output = res.split(b'\n', 1)
138 for l in output.splitlines(True):
138 for l in output.splitlines(True):
139 self.ui.warn(_(b'remote: '), l) # assume l ends with \n
139 self.ui.warn(_(b'remote: '), l) # assume l ends with \n
140 return int(d)
140 return int(d)
141 except ValueError:
141 except ValueError:
142 self.ui.warn(_(b'unexpected putlfile response: %r\n') % res)
142 self.ui.warn(_(b'unexpected putlfile response: %r\n') % res)
143 return 1
143 return 1
144 # ... but we can't use sshrepository._call because the data=
144 # ... but we can't use sshrepository._call because the data=
145 # argument won't get sent, and _callpush does exactly what we want
145 # argument won't get sent, and _callpush does exactly what we want
146 # in this case: send the data straight through
146 # in this case: send the data straight through
147 else:
147 else:
148 try:
148 try:
149 ret, output = self._callpush(b"putlfile", fd, sha=sha)
149 ret, output = self._callpush(b"putlfile", fd, sha=sha)
150 if ret == b"":
150 if ret == b"":
151 raise error.ResponseError(
151 raise error.ResponseError(
152 _(b'putlfile failed:'), output
152 _(b'putlfile failed:'), output
153 )
153 )
154 return int(ret)
154 return int(ret)
155 except IOError:
155 except IOError:
156 return 1
156 return 1
157 except ValueError:
157 except ValueError:
158 raise error.ResponseError(
158 raise error.ResponseError(
159 _(b'putlfile failed (unexpected response):'), ret
159 _(b'putlfile failed (unexpected response):'), ret
160 )
160 )
161
161
162 def getlfile(self, sha):
162 def getlfile(self, sha):
163 """returns an iterable with the chunks of the file with sha sha"""
163 """returns an iterable with the chunks of the file with sha sha"""
164 stream = self._callstream(b"getlfile", sha=sha)
164 stream = self._callstream(b"getlfile", sha=sha)
165 length = stream.readline()
165 length = stream.readline()
166 try:
166 try:
167 length = int(length)
167 length = int(length)
168 except ValueError:
168 except ValueError:
169 self._abort(
169 self._abort(
170 error.ResponseError(_(b"unexpected response:"), length)
170 error.ResponseError(_(b"unexpected response:"), length)
171 )
171 )
172
172
173 # SSH streams will block if reading more than length
173 # SSH streams will block if reading more than length
174 for chunk in util.filechunkiter(stream, limit=length):
174 for chunk in util.filechunkiter(stream, limit=length):
175 yield chunk
175 yield chunk
176 # HTTP streams must hit the end to process the last empty
176 # HTTP streams must hit the end to process the last empty
177 # chunk of Chunked-Encoding so the connection can be reused.
177 # chunk of Chunked-Encoding so the connection can be reused.
178 if issubclass(self.__class__, httppeer.httppeer):
178 if issubclass(self.__class__, httppeer.httppeer):
179 chunk = stream.read(1)
179 chunk = stream.read(1)
180 if chunk:
180 if chunk:
181 self._abort(
181 self._abort(
182 error.ResponseError(_(b"unexpected response:"), chunk)
182 error.ResponseError(_(b"unexpected response:"), chunk)
183 )
183 )
184
184
185 @wireprotov1peer.batchable
185 @wireprotov1peer.batchable
186 def statlfile(self, sha):
186 def statlfile(self, sha):
187 f = wireprotov1peer.future()
187 def decode(d):
188 result = {b'sha': sha}
189 yield result, f
190 try:
188 try:
191 yield int(f.value)
189 return int(d)
192 except (ValueError, urlerr.httperror):
190 except (ValueError, urlerr.httperror):
193 # If the server returns anything but an integer followed by a
191 # If the server returns anything but an integer followed by a
194 # newline, newline, it's not speaking our language; if we get
192 # newline, newline, it's not speaking our language; if we get
195 # an HTTP error, we can't be sure the largefile is present;
193 # an HTTP error, we can't be sure the largefile is present;
196 # either way, consider it missing.
194 # either way, consider it missing.
197 yield 2
195 return 2
196
197 result = {b'sha': sha}
198 return result, decode
198
199
199 repo.__class__ = lfileswirerepository
200 repo.__class__ = lfileswirerepository
200
201
201
202
202 # advertise the largefiles=serve capability
203 # advertise the largefiles=serve capability
203 @eh.wrapfunction(wireprotov1server, b'_capabilities')
204 @eh.wrapfunction(wireprotov1server, b'_capabilities')
204 def _capabilities(orig, repo, proto):
205 def _capabilities(orig, repo, proto):
205 '''announce largefile server capability'''
206 '''announce largefile server capability'''
206 caps = orig(repo, proto)
207 caps = orig(repo, proto)
207 caps.append(b'largefiles=serve')
208 caps.append(b'largefiles=serve')
208 return caps
209 return caps
209
210
210
211
211 def heads(orig, repo, proto):
212 def heads(orig, repo, proto):
212 """Wrap server command - largefile capable clients will know to call
213 """Wrap server command - largefile capable clients will know to call
213 lheads instead"""
214 lheads instead"""
214 if lfutil.islfilesrepo(repo):
215 if lfutil.islfilesrepo(repo):
215 return wireprototypes.ooberror(LARGEFILES_REQUIRED_MSG)
216 return wireprototypes.ooberror(LARGEFILES_REQUIRED_MSG)
216
217
217 return orig(repo, proto)
218 return orig(repo, proto)
@@ -1,670 +1,673 b''
1 # fileserverclient.py - client for communicating with the cache process
1 # fileserverclient.py - client for communicating with the cache process
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import io
10 import io
11 import os
11 import os
12 import threading
12 import threading
13 import time
13 import time
14 import zlib
14 import zlib
15
15
16 from mercurial.i18n import _
16 from mercurial.i18n import _
17 from mercurial.node import bin, hex
17 from mercurial.node import bin, hex
18 from mercurial import (
18 from mercurial import (
19 error,
19 error,
20 pycompat,
20 pycompat,
21 revlog,
21 revlog,
22 sshpeer,
22 sshpeer,
23 util,
23 util,
24 wireprotov1peer,
24 wireprotov1peer,
25 )
25 )
26 from mercurial.utils import (
26 from mercurial.utils import (
27 hashutil,
27 hashutil,
28 procutil,
28 procutil,
29 )
29 )
30
30
31 from . import (
31 from . import (
32 constants,
32 constants,
33 contentstore,
33 contentstore,
34 metadatastore,
34 metadatastore,
35 )
35 )
36
36
37 _sshv1peer = sshpeer.sshv1peer
37 _sshv1peer = sshpeer.sshv1peer
38
38
39 # Statistics for debugging
39 # Statistics for debugging
40 fetchcost = 0
40 fetchcost = 0
41 fetches = 0
41 fetches = 0
42 fetched = 0
42 fetched = 0
43 fetchmisses = 0
43 fetchmisses = 0
44
44
45 _lfsmod = None
45 _lfsmod = None
46
46
47
47
48 def getcachekey(reponame, file, id):
48 def getcachekey(reponame, file, id):
49 pathhash = hex(hashutil.sha1(file).digest())
49 pathhash = hex(hashutil.sha1(file).digest())
50 return os.path.join(reponame, pathhash[:2], pathhash[2:], id)
50 return os.path.join(reponame, pathhash[:2], pathhash[2:], id)
51
51
52
52
53 def getlocalkey(file, id):
53 def getlocalkey(file, id):
54 pathhash = hex(hashutil.sha1(file).digest())
54 pathhash = hex(hashutil.sha1(file).digest())
55 return os.path.join(pathhash, id)
55 return os.path.join(pathhash, id)
56
56
57
57
58 def peersetup(ui, peer):
58 def peersetup(ui, peer):
59 class remotefilepeer(peer.__class__):
59 class remotefilepeer(peer.__class__):
60 @wireprotov1peer.batchable
60 @wireprotov1peer.batchable
61 def x_rfl_getfile(self, file, node):
61 def x_rfl_getfile(self, file, node):
62 if not self.capable(b'x_rfl_getfile'):
62 if not self.capable(b'x_rfl_getfile'):
63 raise error.Abort(
63 raise error.Abort(
64 b'configured remotefile server does not support getfile'
64 b'configured remotefile server does not support getfile'
65 )
65 )
66 f = wireprotov1peer.future()
66
67 yield {b'file': file, b'node': node}, f
67 def decode(d):
68 code, data = f.value.split(b'\0', 1)
68 code, data = d.split(b'\0', 1)
69 if int(code):
69 if int(code):
70 raise error.LookupError(file, node, data)
70 raise error.LookupError(file, node, data)
71 yield data
71 return data
72
73 return {b'file': file, b'node': node}, decode
72
74
73 @wireprotov1peer.batchable
75 @wireprotov1peer.batchable
74 def x_rfl_getflogheads(self, path):
76 def x_rfl_getflogheads(self, path):
75 if not self.capable(b'x_rfl_getflogheads'):
77 if not self.capable(b'x_rfl_getflogheads'):
76 raise error.Abort(
78 raise error.Abort(
77 b'configured remotefile server does not '
79 b'configured remotefile server does not '
78 b'support getflogheads'
80 b'support getflogheads'
79 )
81 )
80 f = wireprotov1peer.future()
82
81 yield {b'path': path}, f
83 def decode(d):
82 heads = f.value.split(b'\n') if f.value else []
84 return d.split(b'\n') if d else []
83 yield heads
85
86 return {b'path': path}, decode
84
87
85 def _updatecallstreamopts(self, command, opts):
88 def _updatecallstreamopts(self, command, opts):
86 if command != b'getbundle':
89 if command != b'getbundle':
87 return
90 return
88 if (
91 if (
89 constants.NETWORK_CAP_LEGACY_SSH_GETFILES
92 constants.NETWORK_CAP_LEGACY_SSH_GETFILES
90 not in self.capabilities()
93 not in self.capabilities()
91 ):
94 ):
92 return
95 return
93 if not util.safehasattr(self, '_localrepo'):
96 if not util.safehasattr(self, '_localrepo'):
94 return
97 return
95 if (
98 if (
96 constants.SHALLOWREPO_REQUIREMENT
99 constants.SHALLOWREPO_REQUIREMENT
97 not in self._localrepo.requirements
100 not in self._localrepo.requirements
98 ):
101 ):
99 return
102 return
100
103
101 bundlecaps = opts.get(b'bundlecaps')
104 bundlecaps = opts.get(b'bundlecaps')
102 if bundlecaps:
105 if bundlecaps:
103 bundlecaps = [bundlecaps]
106 bundlecaps = [bundlecaps]
104 else:
107 else:
105 bundlecaps = []
108 bundlecaps = []
106
109
107 # shallow, includepattern, and excludepattern are a hacky way of
110 # shallow, includepattern, and excludepattern are a hacky way of
108 # carrying over data from the local repo to this getbundle
111 # carrying over data from the local repo to this getbundle
109 # command. We need to do it this way because bundle1 getbundle
112 # command. We need to do it this way because bundle1 getbundle
110 # doesn't provide any other place we can hook in to manipulate
113 # doesn't provide any other place we can hook in to manipulate
111 # getbundle args before it goes across the wire. Once we get rid
114 # getbundle args before it goes across the wire. Once we get rid
112 # of bundle1, we can use bundle2's _pullbundle2extraprepare to
115 # of bundle1, we can use bundle2's _pullbundle2extraprepare to
113 # do this more cleanly.
116 # do this more cleanly.
114 bundlecaps.append(constants.BUNDLE2_CAPABLITY)
117 bundlecaps.append(constants.BUNDLE2_CAPABLITY)
115 if self._localrepo.includepattern:
118 if self._localrepo.includepattern:
116 patterns = b'\0'.join(self._localrepo.includepattern)
119 patterns = b'\0'.join(self._localrepo.includepattern)
117 includecap = b"includepattern=" + patterns
120 includecap = b"includepattern=" + patterns
118 bundlecaps.append(includecap)
121 bundlecaps.append(includecap)
119 if self._localrepo.excludepattern:
122 if self._localrepo.excludepattern:
120 patterns = b'\0'.join(self._localrepo.excludepattern)
123 patterns = b'\0'.join(self._localrepo.excludepattern)
121 excludecap = b"excludepattern=" + patterns
124 excludecap = b"excludepattern=" + patterns
122 bundlecaps.append(excludecap)
125 bundlecaps.append(excludecap)
123 opts[b'bundlecaps'] = b','.join(bundlecaps)
126 opts[b'bundlecaps'] = b','.join(bundlecaps)
124
127
125 def _sendrequest(self, command, args, **opts):
128 def _sendrequest(self, command, args, **opts):
126 self._updatecallstreamopts(command, args)
129 self._updatecallstreamopts(command, args)
127 return super(remotefilepeer, self)._sendrequest(
130 return super(remotefilepeer, self)._sendrequest(
128 command, args, **opts
131 command, args, **opts
129 )
132 )
130
133
131 def _callstream(self, command, **opts):
134 def _callstream(self, command, **opts):
132 supertype = super(remotefilepeer, self)
135 supertype = super(remotefilepeer, self)
133 if not util.safehasattr(supertype, '_sendrequest'):
136 if not util.safehasattr(supertype, '_sendrequest'):
134 self._updatecallstreamopts(command, pycompat.byteskwargs(opts))
137 self._updatecallstreamopts(command, pycompat.byteskwargs(opts))
135 return super(remotefilepeer, self)._callstream(command, **opts)
138 return super(remotefilepeer, self)._callstream(command, **opts)
136
139
137 peer.__class__ = remotefilepeer
140 peer.__class__ = remotefilepeer
138
141
139
142
140 class cacheconnection(object):
143 class cacheconnection(object):
141 """The connection for communicating with the remote cache. Performs
144 """The connection for communicating with the remote cache. Performs
142 gets and sets by communicating with an external process that has the
145 gets and sets by communicating with an external process that has the
143 cache-specific implementation.
146 cache-specific implementation.
144 """
147 """
145
148
146 def __init__(self):
149 def __init__(self):
147 self.pipeo = self.pipei = self.pipee = None
150 self.pipeo = self.pipei = self.pipee = None
148 self.subprocess = None
151 self.subprocess = None
149 self.connected = False
152 self.connected = False
150
153
151 def connect(self, cachecommand):
154 def connect(self, cachecommand):
152 if self.pipeo:
155 if self.pipeo:
153 raise error.Abort(_(b"cache connection already open"))
156 raise error.Abort(_(b"cache connection already open"))
154 self.pipei, self.pipeo, self.pipee, self.subprocess = procutil.popen4(
157 self.pipei, self.pipeo, self.pipee, self.subprocess = procutil.popen4(
155 cachecommand
158 cachecommand
156 )
159 )
157 self.connected = True
160 self.connected = True
158
161
159 def close(self):
162 def close(self):
160 def tryclose(pipe):
163 def tryclose(pipe):
161 try:
164 try:
162 pipe.close()
165 pipe.close()
163 except Exception:
166 except Exception:
164 pass
167 pass
165
168
166 if self.connected:
169 if self.connected:
167 try:
170 try:
168 self.pipei.write(b"exit\n")
171 self.pipei.write(b"exit\n")
169 except Exception:
172 except Exception:
170 pass
173 pass
171 tryclose(self.pipei)
174 tryclose(self.pipei)
172 self.pipei = None
175 self.pipei = None
173 tryclose(self.pipeo)
176 tryclose(self.pipeo)
174 self.pipeo = None
177 self.pipeo = None
175 tryclose(self.pipee)
178 tryclose(self.pipee)
176 self.pipee = None
179 self.pipee = None
177 try:
180 try:
178 # Wait for process to terminate, making sure to avoid deadlock.
181 # Wait for process to terminate, making sure to avoid deadlock.
179 # See https://docs.python.org/2/library/subprocess.html for
182 # See https://docs.python.org/2/library/subprocess.html for
180 # warnings about wait() and deadlocking.
183 # warnings about wait() and deadlocking.
181 self.subprocess.communicate()
184 self.subprocess.communicate()
182 except Exception:
185 except Exception:
183 pass
186 pass
184 self.subprocess = None
187 self.subprocess = None
185 self.connected = False
188 self.connected = False
186
189
187 def request(self, request, flush=True):
190 def request(self, request, flush=True):
188 if self.connected:
191 if self.connected:
189 try:
192 try:
190 self.pipei.write(request)
193 self.pipei.write(request)
191 if flush:
194 if flush:
192 self.pipei.flush()
195 self.pipei.flush()
193 except IOError:
196 except IOError:
194 self.close()
197 self.close()
195
198
196 def receiveline(self):
199 def receiveline(self):
197 if not self.connected:
200 if not self.connected:
198 return None
201 return None
199 try:
202 try:
200 result = self.pipeo.readline()[:-1]
203 result = self.pipeo.readline()[:-1]
201 if not result:
204 if not result:
202 self.close()
205 self.close()
203 except IOError:
206 except IOError:
204 self.close()
207 self.close()
205
208
206 return result
209 return result
207
210
208
211
209 def _getfilesbatch(
212 def _getfilesbatch(
210 remote, receivemissing, progresstick, missed, idmap, batchsize
213 remote, receivemissing, progresstick, missed, idmap, batchsize
211 ):
214 ):
212 # Over http(s), iterbatch is a streamy method and we can start
215 # Over http(s), iterbatch is a streamy method and we can start
213 # looking at results early. This means we send one (potentially
216 # looking at results early. This means we send one (potentially
214 # large) request, but then we show nice progress as we process
217 # large) request, but then we show nice progress as we process
215 # file results, rather than showing chunks of $batchsize in
218 # file results, rather than showing chunks of $batchsize in
216 # progress.
219 # progress.
217 #
220 #
218 # Over ssh, iterbatch isn't streamy because batch() wasn't
221 # Over ssh, iterbatch isn't streamy because batch() wasn't
219 # explicitly designed as a streaming method. In the future we
222 # explicitly designed as a streaming method. In the future we
220 # should probably introduce a streambatch() method upstream and
223 # should probably introduce a streambatch() method upstream and
221 # use that for this.
224 # use that for this.
222 with remote.commandexecutor() as e:
225 with remote.commandexecutor() as e:
223 futures = []
226 futures = []
224 for m in missed:
227 for m in missed:
225 futures.append(
228 futures.append(
226 e.callcommand(
229 e.callcommand(
227 b'x_rfl_getfile', {b'file': idmap[m], b'node': m[-40:]}
230 b'x_rfl_getfile', {b'file': idmap[m], b'node': m[-40:]}
228 )
231 )
229 )
232 )
230
233
231 for i, m in enumerate(missed):
234 for i, m in enumerate(missed):
232 r = futures[i].result()
235 r = futures[i].result()
233 futures[i] = None # release memory
236 futures[i] = None # release memory
234 file_ = idmap[m]
237 file_ = idmap[m]
235 node = m[-40:]
238 node = m[-40:]
236 receivemissing(io.BytesIO(b'%d\n%s' % (len(r), r)), file_, node)
239 receivemissing(io.BytesIO(b'%d\n%s' % (len(r), r)), file_, node)
237 progresstick()
240 progresstick()
238
241
239
242
240 def _getfiles_optimistic(
243 def _getfiles_optimistic(
241 remote, receivemissing, progresstick, missed, idmap, step
244 remote, receivemissing, progresstick, missed, idmap, step
242 ):
245 ):
243 remote._callstream(b"x_rfl_getfiles")
246 remote._callstream(b"x_rfl_getfiles")
244 i = 0
247 i = 0
245 pipeo = remote._pipeo
248 pipeo = remote._pipeo
246 pipei = remote._pipei
249 pipei = remote._pipei
247 while i < len(missed):
250 while i < len(missed):
248 # issue a batch of requests
251 # issue a batch of requests
249 start = i
252 start = i
250 end = min(len(missed), start + step)
253 end = min(len(missed), start + step)
251 i = end
254 i = end
252 for missingid in missed[start:end]:
255 for missingid in missed[start:end]:
253 # issue new request
256 # issue new request
254 versionid = missingid[-40:]
257 versionid = missingid[-40:]
255 file = idmap[missingid]
258 file = idmap[missingid]
256 sshrequest = b"%s%s\n" % (versionid, file)
259 sshrequest = b"%s%s\n" % (versionid, file)
257 pipeo.write(sshrequest)
260 pipeo.write(sshrequest)
258 pipeo.flush()
261 pipeo.flush()
259
262
260 # receive batch results
263 # receive batch results
261 for missingid in missed[start:end]:
264 for missingid in missed[start:end]:
262 versionid = missingid[-40:]
265 versionid = missingid[-40:]
263 file = idmap[missingid]
266 file = idmap[missingid]
264 receivemissing(pipei, file, versionid)
267 receivemissing(pipei, file, versionid)
265 progresstick()
268 progresstick()
266
269
267 # End the command
270 # End the command
268 pipeo.write(b'\n')
271 pipeo.write(b'\n')
269 pipeo.flush()
272 pipeo.flush()
270
273
271
274
272 def _getfiles_threaded(
275 def _getfiles_threaded(
273 remote, receivemissing, progresstick, missed, idmap, step
276 remote, receivemissing, progresstick, missed, idmap, step
274 ):
277 ):
275 remote._callstream(b"x_rfl_getfiles")
278 remote._callstream(b"x_rfl_getfiles")
276 pipeo = remote._pipeo
279 pipeo = remote._pipeo
277 pipei = remote._pipei
280 pipei = remote._pipei
278
281
279 def writer():
282 def writer():
280 for missingid in missed:
283 for missingid in missed:
281 versionid = missingid[-40:]
284 versionid = missingid[-40:]
282 file = idmap[missingid]
285 file = idmap[missingid]
283 sshrequest = b"%s%s\n" % (versionid, file)
286 sshrequest = b"%s%s\n" % (versionid, file)
284 pipeo.write(sshrequest)
287 pipeo.write(sshrequest)
285 pipeo.flush()
288 pipeo.flush()
286
289
287 writerthread = threading.Thread(target=writer)
290 writerthread = threading.Thread(target=writer)
288 writerthread.daemon = True
291 writerthread.daemon = True
289 writerthread.start()
292 writerthread.start()
290
293
291 for missingid in missed:
294 for missingid in missed:
292 versionid = missingid[-40:]
295 versionid = missingid[-40:]
293 file = idmap[missingid]
296 file = idmap[missingid]
294 receivemissing(pipei, file, versionid)
297 receivemissing(pipei, file, versionid)
295 progresstick()
298 progresstick()
296
299
297 writerthread.join()
300 writerthread.join()
298 # End the command
301 # End the command
299 pipeo.write(b'\n')
302 pipeo.write(b'\n')
300 pipeo.flush()
303 pipeo.flush()
301
304
302
305
303 class fileserverclient(object):
306 class fileserverclient(object):
304 """A client for requesting files from the remote file server."""
307 """A client for requesting files from the remote file server."""
305
308
306 def __init__(self, repo):
309 def __init__(self, repo):
307 ui = repo.ui
310 ui = repo.ui
308 self.repo = repo
311 self.repo = repo
309 self.ui = ui
312 self.ui = ui
310 self.cacheprocess = ui.config(b"remotefilelog", b"cacheprocess")
313 self.cacheprocess = ui.config(b"remotefilelog", b"cacheprocess")
311 if self.cacheprocess:
314 if self.cacheprocess:
312 self.cacheprocess = util.expandpath(self.cacheprocess)
315 self.cacheprocess = util.expandpath(self.cacheprocess)
313
316
314 # This option causes remotefilelog to pass the full file path to the
317 # This option causes remotefilelog to pass the full file path to the
315 # cacheprocess instead of a hashed key.
318 # cacheprocess instead of a hashed key.
316 self.cacheprocesspasspath = ui.configbool(
319 self.cacheprocesspasspath = ui.configbool(
317 b"remotefilelog", b"cacheprocess.includepath"
320 b"remotefilelog", b"cacheprocess.includepath"
318 )
321 )
319
322
320 self.debugoutput = ui.configbool(b"remotefilelog", b"debug")
323 self.debugoutput = ui.configbool(b"remotefilelog", b"debug")
321
324
322 self.remotecache = cacheconnection()
325 self.remotecache = cacheconnection()
323
326
324 def setstore(self, datastore, historystore, writedata, writehistory):
327 def setstore(self, datastore, historystore, writedata, writehistory):
325 self.datastore = datastore
328 self.datastore = datastore
326 self.historystore = historystore
329 self.historystore = historystore
327 self.writedata = writedata
330 self.writedata = writedata
328 self.writehistory = writehistory
331 self.writehistory = writehistory
329
332
330 def _connect(self):
333 def _connect(self):
331 return self.repo.connectionpool.get(self.repo.fallbackpath)
334 return self.repo.connectionpool.get(self.repo.fallbackpath)
332
335
333 def request(self, fileids):
336 def request(self, fileids):
334 """Takes a list of filename/node pairs and fetches them from the
337 """Takes a list of filename/node pairs and fetches them from the
335 server. Files are stored in the local cache.
338 server. Files are stored in the local cache.
336 A list of nodes that the server couldn't find is returned.
339 A list of nodes that the server couldn't find is returned.
337 If the connection fails, an exception is raised.
340 If the connection fails, an exception is raised.
338 """
341 """
339 if not self.remotecache.connected:
342 if not self.remotecache.connected:
340 self.connect()
343 self.connect()
341 cache = self.remotecache
344 cache = self.remotecache
342 writedata = self.writedata
345 writedata = self.writedata
343
346
344 repo = self.repo
347 repo = self.repo
345 total = len(fileids)
348 total = len(fileids)
346 request = b"get\n%d\n" % total
349 request = b"get\n%d\n" % total
347 idmap = {}
350 idmap = {}
348 reponame = repo.name
351 reponame = repo.name
349 for file, id in fileids:
352 for file, id in fileids:
350 fullid = getcachekey(reponame, file, id)
353 fullid = getcachekey(reponame, file, id)
351 if self.cacheprocesspasspath:
354 if self.cacheprocesspasspath:
352 request += file + b'\0'
355 request += file + b'\0'
353 request += fullid + b"\n"
356 request += fullid + b"\n"
354 idmap[fullid] = file
357 idmap[fullid] = file
355
358
356 cache.request(request)
359 cache.request(request)
357
360
358 progress = self.ui.makeprogress(_(b'downloading'), total=total)
361 progress = self.ui.makeprogress(_(b'downloading'), total=total)
359 progress.update(0)
362 progress.update(0)
360
363
361 missed = []
364 missed = []
362 while True:
365 while True:
363 missingid = cache.receiveline()
366 missingid = cache.receiveline()
364 if not missingid:
367 if not missingid:
365 missedset = set(missed)
368 missedset = set(missed)
366 for missingid in idmap:
369 for missingid in idmap:
367 if not missingid in missedset:
370 if not missingid in missedset:
368 missed.append(missingid)
371 missed.append(missingid)
369 self.ui.warn(
372 self.ui.warn(
370 _(
373 _(
371 b"warning: cache connection closed early - "
374 b"warning: cache connection closed early - "
372 + b"falling back to server\n"
375 + b"falling back to server\n"
373 )
376 )
374 )
377 )
375 break
378 break
376 if missingid == b"0":
379 if missingid == b"0":
377 break
380 break
378 if missingid.startswith(b"_hits_"):
381 if missingid.startswith(b"_hits_"):
379 # receive progress reports
382 # receive progress reports
380 parts = missingid.split(b"_")
383 parts = missingid.split(b"_")
381 progress.increment(int(parts[2]))
384 progress.increment(int(parts[2]))
382 continue
385 continue
383
386
384 missed.append(missingid)
387 missed.append(missingid)
385
388
386 global fetchmisses
389 global fetchmisses
387 fetchmisses += len(missed)
390 fetchmisses += len(missed)
388
391
389 fromcache = total - len(missed)
392 fromcache = total - len(missed)
390 progress.update(fromcache, total=total)
393 progress.update(fromcache, total=total)
391 self.ui.log(
394 self.ui.log(
392 b"remotefilelog",
395 b"remotefilelog",
393 b"remote cache hit rate is %r of %r\n",
396 b"remote cache hit rate is %r of %r\n",
394 fromcache,
397 fromcache,
395 total,
398 total,
396 hit=fromcache,
399 hit=fromcache,
397 total=total,
400 total=total,
398 )
401 )
399
402
400 oldumask = os.umask(0o002)
403 oldumask = os.umask(0o002)
401 try:
404 try:
402 # receive cache misses from master
405 # receive cache misses from master
403 if missed:
406 if missed:
404 # When verbose is true, sshpeer prints 'running ssh...'
407 # When verbose is true, sshpeer prints 'running ssh...'
405 # to stdout, which can interfere with some command
408 # to stdout, which can interfere with some command
406 # outputs
409 # outputs
407 verbose = self.ui.verbose
410 verbose = self.ui.verbose
408 self.ui.verbose = False
411 self.ui.verbose = False
409 try:
412 try:
410 with self._connect() as conn:
413 with self._connect() as conn:
411 remote = conn.peer
414 remote = conn.peer
412 if remote.capable(
415 if remote.capable(
413 constants.NETWORK_CAP_LEGACY_SSH_GETFILES
416 constants.NETWORK_CAP_LEGACY_SSH_GETFILES
414 ):
417 ):
415 if not isinstance(remote, _sshv1peer):
418 if not isinstance(remote, _sshv1peer):
416 raise error.Abort(
419 raise error.Abort(
417 b'remotefilelog requires ssh servers'
420 b'remotefilelog requires ssh servers'
418 )
421 )
419 step = self.ui.configint(
422 step = self.ui.configint(
420 b'remotefilelog', b'getfilesstep'
423 b'remotefilelog', b'getfilesstep'
421 )
424 )
422 getfilestype = self.ui.config(
425 getfilestype = self.ui.config(
423 b'remotefilelog', b'getfilestype'
426 b'remotefilelog', b'getfilestype'
424 )
427 )
425 if getfilestype == b'threaded':
428 if getfilestype == b'threaded':
426 _getfiles = _getfiles_threaded
429 _getfiles = _getfiles_threaded
427 else:
430 else:
428 _getfiles = _getfiles_optimistic
431 _getfiles = _getfiles_optimistic
429 _getfiles(
432 _getfiles(
430 remote,
433 remote,
431 self.receivemissing,
434 self.receivemissing,
432 progress.increment,
435 progress.increment,
433 missed,
436 missed,
434 idmap,
437 idmap,
435 step,
438 step,
436 )
439 )
437 elif remote.capable(b"x_rfl_getfile"):
440 elif remote.capable(b"x_rfl_getfile"):
438 if remote.capable(b'batch'):
441 if remote.capable(b'batch'):
439 batchdefault = 100
442 batchdefault = 100
440 else:
443 else:
441 batchdefault = 10
444 batchdefault = 10
442 batchsize = self.ui.configint(
445 batchsize = self.ui.configint(
443 b'remotefilelog', b'batchsize', batchdefault
446 b'remotefilelog', b'batchsize', batchdefault
444 )
447 )
445 self.ui.debug(
448 self.ui.debug(
446 b'requesting %d files from '
449 b'requesting %d files from '
447 b'remotefilelog server...\n' % len(missed)
450 b'remotefilelog server...\n' % len(missed)
448 )
451 )
449 _getfilesbatch(
452 _getfilesbatch(
450 remote,
453 remote,
451 self.receivemissing,
454 self.receivemissing,
452 progress.increment,
455 progress.increment,
453 missed,
456 missed,
454 idmap,
457 idmap,
455 batchsize,
458 batchsize,
456 )
459 )
457 else:
460 else:
458 raise error.Abort(
461 raise error.Abort(
459 b"configured remotefilelog server"
462 b"configured remotefilelog server"
460 b" does not support remotefilelog"
463 b" does not support remotefilelog"
461 )
464 )
462
465
463 self.ui.log(
466 self.ui.log(
464 b"remotefilefetchlog",
467 b"remotefilefetchlog",
465 b"Success\n",
468 b"Success\n",
466 fetched_files=progress.pos - fromcache,
469 fetched_files=progress.pos - fromcache,
467 total_to_fetch=total - fromcache,
470 total_to_fetch=total - fromcache,
468 )
471 )
469 except Exception:
472 except Exception:
470 self.ui.log(
473 self.ui.log(
471 b"remotefilefetchlog",
474 b"remotefilefetchlog",
472 b"Fail\n",
475 b"Fail\n",
473 fetched_files=progress.pos - fromcache,
476 fetched_files=progress.pos - fromcache,
474 total_to_fetch=total - fromcache,
477 total_to_fetch=total - fromcache,
475 )
478 )
476 raise
479 raise
477 finally:
480 finally:
478 self.ui.verbose = verbose
481 self.ui.verbose = verbose
479 # send to memcache
482 # send to memcache
480 request = b"set\n%d\n%s\n" % (len(missed), b"\n".join(missed))
483 request = b"set\n%d\n%s\n" % (len(missed), b"\n".join(missed))
481 cache.request(request)
484 cache.request(request)
482
485
483 progress.complete()
486 progress.complete()
484
487
485 # mark ourselves as a user of this cache
488 # mark ourselves as a user of this cache
486 writedata.markrepo(self.repo.path)
489 writedata.markrepo(self.repo.path)
487 finally:
490 finally:
488 os.umask(oldumask)
491 os.umask(oldumask)
489
492
490 def receivemissing(self, pipe, filename, node):
493 def receivemissing(self, pipe, filename, node):
491 line = pipe.readline()[:-1]
494 line = pipe.readline()[:-1]
492 if not line:
495 if not line:
493 raise error.ResponseError(
496 raise error.ResponseError(
494 _(b"error downloading file contents:"),
497 _(b"error downloading file contents:"),
495 _(b"connection closed early"),
498 _(b"connection closed early"),
496 )
499 )
497 size = int(line)
500 size = int(line)
498 data = pipe.read(size)
501 data = pipe.read(size)
499 if len(data) != size:
502 if len(data) != size:
500 raise error.ResponseError(
503 raise error.ResponseError(
501 _(b"error downloading file contents:"),
504 _(b"error downloading file contents:"),
502 _(b"only received %s of %s bytes") % (len(data), size),
505 _(b"only received %s of %s bytes") % (len(data), size),
503 )
506 )
504
507
505 self.writedata.addremotefilelognode(
508 self.writedata.addremotefilelognode(
506 filename, bin(node), zlib.decompress(data)
509 filename, bin(node), zlib.decompress(data)
507 )
510 )
508
511
509 def connect(self):
512 def connect(self):
510 if self.cacheprocess:
513 if self.cacheprocess:
511 cmd = b"%s %s" % (self.cacheprocess, self.writedata._path)
514 cmd = b"%s %s" % (self.cacheprocess, self.writedata._path)
512 self.remotecache.connect(cmd)
515 self.remotecache.connect(cmd)
513 else:
516 else:
514 # If no cache process is specified, we fake one that always
517 # If no cache process is specified, we fake one that always
515 # returns cache misses. This enables tests to run easily
518 # returns cache misses. This enables tests to run easily
516 # and may eventually allow us to be a drop in replacement
519 # and may eventually allow us to be a drop in replacement
517 # for the largefiles extension.
520 # for the largefiles extension.
518 class simplecache(object):
521 class simplecache(object):
519 def __init__(self):
522 def __init__(self):
520 self.missingids = []
523 self.missingids = []
521 self.connected = True
524 self.connected = True
522
525
523 def close(self):
526 def close(self):
524 pass
527 pass
525
528
526 def request(self, value, flush=True):
529 def request(self, value, flush=True):
527 lines = value.split(b"\n")
530 lines = value.split(b"\n")
528 if lines[0] != b"get":
531 if lines[0] != b"get":
529 return
532 return
530 self.missingids = lines[2:-1]
533 self.missingids = lines[2:-1]
531 self.missingids.append(b'0')
534 self.missingids.append(b'0')
532
535
533 def receiveline(self):
536 def receiveline(self):
534 if len(self.missingids) > 0:
537 if len(self.missingids) > 0:
535 return self.missingids.pop(0)
538 return self.missingids.pop(0)
536 return None
539 return None
537
540
538 self.remotecache = simplecache()
541 self.remotecache = simplecache()
539
542
540 def close(self):
543 def close(self):
541 if fetches:
544 if fetches:
542 msg = (
545 msg = (
543 b"%d files fetched over %d fetches - "
546 b"%d files fetched over %d fetches - "
544 + b"(%d misses, %0.2f%% hit ratio) over %0.2fs\n"
547 + b"(%d misses, %0.2f%% hit ratio) over %0.2fs\n"
545 ) % (
548 ) % (
546 fetched,
549 fetched,
547 fetches,
550 fetches,
548 fetchmisses,
551 fetchmisses,
549 float(fetched - fetchmisses) / float(fetched) * 100.0,
552 float(fetched - fetchmisses) / float(fetched) * 100.0,
550 fetchcost,
553 fetchcost,
551 )
554 )
552 if self.debugoutput:
555 if self.debugoutput:
553 self.ui.warn(msg)
556 self.ui.warn(msg)
554 self.ui.log(
557 self.ui.log(
555 b"remotefilelog.prefetch",
558 b"remotefilelog.prefetch",
556 msg.replace(b"%", b"%%"),
559 msg.replace(b"%", b"%%"),
557 remotefilelogfetched=fetched,
560 remotefilelogfetched=fetched,
558 remotefilelogfetches=fetches,
561 remotefilelogfetches=fetches,
559 remotefilelogfetchmisses=fetchmisses,
562 remotefilelogfetchmisses=fetchmisses,
560 remotefilelogfetchtime=fetchcost * 1000,
563 remotefilelogfetchtime=fetchcost * 1000,
561 )
564 )
562
565
563 if self.remotecache.connected:
566 if self.remotecache.connected:
564 self.remotecache.close()
567 self.remotecache.close()
565
568
566 def prefetch(
569 def prefetch(
567 self, fileids, force=False, fetchdata=True, fetchhistory=False
570 self, fileids, force=False, fetchdata=True, fetchhistory=False
568 ):
571 ):
569 """downloads the given file versions to the cache"""
572 """downloads the given file versions to the cache"""
570 repo = self.repo
573 repo = self.repo
571 idstocheck = []
574 idstocheck = []
572 for file, id in fileids:
575 for file, id in fileids:
573 # hack
576 # hack
574 # - we don't use .hgtags
577 # - we don't use .hgtags
575 # - workingctx produces ids with length 42,
578 # - workingctx produces ids with length 42,
576 # which we skip since they aren't in any cache
579 # which we skip since they aren't in any cache
577 if (
580 if (
578 file == b'.hgtags'
581 file == b'.hgtags'
579 or len(id) == 42
582 or len(id) == 42
580 or not repo.shallowmatch(file)
583 or not repo.shallowmatch(file)
581 ):
584 ):
582 continue
585 continue
583
586
584 idstocheck.append((file, bin(id)))
587 idstocheck.append((file, bin(id)))
585
588
586 datastore = self.datastore
589 datastore = self.datastore
587 historystore = self.historystore
590 historystore = self.historystore
588 if force:
591 if force:
589 datastore = contentstore.unioncontentstore(*repo.shareddatastores)
592 datastore = contentstore.unioncontentstore(*repo.shareddatastores)
590 historystore = metadatastore.unionmetadatastore(
593 historystore = metadatastore.unionmetadatastore(
591 *repo.sharedhistorystores
594 *repo.sharedhistorystores
592 )
595 )
593
596
594 missingids = set()
597 missingids = set()
595 if fetchdata:
598 if fetchdata:
596 missingids.update(datastore.getmissing(idstocheck))
599 missingids.update(datastore.getmissing(idstocheck))
597 if fetchhistory:
600 if fetchhistory:
598 missingids.update(historystore.getmissing(idstocheck))
601 missingids.update(historystore.getmissing(idstocheck))
599
602
600 # partition missing nodes into nullid and not-nullid so we can
603 # partition missing nodes into nullid and not-nullid so we can
601 # warn about this filtering potentially shadowing bugs.
604 # warn about this filtering potentially shadowing bugs.
602 nullids = len(
605 nullids = len(
603 [None for unused, id in missingids if id == self.repo.nullid]
606 [None for unused, id in missingids if id == self.repo.nullid]
604 )
607 )
605 if nullids:
608 if nullids:
606 missingids = [
609 missingids = [
607 (f, id) for f, id in missingids if id != self.repo.nullid
610 (f, id) for f, id in missingids if id != self.repo.nullid
608 ]
611 ]
609 repo.ui.develwarn(
612 repo.ui.develwarn(
610 (
613 (
611 b'remotefilelog not fetching %d null revs'
614 b'remotefilelog not fetching %d null revs'
612 b' - this is likely hiding bugs' % nullids
615 b' - this is likely hiding bugs' % nullids
613 ),
616 ),
614 config=b'remotefilelog-ext',
617 config=b'remotefilelog-ext',
615 )
618 )
616 if missingids:
619 if missingids:
617 global fetches, fetched, fetchcost
620 global fetches, fetched, fetchcost
618 fetches += 1
621 fetches += 1
619
622
620 # We want to be able to detect excess individual file downloads, so
623 # We want to be able to detect excess individual file downloads, so
621 # let's log that information for debugging.
624 # let's log that information for debugging.
622 if fetches >= 15 and fetches < 18:
625 if fetches >= 15 and fetches < 18:
623 if fetches == 15:
626 if fetches == 15:
624 fetchwarning = self.ui.config(
627 fetchwarning = self.ui.config(
625 b'remotefilelog', b'fetchwarning'
628 b'remotefilelog', b'fetchwarning'
626 )
629 )
627 if fetchwarning:
630 if fetchwarning:
628 self.ui.warn(fetchwarning + b'\n')
631 self.ui.warn(fetchwarning + b'\n')
629 self.logstacktrace()
632 self.logstacktrace()
630 missingids = [(file, hex(id)) for file, id in sorted(missingids)]
633 missingids = [(file, hex(id)) for file, id in sorted(missingids)]
631 fetched += len(missingids)
634 fetched += len(missingids)
632 start = time.time()
635 start = time.time()
633 missingids = self.request(missingids)
636 missingids = self.request(missingids)
634 if missingids:
637 if missingids:
635 raise error.Abort(
638 raise error.Abort(
636 _(b"unable to download %d files") % len(missingids)
639 _(b"unable to download %d files") % len(missingids)
637 )
640 )
638 fetchcost += time.time() - start
641 fetchcost += time.time() - start
639 self._lfsprefetch(fileids)
642 self._lfsprefetch(fileids)
640
643
641 def _lfsprefetch(self, fileids):
644 def _lfsprefetch(self, fileids):
642 if not _lfsmod or not util.safehasattr(
645 if not _lfsmod or not util.safehasattr(
643 self.repo.svfs, b'lfslocalblobstore'
646 self.repo.svfs, b'lfslocalblobstore'
644 ):
647 ):
645 return
648 return
646 if not _lfsmod.wrapper.candownload(self.repo):
649 if not _lfsmod.wrapper.candownload(self.repo):
647 return
650 return
648 pointers = []
651 pointers = []
649 store = self.repo.svfs.lfslocalblobstore
652 store = self.repo.svfs.lfslocalblobstore
650 for file, id in fileids:
653 for file, id in fileids:
651 node = bin(id)
654 node = bin(id)
652 rlog = self.repo.file(file)
655 rlog = self.repo.file(file)
653 if rlog.flags(node) & revlog.REVIDX_EXTSTORED:
656 if rlog.flags(node) & revlog.REVIDX_EXTSTORED:
654 text = rlog.rawdata(node)
657 text = rlog.rawdata(node)
655 p = _lfsmod.pointer.deserialize(text)
658 p = _lfsmod.pointer.deserialize(text)
656 oid = p.oid()
659 oid = p.oid()
657 if not store.has(oid):
660 if not store.has(oid):
658 pointers.append(p)
661 pointers.append(p)
659 if len(pointers) > 0:
662 if len(pointers) > 0:
660 self.repo.svfs.lfsremoteblobstore.readbatch(pointers, store)
663 self.repo.svfs.lfsremoteblobstore.readbatch(pointers, store)
661 assert all(store.has(p.oid()) for p in pointers)
664 assert all(store.has(p.oid()) for p in pointers)
662
665
663 def logstacktrace(self):
666 def logstacktrace(self):
664 import traceback
667 import traceback
665
668
666 self.ui.log(
669 self.ui.log(
667 b'remotefilelog',
670 b'remotefilelog',
668 b'excess remotefilelog fetching:\n%s\n',
671 b'excess remotefilelog fetching:\n%s\n',
669 b''.join(pycompat.sysbytes(s) for s in traceback.format_stack()),
672 b''.join(pycompat.sysbytes(s) for s in traceback.format_stack()),
670 )
673 )
@@ -1,673 +1,649 b''
1 # wireprotov1peer.py - Client-side functionality for wire protocol version 1.
1 # wireprotov1peer.py - Client-side functionality for wire protocol version 1.
2 #
2 #
3 # Copyright 2005-2010 Olivia Mackall <olivia@selenic.com>
3 # Copyright 2005-2010 Olivia Mackall <olivia@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import sys
10 import sys
11 import weakref
11 import weakref
12
12
13 from .i18n import _
13 from .i18n import _
14 from .node import bin
14 from .node import bin
15 from .pycompat import (
15 from .pycompat import (
16 getattr,
16 getattr,
17 setattr,
17 setattr,
18 )
18 )
19 from . import (
19 from . import (
20 bundle2,
20 bundle2,
21 changegroup as changegroupmod,
21 changegroup as changegroupmod,
22 encoding,
22 encoding,
23 error,
23 error,
24 pushkey as pushkeymod,
24 pushkey as pushkeymod,
25 pycompat,
25 pycompat,
26 util,
26 util,
27 wireprototypes,
27 wireprototypes,
28 )
28 )
29 from .interfaces import (
29 from .interfaces import (
30 repository,
30 repository,
31 util as interfaceutil,
31 util as interfaceutil,
32 )
32 )
33 from .utils import hashutil
33 from .utils import hashutil
34
34
35 urlreq = util.urlreq
35 urlreq = util.urlreq
36
36
37
37
38 def batchable_new_style(f):
38 def batchable(f):
39 """annotation for batchable methods
39 """annotation for batchable methods
40
40
41 Such methods must implement a coroutine as follows:
41 Such methods must implement a coroutine as follows:
42
42
43 @batchable
43 @batchable
44 def sample(self, one, two=None):
44 def sample(self, one, two=None):
45 # Build list of encoded arguments suitable for your wire protocol:
45 # Build list of encoded arguments suitable for your wire protocol:
46 encoded_args = [('one', encode(one),), ('two', encode(two),)]
46 encoded_args = [('one', encode(one),), ('two', encode(two),)]
47 # Return it, along with a function that will receive the result
47 # Return it, along with a function that will receive the result
48 # from the batched request.
48 # from the batched request.
49 return encoded_args, decode
49 return encoded_args, decode
50
50
51 The decorator returns a function which wraps this coroutine as a plain
51 The decorator returns a function which wraps this coroutine as a plain
52 method, but adds the original method as an attribute called "batchable",
52 method, but adds the original method as an attribute called "batchable",
53 which is used by remotebatch to split the call into separate encoding and
53 which is used by remotebatch to split the call into separate encoding and
54 decoding phases.
54 decoding phases.
55 """
55 """
56
56
57 def plain(*args, **opts):
57 def plain(*args, **opts):
58 encoded_args_or_res, decode = f(*args, **opts)
58 encoded_args_or_res, decode = f(*args, **opts)
59 if not decode:
59 if not decode:
60 return encoded_args_or_res # a local result in this case
60 return encoded_args_or_res # a local result in this case
61 self = args[0]
61 self = args[0]
62 cmd = pycompat.bytesurl(f.__name__) # ensure cmd is ascii bytestr
62 cmd = pycompat.bytesurl(f.__name__) # ensure cmd is ascii bytestr
63 encoded_res = self._submitone(cmd, encoded_args_or_res)
63 encoded_res = self._submitone(cmd, encoded_args_or_res)
64 return decode(encoded_res)
64 return decode(encoded_res)
65
65
66 setattr(plain, 'batchable', f)
66 setattr(plain, 'batchable', f)
67 setattr(plain, '__name__', f.__name__)
67 setattr(plain, '__name__', f.__name__)
68 return plain
68 return plain
69
69
70
70
71 def batchable(f):
72 def upgraded(*args, **opts):
73 batchable = f(*args, **opts)
74 encoded_args_or_res, encoded_res_future = next(batchable)
75 if not encoded_res_future:
76 decode = None
77 else:
78
79 def decode(d):
80 encoded_res_future.set(d)
81 return next(batchable)
82
83 return encoded_args_or_res, decode
84
85 setattr(upgraded, '__name__', f.__name__)
86 return batchable_new_style(upgraded)
87
88
89 class future(object):
90 '''placeholder for a value to be set later'''
91
92 def set(self, value):
93 if util.safehasattr(self, b'value'):
94 raise error.RepoError(b"future is already set")
95 self.value = value
96
97
98 def encodebatchcmds(req):
71 def encodebatchcmds(req):
99 """Return a ``cmds`` argument value for the ``batch`` command."""
72 """Return a ``cmds`` argument value for the ``batch`` command."""
100 escapearg = wireprototypes.escapebatcharg
73 escapearg = wireprototypes.escapebatcharg
101
74
102 cmds = []
75 cmds = []
103 for op, argsdict in req:
76 for op, argsdict in req:
104 # Old servers didn't properly unescape argument names. So prevent
77 # Old servers didn't properly unescape argument names. So prevent
105 # the sending of argument names that may not be decoded properly by
78 # the sending of argument names that may not be decoded properly by
106 # servers.
79 # servers.
107 assert all(escapearg(k) == k for k in argsdict)
80 assert all(escapearg(k) == k for k in argsdict)
108
81
109 args = b','.join(
82 args = b','.join(
110 b'%s=%s' % (escapearg(k), escapearg(v))
83 b'%s=%s' % (escapearg(k), escapearg(v))
111 for k, v in pycompat.iteritems(argsdict)
84 for k, v in pycompat.iteritems(argsdict)
112 )
85 )
113 cmds.append(b'%s %s' % (op, args))
86 cmds.append(b'%s %s' % (op, args))
114
87
115 return b';'.join(cmds)
88 return b';'.join(cmds)
116
89
117
90
118 class unsentfuture(pycompat.futures.Future):
91 class unsentfuture(pycompat.futures.Future):
119 """A Future variation to represent an unsent command.
92 """A Future variation to represent an unsent command.
120
93
121 Because we buffer commands and don't submit them immediately, calling
94 Because we buffer commands and don't submit them immediately, calling
122 ``result()`` on an unsent future could deadlock. Futures for buffered
95 ``result()`` on an unsent future could deadlock. Futures for buffered
123 commands are represented by this type, which wraps ``result()`` to
96 commands are represented by this type, which wraps ``result()`` to
124 call ``sendcommands()``.
97 call ``sendcommands()``.
125 """
98 """
126
99
127 def result(self, timeout=None):
100 def result(self, timeout=None):
128 if self.done():
101 if self.done():
129 return pycompat.futures.Future.result(self, timeout)
102 return pycompat.futures.Future.result(self, timeout)
130
103
131 self._peerexecutor.sendcommands()
104 self._peerexecutor.sendcommands()
132
105
133 # This looks like it will infinitely recurse. However,
106 # This looks like it will infinitely recurse. However,
134 # sendcommands() should modify __class__. This call serves as a check
107 # sendcommands() should modify __class__. This call serves as a check
135 # on that.
108 # on that.
136 return self.result(timeout)
109 return self.result(timeout)
137
110
138
111
139 @interfaceutil.implementer(repository.ipeercommandexecutor)
112 @interfaceutil.implementer(repository.ipeercommandexecutor)
140 class peerexecutor(object):
113 class peerexecutor(object):
141 def __init__(self, peer):
114 def __init__(self, peer):
142 self._peer = peer
115 self._peer = peer
143 self._sent = False
116 self._sent = False
144 self._closed = False
117 self._closed = False
145 self._calls = []
118 self._calls = []
146 self._futures = weakref.WeakSet()
119 self._futures = weakref.WeakSet()
147 self._responseexecutor = None
120 self._responseexecutor = None
148 self._responsef = None
121 self._responsef = None
149
122
150 def __enter__(self):
123 def __enter__(self):
151 return self
124 return self
152
125
153 def __exit__(self, exctype, excvalee, exctb):
126 def __exit__(self, exctype, excvalee, exctb):
154 self.close()
127 self.close()
155
128
156 def callcommand(self, command, args):
129 def callcommand(self, command, args):
157 if self._sent:
130 if self._sent:
158 raise error.ProgrammingError(
131 raise error.ProgrammingError(
159 b'callcommand() cannot be used after commands are sent'
132 b'callcommand() cannot be used after commands are sent'
160 )
133 )
161
134
162 if self._closed:
135 if self._closed:
163 raise error.ProgrammingError(
136 raise error.ProgrammingError(
164 b'callcommand() cannot be used after close()'
137 b'callcommand() cannot be used after close()'
165 )
138 )
166
139
167 # Commands are dispatched through methods on the peer.
140 # Commands are dispatched through methods on the peer.
168 fn = getattr(self._peer, pycompat.sysstr(command), None)
141 fn = getattr(self._peer, pycompat.sysstr(command), None)
169
142
170 if not fn:
143 if not fn:
171 raise error.ProgrammingError(
144 raise error.ProgrammingError(
172 b'cannot call command %s: method of same name not available '
145 b'cannot call command %s: method of same name not available '
173 b'on peer' % command
146 b'on peer' % command
174 )
147 )
175
148
176 # Commands are either batchable or they aren't. If a command
149 # Commands are either batchable or they aren't. If a command
177 # isn't batchable, we send it immediately because the executor
150 # isn't batchable, we send it immediately because the executor
178 # can no longer accept new commands after a non-batchable command.
151 # can no longer accept new commands after a non-batchable command.
179 # If a command is batchable, we queue it for later. But we have
152 # If a command is batchable, we queue it for later. But we have
180 # to account for the case of a non-batchable command arriving after
153 # to account for the case of a non-batchable command arriving after
181 # a batchable one and refuse to service it.
154 # a batchable one and refuse to service it.
182
155
183 def addcall():
156 def addcall():
184 f = pycompat.futures.Future()
157 f = pycompat.futures.Future()
185 self._futures.add(f)
158 self._futures.add(f)
186 self._calls.append((command, args, fn, f))
159 self._calls.append((command, args, fn, f))
187 return f
160 return f
188
161
189 if getattr(fn, 'batchable', False):
162 if getattr(fn, 'batchable', False):
190 f = addcall()
163 f = addcall()
191
164
192 # But since we don't issue it immediately, we wrap its result()
165 # But since we don't issue it immediately, we wrap its result()
193 # to trigger sending so we avoid deadlocks.
166 # to trigger sending so we avoid deadlocks.
194 f.__class__ = unsentfuture
167 f.__class__ = unsentfuture
195 f._peerexecutor = self
168 f._peerexecutor = self
196 else:
169 else:
197 if self._calls:
170 if self._calls:
198 raise error.ProgrammingError(
171 raise error.ProgrammingError(
199 b'%s is not batchable and cannot be called on a command '
172 b'%s is not batchable and cannot be called on a command '
200 b'executor along with other commands' % command
173 b'executor along with other commands' % command
201 )
174 )
202
175
203 f = addcall()
176 f = addcall()
204
177
205 # Non-batchable commands can never coexist with another command
178 # Non-batchable commands can never coexist with another command
206 # in this executor. So send the command immediately.
179 # in this executor. So send the command immediately.
207 self.sendcommands()
180 self.sendcommands()
208
181
209 return f
182 return f
210
183
211 def sendcommands(self):
184 def sendcommands(self):
212 if self._sent:
185 if self._sent:
213 return
186 return
214
187
215 if not self._calls:
188 if not self._calls:
216 return
189 return
217
190
218 self._sent = True
191 self._sent = True
219
192
220 # Unhack any future types so caller seens a clean type and to break
193 # Unhack any future types so caller seens a clean type and to break
221 # cycle between us and futures.
194 # cycle between us and futures.
222 for f in self._futures:
195 for f in self._futures:
223 if isinstance(f, unsentfuture):
196 if isinstance(f, unsentfuture):
224 f.__class__ = pycompat.futures.Future
197 f.__class__ = pycompat.futures.Future
225 f._peerexecutor = None
198 f._peerexecutor = None
226
199
227 calls = self._calls
200 calls = self._calls
228 # Mainly to destroy references to futures.
201 # Mainly to destroy references to futures.
229 self._calls = None
202 self._calls = None
230
203
231 # Simple case of a single command. We call it synchronously.
204 # Simple case of a single command. We call it synchronously.
232 if len(calls) == 1:
205 if len(calls) == 1:
233 command, args, fn, f = calls[0]
206 command, args, fn, f = calls[0]
234
207
235 # Future was cancelled. Ignore it.
208 # Future was cancelled. Ignore it.
236 if not f.set_running_or_notify_cancel():
209 if not f.set_running_or_notify_cancel():
237 return
210 return
238
211
239 try:
212 try:
240 result = fn(**pycompat.strkwargs(args))
213 result = fn(**pycompat.strkwargs(args))
241 except Exception:
214 except Exception:
242 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
215 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
243 else:
216 else:
244 f.set_result(result)
217 f.set_result(result)
245
218
246 return
219 return
247
220
248 # Batch commands are a bit harder. First, we have to deal with the
221 # Batch commands are a bit harder. First, we have to deal with the
249 # @batchable coroutine. That's a bit annoying. Furthermore, we also
222 # @batchable coroutine. That's a bit annoying. Furthermore, we also
250 # need to preserve streaming. i.e. it should be possible for the
223 # need to preserve streaming. i.e. it should be possible for the
251 # futures to resolve as data is coming in off the wire without having
224 # futures to resolve as data is coming in off the wire without having
252 # to wait for the final byte of the final response. We do this by
225 # to wait for the final byte of the final response. We do this by
253 # spinning up a thread to read the responses.
226 # spinning up a thread to read the responses.
254
227
255 requests = []
228 requests = []
256 states = []
229 states = []
257
230
258 for command, args, fn, f in calls:
231 for command, args, fn, f in calls:
259 # Future was cancelled. Ignore it.
232 # Future was cancelled. Ignore it.
260 if not f.set_running_or_notify_cancel():
233 if not f.set_running_or_notify_cancel():
261 continue
234 continue
262
235
263 try:
236 try:
264 encoded_args_or_res, decode = fn.batchable(
237 encoded_args_or_res, decode = fn.batchable(
265 fn.__self__, **pycompat.strkwargs(args)
238 fn.__self__, **pycompat.strkwargs(args)
266 )
239 )
267 except Exception:
240 except Exception:
268 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
241 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
269 return
242 return
270
243
271 if not decode:
244 if not decode:
272 f.set_result(encoded_args_or_res)
245 f.set_result(encoded_args_or_res)
273 else:
246 else:
274 requests.append((command, encoded_args_or_res))
247 requests.append((command, encoded_args_or_res))
275 states.append((command, f, batchable, decode))
248 states.append((command, f, batchable, decode))
276
249
277 if not requests:
250 if not requests:
278 return
251 return
279
252
280 # This will emit responses in order they were executed.
253 # This will emit responses in order they were executed.
281 wireresults = self._peer._submitbatch(requests)
254 wireresults = self._peer._submitbatch(requests)
282
255
283 # The use of a thread pool executor here is a bit weird for something
256 # The use of a thread pool executor here is a bit weird for something
284 # that only spins up a single thread. However, thread management is
257 # that only spins up a single thread. However, thread management is
285 # hard and it is easy to encounter race conditions, deadlocks, etc.
258 # hard and it is easy to encounter race conditions, deadlocks, etc.
286 # concurrent.futures already solves these problems and its thread pool
259 # concurrent.futures already solves these problems and its thread pool
287 # executor has minimal overhead. So we use it.
260 # executor has minimal overhead. So we use it.
288 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
261 self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
289 self._responsef = self._responseexecutor.submit(
262 self._responsef = self._responseexecutor.submit(
290 self._readbatchresponse, states, wireresults
263 self._readbatchresponse, states, wireresults
291 )
264 )
292
265
293 def close(self):
266 def close(self):
294 self.sendcommands()
267 self.sendcommands()
295
268
296 if self._closed:
269 if self._closed:
297 return
270 return
298
271
299 self._closed = True
272 self._closed = True
300
273
301 if not self._responsef:
274 if not self._responsef:
302 return
275 return
303
276
304 # We need to wait on our in-flight response and then shut down the
277 # We need to wait on our in-flight response and then shut down the
305 # executor once we have a result.
278 # executor once we have a result.
306 try:
279 try:
307 self._responsef.result()
280 self._responsef.result()
308 finally:
281 finally:
309 self._responseexecutor.shutdown(wait=True)
282 self._responseexecutor.shutdown(wait=True)
310 self._responsef = None
283 self._responsef = None
311 self._responseexecutor = None
284 self._responseexecutor = None
312
285
313 # If any of our futures are still in progress, mark them as
286 # If any of our futures are still in progress, mark them as
314 # errored. Otherwise a result() could wait indefinitely.
287 # errored. Otherwise a result() could wait indefinitely.
315 for f in self._futures:
288 for f in self._futures:
316 if not f.done():
289 if not f.done():
317 f.set_exception(
290 f.set_exception(
318 error.ResponseError(
291 error.ResponseError(
319 _(b'unfulfilled batch command response'), None
292 _(b'unfulfilled batch command response'), None
320 )
293 )
321 )
294 )
322
295
323 self._futures = None
296 self._futures = None
324
297
325 def _readbatchresponse(self, states, wireresults):
298 def _readbatchresponse(self, states, wireresults):
326 # Executes in a thread to read data off the wire.
299 # Executes in a thread to read data off the wire.
327
300
328 for command, f, batchable, decode in states:
301 for command, f, batchable, decode in states:
329 # Grab raw result off the wire and teach the internal future
302 # Grab raw result off the wire and teach the internal future
330 # about it.
303 # about it.
331 try:
304 try:
332 remoteresult = next(wireresults)
305 remoteresult = next(wireresults)
333 except StopIteration:
306 except StopIteration:
334 # This can happen in particular because next(batchable)
307 # This can happen in particular because next(batchable)
335 # in the previous iteration can call peer._abort, which
308 # in the previous iteration can call peer._abort, which
336 # may close the peer.
309 # may close the peer.
337 f.set_exception(
310 f.set_exception(
338 error.ResponseError(
311 error.ResponseError(
339 _(b'unfulfilled batch command response'), None
312 _(b'unfulfilled batch command response'), None
340 )
313 )
341 )
314 )
342 else:
315 else:
343 try:
316 try:
344 result = decode(remoteresult)
317 result = decode(remoteresult)
345 except Exception:
318 except Exception:
346 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
319 pycompat.future_set_exception_info(f, sys.exc_info()[1:])
347 else:
320 else:
348 f.set_result(result)
321 f.set_result(result)
349
322
350
323
351 @interfaceutil.implementer(
324 @interfaceutil.implementer(
352 repository.ipeercommands, repository.ipeerlegacycommands
325 repository.ipeercommands, repository.ipeerlegacycommands
353 )
326 )
354 class wirepeer(repository.peer):
327 class wirepeer(repository.peer):
355 """Client-side interface for communicating with a peer repository.
328 """Client-side interface for communicating with a peer repository.
356
329
357 Methods commonly call wire protocol commands of the same name.
330 Methods commonly call wire protocol commands of the same name.
358
331
359 See also httppeer.py and sshpeer.py for protocol-specific
332 See also httppeer.py and sshpeer.py for protocol-specific
360 implementations of this interface.
333 implementations of this interface.
361 """
334 """
362
335
363 def commandexecutor(self):
336 def commandexecutor(self):
364 return peerexecutor(self)
337 return peerexecutor(self)
365
338
366 # Begin of ipeercommands interface.
339 # Begin of ipeercommands interface.
367
340
368 def clonebundles(self):
341 def clonebundles(self):
369 self.requirecap(b'clonebundles', _(b'clone bundles'))
342 self.requirecap(b'clonebundles', _(b'clone bundles'))
370 return self._call(b'clonebundles')
343 return self._call(b'clonebundles')
371
344
372 @batchable
345 @batchable
373 def lookup(self, key):
346 def lookup(self, key):
374 self.requirecap(b'lookup', _(b'look up remote revision'))
347 self.requirecap(b'lookup', _(b'look up remote revision'))
375 f = future()
348
376 yield {b'key': encoding.fromlocal(key)}, f
349 def decode(d):
377 d = f.value
378 success, data = d[:-1].split(b" ", 1)
350 success, data = d[:-1].split(b" ", 1)
379 if int(success):
351 if int(success):
380 yield bin(data)
352 return bin(data)
381 else:
353 else:
382 self._abort(error.RepoError(data))
354 self._abort(error.RepoError(data))
383
355
356 return {b'key': encoding.fromlocal(key)}, decode
357
384 @batchable
358 @batchable
385 def heads(self):
359 def heads(self):
386 f = future()
360 def decode(d):
387 yield {}, f
388 d = f.value
389 try:
361 try:
390 yield wireprototypes.decodelist(d[:-1])
362 return wireprototypes.decodelist(d[:-1])
391 except ValueError:
363 except ValueError:
392 self._abort(error.ResponseError(_(b"unexpected response:"), d))
364 self._abort(error.ResponseError(_(b"unexpected response:"), d))
393
365
366 return {}, decode
367
394 @batchable
368 @batchable
395 def known(self, nodes):
369 def known(self, nodes):
396 f = future()
370 def decode(d):
397 yield {b'nodes': wireprototypes.encodelist(nodes)}, f
398 d = f.value
399 try:
371 try:
400 yield [bool(int(b)) for b in pycompat.iterbytestr(d)]
372 return [bool(int(b)) for b in pycompat.iterbytestr(d)]
401 except ValueError:
373 except ValueError:
402 self._abort(error.ResponseError(_(b"unexpected response:"), d))
374 self._abort(error.ResponseError(_(b"unexpected response:"), d))
403
375
376 return {b'nodes': wireprototypes.encodelist(nodes)}, decode
377
404 @batchable
378 @batchable
405 def branchmap(self):
379 def branchmap(self):
406 f = future()
380 def decode(d):
407 yield {}, f
408 d = f.value
409 try:
381 try:
410 branchmap = {}
382 branchmap = {}
411 for branchpart in d.splitlines():
383 for branchpart in d.splitlines():
412 branchname, branchheads = branchpart.split(b' ', 1)
384 branchname, branchheads = branchpart.split(b' ', 1)
413 branchname = encoding.tolocal(urlreq.unquote(branchname))
385 branchname = encoding.tolocal(urlreq.unquote(branchname))
414 branchheads = wireprototypes.decodelist(branchheads)
386 branchheads = wireprototypes.decodelist(branchheads)
415 branchmap[branchname] = branchheads
387 branchmap[branchname] = branchheads
416 yield branchmap
388 return branchmap
417 except TypeError:
389 except TypeError:
418 self._abort(error.ResponseError(_(b"unexpected response:"), d))
390 self._abort(error.ResponseError(_(b"unexpected response:"), d))
419
391
392 return {}, decode
393
420 @batchable
394 @batchable
421 def listkeys(self, namespace):
395 def listkeys(self, namespace):
422 if not self.capable(b'pushkey'):
396 if not self.capable(b'pushkey'):
423 yield {}, None
397 return {}, None
424 f = future()
425 self.ui.debug(b'preparing listkeys for "%s"\n' % namespace)
398 self.ui.debug(b'preparing listkeys for "%s"\n' % namespace)
426 yield {b'namespace': encoding.fromlocal(namespace)}, f
399
427 d = f.value
400 def decode(d):
428 self.ui.debug(
401 self.ui.debug(
429 b'received listkey for "%s": %i bytes\n' % (namespace, len(d))
402 b'received listkey for "%s": %i bytes\n' % (namespace, len(d))
430 )
403 )
431 yield pushkeymod.decodekeys(d)
404 return pushkeymod.decodekeys(d)
405
406 return {b'namespace': encoding.fromlocal(namespace)}, decode
432
407
433 @batchable
408 @batchable
434 def pushkey(self, namespace, key, old, new):
409 def pushkey(self, namespace, key, old, new):
435 if not self.capable(b'pushkey'):
410 if not self.capable(b'pushkey'):
436 yield False, None
411 return False, None
437 f = future()
438 self.ui.debug(b'preparing pushkey for "%s:%s"\n' % (namespace, key))
412 self.ui.debug(b'preparing pushkey for "%s:%s"\n' % (namespace, key))
439 yield {
413
440 b'namespace': encoding.fromlocal(namespace),
414 def decode(d):
441 b'key': encoding.fromlocal(key),
442 b'old': encoding.fromlocal(old),
443 b'new': encoding.fromlocal(new),
444 }, f
445 d = f.value
446 d, output = d.split(b'\n', 1)
415 d, output = d.split(b'\n', 1)
447 try:
416 try:
448 d = bool(int(d))
417 d = bool(int(d))
449 except ValueError:
418 except ValueError:
450 raise error.ResponseError(
419 raise error.ResponseError(
451 _(b'push failed (unexpected response):'), d
420 _(b'push failed (unexpected response):'), d
452 )
421 )
453 for l in output.splitlines(True):
422 for l in output.splitlines(True):
454 self.ui.status(_(b'remote: '), l)
423 self.ui.status(_(b'remote: '), l)
455 yield d
424 return d
425
426 return {
427 b'namespace': encoding.fromlocal(namespace),
428 b'key': encoding.fromlocal(key),
429 b'old': encoding.fromlocal(old),
430 b'new': encoding.fromlocal(new),
431 }, decode
456
432
457 def stream_out(self):
433 def stream_out(self):
458 return self._callstream(b'stream_out')
434 return self._callstream(b'stream_out')
459
435
460 def getbundle(self, source, **kwargs):
436 def getbundle(self, source, **kwargs):
461 kwargs = pycompat.byteskwargs(kwargs)
437 kwargs = pycompat.byteskwargs(kwargs)
462 self.requirecap(b'getbundle', _(b'look up remote changes'))
438 self.requirecap(b'getbundle', _(b'look up remote changes'))
463 opts = {}
439 opts = {}
464 bundlecaps = kwargs.get(b'bundlecaps') or set()
440 bundlecaps = kwargs.get(b'bundlecaps') or set()
465 for key, value in pycompat.iteritems(kwargs):
441 for key, value in pycompat.iteritems(kwargs):
466 if value is None:
442 if value is None:
467 continue
443 continue
468 keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key)
444 keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key)
469 if keytype is None:
445 if keytype is None:
470 raise error.ProgrammingError(
446 raise error.ProgrammingError(
471 b'Unexpectedly None keytype for key %s' % key
447 b'Unexpectedly None keytype for key %s' % key
472 )
448 )
473 elif keytype == b'nodes':
449 elif keytype == b'nodes':
474 value = wireprototypes.encodelist(value)
450 value = wireprototypes.encodelist(value)
475 elif keytype == b'csv':
451 elif keytype == b'csv':
476 value = b','.join(value)
452 value = b','.join(value)
477 elif keytype == b'scsv':
453 elif keytype == b'scsv':
478 value = b','.join(sorted(value))
454 value = b','.join(sorted(value))
479 elif keytype == b'boolean':
455 elif keytype == b'boolean':
480 value = b'%i' % bool(value)
456 value = b'%i' % bool(value)
481 elif keytype != b'plain':
457 elif keytype != b'plain':
482 raise KeyError(b'unknown getbundle option type %s' % keytype)
458 raise KeyError(b'unknown getbundle option type %s' % keytype)
483 opts[key] = value
459 opts[key] = value
484 f = self._callcompressable(b"getbundle", **pycompat.strkwargs(opts))
460 f = self._callcompressable(b"getbundle", **pycompat.strkwargs(opts))
485 if any((cap.startswith(b'HG2') for cap in bundlecaps)):
461 if any((cap.startswith(b'HG2') for cap in bundlecaps)):
486 return bundle2.getunbundler(self.ui, f)
462 return bundle2.getunbundler(self.ui, f)
487 else:
463 else:
488 return changegroupmod.cg1unpacker(f, b'UN')
464 return changegroupmod.cg1unpacker(f, b'UN')
489
465
490 def unbundle(self, bundle, heads, url):
466 def unbundle(self, bundle, heads, url):
491 """Send cg (a readable file-like object representing the
467 """Send cg (a readable file-like object representing the
492 changegroup to push, typically a chunkbuffer object) to the
468 changegroup to push, typically a chunkbuffer object) to the
493 remote server as a bundle.
469 remote server as a bundle.
494
470
495 When pushing a bundle10 stream, return an integer indicating the
471 When pushing a bundle10 stream, return an integer indicating the
496 result of the push (see changegroup.apply()).
472 result of the push (see changegroup.apply()).
497
473
498 When pushing a bundle20 stream, return a bundle20 stream.
474 When pushing a bundle20 stream, return a bundle20 stream.
499
475
500 `url` is the url the client thinks it's pushing to, which is
476 `url` is the url the client thinks it's pushing to, which is
501 visible to hooks.
477 visible to hooks.
502 """
478 """
503
479
504 if heads != [b'force'] and self.capable(b'unbundlehash'):
480 if heads != [b'force'] and self.capable(b'unbundlehash'):
505 heads = wireprototypes.encodelist(
481 heads = wireprototypes.encodelist(
506 [b'hashed', hashutil.sha1(b''.join(sorted(heads))).digest()]
482 [b'hashed', hashutil.sha1(b''.join(sorted(heads))).digest()]
507 )
483 )
508 else:
484 else:
509 heads = wireprototypes.encodelist(heads)
485 heads = wireprototypes.encodelist(heads)
510
486
511 if util.safehasattr(bundle, b'deltaheader'):
487 if util.safehasattr(bundle, b'deltaheader'):
512 # this a bundle10, do the old style call sequence
488 # this a bundle10, do the old style call sequence
513 ret, output = self._callpush(b"unbundle", bundle, heads=heads)
489 ret, output = self._callpush(b"unbundle", bundle, heads=heads)
514 if ret == b"":
490 if ret == b"":
515 raise error.ResponseError(_(b'push failed:'), output)
491 raise error.ResponseError(_(b'push failed:'), output)
516 try:
492 try:
517 ret = int(ret)
493 ret = int(ret)
518 except ValueError:
494 except ValueError:
519 raise error.ResponseError(
495 raise error.ResponseError(
520 _(b'push failed (unexpected response):'), ret
496 _(b'push failed (unexpected response):'), ret
521 )
497 )
522
498
523 for l in output.splitlines(True):
499 for l in output.splitlines(True):
524 self.ui.status(_(b'remote: '), l)
500 self.ui.status(_(b'remote: '), l)
525 else:
501 else:
526 # bundle2 push. Send a stream, fetch a stream.
502 # bundle2 push. Send a stream, fetch a stream.
527 stream = self._calltwowaystream(b'unbundle', bundle, heads=heads)
503 stream = self._calltwowaystream(b'unbundle', bundle, heads=heads)
528 ret = bundle2.getunbundler(self.ui, stream)
504 ret = bundle2.getunbundler(self.ui, stream)
529 return ret
505 return ret
530
506
531 # End of ipeercommands interface.
507 # End of ipeercommands interface.
532
508
533 # Begin of ipeerlegacycommands interface.
509 # Begin of ipeerlegacycommands interface.
534
510
535 def branches(self, nodes):
511 def branches(self, nodes):
536 n = wireprototypes.encodelist(nodes)
512 n = wireprototypes.encodelist(nodes)
537 d = self._call(b"branches", nodes=n)
513 d = self._call(b"branches", nodes=n)
538 try:
514 try:
539 br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()]
515 br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()]
540 return br
516 return br
541 except ValueError:
517 except ValueError:
542 self._abort(error.ResponseError(_(b"unexpected response:"), d))
518 self._abort(error.ResponseError(_(b"unexpected response:"), d))
543
519
544 def between(self, pairs):
520 def between(self, pairs):
545 batch = 8 # avoid giant requests
521 batch = 8 # avoid giant requests
546 r = []
522 r = []
547 for i in pycompat.xrange(0, len(pairs), batch):
523 for i in pycompat.xrange(0, len(pairs), batch):
548 n = b" ".join(
524 n = b" ".join(
549 [
525 [
550 wireprototypes.encodelist(p, b'-')
526 wireprototypes.encodelist(p, b'-')
551 for p in pairs[i : i + batch]
527 for p in pairs[i : i + batch]
552 ]
528 ]
553 )
529 )
554 d = self._call(b"between", pairs=n)
530 d = self._call(b"between", pairs=n)
555 try:
531 try:
556 r.extend(
532 r.extend(
557 l and wireprototypes.decodelist(l) or []
533 l and wireprototypes.decodelist(l) or []
558 for l in d.splitlines()
534 for l in d.splitlines()
559 )
535 )
560 except ValueError:
536 except ValueError:
561 self._abort(error.ResponseError(_(b"unexpected response:"), d))
537 self._abort(error.ResponseError(_(b"unexpected response:"), d))
562 return r
538 return r
563
539
564 def changegroup(self, nodes, source):
540 def changegroup(self, nodes, source):
565 n = wireprototypes.encodelist(nodes)
541 n = wireprototypes.encodelist(nodes)
566 f = self._callcompressable(b"changegroup", roots=n)
542 f = self._callcompressable(b"changegroup", roots=n)
567 return changegroupmod.cg1unpacker(f, b'UN')
543 return changegroupmod.cg1unpacker(f, b'UN')
568
544
569 def changegroupsubset(self, bases, heads, source):
545 def changegroupsubset(self, bases, heads, source):
570 self.requirecap(b'changegroupsubset', _(b'look up remote changes'))
546 self.requirecap(b'changegroupsubset', _(b'look up remote changes'))
571 bases = wireprototypes.encodelist(bases)
547 bases = wireprototypes.encodelist(bases)
572 heads = wireprototypes.encodelist(heads)
548 heads = wireprototypes.encodelist(heads)
573 f = self._callcompressable(
549 f = self._callcompressable(
574 b"changegroupsubset", bases=bases, heads=heads
550 b"changegroupsubset", bases=bases, heads=heads
575 )
551 )
576 return changegroupmod.cg1unpacker(f, b'UN')
552 return changegroupmod.cg1unpacker(f, b'UN')
577
553
578 # End of ipeerlegacycommands interface.
554 # End of ipeerlegacycommands interface.
579
555
580 def _submitbatch(self, req):
556 def _submitbatch(self, req):
581 """run batch request <req> on the server
557 """run batch request <req> on the server
582
558
583 Returns an iterator of the raw responses from the server.
559 Returns an iterator of the raw responses from the server.
584 """
560 """
585 ui = self.ui
561 ui = self.ui
586 if ui.debugflag and ui.configbool(b'devel', b'debug.peer-request'):
562 if ui.debugflag and ui.configbool(b'devel', b'debug.peer-request'):
587 ui.debug(b'devel-peer-request: batched-content\n')
563 ui.debug(b'devel-peer-request: batched-content\n')
588 for op, args in req:
564 for op, args in req:
589 msg = b'devel-peer-request: - %s (%d arguments)\n'
565 msg = b'devel-peer-request: - %s (%d arguments)\n'
590 ui.debug(msg % (op, len(args)))
566 ui.debug(msg % (op, len(args)))
591
567
592 unescapearg = wireprototypes.unescapebatcharg
568 unescapearg = wireprototypes.unescapebatcharg
593
569
594 rsp = self._callstream(b"batch", cmds=encodebatchcmds(req))
570 rsp = self._callstream(b"batch", cmds=encodebatchcmds(req))
595 chunk = rsp.read(1024)
571 chunk = rsp.read(1024)
596 work = [chunk]
572 work = [chunk]
597 while chunk:
573 while chunk:
598 while b';' not in chunk and chunk:
574 while b';' not in chunk and chunk:
599 chunk = rsp.read(1024)
575 chunk = rsp.read(1024)
600 work.append(chunk)
576 work.append(chunk)
601 merged = b''.join(work)
577 merged = b''.join(work)
602 while b';' in merged:
578 while b';' in merged:
603 one, merged = merged.split(b';', 1)
579 one, merged = merged.split(b';', 1)
604 yield unescapearg(one)
580 yield unescapearg(one)
605 chunk = rsp.read(1024)
581 chunk = rsp.read(1024)
606 work = [merged, chunk]
582 work = [merged, chunk]
607 yield unescapearg(b''.join(work))
583 yield unescapearg(b''.join(work))
608
584
609 def _submitone(self, op, args):
585 def _submitone(self, op, args):
610 return self._call(op, **pycompat.strkwargs(args))
586 return self._call(op, **pycompat.strkwargs(args))
611
587
612 def debugwireargs(self, one, two, three=None, four=None, five=None):
588 def debugwireargs(self, one, two, three=None, four=None, five=None):
613 # don't pass optional arguments left at their default value
589 # don't pass optional arguments left at their default value
614 opts = {}
590 opts = {}
615 if three is not None:
591 if three is not None:
616 opts['three'] = three
592 opts['three'] = three
617 if four is not None:
593 if four is not None:
618 opts['four'] = four
594 opts['four'] = four
619 return self._call(b'debugwireargs', one=one, two=two, **opts)
595 return self._call(b'debugwireargs', one=one, two=two, **opts)
620
596
621 def _call(self, cmd, **args):
597 def _call(self, cmd, **args):
622 """execute <cmd> on the server
598 """execute <cmd> on the server
623
599
624 The command is expected to return a simple string.
600 The command is expected to return a simple string.
625
601
626 returns the server reply as a string."""
602 returns the server reply as a string."""
627 raise NotImplementedError()
603 raise NotImplementedError()
628
604
629 def _callstream(self, cmd, **args):
605 def _callstream(self, cmd, **args):
630 """execute <cmd> on the server
606 """execute <cmd> on the server
631
607
632 The command is expected to return a stream. Note that if the
608 The command is expected to return a stream. Note that if the
633 command doesn't return a stream, _callstream behaves
609 command doesn't return a stream, _callstream behaves
634 differently for ssh and http peers.
610 differently for ssh and http peers.
635
611
636 returns the server reply as a file like object.
612 returns the server reply as a file like object.
637 """
613 """
638 raise NotImplementedError()
614 raise NotImplementedError()
639
615
640 def _callcompressable(self, cmd, **args):
616 def _callcompressable(self, cmd, **args):
641 """execute <cmd> on the server
617 """execute <cmd> on the server
642
618
643 The command is expected to return a stream.
619 The command is expected to return a stream.
644
620
645 The stream may have been compressed in some implementations. This
621 The stream may have been compressed in some implementations. This
646 function takes care of the decompression. This is the only difference
622 function takes care of the decompression. This is the only difference
647 with _callstream.
623 with _callstream.
648
624
649 returns the server reply as a file like object.
625 returns the server reply as a file like object.
650 """
626 """
651 raise NotImplementedError()
627 raise NotImplementedError()
652
628
653 def _callpush(self, cmd, fp, **args):
629 def _callpush(self, cmd, fp, **args):
654 """execute a <cmd> on server
630 """execute a <cmd> on server
655
631
656 The command is expected to be related to a push. Push has a special
632 The command is expected to be related to a push. Push has a special
657 return method.
633 return method.
658
634
659 returns the server reply as a (ret, output) tuple. ret is either
635 returns the server reply as a (ret, output) tuple. ret is either
660 empty (error) or a stringified int.
636 empty (error) or a stringified int.
661 """
637 """
662 raise NotImplementedError()
638 raise NotImplementedError()
663
639
664 def _calltwowaystream(self, cmd, fp, **args):
640 def _calltwowaystream(self, cmd, fp, **args):
665 """execute <cmd> on server
641 """execute <cmd> on server
666
642
667 The command will send a stream to the server and get a stream in reply.
643 The command will send a stream to the server and get a stream in reply.
668 """
644 """
669 raise NotImplementedError()
645 raise NotImplementedError()
670
646
671 def _abort(self, exception):
647 def _abort(self, exception):
672 """clearly abort the wire protocol connection and raise the exception"""
648 """clearly abort the wire protocol connection and raise the exception"""
673 raise NotImplementedError()
649 raise NotImplementedError()
@@ -1,258 +1,254 b''
1 # test-batching.py - tests for transparent command batching
1 # test-batching.py - tests for transparent command batching
2 #
2 #
3 # Copyright 2011 Peter Arrenbrecht <peter@arrenbrecht.ch>
3 # Copyright 2011 Peter Arrenbrecht <peter@arrenbrecht.ch>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import, print_function
8 from __future__ import absolute_import, print_function
9
9
10 import contextlib
10 import contextlib
11
11
12 from mercurial import (
12 from mercurial import (
13 localrepo,
13 localrepo,
14 pycompat,
14 pycompat,
15 wireprotov1peer,
15 wireprotov1peer,
16 )
16 )
17
17
18
18
19 def bprint(*bs):
19 def bprint(*bs):
20 print(*[pycompat.sysstr(b) for b in bs])
20 print(*[pycompat.sysstr(b) for b in bs])
21
21
22
22
23 # equivalent of repo.repository
23 # equivalent of repo.repository
24 class thing(object):
24 class thing(object):
25 def hello(self):
25 def hello(self):
26 return b"Ready."
26 return b"Ready."
27
27
28
28
29 # equivalent of localrepo.localrepository
29 # equivalent of localrepo.localrepository
30 class localthing(thing):
30 class localthing(thing):
31 def foo(self, one, two=None):
31 def foo(self, one, two=None):
32 if one:
32 if one:
33 return b"%s and %s" % (
33 return b"%s and %s" % (
34 one,
34 one,
35 two,
35 two,
36 )
36 )
37 return b"Nope"
37 return b"Nope"
38
38
39 def bar(self, b, a):
39 def bar(self, b, a):
40 return b"%s und %s" % (
40 return b"%s und %s" % (
41 b,
41 b,
42 a,
42 a,
43 )
43 )
44
44
45 def greet(self, name=None):
45 def greet(self, name=None):
46 return b"Hello, %s" % name
46 return b"Hello, %s" % name
47
47
48 @contextlib.contextmanager
48 @contextlib.contextmanager
49 def commandexecutor(self):
49 def commandexecutor(self):
50 e = localrepo.localcommandexecutor(self)
50 e = localrepo.localcommandexecutor(self)
51 try:
51 try:
52 yield e
52 yield e
53 finally:
53 finally:
54 e.close()
54 e.close()
55
55
56
56
57 # usage of "thing" interface
57 # usage of "thing" interface
58 def use(it):
58 def use(it):
59
59
60 # Direct call to base method shared between client and server.
60 # Direct call to base method shared between client and server.
61 bprint(it.hello())
61 bprint(it.hello())
62
62
63 # Direct calls to proxied methods. They cause individual roundtrips.
63 # Direct calls to proxied methods. They cause individual roundtrips.
64 bprint(it.foo(b"Un", two=b"Deux"))
64 bprint(it.foo(b"Un", two=b"Deux"))
65 bprint(it.bar(b"Eins", b"Zwei"))
65 bprint(it.bar(b"Eins", b"Zwei"))
66
66
67 # Batched call to a couple of proxied methods.
67 # Batched call to a couple of proxied methods.
68
68
69 with it.commandexecutor() as e:
69 with it.commandexecutor() as e:
70 ffoo = e.callcommand(b'foo', {b'one': b'One', b'two': b'Two'})
70 ffoo = e.callcommand(b'foo', {b'one': b'One', b'two': b'Two'})
71 fbar = e.callcommand(b'bar', {b'b': b'Eins', b'a': b'Zwei'})
71 fbar = e.callcommand(b'bar', {b'b': b'Eins', b'a': b'Zwei'})
72 fbar2 = e.callcommand(b'bar', {b'b': b'Uno', b'a': b'Due'})
72 fbar2 = e.callcommand(b'bar', {b'b': b'Uno', b'a': b'Due'})
73
73
74 bprint(ffoo.result())
74 bprint(ffoo.result())
75 bprint(fbar.result())
75 bprint(fbar.result())
76 bprint(fbar2.result())
76 bprint(fbar2.result())
77
77
78
78
79 # local usage
79 # local usage
80 mylocal = localthing()
80 mylocal = localthing()
81 print()
81 print()
82 bprint(b"== Local")
82 bprint(b"== Local")
83 use(mylocal)
83 use(mylocal)
84
84
85 # demo remoting; mimicks what wireproto and HTTP/SSH do
85 # demo remoting; mimicks what wireproto and HTTP/SSH do
86
86
87 # shared
87 # shared
88
88
89
89
90 def escapearg(plain):
90 def escapearg(plain):
91 return (
91 return (
92 plain.replace(b':', b'::')
92 plain.replace(b':', b'::')
93 .replace(b',', b':,')
93 .replace(b',', b':,')
94 .replace(b';', b':;')
94 .replace(b';', b':;')
95 .replace(b'=', b':=')
95 .replace(b'=', b':=')
96 )
96 )
97
97
98
98
99 def unescapearg(escaped):
99 def unescapearg(escaped):
100 return (
100 return (
101 escaped.replace(b':=', b'=')
101 escaped.replace(b':=', b'=')
102 .replace(b':;', b';')
102 .replace(b':;', b';')
103 .replace(b':,', b',')
103 .replace(b':,', b',')
104 .replace(b'::', b':')
104 .replace(b'::', b':')
105 )
105 )
106
106
107
107
108 # server side
108 # server side
109
109
110 # equivalent of wireproto's global functions
110 # equivalent of wireproto's global functions
111 class server(object):
111 class server(object):
112 def __init__(self, local):
112 def __init__(self, local):
113 self.local = local
113 self.local = local
114
114
115 def _call(self, name, args):
115 def _call(self, name, args):
116 args = dict(arg.split(b'=', 1) for arg in args)
116 args = dict(arg.split(b'=', 1) for arg in args)
117 return getattr(self, name)(**args)
117 return getattr(self, name)(**args)
118
118
119 def perform(self, req):
119 def perform(self, req):
120 bprint(b"REQ:", req)
120 bprint(b"REQ:", req)
121 name, args = req.split(b'?', 1)
121 name, args = req.split(b'?', 1)
122 args = args.split(b'&')
122 args = args.split(b'&')
123 vals = dict(arg.split(b'=', 1) for arg in args)
123 vals = dict(arg.split(b'=', 1) for arg in args)
124 res = getattr(self, pycompat.sysstr(name))(**pycompat.strkwargs(vals))
124 res = getattr(self, pycompat.sysstr(name))(**pycompat.strkwargs(vals))
125 bprint(b" ->", res)
125 bprint(b" ->", res)
126 return res
126 return res
127
127
128 def batch(self, cmds):
128 def batch(self, cmds):
129 res = []
129 res = []
130 for pair in cmds.split(b';'):
130 for pair in cmds.split(b';'):
131 name, args = pair.split(b':', 1)
131 name, args = pair.split(b':', 1)
132 vals = {}
132 vals = {}
133 for a in args.split(b','):
133 for a in args.split(b','):
134 if a:
134 if a:
135 n, v = a.split(b'=')
135 n, v = a.split(b'=')
136 vals[n] = unescapearg(v)
136 vals[n] = unescapearg(v)
137 res.append(
137 res.append(
138 escapearg(
138 escapearg(
139 getattr(self, pycompat.sysstr(name))(
139 getattr(self, pycompat.sysstr(name))(
140 **pycompat.strkwargs(vals)
140 **pycompat.strkwargs(vals)
141 )
141 )
142 )
142 )
143 )
143 )
144 return b';'.join(res)
144 return b';'.join(res)
145
145
146 def foo(self, one, two):
146 def foo(self, one, two):
147 return mangle(self.local.foo(unmangle(one), unmangle(two)))
147 return mangle(self.local.foo(unmangle(one), unmangle(two)))
148
148
149 def bar(self, b, a):
149 def bar(self, b, a):
150 return mangle(self.local.bar(unmangle(b), unmangle(a)))
150 return mangle(self.local.bar(unmangle(b), unmangle(a)))
151
151
152 def greet(self, name):
152 def greet(self, name):
153 return mangle(self.local.greet(unmangle(name)))
153 return mangle(self.local.greet(unmangle(name)))
154
154
155
155
156 myserver = server(mylocal)
156 myserver = server(mylocal)
157
157
158 # local side
158 # local side
159
159
160 # equivalent of wireproto.encode/decodelist, that is, type-specific marshalling
160 # equivalent of wireproto.encode/decodelist, that is, type-specific marshalling
161 # here we just transform the strings a bit to check we're properly en-/decoding
161 # here we just transform the strings a bit to check we're properly en-/decoding
162 def mangle(s):
162 def mangle(s):
163 return b''.join(pycompat.bytechr(ord(c) + 1) for c in pycompat.bytestr(s))
163 return b''.join(pycompat.bytechr(ord(c) + 1) for c in pycompat.bytestr(s))
164
164
165
165
166 def unmangle(s):
166 def unmangle(s):
167 return b''.join(pycompat.bytechr(ord(c) - 1) for c in pycompat.bytestr(s))
167 return b''.join(pycompat.bytechr(ord(c) - 1) for c in pycompat.bytestr(s))
168
168
169
169
170 # equivalent of wireproto.wirerepository and something like http's wire format
170 # equivalent of wireproto.wirerepository and something like http's wire format
171 class remotething(thing):
171 class remotething(thing):
172 def __init__(self, server):
172 def __init__(self, server):
173 self.server = server
173 self.server = server
174
174
175 def _submitone(self, name, args):
175 def _submitone(self, name, args):
176 req = name + b'?' + b'&'.join([b'%s=%s' % (n, v) for n, v in args])
176 req = name + b'?' + b'&'.join([b'%s=%s' % (n, v) for n, v in args])
177 return self.server.perform(req)
177 return self.server.perform(req)
178
178
179 def _submitbatch(self, cmds):
179 def _submitbatch(self, cmds):
180 req = []
180 req = []
181 for name, args in cmds:
181 for name, args in cmds:
182 args = b','.join(n + b'=' + escapearg(v) for n, v in args)
182 args = b','.join(n + b'=' + escapearg(v) for n, v in args)
183 req.append(name + b':' + args)
183 req.append(name + b':' + args)
184 req = b';'.join(req)
184 req = b';'.join(req)
185 res = self._submitone(
185 res = self._submitone(
186 b'batch',
186 b'batch',
187 [
187 [
188 (
188 (
189 b'cmds',
189 b'cmds',
190 req,
190 req,
191 )
191 )
192 ],
192 ],
193 )
193 )
194 for r in res.split(b';'):
194 for r in res.split(b';'):
195 yield r
195 yield r
196
196
197 @contextlib.contextmanager
197 @contextlib.contextmanager
198 def commandexecutor(self):
198 def commandexecutor(self):
199 e = wireprotov1peer.peerexecutor(self)
199 e = wireprotov1peer.peerexecutor(self)
200 try:
200 try:
201 yield e
201 yield e
202 finally:
202 finally:
203 e.close()
203 e.close()
204
204
205 @wireprotov1peer.batchable
205 @wireprotov1peer.batchable
206 def foo(self, one, two=None):
206 def foo(self, one, two=None):
207 encoded_args = [
207 encoded_args = [
208 (
208 (
209 b'one',
209 b'one',
210 mangle(one),
210 mangle(one),
211 ),
211 ),
212 (
212 (
213 b'two',
213 b'two',
214 mangle(two),
214 mangle(two),
215 ),
215 ),
216 ]
216 ]
217 encoded_res_future = wireprotov1peer.future()
217 return encoded_args, unmangle
218 yield encoded_args, encoded_res_future
219 yield unmangle(encoded_res_future.value)
220
218
221 @wireprotov1peer.batchable
219 @wireprotov1peer.batchable
222 def bar(self, b, a):
220 def bar(self, b, a):
223 encresref = wireprotov1peer.future()
221 return [
224 yield [
225 (
222 (
226 b'b',
223 b'b',
227 mangle(b),
224 mangle(b),
228 ),
225 ),
229 (
226 (
230 b'a',
227 b'a',
231 mangle(a),
228 mangle(a),
232 ),
229 ),
233 ], encresref
230 ], unmangle
234 yield unmangle(encresref.value)
235
231
236 # greet is coded directly. It therefore does not support batching. If it
232 # greet is coded directly. It therefore does not support batching. If it
237 # does appear in a batch, the batch is split around greet, and the call to
233 # does appear in a batch, the batch is split around greet, and the call to
238 # greet is done in its own roundtrip.
234 # greet is done in its own roundtrip.
239 def greet(self, name=None):
235 def greet(self, name=None):
240 return unmangle(
236 return unmangle(
241 self._submitone(
237 self._submitone(
242 b'greet',
238 b'greet',
243 [
239 [
244 (
240 (
245 b'name',
241 b'name',
246 mangle(name),
242 mangle(name),
247 )
243 )
248 ],
244 ],
249 )
245 )
250 )
246 )
251
247
252
248
253 # demo remote usage
249 # demo remote usage
254
250
255 myproxy = remotething(myserver)
251 myproxy = remotething(myserver)
256 print()
252 print()
257 bprint(b"== Remote")
253 bprint(b"== Remote")
258 use(myproxy)
254 use(myproxy)
@@ -1,126 +1,124 b''
1 from __future__ import absolute_import, print_function
1 from __future__ import absolute_import, print_function
2
2
3 import sys
3 import sys
4
4
5 from mercurial import (
5 from mercurial import (
6 error,
6 error,
7 pycompat,
7 pycompat,
8 ui as uimod,
8 ui as uimod,
9 util,
9 util,
10 wireprototypes,
10 wireprototypes,
11 wireprotov1peer,
11 wireprotov1peer,
12 wireprotov1server,
12 wireprotov1server,
13 )
13 )
14 from mercurial.utils import stringutil
14 from mercurial.utils import stringutil
15
15
16 stringio = util.stringio
16 stringio = util.stringio
17
17
18
18
19 class proto(object):
19 class proto(object):
20 def __init__(self, args):
20 def __init__(self, args):
21 self.args = args
21 self.args = args
22 self.name = 'dummyproto'
22 self.name = 'dummyproto'
23
23
24 def getargs(self, spec):
24 def getargs(self, spec):
25 args = self.args
25 args = self.args
26 args.setdefault(b'*', {})
26 args.setdefault(b'*', {})
27 names = spec.split()
27 names = spec.split()
28 return [args[n] for n in names]
28 return [args[n] for n in names]
29
29
30 def checkperm(self, perm):
30 def checkperm(self, perm):
31 pass
31 pass
32
32
33
33
34 wireprototypes.TRANSPORTS['dummyproto'] = {
34 wireprototypes.TRANSPORTS['dummyproto'] = {
35 'transport': 'dummy',
35 'transport': 'dummy',
36 'version': 1,
36 'version': 1,
37 }
37 }
38
38
39
39
40 class clientpeer(wireprotov1peer.wirepeer):
40 class clientpeer(wireprotov1peer.wirepeer):
41 def __init__(self, serverrepo, ui):
41 def __init__(self, serverrepo, ui):
42 self.serverrepo = serverrepo
42 self.serverrepo = serverrepo
43 self.ui = ui
43 self.ui = ui
44
44
45 def url(self):
45 def url(self):
46 return b'test'
46 return b'test'
47
47
48 def local(self):
48 def local(self):
49 return None
49 return None
50
50
51 def peer(self):
51 def peer(self):
52 return self
52 return self
53
53
54 def canpush(self):
54 def canpush(self):
55 return True
55 return True
56
56
57 def close(self):
57 def close(self):
58 pass
58 pass
59
59
60 def capabilities(self):
60 def capabilities(self):
61 return [b'batch']
61 return [b'batch']
62
62
63 def _call(self, cmd, **args):
63 def _call(self, cmd, **args):
64 args = pycompat.byteskwargs(args)
64 args = pycompat.byteskwargs(args)
65 res = wireprotov1server.dispatch(self.serverrepo, proto(args), cmd)
65 res = wireprotov1server.dispatch(self.serverrepo, proto(args), cmd)
66 if isinstance(res, wireprototypes.bytesresponse):
66 if isinstance(res, wireprototypes.bytesresponse):
67 return res.data
67 return res.data
68 elif isinstance(res, bytes):
68 elif isinstance(res, bytes):
69 return res
69 return res
70 else:
70 else:
71 raise error.Abort('dummy client does not support response type')
71 raise error.Abort('dummy client does not support response type')
72
72
73 def _callstream(self, cmd, **args):
73 def _callstream(self, cmd, **args):
74 return stringio(self._call(cmd, **args))
74 return stringio(self._call(cmd, **args))
75
75
76 @wireprotov1peer.batchable
76 @wireprotov1peer.batchable
77 def greet(self, name):
77 def greet(self, name):
78 f = wireprotov1peer.future()
78 return {b'name': mangle(name)}, unmangle
79 yield {b'name': mangle(name)}, f
80 yield unmangle(f.value)
81
79
82
80
83 class serverrepo(object):
81 class serverrepo(object):
84 def __init__(self, ui):
82 def __init__(self, ui):
85 self.ui = ui
83 self.ui = ui
86
84
87 def greet(self, name):
85 def greet(self, name):
88 return b"Hello, " + name
86 return b"Hello, " + name
89
87
90 def filtered(self, name):
88 def filtered(self, name):
91 return self
89 return self
92
90
93
91
94 def mangle(s):
92 def mangle(s):
95 return b''.join(pycompat.bytechr(ord(c) + 1) for c in pycompat.bytestr(s))
93 return b''.join(pycompat.bytechr(ord(c) + 1) for c in pycompat.bytestr(s))
96
94
97
95
98 def unmangle(s):
96 def unmangle(s):
99 return b''.join(pycompat.bytechr(ord(c) - 1) for c in pycompat.bytestr(s))
97 return b''.join(pycompat.bytechr(ord(c) - 1) for c in pycompat.bytestr(s))
100
98
101
99
102 def greet(repo, proto, name):
100 def greet(repo, proto, name):
103 return mangle(repo.greet(unmangle(name)))
101 return mangle(repo.greet(unmangle(name)))
104
102
105
103
106 wireprotov1server.commands[b'greet'] = (greet, b'name')
104 wireprotov1server.commands[b'greet'] = (greet, b'name')
107
105
108 srv = serverrepo(uimod.ui())
106 srv = serverrepo(uimod.ui())
109 clt = clientpeer(srv, uimod.ui())
107 clt = clientpeer(srv, uimod.ui())
110
108
111
109
112 def printb(data, end=b'\n'):
110 def printb(data, end=b'\n'):
113 out = getattr(sys.stdout, 'buffer', sys.stdout)
111 out = getattr(sys.stdout, 'buffer', sys.stdout)
114 out.write(data + end)
112 out.write(data + end)
115 out.flush()
113 out.flush()
116
114
117
115
118 printb(clt.greet(b"Foobar"))
116 printb(clt.greet(b"Foobar"))
119
117
120 with clt.commandexecutor() as e:
118 with clt.commandexecutor() as e:
121 fgreet1 = e.callcommand(b'greet', {b'name': b'Fo, =;:<o'})
119 fgreet1 = e.callcommand(b'greet', {b'name': b'Fo, =;:<o'})
122 fgreet2 = e.callcommand(b'greet', {b'name': b'Bar'})
120 fgreet2 = e.callcommand(b'greet', {b'name': b'Bar'})
123
121
124 printb(
122 printb(
125 stringutil.pprint([f.result() for f in (fgreet1, fgreet2)], bprefix=True)
123 stringutil.pprint([f.result() for f in (fgreet1, fgreet2)], bprefix=True)
126 )
124 )
General Comments 0
You need to be logged in to leave comments. Login now