##// END OF EJS Templates
clonebundle: use 'repo.vfs' instead of 'repo.opener'...
Pierre-Yves David -
r31146:16d8bec0 default
parent child Browse files
Show More
@@ -1,186 +1,186
1 # This software may be used and distributed according to the terms of the
1 # This software may be used and distributed according to the terms of the
2 # GNU General Public License version 2 or any later version.
2 # GNU General Public License version 2 or any later version.
3
3
4 """advertise pre-generated bundles to seed clones
4 """advertise pre-generated bundles to seed clones
5
5
6 "clonebundles" is a server-side extension used to advertise the existence
6 "clonebundles" is a server-side extension used to advertise the existence
7 of pre-generated, externally hosted bundle files to clients that are
7 of pre-generated, externally hosted bundle files to clients that are
8 cloning so that cloning can be faster, more reliable, and require less
8 cloning so that cloning can be faster, more reliable, and require less
9 resources on the server.
9 resources on the server.
10
10
11 Cloning can be a CPU and I/O intensive operation on servers. Traditionally,
11 Cloning can be a CPU and I/O intensive operation on servers. Traditionally,
12 the server, in response to a client's request to clone, dynamically generates
12 the server, in response to a client's request to clone, dynamically generates
13 a bundle containing the entire repository content and sends it to the client.
13 a bundle containing the entire repository content and sends it to the client.
14 There is no caching on the server and the server will have to redundantly
14 There is no caching on the server and the server will have to redundantly
15 generate the same outgoing bundle in response to each clone request. For
15 generate the same outgoing bundle in response to each clone request. For
16 servers with large repositories or with high clone volume, the load from
16 servers with large repositories or with high clone volume, the load from
17 clones can make scaling the server challenging and costly.
17 clones can make scaling the server challenging and costly.
18
18
19 This extension provides server operators the ability to offload potentially
19 This extension provides server operators the ability to offload potentially
20 expensive clone load to an external service. Here's how it works.
20 expensive clone load to an external service. Here's how it works.
21
21
22 1. A server operator establishes a mechanism for making bundle files available
22 1. A server operator establishes a mechanism for making bundle files available
23 on a hosting service where Mercurial clients can fetch them.
23 on a hosting service where Mercurial clients can fetch them.
24 2. A manifest file listing available bundle URLs and some optional metadata
24 2. A manifest file listing available bundle URLs and some optional metadata
25 is added to the Mercurial repository on the server.
25 is added to the Mercurial repository on the server.
26 3. A client initiates a clone against a clone bundles aware server.
26 3. A client initiates a clone against a clone bundles aware server.
27 4. The client sees the server is advertising clone bundles and fetches the
27 4. The client sees the server is advertising clone bundles and fetches the
28 manifest listing available bundles.
28 manifest listing available bundles.
29 5. The client filters and sorts the available bundles based on what it
29 5. The client filters and sorts the available bundles based on what it
30 supports and prefers.
30 supports and prefers.
31 6. The client downloads and applies an available bundle from the
31 6. The client downloads and applies an available bundle from the
32 server-specified URL.
32 server-specified URL.
33 7. The client reconnects to the original server and performs the equivalent
33 7. The client reconnects to the original server and performs the equivalent
34 of :hg:`pull` to retrieve all repository data not in the bundle. (The
34 of :hg:`pull` to retrieve all repository data not in the bundle. (The
35 repository could have been updated between when the bundle was created
35 repository could have been updated between when the bundle was created
36 and when the client started the clone.)
36 and when the client started the clone.)
37
37
38 Instead of the server generating full repository bundles for every clone
38 Instead of the server generating full repository bundles for every clone
39 request, it generates full bundles once and they are subsequently reused to
39 request, it generates full bundles once and they are subsequently reused to
40 bootstrap new clones. The server may still transfer data at clone time.
40 bootstrap new clones. The server may still transfer data at clone time.
41 However, this is only data that has been added/changed since the bundle was
41 However, this is only data that has been added/changed since the bundle was
42 created. For large, established repositories, this can reduce server load for
42 created. For large, established repositories, this can reduce server load for
43 clones to less than 1% of original.
43 clones to less than 1% of original.
44
44
45 To work, this extension requires the following of server operators:
45 To work, this extension requires the following of server operators:
46
46
47 * Generating bundle files of repository content (typically periodically,
47 * Generating bundle files of repository content (typically periodically,
48 such as once per day).
48 such as once per day).
49 * A file server that clients have network access to and that Python knows
49 * A file server that clients have network access to and that Python knows
50 how to talk to through its normal URL handling facility (typically an
50 how to talk to through its normal URL handling facility (typically an
51 HTTP server).
51 HTTP server).
52 * A process for keeping the bundles manifest in sync with available bundle
52 * A process for keeping the bundles manifest in sync with available bundle
53 files.
53 files.
54
54
55 Strictly speaking, using a static file hosting server isn't required: a server
55 Strictly speaking, using a static file hosting server isn't required: a server
56 operator could use a dynamic service for retrieving bundle data. However,
56 operator could use a dynamic service for retrieving bundle data. However,
57 static file hosting services are simple and scalable and should be sufficient
57 static file hosting services are simple and scalable and should be sufficient
58 for most needs.
58 for most needs.
59
59
60 Bundle files can be generated with the :hg:`bundle` command. Typically
60 Bundle files can be generated with the :hg:`bundle` command. Typically
61 :hg:`bundle --all` is used to produce a bundle of the entire repository.
61 :hg:`bundle --all` is used to produce a bundle of the entire repository.
62
62
63 :hg:`debugcreatestreamclonebundle` can be used to produce a special
63 :hg:`debugcreatestreamclonebundle` can be used to produce a special
64 *streaming clone bundle*. These are bundle files that are extremely efficient
64 *streaming clone bundle*. These are bundle files that are extremely efficient
65 to produce and consume (read: fast). However, they are larger than
65 to produce and consume (read: fast). However, they are larger than
66 traditional bundle formats and require that clients support the exact set
66 traditional bundle formats and require that clients support the exact set
67 of repository data store formats in use by the repository that created them.
67 of repository data store formats in use by the repository that created them.
68 Typically, a newer server can serve data that is compatible with older clients.
68 Typically, a newer server can serve data that is compatible with older clients.
69 However, *streaming clone bundles* don't have this guarantee. **Server
69 However, *streaming clone bundles* don't have this guarantee. **Server
70 operators need to be aware that newer versions of Mercurial may produce
70 operators need to be aware that newer versions of Mercurial may produce
71 streaming clone bundles incompatible with older Mercurial versions.**
71 streaming clone bundles incompatible with older Mercurial versions.**
72
72
73 A server operator is responsible for creating a ``.hg/clonebundles.manifest``
73 A server operator is responsible for creating a ``.hg/clonebundles.manifest``
74 file containing the list of available bundle files suitable for seeding
74 file containing the list of available bundle files suitable for seeding
75 clones. If this file does not exist, the repository will not advertise the
75 clones. If this file does not exist, the repository will not advertise the
76 existence of clone bundles when clients connect.
76 existence of clone bundles when clients connect.
77
77
78 The manifest file contains a newline (\n) delimited list of entries.
78 The manifest file contains a newline (\n) delimited list of entries.
79
79
80 Each line in this file defines an available bundle. Lines have the format:
80 Each line in this file defines an available bundle. Lines have the format:
81
81
82 <URL> [<key>=<value>[ <key>=<value>]]
82 <URL> [<key>=<value>[ <key>=<value>]]
83
83
84 That is, a URL followed by an optional, space-delimited list of key=value
84 That is, a URL followed by an optional, space-delimited list of key=value
85 pairs describing additional properties of this bundle. Both keys and values
85 pairs describing additional properties of this bundle. Both keys and values
86 are URI encoded.
86 are URI encoded.
87
87
88 Keys in UPPERCASE are reserved for use by Mercurial and are defined below.
88 Keys in UPPERCASE are reserved for use by Mercurial and are defined below.
89 All non-uppercase keys can be used by site installations. An example use
89 All non-uppercase keys can be used by site installations. An example use
90 for custom properties is to use the *datacenter* attribute to define which
90 for custom properties is to use the *datacenter* attribute to define which
91 data center a file is hosted in. Clients could then prefer a server in the
91 data center a file is hosted in. Clients could then prefer a server in the
92 data center closest to them.
92 data center closest to them.
93
93
94 The following reserved keys are currently defined:
94 The following reserved keys are currently defined:
95
95
96 BUNDLESPEC
96 BUNDLESPEC
97 A "bundle specification" string that describes the type of the bundle.
97 A "bundle specification" string that describes the type of the bundle.
98
98
99 These are string values that are accepted by the "--type" argument of
99 These are string values that are accepted by the "--type" argument of
100 :hg:`bundle`.
100 :hg:`bundle`.
101
101
102 The values are parsed in strict mode, which means they must be of the
102 The values are parsed in strict mode, which means they must be of the
103 "<compression>-<type>" form. See
103 "<compression>-<type>" form. See
104 mercurial.exchange.parsebundlespec() for more details.
104 mercurial.exchange.parsebundlespec() for more details.
105
105
106 :hg:`debugbundle --spec` can be used to print the bundle specification
106 :hg:`debugbundle --spec` can be used to print the bundle specification
107 string for a bundle file. The output of this command can be used verbatim
107 string for a bundle file. The output of this command can be used verbatim
108 for the value of ``BUNDLESPEC`` (it is already escaped).
108 for the value of ``BUNDLESPEC`` (it is already escaped).
109
109
110 Clients will automatically filter out specifications that are unknown or
110 Clients will automatically filter out specifications that are unknown or
111 unsupported so they won't attempt to download something that likely won't
111 unsupported so they won't attempt to download something that likely won't
112 apply.
112 apply.
113
113
114 The actual value doesn't impact client behavior beyond filtering:
114 The actual value doesn't impact client behavior beyond filtering:
115 clients will still sniff the bundle type from the header of downloaded
115 clients will still sniff the bundle type from the header of downloaded
116 files.
116 files.
117
117
118 **Use of this key is highly recommended**, as it allows clients to
118 **Use of this key is highly recommended**, as it allows clients to
119 easily skip unsupported bundles. If this key is not defined, an old
119 easily skip unsupported bundles. If this key is not defined, an old
120 client may attempt to apply a bundle that it is incapable of reading.
120 client may attempt to apply a bundle that it is incapable of reading.
121
121
122 REQUIRESNI
122 REQUIRESNI
123 Whether Server Name Indication (SNI) is required to connect to the URL.
123 Whether Server Name Indication (SNI) is required to connect to the URL.
124 SNI allows servers to use multiple certificates on the same IP. It is
124 SNI allows servers to use multiple certificates on the same IP. It is
125 somewhat common in CDNs and other hosting providers. Older Python
125 somewhat common in CDNs and other hosting providers. Older Python
126 versions do not support SNI. Defining this attribute enables clients
126 versions do not support SNI. Defining this attribute enables clients
127 with older Python versions to filter this entry without experiencing
127 with older Python versions to filter this entry without experiencing
128 an opaque SSL failure at connection time.
128 an opaque SSL failure at connection time.
129
129
130 If this is defined, it is important to advertise a non-SNI fallback
130 If this is defined, it is important to advertise a non-SNI fallback
131 URL or clients running old Python releases may not be able to clone
131 URL or clients running old Python releases may not be able to clone
132 with the clonebundles facility.
132 with the clonebundles facility.
133
133
134 Value should be "true".
134 Value should be "true".
135
135
136 Manifests can contain multiple entries. Assuming metadata is defined, clients
136 Manifests can contain multiple entries. Assuming metadata is defined, clients
137 will filter entries from the manifest that they don't support. The remaining
137 will filter entries from the manifest that they don't support. The remaining
138 entries are optionally sorted by client preferences
138 entries are optionally sorted by client preferences
139 (``experimental.clonebundleprefers`` config option). The client then attempts
139 (``experimental.clonebundleprefers`` config option). The client then attempts
140 to fetch the bundle at the first URL in the remaining list.
140 to fetch the bundle at the first URL in the remaining list.
141
141
142 **Errors when downloading a bundle will fail the entire clone operation:
142 **Errors when downloading a bundle will fail the entire clone operation:
143 clients do not automatically fall back to a traditional clone.** The reason
143 clients do not automatically fall back to a traditional clone.** The reason
144 for this is that if a server is using clone bundles, it is probably doing so
144 for this is that if a server is using clone bundles, it is probably doing so
145 because the feature is necessary to help it scale. In other words, there
145 because the feature is necessary to help it scale. In other words, there
146 is an assumption that clone load will be offloaded to another service and
146 is an assumption that clone load will be offloaded to another service and
147 that the Mercurial server isn't responsible for serving this clone load.
147 that the Mercurial server isn't responsible for serving this clone load.
148 If that other service experiences issues and clients start mass falling back to
148 If that other service experiences issues and clients start mass falling back to
149 the original Mercurial server, the added clone load could overwhelm the server
149 the original Mercurial server, the added clone load could overwhelm the server
150 due to unexpected load and effectively take it offline. Not having clients
150 due to unexpected load and effectively take it offline. Not having clients
151 automatically fall back to cloning from the original server mitigates this
151 automatically fall back to cloning from the original server mitigates this
152 scenario.
152 scenario.
153
153
154 Because there is no automatic Mercurial server fallback on failure of the
154 Because there is no automatic Mercurial server fallback on failure of the
155 bundle hosting service, it is important for server operators to view the bundle
155 bundle hosting service, it is important for server operators to view the bundle
156 hosting service as an extension of the Mercurial server in terms of
156 hosting service as an extension of the Mercurial server in terms of
157 availability and service level agreements: if the bundle hosting service goes
157 availability and service level agreements: if the bundle hosting service goes
158 down, so does the ability for clients to clone. Note: clients will see a
158 down, so does the ability for clients to clone. Note: clients will see a
159 message informing them how to bypass the clone bundles facility when a failure
159 message informing them how to bypass the clone bundles facility when a failure
160 occurs. So server operators should prepare for some people to follow these
160 occurs. So server operators should prepare for some people to follow these
161 instructions when a failure occurs, thus driving more load to the original
161 instructions when a failure occurs, thus driving more load to the original
162 Mercurial server when the bundle hosting service fails.
162 Mercurial server when the bundle hosting service fails.
163 """
163 """
164
164
165 from __future__ import absolute_import
165 from __future__ import absolute_import
166
166
167 from mercurial import (
167 from mercurial import (
168 extensions,
168 extensions,
169 wireproto,
169 wireproto,
170 )
170 )
171
171
172 testedwith = 'ships-with-hg-core'
172 testedwith = 'ships-with-hg-core'
173
173
174 def capabilities(orig, repo, proto):
174 def capabilities(orig, repo, proto):
175 caps = orig(repo, proto)
175 caps = orig(repo, proto)
176
176
177 # Only advertise if a manifest exists. This does add some I/O to requests.
177 # Only advertise if a manifest exists. This does add some I/O to requests.
178 # But this should be cheaper than a wasted network round trip due to
178 # But this should be cheaper than a wasted network round trip due to
179 # missing file.
179 # missing file.
180 if repo.opener.exists('clonebundles.manifest'):
180 if repo.vfs.exists('clonebundles.manifest'):
181 caps.append('clonebundles')
181 caps.append('clonebundles')
182
182
183 return caps
183 return caps
184
184
185 def extsetup(ui):
185 def extsetup(ui):
186 extensions.wrapfunction(wireproto, '_capabilities', capabilities)
186 extensions.wrapfunction(wireproto, '_capabilities', capabilities)
@@ -1,1050 +1,1050
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import itertools
11 import itertools
12 import os
12 import os
13 import tempfile
13 import tempfile
14
14
15 from .i18n import _
15 from .i18n import _
16 from .node import (
16 from .node import (
17 bin,
17 bin,
18 hex,
18 hex,
19 )
19 )
20
20
21 from . import (
21 from . import (
22 bundle2,
22 bundle2,
23 changegroup as changegroupmod,
23 changegroup as changegroupmod,
24 encoding,
24 encoding,
25 error,
25 error,
26 exchange,
26 exchange,
27 peer,
27 peer,
28 pushkey as pushkeymod,
28 pushkey as pushkeymod,
29 pycompat,
29 pycompat,
30 streamclone,
30 streamclone,
31 util,
31 util,
32 )
32 )
33
33
34 urlerr = util.urlerr
34 urlerr = util.urlerr
35 urlreq = util.urlreq
35 urlreq = util.urlreq
36
36
37 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
37 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
38 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
38 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
39 'IncompatibleClient')
39 'IncompatibleClient')
40 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
40 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
41
41
42 class abstractserverproto(object):
42 class abstractserverproto(object):
43 """abstract class that summarizes the protocol API
43 """abstract class that summarizes the protocol API
44
44
45 Used as reference and documentation.
45 Used as reference and documentation.
46 """
46 """
47
47
48 def getargs(self, args):
48 def getargs(self, args):
49 """return the value for arguments in <args>
49 """return the value for arguments in <args>
50
50
51 returns a list of values (same order as <args>)"""
51 returns a list of values (same order as <args>)"""
52 raise NotImplementedError()
52 raise NotImplementedError()
53
53
54 def getfile(self, fp):
54 def getfile(self, fp):
55 """write the whole content of a file into a file like object
55 """write the whole content of a file into a file like object
56
56
57 The file is in the form::
57 The file is in the form::
58
58
59 (<chunk-size>\n<chunk>)+0\n
59 (<chunk-size>\n<chunk>)+0\n
60
60
61 chunk size is the ascii version of the int.
61 chunk size is the ascii version of the int.
62 """
62 """
63 raise NotImplementedError()
63 raise NotImplementedError()
64
64
65 def redirect(self):
65 def redirect(self):
66 """may setup interception for stdout and stderr
66 """may setup interception for stdout and stderr
67
67
68 See also the `restore` method."""
68 See also the `restore` method."""
69 raise NotImplementedError()
69 raise NotImplementedError()
70
70
71 # If the `redirect` function does install interception, the `restore`
71 # If the `redirect` function does install interception, the `restore`
72 # function MUST be defined. If interception is not used, this function
72 # function MUST be defined. If interception is not used, this function
73 # MUST NOT be defined.
73 # MUST NOT be defined.
74 #
74 #
75 # left commented here on purpose
75 # left commented here on purpose
76 #
76 #
77 #def restore(self):
77 #def restore(self):
78 # """reinstall previous stdout and stderr and return intercepted stdout
78 # """reinstall previous stdout and stderr and return intercepted stdout
79 # """
79 # """
80 # raise NotImplementedError()
80 # raise NotImplementedError()
81
81
82 class remotebatch(peer.batcher):
82 class remotebatch(peer.batcher):
83 '''batches the queued calls; uses as few roundtrips as possible'''
83 '''batches the queued calls; uses as few roundtrips as possible'''
84 def __init__(self, remote):
84 def __init__(self, remote):
85 '''remote must support _submitbatch(encbatch) and
85 '''remote must support _submitbatch(encbatch) and
86 _submitone(op, encargs)'''
86 _submitone(op, encargs)'''
87 peer.batcher.__init__(self)
87 peer.batcher.__init__(self)
88 self.remote = remote
88 self.remote = remote
89 def submit(self):
89 def submit(self):
90 req, rsp = [], []
90 req, rsp = [], []
91 for name, args, opts, resref in self.calls:
91 for name, args, opts, resref in self.calls:
92 mtd = getattr(self.remote, name)
92 mtd = getattr(self.remote, name)
93 batchablefn = getattr(mtd, 'batchable', None)
93 batchablefn = getattr(mtd, 'batchable', None)
94 if batchablefn is not None:
94 if batchablefn is not None:
95 batchable = batchablefn(mtd.im_self, *args, **opts)
95 batchable = batchablefn(mtd.im_self, *args, **opts)
96 encargsorres, encresref = next(batchable)
96 encargsorres, encresref = next(batchable)
97 if encresref:
97 if encresref:
98 req.append((name, encargsorres,))
98 req.append((name, encargsorres,))
99 rsp.append((batchable, encresref, resref,))
99 rsp.append((batchable, encresref, resref,))
100 else:
100 else:
101 resref.set(encargsorres)
101 resref.set(encargsorres)
102 else:
102 else:
103 if req:
103 if req:
104 self._submitreq(req, rsp)
104 self._submitreq(req, rsp)
105 req, rsp = [], []
105 req, rsp = [], []
106 resref.set(mtd(*args, **opts))
106 resref.set(mtd(*args, **opts))
107 if req:
107 if req:
108 self._submitreq(req, rsp)
108 self._submitreq(req, rsp)
109 def _submitreq(self, req, rsp):
109 def _submitreq(self, req, rsp):
110 encresults = self.remote._submitbatch(req)
110 encresults = self.remote._submitbatch(req)
111 for encres, r in zip(encresults, rsp):
111 for encres, r in zip(encresults, rsp):
112 batchable, encresref, resref = r
112 batchable, encresref, resref = r
113 encresref.set(encres)
113 encresref.set(encres)
114 resref.set(next(batchable))
114 resref.set(next(batchable))
115
115
116 class remoteiterbatcher(peer.iterbatcher):
116 class remoteiterbatcher(peer.iterbatcher):
117 def __init__(self, remote):
117 def __init__(self, remote):
118 super(remoteiterbatcher, self).__init__()
118 super(remoteiterbatcher, self).__init__()
119 self._remote = remote
119 self._remote = remote
120
120
121 def __getattr__(self, name):
121 def __getattr__(self, name):
122 if not getattr(self._remote, name, False):
122 if not getattr(self._remote, name, False):
123 raise AttributeError(
123 raise AttributeError(
124 'Attempted to iterbatch non-batchable call to %r' % name)
124 'Attempted to iterbatch non-batchable call to %r' % name)
125 return super(remoteiterbatcher, self).__getattr__(name)
125 return super(remoteiterbatcher, self).__getattr__(name)
126
126
127 def submit(self):
127 def submit(self):
128 """Break the batch request into many patch calls and pipeline them.
128 """Break the batch request into many patch calls and pipeline them.
129
129
130 This is mostly valuable over http where request sizes can be
130 This is mostly valuable over http where request sizes can be
131 limited, but can be used in other places as well.
131 limited, but can be used in other places as well.
132 """
132 """
133 req, rsp = [], []
133 req, rsp = [], []
134 for name, args, opts, resref in self.calls:
134 for name, args, opts, resref in self.calls:
135 mtd = getattr(self._remote, name)
135 mtd = getattr(self._remote, name)
136 batchable = mtd.batchable(mtd.im_self, *args, **opts)
136 batchable = mtd.batchable(mtd.im_self, *args, **opts)
137 encargsorres, encresref = next(batchable)
137 encargsorres, encresref = next(batchable)
138 assert encresref
138 assert encresref
139 req.append((name, encargsorres))
139 req.append((name, encargsorres))
140 rsp.append((batchable, encresref))
140 rsp.append((batchable, encresref))
141 if req:
141 if req:
142 self._resultiter = self._remote._submitbatch(req)
142 self._resultiter = self._remote._submitbatch(req)
143 self._rsp = rsp
143 self._rsp = rsp
144
144
145 def results(self):
145 def results(self):
146 for (batchable, encresref), encres in itertools.izip(
146 for (batchable, encresref), encres in itertools.izip(
147 self._rsp, self._resultiter):
147 self._rsp, self._resultiter):
148 encresref.set(encres)
148 encresref.set(encres)
149 yield next(batchable)
149 yield next(batchable)
150
150
151 # Forward a couple of names from peer to make wireproto interactions
151 # Forward a couple of names from peer to make wireproto interactions
152 # slightly more sensible.
152 # slightly more sensible.
153 batchable = peer.batchable
153 batchable = peer.batchable
154 future = peer.future
154 future = peer.future
155
155
156 # list of nodes encoding / decoding
156 # list of nodes encoding / decoding
157
157
158 def decodelist(l, sep=' '):
158 def decodelist(l, sep=' '):
159 if l:
159 if l:
160 return map(bin, l.split(sep))
160 return map(bin, l.split(sep))
161 return []
161 return []
162
162
163 def encodelist(l, sep=' '):
163 def encodelist(l, sep=' '):
164 try:
164 try:
165 return sep.join(map(hex, l))
165 return sep.join(map(hex, l))
166 except TypeError:
166 except TypeError:
167 raise
167 raise
168
168
169 # batched call argument encoding
169 # batched call argument encoding
170
170
171 def escapearg(plain):
171 def escapearg(plain):
172 return (plain
172 return (plain
173 .replace(':', ':c')
173 .replace(':', ':c')
174 .replace(',', ':o')
174 .replace(',', ':o')
175 .replace(';', ':s')
175 .replace(';', ':s')
176 .replace('=', ':e'))
176 .replace('=', ':e'))
177
177
178 def unescapearg(escaped):
178 def unescapearg(escaped):
179 return (escaped
179 return (escaped
180 .replace(':e', '=')
180 .replace(':e', '=')
181 .replace(':s', ';')
181 .replace(':s', ';')
182 .replace(':o', ',')
182 .replace(':o', ',')
183 .replace(':c', ':'))
183 .replace(':c', ':'))
184
184
185 def encodebatchcmds(req):
185 def encodebatchcmds(req):
186 """Return a ``cmds`` argument value for the ``batch`` command."""
186 """Return a ``cmds`` argument value for the ``batch`` command."""
187 cmds = []
187 cmds = []
188 for op, argsdict in req:
188 for op, argsdict in req:
189 # Old servers didn't properly unescape argument names. So prevent
189 # Old servers didn't properly unescape argument names. So prevent
190 # the sending of argument names that may not be decoded properly by
190 # the sending of argument names that may not be decoded properly by
191 # servers.
191 # servers.
192 assert all(escapearg(k) == k for k in argsdict)
192 assert all(escapearg(k) == k for k in argsdict)
193
193
194 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
194 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
195 for k, v in argsdict.iteritems())
195 for k, v in argsdict.iteritems())
196 cmds.append('%s %s' % (op, args))
196 cmds.append('%s %s' % (op, args))
197
197
198 return ';'.join(cmds)
198 return ';'.join(cmds)
199
199
200 # mapping of options accepted by getbundle and their types
200 # mapping of options accepted by getbundle and their types
201 #
201 #
202 # Meant to be extended by extensions. It is extensions responsibility to ensure
202 # Meant to be extended by extensions. It is extensions responsibility to ensure
203 # such options are properly processed in exchange.getbundle.
203 # such options are properly processed in exchange.getbundle.
204 #
204 #
205 # supported types are:
205 # supported types are:
206 #
206 #
207 # :nodes: list of binary nodes
207 # :nodes: list of binary nodes
208 # :csv: list of comma-separated values
208 # :csv: list of comma-separated values
209 # :scsv: list of comma-separated values return as set
209 # :scsv: list of comma-separated values return as set
210 # :plain: string with no transformation needed.
210 # :plain: string with no transformation needed.
211 gboptsmap = {'heads': 'nodes',
211 gboptsmap = {'heads': 'nodes',
212 'common': 'nodes',
212 'common': 'nodes',
213 'obsmarkers': 'boolean',
213 'obsmarkers': 'boolean',
214 'bundlecaps': 'scsv',
214 'bundlecaps': 'scsv',
215 'listkeys': 'csv',
215 'listkeys': 'csv',
216 'cg': 'boolean',
216 'cg': 'boolean',
217 'cbattempted': 'boolean'}
217 'cbattempted': 'boolean'}
218
218
219 # client side
219 # client side
220
220
221 class wirepeer(peer.peerrepository):
221 class wirepeer(peer.peerrepository):
222 """Client-side interface for communicating with a peer repository.
222 """Client-side interface for communicating with a peer repository.
223
223
224 Methods commonly call wire protocol commands of the same name.
224 Methods commonly call wire protocol commands of the same name.
225
225
226 See also httppeer.py and sshpeer.py for protocol-specific
226 See also httppeer.py and sshpeer.py for protocol-specific
227 implementations of this interface.
227 implementations of this interface.
228 """
228 """
229 def batch(self):
229 def batch(self):
230 if self.capable('batch'):
230 if self.capable('batch'):
231 return remotebatch(self)
231 return remotebatch(self)
232 else:
232 else:
233 return peer.localbatch(self)
233 return peer.localbatch(self)
234 def _submitbatch(self, req):
234 def _submitbatch(self, req):
235 """run batch request <req> on the server
235 """run batch request <req> on the server
236
236
237 Returns an iterator of the raw responses from the server.
237 Returns an iterator of the raw responses from the server.
238 """
238 """
239 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
239 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
240 chunk = rsp.read(1024)
240 chunk = rsp.read(1024)
241 work = [chunk]
241 work = [chunk]
242 while chunk:
242 while chunk:
243 while ';' not in chunk and chunk:
243 while ';' not in chunk and chunk:
244 chunk = rsp.read(1024)
244 chunk = rsp.read(1024)
245 work.append(chunk)
245 work.append(chunk)
246 merged = ''.join(work)
246 merged = ''.join(work)
247 while ';' in merged:
247 while ';' in merged:
248 one, merged = merged.split(';', 1)
248 one, merged = merged.split(';', 1)
249 yield unescapearg(one)
249 yield unescapearg(one)
250 chunk = rsp.read(1024)
250 chunk = rsp.read(1024)
251 work = [merged, chunk]
251 work = [merged, chunk]
252 yield unescapearg(''.join(work))
252 yield unescapearg(''.join(work))
253
253
254 def _submitone(self, op, args):
254 def _submitone(self, op, args):
255 return self._call(op, **args)
255 return self._call(op, **args)
256
256
257 def iterbatch(self):
257 def iterbatch(self):
258 return remoteiterbatcher(self)
258 return remoteiterbatcher(self)
259
259
260 @batchable
260 @batchable
261 def lookup(self, key):
261 def lookup(self, key):
262 self.requirecap('lookup', _('look up remote revision'))
262 self.requirecap('lookup', _('look up remote revision'))
263 f = future()
263 f = future()
264 yield {'key': encoding.fromlocal(key)}, f
264 yield {'key': encoding.fromlocal(key)}, f
265 d = f.value
265 d = f.value
266 success, data = d[:-1].split(" ", 1)
266 success, data = d[:-1].split(" ", 1)
267 if int(success):
267 if int(success):
268 yield bin(data)
268 yield bin(data)
269 self._abort(error.RepoError(data))
269 self._abort(error.RepoError(data))
270
270
271 @batchable
271 @batchable
272 def heads(self):
272 def heads(self):
273 f = future()
273 f = future()
274 yield {}, f
274 yield {}, f
275 d = f.value
275 d = f.value
276 try:
276 try:
277 yield decodelist(d[:-1])
277 yield decodelist(d[:-1])
278 except ValueError:
278 except ValueError:
279 self._abort(error.ResponseError(_("unexpected response:"), d))
279 self._abort(error.ResponseError(_("unexpected response:"), d))
280
280
281 @batchable
281 @batchable
282 def known(self, nodes):
282 def known(self, nodes):
283 f = future()
283 f = future()
284 yield {'nodes': encodelist(nodes)}, f
284 yield {'nodes': encodelist(nodes)}, f
285 d = f.value
285 d = f.value
286 try:
286 try:
287 yield [bool(int(b)) for b in d]
287 yield [bool(int(b)) for b in d]
288 except ValueError:
288 except ValueError:
289 self._abort(error.ResponseError(_("unexpected response:"), d))
289 self._abort(error.ResponseError(_("unexpected response:"), d))
290
290
291 @batchable
291 @batchable
292 def branchmap(self):
292 def branchmap(self):
293 f = future()
293 f = future()
294 yield {}, f
294 yield {}, f
295 d = f.value
295 d = f.value
296 try:
296 try:
297 branchmap = {}
297 branchmap = {}
298 for branchpart in d.splitlines():
298 for branchpart in d.splitlines():
299 branchname, branchheads = branchpart.split(' ', 1)
299 branchname, branchheads = branchpart.split(' ', 1)
300 branchname = encoding.tolocal(urlreq.unquote(branchname))
300 branchname = encoding.tolocal(urlreq.unquote(branchname))
301 branchheads = decodelist(branchheads)
301 branchheads = decodelist(branchheads)
302 branchmap[branchname] = branchheads
302 branchmap[branchname] = branchheads
303 yield branchmap
303 yield branchmap
304 except TypeError:
304 except TypeError:
305 self._abort(error.ResponseError(_("unexpected response:"), d))
305 self._abort(error.ResponseError(_("unexpected response:"), d))
306
306
307 def branches(self, nodes):
307 def branches(self, nodes):
308 n = encodelist(nodes)
308 n = encodelist(nodes)
309 d = self._call("branches", nodes=n)
309 d = self._call("branches", nodes=n)
310 try:
310 try:
311 br = [tuple(decodelist(b)) for b in d.splitlines()]
311 br = [tuple(decodelist(b)) for b in d.splitlines()]
312 return br
312 return br
313 except ValueError:
313 except ValueError:
314 self._abort(error.ResponseError(_("unexpected response:"), d))
314 self._abort(error.ResponseError(_("unexpected response:"), d))
315
315
316 def between(self, pairs):
316 def between(self, pairs):
317 batch = 8 # avoid giant requests
317 batch = 8 # avoid giant requests
318 r = []
318 r = []
319 for i in xrange(0, len(pairs), batch):
319 for i in xrange(0, len(pairs), batch):
320 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
320 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
321 d = self._call("between", pairs=n)
321 d = self._call("between", pairs=n)
322 try:
322 try:
323 r.extend(l and decodelist(l) or [] for l in d.splitlines())
323 r.extend(l and decodelist(l) or [] for l in d.splitlines())
324 except ValueError:
324 except ValueError:
325 self._abort(error.ResponseError(_("unexpected response:"), d))
325 self._abort(error.ResponseError(_("unexpected response:"), d))
326 return r
326 return r
327
327
328 @batchable
328 @batchable
329 def pushkey(self, namespace, key, old, new):
329 def pushkey(self, namespace, key, old, new):
330 if not self.capable('pushkey'):
330 if not self.capable('pushkey'):
331 yield False, None
331 yield False, None
332 f = future()
332 f = future()
333 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
333 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
334 yield {'namespace': encoding.fromlocal(namespace),
334 yield {'namespace': encoding.fromlocal(namespace),
335 'key': encoding.fromlocal(key),
335 'key': encoding.fromlocal(key),
336 'old': encoding.fromlocal(old),
336 'old': encoding.fromlocal(old),
337 'new': encoding.fromlocal(new)}, f
337 'new': encoding.fromlocal(new)}, f
338 d = f.value
338 d = f.value
339 d, output = d.split('\n', 1)
339 d, output = d.split('\n', 1)
340 try:
340 try:
341 d = bool(int(d))
341 d = bool(int(d))
342 except ValueError:
342 except ValueError:
343 raise error.ResponseError(
343 raise error.ResponseError(
344 _('push failed (unexpected response):'), d)
344 _('push failed (unexpected response):'), d)
345 for l in output.splitlines(True):
345 for l in output.splitlines(True):
346 self.ui.status(_('remote: '), l)
346 self.ui.status(_('remote: '), l)
347 yield d
347 yield d
348
348
349 @batchable
349 @batchable
350 def listkeys(self, namespace):
350 def listkeys(self, namespace):
351 if not self.capable('pushkey'):
351 if not self.capable('pushkey'):
352 yield {}, None
352 yield {}, None
353 f = future()
353 f = future()
354 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
354 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
355 yield {'namespace': encoding.fromlocal(namespace)}, f
355 yield {'namespace': encoding.fromlocal(namespace)}, f
356 d = f.value
356 d = f.value
357 self.ui.debug('received listkey for "%s": %i bytes\n'
357 self.ui.debug('received listkey for "%s": %i bytes\n'
358 % (namespace, len(d)))
358 % (namespace, len(d)))
359 yield pushkeymod.decodekeys(d)
359 yield pushkeymod.decodekeys(d)
360
360
361 def stream_out(self):
361 def stream_out(self):
362 return self._callstream('stream_out')
362 return self._callstream('stream_out')
363
363
364 def changegroup(self, nodes, kind):
364 def changegroup(self, nodes, kind):
365 n = encodelist(nodes)
365 n = encodelist(nodes)
366 f = self._callcompressable("changegroup", roots=n)
366 f = self._callcompressable("changegroup", roots=n)
367 return changegroupmod.cg1unpacker(f, 'UN')
367 return changegroupmod.cg1unpacker(f, 'UN')
368
368
369 def changegroupsubset(self, bases, heads, kind):
369 def changegroupsubset(self, bases, heads, kind):
370 self.requirecap('changegroupsubset', _('look up remote changes'))
370 self.requirecap('changegroupsubset', _('look up remote changes'))
371 bases = encodelist(bases)
371 bases = encodelist(bases)
372 heads = encodelist(heads)
372 heads = encodelist(heads)
373 f = self._callcompressable("changegroupsubset",
373 f = self._callcompressable("changegroupsubset",
374 bases=bases, heads=heads)
374 bases=bases, heads=heads)
375 return changegroupmod.cg1unpacker(f, 'UN')
375 return changegroupmod.cg1unpacker(f, 'UN')
376
376
377 def getbundle(self, source, **kwargs):
377 def getbundle(self, source, **kwargs):
378 self.requirecap('getbundle', _('look up remote changes'))
378 self.requirecap('getbundle', _('look up remote changes'))
379 opts = {}
379 opts = {}
380 bundlecaps = kwargs.get('bundlecaps')
380 bundlecaps = kwargs.get('bundlecaps')
381 if bundlecaps is not None:
381 if bundlecaps is not None:
382 kwargs['bundlecaps'] = sorted(bundlecaps)
382 kwargs['bundlecaps'] = sorted(bundlecaps)
383 else:
383 else:
384 bundlecaps = () # kwargs could have it to None
384 bundlecaps = () # kwargs could have it to None
385 for key, value in kwargs.iteritems():
385 for key, value in kwargs.iteritems():
386 if value is None:
386 if value is None:
387 continue
387 continue
388 keytype = gboptsmap.get(key)
388 keytype = gboptsmap.get(key)
389 if keytype is None:
389 if keytype is None:
390 assert False, 'unexpected'
390 assert False, 'unexpected'
391 elif keytype == 'nodes':
391 elif keytype == 'nodes':
392 value = encodelist(value)
392 value = encodelist(value)
393 elif keytype in ('csv', 'scsv'):
393 elif keytype in ('csv', 'scsv'):
394 value = ','.join(value)
394 value = ','.join(value)
395 elif keytype == 'boolean':
395 elif keytype == 'boolean':
396 value = '%i' % bool(value)
396 value = '%i' % bool(value)
397 elif keytype != 'plain':
397 elif keytype != 'plain':
398 raise KeyError('unknown getbundle option type %s'
398 raise KeyError('unknown getbundle option type %s'
399 % keytype)
399 % keytype)
400 opts[key] = value
400 opts[key] = value
401 f = self._callcompressable("getbundle", **opts)
401 f = self._callcompressable("getbundle", **opts)
402 if any((cap.startswith('HG2') for cap in bundlecaps)):
402 if any((cap.startswith('HG2') for cap in bundlecaps)):
403 return bundle2.getunbundler(self.ui, f)
403 return bundle2.getunbundler(self.ui, f)
404 else:
404 else:
405 return changegroupmod.cg1unpacker(f, 'UN')
405 return changegroupmod.cg1unpacker(f, 'UN')
406
406
407 def unbundle(self, cg, heads, url):
407 def unbundle(self, cg, heads, url):
408 '''Send cg (a readable file-like object representing the
408 '''Send cg (a readable file-like object representing the
409 changegroup to push, typically a chunkbuffer object) to the
409 changegroup to push, typically a chunkbuffer object) to the
410 remote server as a bundle.
410 remote server as a bundle.
411
411
412 When pushing a bundle10 stream, return an integer indicating the
412 When pushing a bundle10 stream, return an integer indicating the
413 result of the push (see localrepository.addchangegroup()).
413 result of the push (see localrepository.addchangegroup()).
414
414
415 When pushing a bundle20 stream, return a bundle20 stream.
415 When pushing a bundle20 stream, return a bundle20 stream.
416
416
417 `url` is the url the client thinks it's pushing to, which is
417 `url` is the url the client thinks it's pushing to, which is
418 visible to hooks.
418 visible to hooks.
419 '''
419 '''
420
420
421 if heads != ['force'] and self.capable('unbundlehash'):
421 if heads != ['force'] and self.capable('unbundlehash'):
422 heads = encodelist(['hashed',
422 heads = encodelist(['hashed',
423 hashlib.sha1(''.join(sorted(heads))).digest()])
423 hashlib.sha1(''.join(sorted(heads))).digest()])
424 else:
424 else:
425 heads = encodelist(heads)
425 heads = encodelist(heads)
426
426
427 if util.safehasattr(cg, 'deltaheader'):
427 if util.safehasattr(cg, 'deltaheader'):
428 # this a bundle10, do the old style call sequence
428 # this a bundle10, do the old style call sequence
429 ret, output = self._callpush("unbundle", cg, heads=heads)
429 ret, output = self._callpush("unbundle", cg, heads=heads)
430 if ret == "":
430 if ret == "":
431 raise error.ResponseError(
431 raise error.ResponseError(
432 _('push failed:'), output)
432 _('push failed:'), output)
433 try:
433 try:
434 ret = int(ret)
434 ret = int(ret)
435 except ValueError:
435 except ValueError:
436 raise error.ResponseError(
436 raise error.ResponseError(
437 _('push failed (unexpected response):'), ret)
437 _('push failed (unexpected response):'), ret)
438
438
439 for l in output.splitlines(True):
439 for l in output.splitlines(True):
440 self.ui.status(_('remote: '), l)
440 self.ui.status(_('remote: '), l)
441 else:
441 else:
442 # bundle2 push. Send a stream, fetch a stream.
442 # bundle2 push. Send a stream, fetch a stream.
443 stream = self._calltwowaystream('unbundle', cg, heads=heads)
443 stream = self._calltwowaystream('unbundle', cg, heads=heads)
444 ret = bundle2.getunbundler(self.ui, stream)
444 ret = bundle2.getunbundler(self.ui, stream)
445 return ret
445 return ret
446
446
447 def debugwireargs(self, one, two, three=None, four=None, five=None):
447 def debugwireargs(self, one, two, three=None, four=None, five=None):
448 # don't pass optional arguments left at their default value
448 # don't pass optional arguments left at their default value
449 opts = {}
449 opts = {}
450 if three is not None:
450 if three is not None:
451 opts['three'] = three
451 opts['three'] = three
452 if four is not None:
452 if four is not None:
453 opts['four'] = four
453 opts['four'] = four
454 return self._call('debugwireargs', one=one, two=two, **opts)
454 return self._call('debugwireargs', one=one, two=two, **opts)
455
455
456 def _call(self, cmd, **args):
456 def _call(self, cmd, **args):
457 """execute <cmd> on the server
457 """execute <cmd> on the server
458
458
459 The command is expected to return a simple string.
459 The command is expected to return a simple string.
460
460
461 returns the server reply as a string."""
461 returns the server reply as a string."""
462 raise NotImplementedError()
462 raise NotImplementedError()
463
463
464 def _callstream(self, cmd, **args):
464 def _callstream(self, cmd, **args):
465 """execute <cmd> on the server
465 """execute <cmd> on the server
466
466
467 The command is expected to return a stream. Note that if the
467 The command is expected to return a stream. Note that if the
468 command doesn't return a stream, _callstream behaves
468 command doesn't return a stream, _callstream behaves
469 differently for ssh and http peers.
469 differently for ssh and http peers.
470
470
471 returns the server reply as a file like object.
471 returns the server reply as a file like object.
472 """
472 """
473 raise NotImplementedError()
473 raise NotImplementedError()
474
474
475 def _callcompressable(self, cmd, **args):
475 def _callcompressable(self, cmd, **args):
476 """execute <cmd> on the server
476 """execute <cmd> on the server
477
477
478 The command is expected to return a stream.
478 The command is expected to return a stream.
479
479
480 The stream may have been compressed in some implementations. This
480 The stream may have been compressed in some implementations. This
481 function takes care of the decompression. This is the only difference
481 function takes care of the decompression. This is the only difference
482 with _callstream.
482 with _callstream.
483
483
484 returns the server reply as a file like object.
484 returns the server reply as a file like object.
485 """
485 """
486 raise NotImplementedError()
486 raise NotImplementedError()
487
487
488 def _callpush(self, cmd, fp, **args):
488 def _callpush(self, cmd, fp, **args):
489 """execute a <cmd> on server
489 """execute a <cmd> on server
490
490
491 The command is expected to be related to a push. Push has a special
491 The command is expected to be related to a push. Push has a special
492 return method.
492 return method.
493
493
494 returns the server reply as a (ret, output) tuple. ret is either
494 returns the server reply as a (ret, output) tuple. ret is either
495 empty (error) or a stringified int.
495 empty (error) or a stringified int.
496 """
496 """
497 raise NotImplementedError()
497 raise NotImplementedError()
498
498
499 def _calltwowaystream(self, cmd, fp, **args):
499 def _calltwowaystream(self, cmd, fp, **args):
500 """execute <cmd> on server
500 """execute <cmd> on server
501
501
502 The command will send a stream to the server and get a stream in reply.
502 The command will send a stream to the server and get a stream in reply.
503 """
503 """
504 raise NotImplementedError()
504 raise NotImplementedError()
505
505
506 def _abort(self, exception):
506 def _abort(self, exception):
507 """clearly abort the wire protocol connection and raise the exception
507 """clearly abort the wire protocol connection and raise the exception
508 """
508 """
509 raise NotImplementedError()
509 raise NotImplementedError()
510
510
511 # server side
511 # server side
512
512
513 # wire protocol command can either return a string or one of these classes.
513 # wire protocol command can either return a string or one of these classes.
514 class streamres(object):
514 class streamres(object):
515 """wireproto reply: binary stream
515 """wireproto reply: binary stream
516
516
517 The call was successful and the result is a stream.
517 The call was successful and the result is a stream.
518
518
519 Accepts either a generator or an object with a ``read(size)`` method.
519 Accepts either a generator or an object with a ``read(size)`` method.
520
520
521 ``v1compressible`` indicates whether this data can be compressed to
521 ``v1compressible`` indicates whether this data can be compressed to
522 "version 1" clients (technically: HTTP peers using
522 "version 1" clients (technically: HTTP peers using
523 application/mercurial-0.1 media type). This flag should NOT be used on
523 application/mercurial-0.1 media type). This flag should NOT be used on
524 new commands because new clients should support a more modern compression
524 new commands because new clients should support a more modern compression
525 mechanism.
525 mechanism.
526 """
526 """
527 def __init__(self, gen=None, reader=None, v1compressible=False):
527 def __init__(self, gen=None, reader=None, v1compressible=False):
528 self.gen = gen
528 self.gen = gen
529 self.reader = reader
529 self.reader = reader
530 self.v1compressible = v1compressible
530 self.v1compressible = v1compressible
531
531
532 class pushres(object):
532 class pushres(object):
533 """wireproto reply: success with simple integer return
533 """wireproto reply: success with simple integer return
534
534
535 The call was successful and returned an integer contained in `self.res`.
535 The call was successful and returned an integer contained in `self.res`.
536 """
536 """
537 def __init__(self, res):
537 def __init__(self, res):
538 self.res = res
538 self.res = res
539
539
540 class pusherr(object):
540 class pusherr(object):
541 """wireproto reply: failure
541 """wireproto reply: failure
542
542
543 The call failed. The `self.res` attribute contains the error message.
543 The call failed. The `self.res` attribute contains the error message.
544 """
544 """
545 def __init__(self, res):
545 def __init__(self, res):
546 self.res = res
546 self.res = res
547
547
548 class ooberror(object):
548 class ooberror(object):
549 """wireproto reply: failure of a batch of operation
549 """wireproto reply: failure of a batch of operation
550
550
551 Something failed during a batch call. The error message is stored in
551 Something failed during a batch call. The error message is stored in
552 `self.message`.
552 `self.message`.
553 """
553 """
554 def __init__(self, message):
554 def __init__(self, message):
555 self.message = message
555 self.message = message
556
556
557 def getdispatchrepo(repo, proto, command):
557 def getdispatchrepo(repo, proto, command):
558 """Obtain the repo used for processing wire protocol commands.
558 """Obtain the repo used for processing wire protocol commands.
559
559
560 The intent of this function is to serve as a monkeypatch point for
560 The intent of this function is to serve as a monkeypatch point for
561 extensions that need commands to operate on different repo views under
561 extensions that need commands to operate on different repo views under
562 specialized circumstances.
562 specialized circumstances.
563 """
563 """
564 return repo.filtered('served')
564 return repo.filtered('served')
565
565
566 def dispatch(repo, proto, command):
566 def dispatch(repo, proto, command):
567 repo = getdispatchrepo(repo, proto, command)
567 repo = getdispatchrepo(repo, proto, command)
568 func, spec = commands[command]
568 func, spec = commands[command]
569 args = proto.getargs(spec)
569 args = proto.getargs(spec)
570 return func(repo, proto, *args)
570 return func(repo, proto, *args)
571
571
572 def options(cmd, keys, others):
572 def options(cmd, keys, others):
573 opts = {}
573 opts = {}
574 for k in keys:
574 for k in keys:
575 if k in others:
575 if k in others:
576 opts[k] = others[k]
576 opts[k] = others[k]
577 del others[k]
577 del others[k]
578 if others:
578 if others:
579 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
579 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
580 % (cmd, ",".join(others)))
580 % (cmd, ",".join(others)))
581 return opts
581 return opts
582
582
583 def bundle1allowed(repo, action):
583 def bundle1allowed(repo, action):
584 """Whether a bundle1 operation is allowed from the server.
584 """Whether a bundle1 operation is allowed from the server.
585
585
586 Priority is:
586 Priority is:
587
587
588 1. server.bundle1gd.<action> (if generaldelta active)
588 1. server.bundle1gd.<action> (if generaldelta active)
589 2. server.bundle1.<action>
589 2. server.bundle1.<action>
590 3. server.bundle1gd (if generaldelta active)
590 3. server.bundle1gd (if generaldelta active)
591 4. server.bundle1
591 4. server.bundle1
592 """
592 """
593 ui = repo.ui
593 ui = repo.ui
594 gd = 'generaldelta' in repo.requirements
594 gd = 'generaldelta' in repo.requirements
595
595
596 if gd:
596 if gd:
597 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
597 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
598 if v is not None:
598 if v is not None:
599 return v
599 return v
600
600
601 v = ui.configbool('server', 'bundle1.%s' % action, None)
601 v = ui.configbool('server', 'bundle1.%s' % action, None)
602 if v is not None:
602 if v is not None:
603 return v
603 return v
604
604
605 if gd:
605 if gd:
606 v = ui.configbool('server', 'bundle1gd', None)
606 v = ui.configbool('server', 'bundle1gd', None)
607 if v is not None:
607 if v is not None:
608 return v
608 return v
609
609
610 return ui.configbool('server', 'bundle1', True)
610 return ui.configbool('server', 'bundle1', True)
611
611
612 def supportedcompengines(ui, proto, role):
612 def supportedcompengines(ui, proto, role):
613 """Obtain the list of supported compression engines for a request."""
613 """Obtain the list of supported compression engines for a request."""
614 assert role in (util.CLIENTROLE, util.SERVERROLE)
614 assert role in (util.CLIENTROLE, util.SERVERROLE)
615
615
616 compengines = util.compengines.supportedwireengines(role)
616 compengines = util.compengines.supportedwireengines(role)
617
617
618 # Allow config to override default list and ordering.
618 # Allow config to override default list and ordering.
619 if role == util.SERVERROLE:
619 if role == util.SERVERROLE:
620 configengines = ui.configlist('server', 'compressionengines')
620 configengines = ui.configlist('server', 'compressionengines')
621 config = 'server.compressionengines'
621 config = 'server.compressionengines'
622 else:
622 else:
623 # This is currently implemented mainly to facilitate testing. In most
623 # This is currently implemented mainly to facilitate testing. In most
624 # cases, the server should be in charge of choosing a compression engine
624 # cases, the server should be in charge of choosing a compression engine
625 # because a server has the most to lose from a sub-optimal choice. (e.g.
625 # because a server has the most to lose from a sub-optimal choice. (e.g.
626 # CPU DoS due to an expensive engine or a network DoS due to poor
626 # CPU DoS due to an expensive engine or a network DoS due to poor
627 # compression ratio).
627 # compression ratio).
628 configengines = ui.configlist('experimental',
628 configengines = ui.configlist('experimental',
629 'clientcompressionengines')
629 'clientcompressionengines')
630 config = 'experimental.clientcompressionengines'
630 config = 'experimental.clientcompressionengines'
631
631
632 # No explicit config. Filter out the ones that aren't supposed to be
632 # No explicit config. Filter out the ones that aren't supposed to be
633 # advertised and return default ordering.
633 # advertised and return default ordering.
634 if not configengines:
634 if not configengines:
635 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
635 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
636 return [e for e in compengines
636 return [e for e in compengines
637 if getattr(e.wireprotosupport(), attr) > 0]
637 if getattr(e.wireprotosupport(), attr) > 0]
638
638
639 # If compression engines are listed in the config, assume there is a good
639 # If compression engines are listed in the config, assume there is a good
640 # reason for it (like server operators wanting to achieve specific
640 # reason for it (like server operators wanting to achieve specific
641 # performance characteristics). So fail fast if the config references
641 # performance characteristics). So fail fast if the config references
642 # unusable compression engines.
642 # unusable compression engines.
643 validnames = set(e.name() for e in compengines)
643 validnames = set(e.name() for e in compengines)
644 invalidnames = set(e for e in configengines if e not in validnames)
644 invalidnames = set(e for e in configengines if e not in validnames)
645 if invalidnames:
645 if invalidnames:
646 raise error.Abort(_('invalid compression engine defined in %s: %s') %
646 raise error.Abort(_('invalid compression engine defined in %s: %s') %
647 (config, ', '.join(sorted(invalidnames))))
647 (config, ', '.join(sorted(invalidnames))))
648
648
649 compengines = [e for e in compengines if e.name() in configengines]
649 compengines = [e for e in compengines if e.name() in configengines]
650 compengines = sorted(compengines,
650 compengines = sorted(compengines,
651 key=lambda e: configengines.index(e.name()))
651 key=lambda e: configengines.index(e.name()))
652
652
653 if not compengines:
653 if not compengines:
654 raise error.Abort(_('%s config option does not specify any known '
654 raise error.Abort(_('%s config option does not specify any known '
655 'compression engines') % config,
655 'compression engines') % config,
656 hint=_('usable compression engines: %s') %
656 hint=_('usable compression engines: %s') %
657 ', '.sorted(validnames))
657 ', '.sorted(validnames))
658
658
659 return compengines
659 return compengines
660
660
661 # list of commands
661 # list of commands
662 commands = {}
662 commands = {}
663
663
664 def wireprotocommand(name, args=''):
664 def wireprotocommand(name, args=''):
665 """decorator for wire protocol command"""
665 """decorator for wire protocol command"""
666 def register(func):
666 def register(func):
667 commands[name] = (func, args)
667 commands[name] = (func, args)
668 return func
668 return func
669 return register
669 return register
670
670
671 @wireprotocommand('batch', 'cmds *')
671 @wireprotocommand('batch', 'cmds *')
672 def batch(repo, proto, cmds, others):
672 def batch(repo, proto, cmds, others):
673 repo = repo.filtered("served")
673 repo = repo.filtered("served")
674 res = []
674 res = []
675 for pair in cmds.split(';'):
675 for pair in cmds.split(';'):
676 op, args = pair.split(' ', 1)
676 op, args = pair.split(' ', 1)
677 vals = {}
677 vals = {}
678 for a in args.split(','):
678 for a in args.split(','):
679 if a:
679 if a:
680 n, v = a.split('=')
680 n, v = a.split('=')
681 vals[unescapearg(n)] = unescapearg(v)
681 vals[unescapearg(n)] = unescapearg(v)
682 func, spec = commands[op]
682 func, spec = commands[op]
683 if spec:
683 if spec:
684 keys = spec.split()
684 keys = spec.split()
685 data = {}
685 data = {}
686 for k in keys:
686 for k in keys:
687 if k == '*':
687 if k == '*':
688 star = {}
688 star = {}
689 for key in vals.keys():
689 for key in vals.keys():
690 if key not in keys:
690 if key not in keys:
691 star[key] = vals[key]
691 star[key] = vals[key]
692 data['*'] = star
692 data['*'] = star
693 else:
693 else:
694 data[k] = vals[k]
694 data[k] = vals[k]
695 result = func(repo, proto, *[data[k] for k in keys])
695 result = func(repo, proto, *[data[k] for k in keys])
696 else:
696 else:
697 result = func(repo, proto)
697 result = func(repo, proto)
698 if isinstance(result, ooberror):
698 if isinstance(result, ooberror):
699 return result
699 return result
700 res.append(escapearg(result))
700 res.append(escapearg(result))
701 return ';'.join(res)
701 return ';'.join(res)
702
702
703 @wireprotocommand('between', 'pairs')
703 @wireprotocommand('between', 'pairs')
704 def between(repo, proto, pairs):
704 def between(repo, proto, pairs):
705 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
705 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
706 r = []
706 r = []
707 for b in repo.between(pairs):
707 for b in repo.between(pairs):
708 r.append(encodelist(b) + "\n")
708 r.append(encodelist(b) + "\n")
709 return "".join(r)
709 return "".join(r)
710
710
711 @wireprotocommand('branchmap')
711 @wireprotocommand('branchmap')
712 def branchmap(repo, proto):
712 def branchmap(repo, proto):
713 branchmap = repo.branchmap()
713 branchmap = repo.branchmap()
714 heads = []
714 heads = []
715 for branch, nodes in branchmap.iteritems():
715 for branch, nodes in branchmap.iteritems():
716 branchname = urlreq.quote(encoding.fromlocal(branch))
716 branchname = urlreq.quote(encoding.fromlocal(branch))
717 branchnodes = encodelist(nodes)
717 branchnodes = encodelist(nodes)
718 heads.append('%s %s' % (branchname, branchnodes))
718 heads.append('%s %s' % (branchname, branchnodes))
719 return '\n'.join(heads)
719 return '\n'.join(heads)
720
720
721 @wireprotocommand('branches', 'nodes')
721 @wireprotocommand('branches', 'nodes')
722 def branches(repo, proto, nodes):
722 def branches(repo, proto, nodes):
723 nodes = decodelist(nodes)
723 nodes = decodelist(nodes)
724 r = []
724 r = []
725 for b in repo.branches(nodes):
725 for b in repo.branches(nodes):
726 r.append(encodelist(b) + "\n")
726 r.append(encodelist(b) + "\n")
727 return "".join(r)
727 return "".join(r)
728
728
729 @wireprotocommand('clonebundles', '')
729 @wireprotocommand('clonebundles', '')
730 def clonebundles(repo, proto):
730 def clonebundles(repo, proto):
731 """Server command for returning info for available bundles to seed clones.
731 """Server command for returning info for available bundles to seed clones.
732
732
733 Clients will parse this response and determine what bundle to fetch.
733 Clients will parse this response and determine what bundle to fetch.
734
734
735 Extensions may wrap this command to filter or dynamically emit data
735 Extensions may wrap this command to filter or dynamically emit data
736 depending on the request. e.g. you could advertise URLs for the closest
736 depending on the request. e.g. you could advertise URLs for the closest
737 data center given the client's IP address.
737 data center given the client's IP address.
738 """
738 """
739 return repo.opener.tryread('clonebundles.manifest')
739 return repo.vfs.tryread('clonebundles.manifest')
740
740
741 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
741 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
742 'known', 'getbundle', 'unbundlehash', 'batch']
742 'known', 'getbundle', 'unbundlehash', 'batch']
743
743
744 def _capabilities(repo, proto):
744 def _capabilities(repo, proto):
745 """return a list of capabilities for a repo
745 """return a list of capabilities for a repo
746
746
747 This function exists to allow extensions to easily wrap capabilities
747 This function exists to allow extensions to easily wrap capabilities
748 computation
748 computation
749
749
750 - returns a lists: easy to alter
750 - returns a lists: easy to alter
751 - change done here will be propagated to both `capabilities` and `hello`
751 - change done here will be propagated to both `capabilities` and `hello`
752 command without any other action needed.
752 command without any other action needed.
753 """
753 """
754 # copy to prevent modification of the global list
754 # copy to prevent modification of the global list
755 caps = list(wireprotocaps)
755 caps = list(wireprotocaps)
756 if streamclone.allowservergeneration(repo.ui):
756 if streamclone.allowservergeneration(repo.ui):
757 if repo.ui.configbool('server', 'preferuncompressed', False):
757 if repo.ui.configbool('server', 'preferuncompressed', False):
758 caps.append('stream-preferred')
758 caps.append('stream-preferred')
759 requiredformats = repo.requirements & repo.supportedformats
759 requiredformats = repo.requirements & repo.supportedformats
760 # if our local revlogs are just revlogv1, add 'stream' cap
760 # if our local revlogs are just revlogv1, add 'stream' cap
761 if not requiredformats - set(('revlogv1',)):
761 if not requiredformats - set(('revlogv1',)):
762 caps.append('stream')
762 caps.append('stream')
763 # otherwise, add 'streamreqs' detailing our local revlog format
763 # otherwise, add 'streamreqs' detailing our local revlog format
764 else:
764 else:
765 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
765 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
766 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
766 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
767 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
767 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
768 caps.append('bundle2=' + urlreq.quote(capsblob))
768 caps.append('bundle2=' + urlreq.quote(capsblob))
769 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
769 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
770
770
771 if proto.name == 'http':
771 if proto.name == 'http':
772 caps.append('httpheader=%d' %
772 caps.append('httpheader=%d' %
773 repo.ui.configint('server', 'maxhttpheaderlen', 1024))
773 repo.ui.configint('server', 'maxhttpheaderlen', 1024))
774 if repo.ui.configbool('experimental', 'httppostargs', False):
774 if repo.ui.configbool('experimental', 'httppostargs', False):
775 caps.append('httppostargs')
775 caps.append('httppostargs')
776
776
777 # FUTURE advertise 0.2rx once support is implemented
777 # FUTURE advertise 0.2rx once support is implemented
778 # FUTURE advertise minrx and mintx after consulting config option
778 # FUTURE advertise minrx and mintx after consulting config option
779 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
779 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
780
780
781 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
781 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
782 if compengines:
782 if compengines:
783 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
783 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
784 for e in compengines)
784 for e in compengines)
785 caps.append('compression=%s' % comptypes)
785 caps.append('compression=%s' % comptypes)
786
786
787 return caps
787 return caps
788
788
789 # If you are writing an extension and consider wrapping this function. Wrap
789 # If you are writing an extension and consider wrapping this function. Wrap
790 # `_capabilities` instead.
790 # `_capabilities` instead.
791 @wireprotocommand('capabilities')
791 @wireprotocommand('capabilities')
792 def capabilities(repo, proto):
792 def capabilities(repo, proto):
793 return ' '.join(_capabilities(repo, proto))
793 return ' '.join(_capabilities(repo, proto))
794
794
795 @wireprotocommand('changegroup', 'roots')
795 @wireprotocommand('changegroup', 'roots')
796 def changegroup(repo, proto, roots):
796 def changegroup(repo, proto, roots):
797 nodes = decodelist(roots)
797 nodes = decodelist(roots)
798 cg = changegroupmod.changegroup(repo, nodes, 'serve')
798 cg = changegroupmod.changegroup(repo, nodes, 'serve')
799 return streamres(reader=cg, v1compressible=True)
799 return streamres(reader=cg, v1compressible=True)
800
800
801 @wireprotocommand('changegroupsubset', 'bases heads')
801 @wireprotocommand('changegroupsubset', 'bases heads')
802 def changegroupsubset(repo, proto, bases, heads):
802 def changegroupsubset(repo, proto, bases, heads):
803 bases = decodelist(bases)
803 bases = decodelist(bases)
804 heads = decodelist(heads)
804 heads = decodelist(heads)
805 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
805 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
806 return streamres(reader=cg, v1compressible=True)
806 return streamres(reader=cg, v1compressible=True)
807
807
808 @wireprotocommand('debugwireargs', 'one two *')
808 @wireprotocommand('debugwireargs', 'one two *')
809 def debugwireargs(repo, proto, one, two, others):
809 def debugwireargs(repo, proto, one, two, others):
810 # only accept optional args from the known set
810 # only accept optional args from the known set
811 opts = options('debugwireargs', ['three', 'four'], others)
811 opts = options('debugwireargs', ['three', 'four'], others)
812 return repo.debugwireargs(one, two, **opts)
812 return repo.debugwireargs(one, two, **opts)
813
813
814 @wireprotocommand('getbundle', '*')
814 @wireprotocommand('getbundle', '*')
815 def getbundle(repo, proto, others):
815 def getbundle(repo, proto, others):
816 opts = options('getbundle', gboptsmap.keys(), others)
816 opts = options('getbundle', gboptsmap.keys(), others)
817 for k, v in opts.iteritems():
817 for k, v in opts.iteritems():
818 keytype = gboptsmap[k]
818 keytype = gboptsmap[k]
819 if keytype == 'nodes':
819 if keytype == 'nodes':
820 opts[k] = decodelist(v)
820 opts[k] = decodelist(v)
821 elif keytype == 'csv':
821 elif keytype == 'csv':
822 opts[k] = list(v.split(','))
822 opts[k] = list(v.split(','))
823 elif keytype == 'scsv':
823 elif keytype == 'scsv':
824 opts[k] = set(v.split(','))
824 opts[k] = set(v.split(','))
825 elif keytype == 'boolean':
825 elif keytype == 'boolean':
826 # Client should serialize False as '0', which is a non-empty string
826 # Client should serialize False as '0', which is a non-empty string
827 # so it evaluates as a True bool.
827 # so it evaluates as a True bool.
828 if v == '0':
828 if v == '0':
829 opts[k] = False
829 opts[k] = False
830 else:
830 else:
831 opts[k] = bool(v)
831 opts[k] = bool(v)
832 elif keytype != 'plain':
832 elif keytype != 'plain':
833 raise KeyError('unknown getbundle option type %s'
833 raise KeyError('unknown getbundle option type %s'
834 % keytype)
834 % keytype)
835
835
836 if not bundle1allowed(repo, 'pull'):
836 if not bundle1allowed(repo, 'pull'):
837 if not exchange.bundle2requested(opts.get('bundlecaps')):
837 if not exchange.bundle2requested(opts.get('bundlecaps')):
838 if proto.name == 'http':
838 if proto.name == 'http':
839 return ooberror(bundle2required)
839 return ooberror(bundle2required)
840 raise error.Abort(bundle2requiredmain,
840 raise error.Abort(bundle2requiredmain,
841 hint=bundle2requiredhint)
841 hint=bundle2requiredhint)
842
842
843 try:
843 try:
844 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
844 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
845 except error.Abort as exc:
845 except error.Abort as exc:
846 # cleanly forward Abort error to the client
846 # cleanly forward Abort error to the client
847 if not exchange.bundle2requested(opts.get('bundlecaps')):
847 if not exchange.bundle2requested(opts.get('bundlecaps')):
848 if proto.name == 'http':
848 if proto.name == 'http':
849 return ooberror(str(exc) + '\n')
849 return ooberror(str(exc) + '\n')
850 raise # cannot do better for bundle1 + ssh
850 raise # cannot do better for bundle1 + ssh
851 # bundle2 request expect a bundle2 reply
851 # bundle2 request expect a bundle2 reply
852 bundler = bundle2.bundle20(repo.ui)
852 bundler = bundle2.bundle20(repo.ui)
853 manargs = [('message', str(exc))]
853 manargs = [('message', str(exc))]
854 advargs = []
854 advargs = []
855 if exc.hint is not None:
855 if exc.hint is not None:
856 advargs.append(('hint', exc.hint))
856 advargs.append(('hint', exc.hint))
857 bundler.addpart(bundle2.bundlepart('error:abort',
857 bundler.addpart(bundle2.bundlepart('error:abort',
858 manargs, advargs))
858 manargs, advargs))
859 return streamres(gen=bundler.getchunks(), v1compressible=True)
859 return streamres(gen=bundler.getchunks(), v1compressible=True)
860 return streamres(gen=chunks, v1compressible=True)
860 return streamres(gen=chunks, v1compressible=True)
861
861
862 @wireprotocommand('heads')
862 @wireprotocommand('heads')
863 def heads(repo, proto):
863 def heads(repo, proto):
864 h = repo.heads()
864 h = repo.heads()
865 return encodelist(h) + "\n"
865 return encodelist(h) + "\n"
866
866
867 @wireprotocommand('hello')
867 @wireprotocommand('hello')
868 def hello(repo, proto):
868 def hello(repo, proto):
869 '''the hello command returns a set of lines describing various
869 '''the hello command returns a set of lines describing various
870 interesting things about the server, in an RFC822-like format.
870 interesting things about the server, in an RFC822-like format.
871 Currently the only one defined is "capabilities", which
871 Currently the only one defined is "capabilities", which
872 consists of a line in the form:
872 consists of a line in the form:
873
873
874 capabilities: space separated list of tokens
874 capabilities: space separated list of tokens
875 '''
875 '''
876 return "capabilities: %s\n" % (capabilities(repo, proto))
876 return "capabilities: %s\n" % (capabilities(repo, proto))
877
877
878 @wireprotocommand('listkeys', 'namespace')
878 @wireprotocommand('listkeys', 'namespace')
879 def listkeys(repo, proto, namespace):
879 def listkeys(repo, proto, namespace):
880 d = repo.listkeys(encoding.tolocal(namespace)).items()
880 d = repo.listkeys(encoding.tolocal(namespace)).items()
881 return pushkeymod.encodekeys(d)
881 return pushkeymod.encodekeys(d)
882
882
883 @wireprotocommand('lookup', 'key')
883 @wireprotocommand('lookup', 'key')
884 def lookup(repo, proto, key):
884 def lookup(repo, proto, key):
885 try:
885 try:
886 k = encoding.tolocal(key)
886 k = encoding.tolocal(key)
887 c = repo[k]
887 c = repo[k]
888 r = c.hex()
888 r = c.hex()
889 success = 1
889 success = 1
890 except Exception as inst:
890 except Exception as inst:
891 r = str(inst)
891 r = str(inst)
892 success = 0
892 success = 0
893 return "%s %s\n" % (success, r)
893 return "%s %s\n" % (success, r)
894
894
895 @wireprotocommand('known', 'nodes *')
895 @wireprotocommand('known', 'nodes *')
896 def known(repo, proto, nodes, others):
896 def known(repo, proto, nodes, others):
897 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
897 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
898
898
899 @wireprotocommand('pushkey', 'namespace key old new')
899 @wireprotocommand('pushkey', 'namespace key old new')
900 def pushkey(repo, proto, namespace, key, old, new):
900 def pushkey(repo, proto, namespace, key, old, new):
901 # compatibility with pre-1.8 clients which were accidentally
901 # compatibility with pre-1.8 clients which were accidentally
902 # sending raw binary nodes rather than utf-8-encoded hex
902 # sending raw binary nodes rather than utf-8-encoded hex
903 if len(new) == 20 and new.encode('string-escape') != new:
903 if len(new) == 20 and new.encode('string-escape') != new:
904 # looks like it could be a binary node
904 # looks like it could be a binary node
905 try:
905 try:
906 new.decode('utf-8')
906 new.decode('utf-8')
907 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
907 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
908 except UnicodeDecodeError:
908 except UnicodeDecodeError:
909 pass # binary, leave unmodified
909 pass # binary, leave unmodified
910 else:
910 else:
911 new = encoding.tolocal(new) # normal path
911 new = encoding.tolocal(new) # normal path
912
912
913 if util.safehasattr(proto, 'restore'):
913 if util.safehasattr(proto, 'restore'):
914
914
915 proto.redirect()
915 proto.redirect()
916
916
917 try:
917 try:
918 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
918 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
919 encoding.tolocal(old), new) or False
919 encoding.tolocal(old), new) or False
920 except error.Abort:
920 except error.Abort:
921 r = False
921 r = False
922
922
923 output = proto.restore()
923 output = proto.restore()
924
924
925 return '%s\n%s' % (int(r), output)
925 return '%s\n%s' % (int(r), output)
926
926
927 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
927 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
928 encoding.tolocal(old), new)
928 encoding.tolocal(old), new)
929 return '%s\n' % int(r)
929 return '%s\n' % int(r)
930
930
931 @wireprotocommand('stream_out')
931 @wireprotocommand('stream_out')
932 def stream(repo, proto):
932 def stream(repo, proto):
933 '''If the server supports streaming clone, it advertises the "stream"
933 '''If the server supports streaming clone, it advertises the "stream"
934 capability with a value representing the version and flags of the repo
934 capability with a value representing the version and flags of the repo
935 it is serving. Client checks to see if it understands the format.
935 it is serving. Client checks to see if it understands the format.
936 '''
936 '''
937 if not streamclone.allowservergeneration(repo.ui):
937 if not streamclone.allowservergeneration(repo.ui):
938 return '1\n'
938 return '1\n'
939
939
940 def getstream(it):
940 def getstream(it):
941 yield '0\n'
941 yield '0\n'
942 for chunk in it:
942 for chunk in it:
943 yield chunk
943 yield chunk
944
944
945 try:
945 try:
946 # LockError may be raised before the first result is yielded. Don't
946 # LockError may be raised before the first result is yielded. Don't
947 # emit output until we're sure we got the lock successfully.
947 # emit output until we're sure we got the lock successfully.
948 it = streamclone.generatev1wireproto(repo)
948 it = streamclone.generatev1wireproto(repo)
949 return streamres(gen=getstream(it))
949 return streamres(gen=getstream(it))
950 except error.LockError:
950 except error.LockError:
951 return '2\n'
951 return '2\n'
952
952
953 @wireprotocommand('unbundle', 'heads')
953 @wireprotocommand('unbundle', 'heads')
954 def unbundle(repo, proto, heads):
954 def unbundle(repo, proto, heads):
955 their_heads = decodelist(heads)
955 their_heads = decodelist(heads)
956
956
957 try:
957 try:
958 proto.redirect()
958 proto.redirect()
959
959
960 exchange.check_heads(repo, their_heads, 'preparing changes')
960 exchange.check_heads(repo, their_heads, 'preparing changes')
961
961
962 # write bundle data to temporary file because it can be big
962 # write bundle data to temporary file because it can be big
963 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
963 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
964 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
964 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
965 r = 0
965 r = 0
966 try:
966 try:
967 proto.getfile(fp)
967 proto.getfile(fp)
968 fp.seek(0)
968 fp.seek(0)
969 gen = exchange.readbundle(repo.ui, fp, None)
969 gen = exchange.readbundle(repo.ui, fp, None)
970 if (isinstance(gen, changegroupmod.cg1unpacker)
970 if (isinstance(gen, changegroupmod.cg1unpacker)
971 and not bundle1allowed(repo, 'push')):
971 and not bundle1allowed(repo, 'push')):
972 if proto.name == 'http':
972 if proto.name == 'http':
973 # need to special case http because stderr do not get to
973 # need to special case http because stderr do not get to
974 # the http client on failed push so we need to abuse some
974 # the http client on failed push so we need to abuse some
975 # other error type to make sure the message get to the
975 # other error type to make sure the message get to the
976 # user.
976 # user.
977 return ooberror(bundle2required)
977 return ooberror(bundle2required)
978 raise error.Abort(bundle2requiredmain,
978 raise error.Abort(bundle2requiredmain,
979 hint=bundle2requiredhint)
979 hint=bundle2requiredhint)
980
980
981 r = exchange.unbundle(repo, gen, their_heads, 'serve',
981 r = exchange.unbundle(repo, gen, their_heads, 'serve',
982 proto._client())
982 proto._client())
983 if util.safehasattr(r, 'addpart'):
983 if util.safehasattr(r, 'addpart'):
984 # The return looks streamable, we are in the bundle2 case and
984 # The return looks streamable, we are in the bundle2 case and
985 # should return a stream.
985 # should return a stream.
986 return streamres(gen=r.getchunks())
986 return streamres(gen=r.getchunks())
987 return pushres(r)
987 return pushres(r)
988
988
989 finally:
989 finally:
990 fp.close()
990 fp.close()
991 os.unlink(tempname)
991 os.unlink(tempname)
992
992
993 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
993 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
994 # handle non-bundle2 case first
994 # handle non-bundle2 case first
995 if not getattr(exc, 'duringunbundle2', False):
995 if not getattr(exc, 'duringunbundle2', False):
996 try:
996 try:
997 raise
997 raise
998 except error.Abort:
998 except error.Abort:
999 # The old code we moved used util.stderr directly.
999 # The old code we moved used util.stderr directly.
1000 # We did not change it to minimise code change.
1000 # We did not change it to minimise code change.
1001 # This need to be moved to something proper.
1001 # This need to be moved to something proper.
1002 # Feel free to do it.
1002 # Feel free to do it.
1003 util.stderr.write("abort: %s\n" % exc)
1003 util.stderr.write("abort: %s\n" % exc)
1004 if exc.hint is not None:
1004 if exc.hint is not None:
1005 util.stderr.write("(%s)\n" % exc.hint)
1005 util.stderr.write("(%s)\n" % exc.hint)
1006 return pushres(0)
1006 return pushres(0)
1007 except error.PushRaced:
1007 except error.PushRaced:
1008 return pusherr(str(exc))
1008 return pusherr(str(exc))
1009
1009
1010 bundler = bundle2.bundle20(repo.ui)
1010 bundler = bundle2.bundle20(repo.ui)
1011 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1011 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1012 bundler.addpart(out)
1012 bundler.addpart(out)
1013 try:
1013 try:
1014 try:
1014 try:
1015 raise
1015 raise
1016 except error.PushkeyFailed as exc:
1016 except error.PushkeyFailed as exc:
1017 # check client caps
1017 # check client caps
1018 remotecaps = getattr(exc, '_replycaps', None)
1018 remotecaps = getattr(exc, '_replycaps', None)
1019 if (remotecaps is not None
1019 if (remotecaps is not None
1020 and 'pushkey' not in remotecaps.get('error', ())):
1020 and 'pushkey' not in remotecaps.get('error', ())):
1021 # no support remote side, fallback to Abort handler.
1021 # no support remote side, fallback to Abort handler.
1022 raise
1022 raise
1023 part = bundler.newpart('error:pushkey')
1023 part = bundler.newpart('error:pushkey')
1024 part.addparam('in-reply-to', exc.partid)
1024 part.addparam('in-reply-to', exc.partid)
1025 if exc.namespace is not None:
1025 if exc.namespace is not None:
1026 part.addparam('namespace', exc.namespace, mandatory=False)
1026 part.addparam('namespace', exc.namespace, mandatory=False)
1027 if exc.key is not None:
1027 if exc.key is not None:
1028 part.addparam('key', exc.key, mandatory=False)
1028 part.addparam('key', exc.key, mandatory=False)
1029 if exc.new is not None:
1029 if exc.new is not None:
1030 part.addparam('new', exc.new, mandatory=False)
1030 part.addparam('new', exc.new, mandatory=False)
1031 if exc.old is not None:
1031 if exc.old is not None:
1032 part.addparam('old', exc.old, mandatory=False)
1032 part.addparam('old', exc.old, mandatory=False)
1033 if exc.ret is not None:
1033 if exc.ret is not None:
1034 part.addparam('ret', exc.ret, mandatory=False)
1034 part.addparam('ret', exc.ret, mandatory=False)
1035 except error.BundleValueError as exc:
1035 except error.BundleValueError as exc:
1036 errpart = bundler.newpart('error:unsupportedcontent')
1036 errpart = bundler.newpart('error:unsupportedcontent')
1037 if exc.parttype is not None:
1037 if exc.parttype is not None:
1038 errpart.addparam('parttype', exc.parttype)
1038 errpart.addparam('parttype', exc.parttype)
1039 if exc.params:
1039 if exc.params:
1040 errpart.addparam('params', '\0'.join(exc.params))
1040 errpart.addparam('params', '\0'.join(exc.params))
1041 except error.Abort as exc:
1041 except error.Abort as exc:
1042 manargs = [('message', str(exc))]
1042 manargs = [('message', str(exc))]
1043 advargs = []
1043 advargs = []
1044 if exc.hint is not None:
1044 if exc.hint is not None:
1045 advargs.append(('hint', exc.hint))
1045 advargs.append(('hint', exc.hint))
1046 bundler.addpart(bundle2.bundlepart('error:abort',
1046 bundler.addpart(bundle2.bundlepart('error:abort',
1047 manargs, advargs))
1047 manargs, advargs))
1048 except error.PushRaced as exc:
1048 except error.PushRaced as exc:
1049 bundler.newpart('error:pushraced', [('message', str(exc))])
1049 bundler.newpart('error:pushraced', [('message', str(exc))])
1050 return streamres(gen=bundler.getchunks())
1050 return streamres(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now