##// END OF EJS Templates
bundle2: rename b2x:error:unknownpart to b2x:error:unsupportedcontent...
Pierre-Yves David -
r21619:292331e9 default
parent child Browse files
Show More
@@ -1,839 +1,839 b''
1 # bundle2.py - generic container format to transmit arbitrary data.
1 # bundle2.py - generic container format to transmit arbitrary data.
2 #
2 #
3 # Copyright 2013 Facebook, Inc.
3 # Copyright 2013 Facebook, Inc.
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7 """Handling of the new bundle2 format
7 """Handling of the new bundle2 format
8
8
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
9 The goal of bundle2 is to act as an atomically packet to transmit a set of
10 payloads in an application agnostic way. It consist in a sequence of "parts"
10 payloads in an application agnostic way. It consist in a sequence of "parts"
11 that will be handed to and processed by the application layer.
11 that will be handed to and processed by the application layer.
12
12
13
13
14 General format architecture
14 General format architecture
15 ===========================
15 ===========================
16
16
17 The format is architectured as follow
17 The format is architectured as follow
18
18
19 - magic string
19 - magic string
20 - stream level parameters
20 - stream level parameters
21 - payload parts (any number)
21 - payload parts (any number)
22 - end of stream marker.
22 - end of stream marker.
23
23
24 the Binary format
24 the Binary format
25 ============================
25 ============================
26
26
27 All numbers are unsigned and big-endian.
27 All numbers are unsigned and big-endian.
28
28
29 stream level parameters
29 stream level parameters
30 ------------------------
30 ------------------------
31
31
32 Binary format is as follow
32 Binary format is as follow
33
33
34 :params size: (16 bits integer)
34 :params size: (16 bits integer)
35
35
36 The total number of Bytes used by the parameters
36 The total number of Bytes used by the parameters
37
37
38 :params value: arbitrary number of Bytes
38 :params value: arbitrary number of Bytes
39
39
40 A blob of `params size` containing the serialized version of all stream level
40 A blob of `params size` containing the serialized version of all stream level
41 parameters.
41 parameters.
42
42
43 The blob contains a space separated list of parameters. Parameters with value
43 The blob contains a space separated list of parameters. Parameters with value
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
44 are stored in the form `<name>=<value>`. Both name and value are urlquoted.
45
45
46 Empty name are obviously forbidden.
46 Empty name are obviously forbidden.
47
47
48 Name MUST start with a letter. If this first letter is lower case, the
48 Name MUST start with a letter. If this first letter is lower case, the
49 parameter is advisory and can be safely ignored. However when the first
49 parameter is advisory and can be safely ignored. However when the first
50 letter is capital, the parameter is mandatory and the bundling process MUST
50 letter is capital, the parameter is mandatory and the bundling process MUST
51 stop if he is not able to proceed it.
51 stop if he is not able to proceed it.
52
52
53 Stream parameters use a simple textual format for two main reasons:
53 Stream parameters use a simple textual format for two main reasons:
54
54
55 - Stream level parameters should remain simple and we want to discourage any
55 - Stream level parameters should remain simple and we want to discourage any
56 crazy usage.
56 crazy usage.
57 - Textual data allow easy human inspection of a bundle2 header in case of
57 - Textual data allow easy human inspection of a bundle2 header in case of
58 troubles.
58 troubles.
59
59
60 Any Applicative level options MUST go into a bundle2 part instead.
60 Any Applicative level options MUST go into a bundle2 part instead.
61
61
62 Payload part
62 Payload part
63 ------------------------
63 ------------------------
64
64
65 Binary format is as follow
65 Binary format is as follow
66
66
67 :header size: (16 bits inter)
67 :header size: (16 bits inter)
68
68
69 The total number of Bytes used by the part headers. When the header is empty
69 The total number of Bytes used by the part headers. When the header is empty
70 (size = 0) this is interpreted as the end of stream marker.
70 (size = 0) this is interpreted as the end of stream marker.
71
71
72 :header:
72 :header:
73
73
74 The header defines how to interpret the part. It contains two piece of
74 The header defines how to interpret the part. It contains two piece of
75 data: the part type, and the part parameters.
75 data: the part type, and the part parameters.
76
76
77 The part type is used to route an application level handler, that can
77 The part type is used to route an application level handler, that can
78 interpret payload.
78 interpret payload.
79
79
80 Part parameters are passed to the application level handler. They are
80 Part parameters are passed to the application level handler. They are
81 meant to convey information that will help the application level object to
81 meant to convey information that will help the application level object to
82 interpret the part payload.
82 interpret the part payload.
83
83
84 The binary format of the header is has follow
84 The binary format of the header is has follow
85
85
86 :typesize: (one byte)
86 :typesize: (one byte)
87
87
88 :parttype: alphanumerical part name
88 :parttype: alphanumerical part name
89
89
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
90 :partid: A 32bits integer (unique in the bundle) that can be used to refer
91 to this part.
91 to this part.
92
92
93 :parameters:
93 :parameters:
94
94
95 Part's parameter may have arbitrary content, the binary structure is::
95 Part's parameter may have arbitrary content, the binary structure is::
96
96
97 <mandatory-count><advisory-count><param-sizes><param-data>
97 <mandatory-count><advisory-count><param-sizes><param-data>
98
98
99 :mandatory-count: 1 byte, number of mandatory parameters
99 :mandatory-count: 1 byte, number of mandatory parameters
100
100
101 :advisory-count: 1 byte, number of advisory parameters
101 :advisory-count: 1 byte, number of advisory parameters
102
102
103 :param-sizes:
103 :param-sizes:
104
104
105 N couple of bytes, where N is the total number of parameters. Each
105 N couple of bytes, where N is the total number of parameters. Each
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
106 couple contains (<size-of-key>, <size-of-value) for one parameter.
107
107
108 :param-data:
108 :param-data:
109
109
110 A blob of bytes from which each parameter key and value can be
110 A blob of bytes from which each parameter key and value can be
111 retrieved using the list of size couples stored in the previous
111 retrieved using the list of size couples stored in the previous
112 field.
112 field.
113
113
114 Mandatory parameters comes first, then the advisory ones.
114 Mandatory parameters comes first, then the advisory ones.
115
115
116 Each parameter's key MUST be unique within the part.
116 Each parameter's key MUST be unique within the part.
117
117
118 :payload:
118 :payload:
119
119
120 payload is a series of `<chunksize><chunkdata>`.
120 payload is a series of `<chunksize><chunkdata>`.
121
121
122 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
122 `chunksize` is a 32 bits integer, `chunkdata` are plain bytes (as much as
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
123 `chunksize` says)` The payload part is concluded by a zero size chunk.
124
124
125 The current implementation always produces either zero or one chunk.
125 The current implementation always produces either zero or one chunk.
126 This is an implementation limitation that will ultimately be lifted.
126 This is an implementation limitation that will ultimately be lifted.
127
127
128 Bundle processing
128 Bundle processing
129 ============================
129 ============================
130
130
131 Each part is processed in order using a "part handler". Handler are registered
131 Each part is processed in order using a "part handler". Handler are registered
132 for a certain part type.
132 for a certain part type.
133
133
134 The matching of a part to its handler is case insensitive. The case of the
134 The matching of a part to its handler is case insensitive. The case of the
135 part type is used to know if a part is mandatory or advisory. If the Part type
135 part type is used to know if a part is mandatory or advisory. If the Part type
136 contains any uppercase char it is considered mandatory. When no handler is
136 contains any uppercase char it is considered mandatory. When no handler is
137 known for a Mandatory part, the process is aborted and an exception is raised.
137 known for a Mandatory part, the process is aborted and an exception is raised.
138 If the part is advisory and no handler is known, the part is ignored. When the
138 If the part is advisory and no handler is known, the part is ignored. When the
139 process is aborted, the full bundle is still read from the stream to keep the
139 process is aborted, the full bundle is still read from the stream to keep the
140 channel usable. But none of the part read from an abort are processed. In the
140 channel usable. But none of the part read from an abort are processed. In the
141 future, dropping the stream may become an option for channel we do not care to
141 future, dropping the stream may become an option for channel we do not care to
142 preserve.
142 preserve.
143 """
143 """
144
144
145 import util
145 import util
146 import struct
146 import struct
147 import urllib
147 import urllib
148 import string
148 import string
149
149
150 import changegroup, error
150 import changegroup, error
151 from i18n import _
151 from i18n import _
152
152
153 _pack = struct.pack
153 _pack = struct.pack
154 _unpack = struct.unpack
154 _unpack = struct.unpack
155
155
156 _magicstring = 'HG2X'
156 _magicstring = 'HG2X'
157
157
158 _fstreamparamsize = '>H'
158 _fstreamparamsize = '>H'
159 _fpartheadersize = '>H'
159 _fpartheadersize = '>H'
160 _fparttypesize = '>B'
160 _fparttypesize = '>B'
161 _fpartid = '>I'
161 _fpartid = '>I'
162 _fpayloadsize = '>I'
162 _fpayloadsize = '>I'
163 _fpartparamcount = '>BB'
163 _fpartparamcount = '>BB'
164
164
165 preferedchunksize = 4096
165 preferedchunksize = 4096
166
166
167 def _makefpartparamsizes(nbparams):
167 def _makefpartparamsizes(nbparams):
168 """return a struct format to read part parameter sizes
168 """return a struct format to read part parameter sizes
169
169
170 The number parameters is variable so we need to build that format
170 The number parameters is variable so we need to build that format
171 dynamically.
171 dynamically.
172 """
172 """
173 return '>'+('BB'*nbparams)
173 return '>'+('BB'*nbparams)
174
174
175 parthandlermapping = {}
175 parthandlermapping = {}
176
176
177 def parthandler(parttype):
177 def parthandler(parttype):
178 """decorator that register a function as a bundle2 part handler
178 """decorator that register a function as a bundle2 part handler
179
179
180 eg::
180 eg::
181
181
182 @parthandler('myparttype')
182 @parthandler('myparttype')
183 def myparttypehandler(...):
183 def myparttypehandler(...):
184 '''process a part of type "my part".'''
184 '''process a part of type "my part".'''
185 ...
185 ...
186 """
186 """
187 def _decorator(func):
187 def _decorator(func):
188 lparttype = parttype.lower() # enforce lower case matching.
188 lparttype = parttype.lower() # enforce lower case matching.
189 assert lparttype not in parthandlermapping
189 assert lparttype not in parthandlermapping
190 parthandlermapping[lparttype] = func
190 parthandlermapping[lparttype] = func
191 return func
191 return func
192 return _decorator
192 return _decorator
193
193
194 class unbundlerecords(object):
194 class unbundlerecords(object):
195 """keep record of what happens during and unbundle
195 """keep record of what happens during and unbundle
196
196
197 New records are added using `records.add('cat', obj)`. Where 'cat' is a
197 New records are added using `records.add('cat', obj)`. Where 'cat' is a
198 category of record and obj is an arbitrary object.
198 category of record and obj is an arbitrary object.
199
199
200 `records['cat']` will return all entries of this category 'cat'.
200 `records['cat']` will return all entries of this category 'cat'.
201
201
202 Iterating on the object itself will yield `('category', obj)` tuples
202 Iterating on the object itself will yield `('category', obj)` tuples
203 for all entries.
203 for all entries.
204
204
205 All iterations happens in chronological order.
205 All iterations happens in chronological order.
206 """
206 """
207
207
208 def __init__(self):
208 def __init__(self):
209 self._categories = {}
209 self._categories = {}
210 self._sequences = []
210 self._sequences = []
211 self._replies = {}
211 self._replies = {}
212
212
213 def add(self, category, entry, inreplyto=None):
213 def add(self, category, entry, inreplyto=None):
214 """add a new record of a given category.
214 """add a new record of a given category.
215
215
216 The entry can then be retrieved in the list returned by
216 The entry can then be retrieved in the list returned by
217 self['category']."""
217 self['category']."""
218 self._categories.setdefault(category, []).append(entry)
218 self._categories.setdefault(category, []).append(entry)
219 self._sequences.append((category, entry))
219 self._sequences.append((category, entry))
220 if inreplyto is not None:
220 if inreplyto is not None:
221 self.getreplies(inreplyto).add(category, entry)
221 self.getreplies(inreplyto).add(category, entry)
222
222
223 def getreplies(self, partid):
223 def getreplies(self, partid):
224 """get the subrecords that replies to a specific part"""
224 """get the subrecords that replies to a specific part"""
225 return self._replies.setdefault(partid, unbundlerecords())
225 return self._replies.setdefault(partid, unbundlerecords())
226
226
227 def __getitem__(self, cat):
227 def __getitem__(self, cat):
228 return tuple(self._categories.get(cat, ()))
228 return tuple(self._categories.get(cat, ()))
229
229
230 def __iter__(self):
230 def __iter__(self):
231 return iter(self._sequences)
231 return iter(self._sequences)
232
232
233 def __len__(self):
233 def __len__(self):
234 return len(self._sequences)
234 return len(self._sequences)
235
235
236 def __nonzero__(self):
236 def __nonzero__(self):
237 return bool(self._sequences)
237 return bool(self._sequences)
238
238
239 class bundleoperation(object):
239 class bundleoperation(object):
240 """an object that represents a single bundling process
240 """an object that represents a single bundling process
241
241
242 Its purpose is to carry unbundle-related objects and states.
242 Its purpose is to carry unbundle-related objects and states.
243
243
244 A new object should be created at the beginning of each bundle processing.
244 A new object should be created at the beginning of each bundle processing.
245 The object is to be returned by the processing function.
245 The object is to be returned by the processing function.
246
246
247 The object has very little content now it will ultimately contain:
247 The object has very little content now it will ultimately contain:
248 * an access to the repo the bundle is applied to,
248 * an access to the repo the bundle is applied to,
249 * a ui object,
249 * a ui object,
250 * a way to retrieve a transaction to add changes to the repo,
250 * a way to retrieve a transaction to add changes to the repo,
251 * a way to record the result of processing each part,
251 * a way to record the result of processing each part,
252 * a way to construct a bundle response when applicable.
252 * a way to construct a bundle response when applicable.
253 """
253 """
254
254
255 def __init__(self, repo, transactiongetter):
255 def __init__(self, repo, transactiongetter):
256 self.repo = repo
256 self.repo = repo
257 self.ui = repo.ui
257 self.ui = repo.ui
258 self.records = unbundlerecords()
258 self.records = unbundlerecords()
259 self.gettransaction = transactiongetter
259 self.gettransaction = transactiongetter
260 self.reply = None
260 self.reply = None
261
261
262 class TransactionUnavailable(RuntimeError):
262 class TransactionUnavailable(RuntimeError):
263 pass
263 pass
264
264
265 def _notransaction():
265 def _notransaction():
266 """default method to get a transaction while processing a bundle
266 """default method to get a transaction while processing a bundle
267
267
268 Raise an exception to highlight the fact that no transaction was expected
268 Raise an exception to highlight the fact that no transaction was expected
269 to be created"""
269 to be created"""
270 raise TransactionUnavailable()
270 raise TransactionUnavailable()
271
271
272 def processbundle(repo, unbundler, transactiongetter=_notransaction):
272 def processbundle(repo, unbundler, transactiongetter=_notransaction):
273 """This function process a bundle, apply effect to/from a repo
273 """This function process a bundle, apply effect to/from a repo
274
274
275 It iterates over each part then searches for and uses the proper handling
275 It iterates over each part then searches for and uses the proper handling
276 code to process the part. Parts are processed in order.
276 code to process the part. Parts are processed in order.
277
277
278 This is very early version of this function that will be strongly reworked
278 This is very early version of this function that will be strongly reworked
279 before final usage.
279 before final usage.
280
280
281 Unknown Mandatory part will abort the process.
281 Unknown Mandatory part will abort the process.
282 """
282 """
283 op = bundleoperation(repo, transactiongetter)
283 op = bundleoperation(repo, transactiongetter)
284 # todo:
284 # todo:
285 # - replace this is a init function soon.
285 # - replace this is a init function soon.
286 # - exception catching
286 # - exception catching
287 unbundler.params
287 unbundler.params
288 iterparts = unbundler.iterparts()
288 iterparts = unbundler.iterparts()
289 part = None
289 part = None
290 try:
290 try:
291 for part in iterparts:
291 for part in iterparts:
292 parttype = part.type
292 parttype = part.type
293 # part key are matched lower case
293 # part key are matched lower case
294 key = parttype.lower()
294 key = parttype.lower()
295 try:
295 try:
296 handler = parthandlermapping[key]
296 handler = parthandlermapping[key]
297 op.ui.debug('found a handler for part %r\n' % parttype)
297 op.ui.debug('found a handler for part %r\n' % parttype)
298 except KeyError:
298 except KeyError:
299 if key != parttype: # mandatory parts
299 if key != parttype: # mandatory parts
300 # todo:
300 # todo:
301 # - use a more precise exception
301 # - use a more precise exception
302 raise error.BundleValueError(key)
302 raise error.BundleValueError(key)
303 op.ui.debug('ignoring unknown advisory part %r\n' % key)
303 op.ui.debug('ignoring unknown advisory part %r\n' % key)
304 # consuming the part
304 # consuming the part
305 part.read()
305 part.read()
306 continue
306 continue
307
307
308 # handler is called outside the above try block so that we don't
308 # handler is called outside the above try block so that we don't
309 # risk catching KeyErrors from anything other than the
309 # risk catching KeyErrors from anything other than the
310 # parthandlermapping lookup (any KeyError raised by handler()
310 # parthandlermapping lookup (any KeyError raised by handler()
311 # itself represents a defect of a different variety).
311 # itself represents a defect of a different variety).
312 output = None
312 output = None
313 if op.reply is not None:
313 if op.reply is not None:
314 op.ui.pushbuffer(error=True)
314 op.ui.pushbuffer(error=True)
315 output = ''
315 output = ''
316 try:
316 try:
317 handler(op, part)
317 handler(op, part)
318 finally:
318 finally:
319 if output is not None:
319 if output is not None:
320 output = op.ui.popbuffer()
320 output = op.ui.popbuffer()
321 if output:
321 if output:
322 outpart = op.reply.newpart('b2x:output', data=output)
322 outpart = op.reply.newpart('b2x:output', data=output)
323 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
323 outpart.addparam('in-reply-to', str(part.id), mandatory=False)
324 part.read()
324 part.read()
325 except Exception, exc:
325 except Exception, exc:
326 if part is not None:
326 if part is not None:
327 # consume the bundle content
327 # consume the bundle content
328 part.read()
328 part.read()
329 for part in iterparts:
329 for part in iterparts:
330 # consume the bundle content
330 # consume the bundle content
331 part.read()
331 part.read()
332 # Small hack to let caller code distinguish exceptions from bundle2
332 # Small hack to let caller code distinguish exceptions from bundle2
333 # processing fron the ones from bundle1 processing. This is mostly
333 # processing fron the ones from bundle1 processing. This is mostly
334 # needed to handle different return codes to unbundle according to the
334 # needed to handle different return codes to unbundle according to the
335 # type of bundle. We should probably clean up or drop this return code
335 # type of bundle. We should probably clean up or drop this return code
336 # craziness in a future version.
336 # craziness in a future version.
337 exc.duringunbundle2 = True
337 exc.duringunbundle2 = True
338 raise
338 raise
339 return op
339 return op
340
340
341 def decodecaps(blob):
341 def decodecaps(blob):
342 """decode a bundle2 caps bytes blob into a dictionnary
342 """decode a bundle2 caps bytes blob into a dictionnary
343
343
344 The blob is a list of capabilities (one per line)
344 The blob is a list of capabilities (one per line)
345 Capabilities may have values using a line of the form::
345 Capabilities may have values using a line of the form::
346
346
347 capability=value1,value2,value3
347 capability=value1,value2,value3
348
348
349 The values are always a list."""
349 The values are always a list."""
350 caps = {}
350 caps = {}
351 for line in blob.splitlines():
351 for line in blob.splitlines():
352 if not line:
352 if not line:
353 continue
353 continue
354 if '=' not in line:
354 if '=' not in line:
355 key, vals = line, ()
355 key, vals = line, ()
356 else:
356 else:
357 key, vals = line.split('=', 1)
357 key, vals = line.split('=', 1)
358 vals = vals.split(',')
358 vals = vals.split(',')
359 key = urllib.unquote(key)
359 key = urllib.unquote(key)
360 vals = [urllib.unquote(v) for v in vals]
360 vals = [urllib.unquote(v) for v in vals]
361 caps[key] = vals
361 caps[key] = vals
362 return caps
362 return caps
363
363
364 def encodecaps(caps):
364 def encodecaps(caps):
365 """encode a bundle2 caps dictionary into a bytes blob"""
365 """encode a bundle2 caps dictionary into a bytes blob"""
366 chunks = []
366 chunks = []
367 for ca in sorted(caps):
367 for ca in sorted(caps):
368 vals = caps[ca]
368 vals = caps[ca]
369 ca = urllib.quote(ca)
369 ca = urllib.quote(ca)
370 vals = [urllib.quote(v) for v in vals]
370 vals = [urllib.quote(v) for v in vals]
371 if vals:
371 if vals:
372 ca = "%s=%s" % (ca, ','.join(vals))
372 ca = "%s=%s" % (ca, ','.join(vals))
373 chunks.append(ca)
373 chunks.append(ca)
374 return '\n'.join(chunks)
374 return '\n'.join(chunks)
375
375
376 class bundle20(object):
376 class bundle20(object):
377 """represent an outgoing bundle2 container
377 """represent an outgoing bundle2 container
378
378
379 Use the `addparam` method to add stream level parameter. and `newpart` to
379 Use the `addparam` method to add stream level parameter. and `newpart` to
380 populate it. Then call `getchunks` to retrieve all the binary chunks of
380 populate it. Then call `getchunks` to retrieve all the binary chunks of
381 data that compose the bundle2 container."""
381 data that compose the bundle2 container."""
382
382
383 def __init__(self, ui, capabilities=()):
383 def __init__(self, ui, capabilities=()):
384 self.ui = ui
384 self.ui = ui
385 self._params = []
385 self._params = []
386 self._parts = []
386 self._parts = []
387 self.capabilities = dict(capabilities)
387 self.capabilities = dict(capabilities)
388
388
389 # methods used to defines the bundle2 content
389 # methods used to defines the bundle2 content
390 def addparam(self, name, value=None):
390 def addparam(self, name, value=None):
391 """add a stream level parameter"""
391 """add a stream level parameter"""
392 if not name:
392 if not name:
393 raise ValueError('empty parameter name')
393 raise ValueError('empty parameter name')
394 if name[0] not in string.letters:
394 if name[0] not in string.letters:
395 raise ValueError('non letter first character: %r' % name)
395 raise ValueError('non letter first character: %r' % name)
396 self._params.append((name, value))
396 self._params.append((name, value))
397
397
398 def addpart(self, part):
398 def addpart(self, part):
399 """add a new part to the bundle2 container
399 """add a new part to the bundle2 container
400
400
401 Parts contains the actual applicative payload."""
401 Parts contains the actual applicative payload."""
402 assert part.id is None
402 assert part.id is None
403 part.id = len(self._parts) # very cheap counter
403 part.id = len(self._parts) # very cheap counter
404 self._parts.append(part)
404 self._parts.append(part)
405
405
406 def newpart(self, typeid, *args, **kwargs):
406 def newpart(self, typeid, *args, **kwargs):
407 """create a new part and add it to the containers
407 """create a new part and add it to the containers
408
408
409 As the part is directly added to the containers. For now, this means
409 As the part is directly added to the containers. For now, this means
410 that any failure to properly initialize the part after calling
410 that any failure to properly initialize the part after calling
411 ``newpart`` should result in a failure of the whole bundling process.
411 ``newpart`` should result in a failure of the whole bundling process.
412
412
413 You can still fall back to manually create and add if you need better
413 You can still fall back to manually create and add if you need better
414 control."""
414 control."""
415 part = bundlepart(typeid, *args, **kwargs)
415 part = bundlepart(typeid, *args, **kwargs)
416 self.addpart(part)
416 self.addpart(part)
417 return part
417 return part
418
418
419 # methods used to generate the bundle2 stream
419 # methods used to generate the bundle2 stream
420 def getchunks(self):
420 def getchunks(self):
421 self.ui.debug('start emission of %s stream\n' % _magicstring)
421 self.ui.debug('start emission of %s stream\n' % _magicstring)
422 yield _magicstring
422 yield _magicstring
423 param = self._paramchunk()
423 param = self._paramchunk()
424 self.ui.debug('bundle parameter: %s\n' % param)
424 self.ui.debug('bundle parameter: %s\n' % param)
425 yield _pack(_fstreamparamsize, len(param))
425 yield _pack(_fstreamparamsize, len(param))
426 if param:
426 if param:
427 yield param
427 yield param
428
428
429 self.ui.debug('start of parts\n')
429 self.ui.debug('start of parts\n')
430 for part in self._parts:
430 for part in self._parts:
431 self.ui.debug('bundle part: "%s"\n' % part.type)
431 self.ui.debug('bundle part: "%s"\n' % part.type)
432 for chunk in part.getchunks():
432 for chunk in part.getchunks():
433 yield chunk
433 yield chunk
434 self.ui.debug('end of bundle\n')
434 self.ui.debug('end of bundle\n')
435 yield '\0\0'
435 yield '\0\0'
436
436
437 def _paramchunk(self):
437 def _paramchunk(self):
438 """return a encoded version of all stream parameters"""
438 """return a encoded version of all stream parameters"""
439 blocks = []
439 blocks = []
440 for par, value in self._params:
440 for par, value in self._params:
441 par = urllib.quote(par)
441 par = urllib.quote(par)
442 if value is not None:
442 if value is not None:
443 value = urllib.quote(value)
443 value = urllib.quote(value)
444 par = '%s=%s' % (par, value)
444 par = '%s=%s' % (par, value)
445 blocks.append(par)
445 blocks.append(par)
446 return ' '.join(blocks)
446 return ' '.join(blocks)
447
447
448 class unpackermixin(object):
448 class unpackermixin(object):
449 """A mixin to extract bytes and struct data from a stream"""
449 """A mixin to extract bytes and struct data from a stream"""
450
450
451 def __init__(self, fp):
451 def __init__(self, fp):
452 self._fp = fp
452 self._fp = fp
453
453
454 def _unpack(self, format):
454 def _unpack(self, format):
455 """unpack this struct format from the stream"""
455 """unpack this struct format from the stream"""
456 data = self._readexact(struct.calcsize(format))
456 data = self._readexact(struct.calcsize(format))
457 return _unpack(format, data)
457 return _unpack(format, data)
458
458
459 def _readexact(self, size):
459 def _readexact(self, size):
460 """read exactly <size> bytes from the stream"""
460 """read exactly <size> bytes from the stream"""
461 return changegroup.readexactly(self._fp, size)
461 return changegroup.readexactly(self._fp, size)
462
462
463
463
464 class unbundle20(unpackermixin):
464 class unbundle20(unpackermixin):
465 """interpret a bundle2 stream
465 """interpret a bundle2 stream
466
466
467 This class is fed with a binary stream and yields parts through its
467 This class is fed with a binary stream and yields parts through its
468 `iterparts` methods."""
468 `iterparts` methods."""
469
469
470 def __init__(self, ui, fp, header=None):
470 def __init__(self, ui, fp, header=None):
471 """If header is specified, we do not read it out of the stream."""
471 """If header is specified, we do not read it out of the stream."""
472 self.ui = ui
472 self.ui = ui
473 super(unbundle20, self).__init__(fp)
473 super(unbundle20, self).__init__(fp)
474 if header is None:
474 if header is None:
475 header = self._readexact(4)
475 header = self._readexact(4)
476 magic, version = header[0:2], header[2:4]
476 magic, version = header[0:2], header[2:4]
477 if magic != 'HG':
477 if magic != 'HG':
478 raise util.Abort(_('not a Mercurial bundle'))
478 raise util.Abort(_('not a Mercurial bundle'))
479 if version != '2X':
479 if version != '2X':
480 raise util.Abort(_('unknown bundle version %s') % version)
480 raise util.Abort(_('unknown bundle version %s') % version)
481 self.ui.debug('start processing of %s stream\n' % header)
481 self.ui.debug('start processing of %s stream\n' % header)
482
482
483 @util.propertycache
483 @util.propertycache
484 def params(self):
484 def params(self):
485 """dictionary of stream level parameters"""
485 """dictionary of stream level parameters"""
486 self.ui.debug('reading bundle2 stream parameters\n')
486 self.ui.debug('reading bundle2 stream parameters\n')
487 params = {}
487 params = {}
488 paramssize = self._unpack(_fstreamparamsize)[0]
488 paramssize = self._unpack(_fstreamparamsize)[0]
489 if paramssize:
489 if paramssize:
490 for p in self._readexact(paramssize).split(' '):
490 for p in self._readexact(paramssize).split(' '):
491 p = p.split('=', 1)
491 p = p.split('=', 1)
492 p = [urllib.unquote(i) for i in p]
492 p = [urllib.unquote(i) for i in p]
493 if len(p) < 2:
493 if len(p) < 2:
494 p.append(None)
494 p.append(None)
495 self._processparam(*p)
495 self._processparam(*p)
496 params[p[0]] = p[1]
496 params[p[0]] = p[1]
497 return params
497 return params
498
498
499 def _processparam(self, name, value):
499 def _processparam(self, name, value):
500 """process a parameter, applying its effect if needed
500 """process a parameter, applying its effect if needed
501
501
502 Parameter starting with a lower case letter are advisory and will be
502 Parameter starting with a lower case letter are advisory and will be
503 ignored when unknown. Those starting with an upper case letter are
503 ignored when unknown. Those starting with an upper case letter are
504 mandatory and will this function will raise a KeyError when unknown.
504 mandatory and will this function will raise a KeyError when unknown.
505
505
506 Note: no option are currently supported. Any input will be either
506 Note: no option are currently supported. Any input will be either
507 ignored or failing.
507 ignored or failing.
508 """
508 """
509 if not name:
509 if not name:
510 raise ValueError('empty parameter name')
510 raise ValueError('empty parameter name')
511 if name[0] not in string.letters:
511 if name[0] not in string.letters:
512 raise ValueError('non letter first character: %r' % name)
512 raise ValueError('non letter first character: %r' % name)
513 # Some logic will be later added here to try to process the option for
513 # Some logic will be later added here to try to process the option for
514 # a dict of known parameter.
514 # a dict of known parameter.
515 if name[0].islower():
515 if name[0].islower():
516 self.ui.debug("ignoring unknown parameter %r\n" % name)
516 self.ui.debug("ignoring unknown parameter %r\n" % name)
517 else:
517 else:
518 raise KeyError(name)
518 raise KeyError(name)
519
519
520
520
521 def iterparts(self):
521 def iterparts(self):
522 """yield all parts contained in the stream"""
522 """yield all parts contained in the stream"""
523 # make sure param have been loaded
523 # make sure param have been loaded
524 self.params
524 self.params
525 self.ui.debug('start extraction of bundle2 parts\n')
525 self.ui.debug('start extraction of bundle2 parts\n')
526 headerblock = self._readpartheader()
526 headerblock = self._readpartheader()
527 while headerblock is not None:
527 while headerblock is not None:
528 part = unbundlepart(self.ui, headerblock, self._fp)
528 part = unbundlepart(self.ui, headerblock, self._fp)
529 yield part
529 yield part
530 headerblock = self._readpartheader()
530 headerblock = self._readpartheader()
531 self.ui.debug('end of bundle2 stream\n')
531 self.ui.debug('end of bundle2 stream\n')
532
532
533 def _readpartheader(self):
533 def _readpartheader(self):
534 """reads a part header size and return the bytes blob
534 """reads a part header size and return the bytes blob
535
535
536 returns None if empty"""
536 returns None if empty"""
537 headersize = self._unpack(_fpartheadersize)[0]
537 headersize = self._unpack(_fpartheadersize)[0]
538 self.ui.debug('part header size: %i\n' % headersize)
538 self.ui.debug('part header size: %i\n' % headersize)
539 if headersize:
539 if headersize:
540 return self._readexact(headersize)
540 return self._readexact(headersize)
541 return None
541 return None
542
542
543
543
544 class bundlepart(object):
544 class bundlepart(object):
545 """A bundle2 part contains application level payload
545 """A bundle2 part contains application level payload
546
546
547 The part `type` is used to route the part to the application level
547 The part `type` is used to route the part to the application level
548 handler.
548 handler.
549
549
550 The part payload is contained in ``part.data``. It could be raw bytes or a
550 The part payload is contained in ``part.data``. It could be raw bytes or a
551 generator of byte chunks.
551 generator of byte chunks.
552
552
553 You can add parameters to the part using the ``addparam`` method.
553 You can add parameters to the part using the ``addparam`` method.
554 Parameters can be either mandatory (default) or advisory. Remote side
554 Parameters can be either mandatory (default) or advisory. Remote side
555 should be able to safely ignore the advisory ones.
555 should be able to safely ignore the advisory ones.
556
556
557 Both data and parameters cannot be modified after the generation has begun.
557 Both data and parameters cannot be modified after the generation has begun.
558 """
558 """
559
559
560 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
560 def __init__(self, parttype, mandatoryparams=(), advisoryparams=(),
561 data=''):
561 data=''):
562 self.id = None
562 self.id = None
563 self.type = parttype
563 self.type = parttype
564 self._data = data
564 self._data = data
565 self._mandatoryparams = list(mandatoryparams)
565 self._mandatoryparams = list(mandatoryparams)
566 self._advisoryparams = list(advisoryparams)
566 self._advisoryparams = list(advisoryparams)
567 # checking for duplicated entries
567 # checking for duplicated entries
568 self._seenparams = set()
568 self._seenparams = set()
569 for pname, __ in self._mandatoryparams + self._advisoryparams:
569 for pname, __ in self._mandatoryparams + self._advisoryparams:
570 if pname in self._seenparams:
570 if pname in self._seenparams:
571 raise RuntimeError('duplicated params: %s' % pname)
571 raise RuntimeError('duplicated params: %s' % pname)
572 self._seenparams.add(pname)
572 self._seenparams.add(pname)
573 # status of the part's generation:
573 # status of the part's generation:
574 # - None: not started,
574 # - None: not started,
575 # - False: currently generated,
575 # - False: currently generated,
576 # - True: generation done.
576 # - True: generation done.
577 self._generated = None
577 self._generated = None
578
578
579 # methods used to defines the part content
579 # methods used to defines the part content
580 def __setdata(self, data):
580 def __setdata(self, data):
581 if self._generated is not None:
581 if self._generated is not None:
582 raise error.ReadOnlyPartError('part is being generated')
582 raise error.ReadOnlyPartError('part is being generated')
583 self._data = data
583 self._data = data
584 def __getdata(self):
584 def __getdata(self):
585 return self._data
585 return self._data
586 data = property(__getdata, __setdata)
586 data = property(__getdata, __setdata)
587
587
588 @property
588 @property
589 def mandatoryparams(self):
589 def mandatoryparams(self):
590 # make it an immutable tuple to force people through ``addparam``
590 # make it an immutable tuple to force people through ``addparam``
591 return tuple(self._mandatoryparams)
591 return tuple(self._mandatoryparams)
592
592
593 @property
593 @property
594 def advisoryparams(self):
594 def advisoryparams(self):
595 # make it an immutable tuple to force people through ``addparam``
595 # make it an immutable tuple to force people through ``addparam``
596 return tuple(self._advisoryparams)
596 return tuple(self._advisoryparams)
597
597
598 def addparam(self, name, value='', mandatory=True):
598 def addparam(self, name, value='', mandatory=True):
599 if self._generated is not None:
599 if self._generated is not None:
600 raise error.ReadOnlyPartError('part is being generated')
600 raise error.ReadOnlyPartError('part is being generated')
601 if name in self._seenparams:
601 if name in self._seenparams:
602 raise ValueError('duplicated params: %s' % name)
602 raise ValueError('duplicated params: %s' % name)
603 self._seenparams.add(name)
603 self._seenparams.add(name)
604 params = self._advisoryparams
604 params = self._advisoryparams
605 if mandatory:
605 if mandatory:
606 params = self._mandatoryparams
606 params = self._mandatoryparams
607 params.append((name, value))
607 params.append((name, value))
608
608
609 # methods used to generates the bundle2 stream
609 # methods used to generates the bundle2 stream
610 def getchunks(self):
610 def getchunks(self):
611 if self._generated is not None:
611 if self._generated is not None:
612 raise RuntimeError('part can only be consumed once')
612 raise RuntimeError('part can only be consumed once')
613 self._generated = False
613 self._generated = False
614 #### header
614 #### header
615 ## parttype
615 ## parttype
616 header = [_pack(_fparttypesize, len(self.type)),
616 header = [_pack(_fparttypesize, len(self.type)),
617 self.type, _pack(_fpartid, self.id),
617 self.type, _pack(_fpartid, self.id),
618 ]
618 ]
619 ## parameters
619 ## parameters
620 # count
620 # count
621 manpar = self.mandatoryparams
621 manpar = self.mandatoryparams
622 advpar = self.advisoryparams
622 advpar = self.advisoryparams
623 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
623 header.append(_pack(_fpartparamcount, len(manpar), len(advpar)))
624 # size
624 # size
625 parsizes = []
625 parsizes = []
626 for key, value in manpar:
626 for key, value in manpar:
627 parsizes.append(len(key))
627 parsizes.append(len(key))
628 parsizes.append(len(value))
628 parsizes.append(len(value))
629 for key, value in advpar:
629 for key, value in advpar:
630 parsizes.append(len(key))
630 parsizes.append(len(key))
631 parsizes.append(len(value))
631 parsizes.append(len(value))
632 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
632 paramsizes = _pack(_makefpartparamsizes(len(parsizes) / 2), *parsizes)
633 header.append(paramsizes)
633 header.append(paramsizes)
634 # key, value
634 # key, value
635 for key, value in manpar:
635 for key, value in manpar:
636 header.append(key)
636 header.append(key)
637 header.append(value)
637 header.append(value)
638 for key, value in advpar:
638 for key, value in advpar:
639 header.append(key)
639 header.append(key)
640 header.append(value)
640 header.append(value)
641 ## finalize header
641 ## finalize header
642 headerchunk = ''.join(header)
642 headerchunk = ''.join(header)
643 yield _pack(_fpartheadersize, len(headerchunk))
643 yield _pack(_fpartheadersize, len(headerchunk))
644 yield headerchunk
644 yield headerchunk
645 ## payload
645 ## payload
646 for chunk in self._payloadchunks():
646 for chunk in self._payloadchunks():
647 yield _pack(_fpayloadsize, len(chunk))
647 yield _pack(_fpayloadsize, len(chunk))
648 yield chunk
648 yield chunk
649 # end of payload
649 # end of payload
650 yield _pack(_fpayloadsize, 0)
650 yield _pack(_fpayloadsize, 0)
651 self._generated = True
651 self._generated = True
652
652
653 def _payloadchunks(self):
653 def _payloadchunks(self):
654 """yield chunks of a the part payload
654 """yield chunks of a the part payload
655
655
656 Exists to handle the different methods to provide data to a part."""
656 Exists to handle the different methods to provide data to a part."""
657 # we only support fixed size data now.
657 # we only support fixed size data now.
658 # This will be improved in the future.
658 # This will be improved in the future.
659 if util.safehasattr(self.data, 'next'):
659 if util.safehasattr(self.data, 'next'):
660 buff = util.chunkbuffer(self.data)
660 buff = util.chunkbuffer(self.data)
661 chunk = buff.read(preferedchunksize)
661 chunk = buff.read(preferedchunksize)
662 while chunk:
662 while chunk:
663 yield chunk
663 yield chunk
664 chunk = buff.read(preferedchunksize)
664 chunk = buff.read(preferedchunksize)
665 elif len(self.data):
665 elif len(self.data):
666 yield self.data
666 yield self.data
667
667
668 class unbundlepart(unpackermixin):
668 class unbundlepart(unpackermixin):
669 """a bundle part read from a bundle"""
669 """a bundle part read from a bundle"""
670
670
671 def __init__(self, ui, header, fp):
671 def __init__(self, ui, header, fp):
672 super(unbundlepart, self).__init__(fp)
672 super(unbundlepart, self).__init__(fp)
673 self.ui = ui
673 self.ui = ui
674 # unbundle state attr
674 # unbundle state attr
675 self._headerdata = header
675 self._headerdata = header
676 self._headeroffset = 0
676 self._headeroffset = 0
677 self._initialized = False
677 self._initialized = False
678 self.consumed = False
678 self.consumed = False
679 # part data
679 # part data
680 self.id = None
680 self.id = None
681 self.type = None
681 self.type = None
682 self.mandatoryparams = None
682 self.mandatoryparams = None
683 self.advisoryparams = None
683 self.advisoryparams = None
684 self.params = None
684 self.params = None
685 self.mandatorykeys = ()
685 self.mandatorykeys = ()
686 self._payloadstream = None
686 self._payloadstream = None
687 self._readheader()
687 self._readheader()
688
688
689 def _fromheader(self, size):
689 def _fromheader(self, size):
690 """return the next <size> byte from the header"""
690 """return the next <size> byte from the header"""
691 offset = self._headeroffset
691 offset = self._headeroffset
692 data = self._headerdata[offset:(offset + size)]
692 data = self._headerdata[offset:(offset + size)]
693 self._headeroffset = offset + size
693 self._headeroffset = offset + size
694 return data
694 return data
695
695
696 def _unpackheader(self, format):
696 def _unpackheader(self, format):
697 """read given format from header
697 """read given format from header
698
698
699 This automatically compute the size of the format to read."""
699 This automatically compute the size of the format to read."""
700 data = self._fromheader(struct.calcsize(format))
700 data = self._fromheader(struct.calcsize(format))
701 return _unpack(format, data)
701 return _unpack(format, data)
702
702
703 def _initparams(self, mandatoryparams, advisoryparams):
703 def _initparams(self, mandatoryparams, advisoryparams):
704 """internal function to setup all logic related parameters"""
704 """internal function to setup all logic related parameters"""
705 # make it read only to prevent people touching it by mistake.
705 # make it read only to prevent people touching it by mistake.
706 self.mandatoryparams = tuple(mandatoryparams)
706 self.mandatoryparams = tuple(mandatoryparams)
707 self.advisoryparams = tuple(advisoryparams)
707 self.advisoryparams = tuple(advisoryparams)
708 # user friendly UI
708 # user friendly UI
709 self.params = dict(self.mandatoryparams)
709 self.params = dict(self.mandatoryparams)
710 self.params.update(dict(self.advisoryparams))
710 self.params.update(dict(self.advisoryparams))
711 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
711 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
712
712
713 def _readheader(self):
713 def _readheader(self):
714 """read the header and setup the object"""
714 """read the header and setup the object"""
715 typesize = self._unpackheader(_fparttypesize)[0]
715 typesize = self._unpackheader(_fparttypesize)[0]
716 self.type = self._fromheader(typesize)
716 self.type = self._fromheader(typesize)
717 self.ui.debug('part type: "%s"\n' % self.type)
717 self.ui.debug('part type: "%s"\n' % self.type)
718 self.id = self._unpackheader(_fpartid)[0]
718 self.id = self._unpackheader(_fpartid)[0]
719 self.ui.debug('part id: "%s"\n' % self.id)
719 self.ui.debug('part id: "%s"\n' % self.id)
720 ## reading parameters
720 ## reading parameters
721 # param count
721 # param count
722 mancount, advcount = self._unpackheader(_fpartparamcount)
722 mancount, advcount = self._unpackheader(_fpartparamcount)
723 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
723 self.ui.debug('part parameters: %i\n' % (mancount + advcount))
724 # param size
724 # param size
725 fparamsizes = _makefpartparamsizes(mancount + advcount)
725 fparamsizes = _makefpartparamsizes(mancount + advcount)
726 paramsizes = self._unpackheader(fparamsizes)
726 paramsizes = self._unpackheader(fparamsizes)
727 # make it a list of couple again
727 # make it a list of couple again
728 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
728 paramsizes = zip(paramsizes[::2], paramsizes[1::2])
729 # split mandatory from advisory
729 # split mandatory from advisory
730 mansizes = paramsizes[:mancount]
730 mansizes = paramsizes[:mancount]
731 advsizes = paramsizes[mancount:]
731 advsizes = paramsizes[mancount:]
732 # retrive param value
732 # retrive param value
733 manparams = []
733 manparams = []
734 for key, value in mansizes:
734 for key, value in mansizes:
735 manparams.append((self._fromheader(key), self._fromheader(value)))
735 manparams.append((self._fromheader(key), self._fromheader(value)))
736 advparams = []
736 advparams = []
737 for key, value in advsizes:
737 for key, value in advsizes:
738 advparams.append((self._fromheader(key), self._fromheader(value)))
738 advparams.append((self._fromheader(key), self._fromheader(value)))
739 self._initparams(manparams, advparams)
739 self._initparams(manparams, advparams)
740 ## part payload
740 ## part payload
741 def payloadchunks():
741 def payloadchunks():
742 payloadsize = self._unpack(_fpayloadsize)[0]
742 payloadsize = self._unpack(_fpayloadsize)[0]
743 self.ui.debug('payload chunk size: %i\n' % payloadsize)
743 self.ui.debug('payload chunk size: %i\n' % payloadsize)
744 while payloadsize:
744 while payloadsize:
745 yield self._readexact(payloadsize)
745 yield self._readexact(payloadsize)
746 payloadsize = self._unpack(_fpayloadsize)[0]
746 payloadsize = self._unpack(_fpayloadsize)[0]
747 self.ui.debug('payload chunk size: %i\n' % payloadsize)
747 self.ui.debug('payload chunk size: %i\n' % payloadsize)
748 self._payloadstream = util.chunkbuffer(payloadchunks())
748 self._payloadstream = util.chunkbuffer(payloadchunks())
749 # we read the data, tell it
749 # we read the data, tell it
750 self._initialized = True
750 self._initialized = True
751
751
752 def read(self, size=None):
752 def read(self, size=None):
753 """read payload data"""
753 """read payload data"""
754 if not self._initialized:
754 if not self._initialized:
755 self._readheader()
755 self._readheader()
756 if size is None:
756 if size is None:
757 data = self._payloadstream.read()
757 data = self._payloadstream.read()
758 else:
758 else:
759 data = self._payloadstream.read(size)
759 data = self._payloadstream.read(size)
760 if size is None or len(data) < size:
760 if size is None or len(data) < size:
761 self.consumed = True
761 self.consumed = True
762 return data
762 return data
763
763
764
764
765 @parthandler('b2x:changegroup')
765 @parthandler('b2x:changegroup')
766 def handlechangegroup(op, inpart):
766 def handlechangegroup(op, inpart):
767 """apply a changegroup part on the repo
767 """apply a changegroup part on the repo
768
768
769 This is a very early implementation that will massive rework before being
769 This is a very early implementation that will massive rework before being
770 inflicted to any end-user.
770 inflicted to any end-user.
771 """
771 """
772 # Make sure we trigger a transaction creation
772 # Make sure we trigger a transaction creation
773 #
773 #
774 # The addchangegroup function will get a transaction object by itself, but
774 # The addchangegroup function will get a transaction object by itself, but
775 # we need to make sure we trigger the creation of a transaction object used
775 # we need to make sure we trigger the creation of a transaction object used
776 # for the whole processing scope.
776 # for the whole processing scope.
777 op.gettransaction()
777 op.gettransaction()
778 cg = changegroup.unbundle10(inpart, 'UN')
778 cg = changegroup.unbundle10(inpart, 'UN')
779 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
779 ret = changegroup.addchangegroup(op.repo, cg, 'bundle2', 'bundle2')
780 op.records.add('changegroup', {'return': ret})
780 op.records.add('changegroup', {'return': ret})
781 if op.reply is not None:
781 if op.reply is not None:
782 # This is definitly not the final form of this
782 # This is definitly not the final form of this
783 # return. But one need to start somewhere.
783 # return. But one need to start somewhere.
784 part = op.reply.newpart('b2x:reply:changegroup')
784 part = op.reply.newpart('b2x:reply:changegroup')
785 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
785 part.addparam('in-reply-to', str(inpart.id), mandatory=False)
786 part.addparam('return', '%i' % ret, mandatory=False)
786 part.addparam('return', '%i' % ret, mandatory=False)
787 assert not inpart.read()
787 assert not inpart.read()
788
788
789 @parthandler('b2x:reply:changegroup')
789 @parthandler('b2x:reply:changegroup')
790 def handlechangegroup(op, inpart):
790 def handlechangegroup(op, inpart):
791 ret = int(inpart.params['return'])
791 ret = int(inpart.params['return'])
792 replyto = int(inpart.params['in-reply-to'])
792 replyto = int(inpart.params['in-reply-to'])
793 op.records.add('changegroup', {'return': ret}, replyto)
793 op.records.add('changegroup', {'return': ret}, replyto)
794
794
795 @parthandler('b2x:check:heads')
795 @parthandler('b2x:check:heads')
796 def handlechangegroup(op, inpart):
796 def handlechangegroup(op, inpart):
797 """check that head of the repo did not change
797 """check that head of the repo did not change
798
798
799 This is used to detect a push race when using unbundle.
799 This is used to detect a push race when using unbundle.
800 This replaces the "heads" argument of unbundle."""
800 This replaces the "heads" argument of unbundle."""
801 h = inpart.read(20)
801 h = inpart.read(20)
802 heads = []
802 heads = []
803 while len(h) == 20:
803 while len(h) == 20:
804 heads.append(h)
804 heads.append(h)
805 h = inpart.read(20)
805 h = inpart.read(20)
806 assert not h
806 assert not h
807 if heads != op.repo.heads():
807 if heads != op.repo.heads():
808 raise error.PushRaced('repository changed while pushing - '
808 raise error.PushRaced('repository changed while pushing - '
809 'please try again')
809 'please try again')
810
810
811 @parthandler('b2x:output')
811 @parthandler('b2x:output')
812 def handleoutput(op, inpart):
812 def handleoutput(op, inpart):
813 """forward output captured on the server to the client"""
813 """forward output captured on the server to the client"""
814 for line in inpart.read().splitlines():
814 for line in inpart.read().splitlines():
815 op.ui.write(('remote: %s\n' % line))
815 op.ui.write(('remote: %s\n' % line))
816
816
817 @parthandler('b2x:replycaps')
817 @parthandler('b2x:replycaps')
818 def handlereplycaps(op, inpart):
818 def handlereplycaps(op, inpart):
819 """Notify that a reply bundle should be created
819 """Notify that a reply bundle should be created
820
820
821 The payload contains the capabilities information for the reply"""
821 The payload contains the capabilities information for the reply"""
822 caps = decodecaps(inpart.read())
822 caps = decodecaps(inpart.read())
823 if op.reply is None:
823 if op.reply is None:
824 op.reply = bundle20(op.ui, caps)
824 op.reply = bundle20(op.ui, caps)
825
825
826 @parthandler('b2x:error:abort')
826 @parthandler('b2x:error:abort')
827 def handlereplycaps(op, inpart):
827 def handlereplycaps(op, inpart):
828 """Used to transmit abort error over the wire"""
828 """Used to transmit abort error over the wire"""
829 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
829 raise util.Abort(inpart.params['message'], hint=inpart.params.get('hint'))
830
830
831 @parthandler('b2x:error:unknownpart')
831 @parthandler('b2x:error:unsupportedcontent')
832 def handlereplycaps(op, inpart):
832 def handlereplycaps(op, inpart):
833 """Used to transmit unknown part error over the wire"""
833 """Used to transmit unknown content error over the wire"""
834 raise error.BundleValueError(inpart.params['parttype'])
834 raise error.BundleValueError(inpart.params['parttype'])
835
835
836 @parthandler('b2x:error:pushraced')
836 @parthandler('b2x:error:pushraced')
837 def handlereplycaps(op, inpart):
837 def handlereplycaps(op, inpart):
838 """Used to transmit push race error over the wire"""
838 """Used to transmit push race error over the wire"""
839 raise error.ResponseError(_('push failed:'), inpart.params['message'])
839 raise error.ResponseError(_('push failed:'), inpart.params['message'])
@@ -1,833 +1,834 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import urllib, tempfile, os, sys
8 import urllib, tempfile, os, sys
9 from i18n import _
9 from i18n import _
10 from node import bin, hex
10 from node import bin, hex
11 import changegroup as changegroupmod, bundle2
11 import changegroup as changegroupmod, bundle2
12 import peer, error, encoding, util, store, exchange
12 import peer, error, encoding, util, store, exchange
13
13
14
14
15 class abstractserverproto(object):
15 class abstractserverproto(object):
16 """abstract class that summarizes the protocol API
16 """abstract class that summarizes the protocol API
17
17
18 Used as reference and documentation.
18 Used as reference and documentation.
19 """
19 """
20
20
21 def getargs(self, args):
21 def getargs(self, args):
22 """return the value for arguments in <args>
22 """return the value for arguments in <args>
23
23
24 returns a list of values (same order as <args>)"""
24 returns a list of values (same order as <args>)"""
25 raise NotImplementedError()
25 raise NotImplementedError()
26
26
27 def getfile(self, fp):
27 def getfile(self, fp):
28 """write the whole content of a file into a file like object
28 """write the whole content of a file into a file like object
29
29
30 The file is in the form::
30 The file is in the form::
31
31
32 (<chunk-size>\n<chunk>)+0\n
32 (<chunk-size>\n<chunk>)+0\n
33
33
34 chunk size is the ascii version of the int.
34 chunk size is the ascii version of the int.
35 """
35 """
36 raise NotImplementedError()
36 raise NotImplementedError()
37
37
38 def redirect(self):
38 def redirect(self):
39 """may setup interception for stdout and stderr
39 """may setup interception for stdout and stderr
40
40
41 See also the `restore` method."""
41 See also the `restore` method."""
42 raise NotImplementedError()
42 raise NotImplementedError()
43
43
44 # If the `redirect` function does install interception, the `restore`
44 # If the `redirect` function does install interception, the `restore`
45 # function MUST be defined. If interception is not used, this function
45 # function MUST be defined. If interception is not used, this function
46 # MUST NOT be defined.
46 # MUST NOT be defined.
47 #
47 #
48 # left commented here on purpose
48 # left commented here on purpose
49 #
49 #
50 #def restore(self):
50 #def restore(self):
51 # """reinstall previous stdout and stderr and return intercepted stdout
51 # """reinstall previous stdout and stderr and return intercepted stdout
52 # """
52 # """
53 # raise NotImplementedError()
53 # raise NotImplementedError()
54
54
55 def groupchunks(self, cg):
55 def groupchunks(self, cg):
56 """return 4096 chunks from a changegroup object
56 """return 4096 chunks from a changegroup object
57
57
58 Some protocols may have compressed the contents."""
58 Some protocols may have compressed the contents."""
59 raise NotImplementedError()
59 raise NotImplementedError()
60
60
61 # abstract batching support
61 # abstract batching support
62
62
63 class future(object):
63 class future(object):
64 '''placeholder for a value to be set later'''
64 '''placeholder for a value to be set later'''
65 def set(self, value):
65 def set(self, value):
66 if util.safehasattr(self, 'value'):
66 if util.safehasattr(self, 'value'):
67 raise error.RepoError("future is already set")
67 raise error.RepoError("future is already set")
68 self.value = value
68 self.value = value
69
69
70 class batcher(object):
70 class batcher(object):
71 '''base class for batches of commands submittable in a single request
71 '''base class for batches of commands submittable in a single request
72
72
73 All methods invoked on instances of this class are simply queued and
73 All methods invoked on instances of this class are simply queued and
74 return a a future for the result. Once you call submit(), all the queued
74 return a a future for the result. Once you call submit(), all the queued
75 calls are performed and the results set in their respective futures.
75 calls are performed and the results set in their respective futures.
76 '''
76 '''
77 def __init__(self):
77 def __init__(self):
78 self.calls = []
78 self.calls = []
79 def __getattr__(self, name):
79 def __getattr__(self, name):
80 def call(*args, **opts):
80 def call(*args, **opts):
81 resref = future()
81 resref = future()
82 self.calls.append((name, args, opts, resref,))
82 self.calls.append((name, args, opts, resref,))
83 return resref
83 return resref
84 return call
84 return call
85 def submit(self):
85 def submit(self):
86 pass
86 pass
87
87
88 class localbatch(batcher):
88 class localbatch(batcher):
89 '''performs the queued calls directly'''
89 '''performs the queued calls directly'''
90 def __init__(self, local):
90 def __init__(self, local):
91 batcher.__init__(self)
91 batcher.__init__(self)
92 self.local = local
92 self.local = local
93 def submit(self):
93 def submit(self):
94 for name, args, opts, resref in self.calls:
94 for name, args, opts, resref in self.calls:
95 resref.set(getattr(self.local, name)(*args, **opts))
95 resref.set(getattr(self.local, name)(*args, **opts))
96
96
97 class remotebatch(batcher):
97 class remotebatch(batcher):
98 '''batches the queued calls; uses as few roundtrips as possible'''
98 '''batches the queued calls; uses as few roundtrips as possible'''
99 def __init__(self, remote):
99 def __init__(self, remote):
100 '''remote must support _submitbatch(encbatch) and
100 '''remote must support _submitbatch(encbatch) and
101 _submitone(op, encargs)'''
101 _submitone(op, encargs)'''
102 batcher.__init__(self)
102 batcher.__init__(self)
103 self.remote = remote
103 self.remote = remote
104 def submit(self):
104 def submit(self):
105 req, rsp = [], []
105 req, rsp = [], []
106 for name, args, opts, resref in self.calls:
106 for name, args, opts, resref in self.calls:
107 mtd = getattr(self.remote, name)
107 mtd = getattr(self.remote, name)
108 batchablefn = getattr(mtd, 'batchable', None)
108 batchablefn = getattr(mtd, 'batchable', None)
109 if batchablefn is not None:
109 if batchablefn is not None:
110 batchable = batchablefn(mtd.im_self, *args, **opts)
110 batchable = batchablefn(mtd.im_self, *args, **opts)
111 encargsorres, encresref = batchable.next()
111 encargsorres, encresref = batchable.next()
112 if encresref:
112 if encresref:
113 req.append((name, encargsorres,))
113 req.append((name, encargsorres,))
114 rsp.append((batchable, encresref, resref,))
114 rsp.append((batchable, encresref, resref,))
115 else:
115 else:
116 resref.set(encargsorres)
116 resref.set(encargsorres)
117 else:
117 else:
118 if req:
118 if req:
119 self._submitreq(req, rsp)
119 self._submitreq(req, rsp)
120 req, rsp = [], []
120 req, rsp = [], []
121 resref.set(mtd(*args, **opts))
121 resref.set(mtd(*args, **opts))
122 if req:
122 if req:
123 self._submitreq(req, rsp)
123 self._submitreq(req, rsp)
124 def _submitreq(self, req, rsp):
124 def _submitreq(self, req, rsp):
125 encresults = self.remote._submitbatch(req)
125 encresults = self.remote._submitbatch(req)
126 for encres, r in zip(encresults, rsp):
126 for encres, r in zip(encresults, rsp):
127 batchable, encresref, resref = r
127 batchable, encresref, resref = r
128 encresref.set(encres)
128 encresref.set(encres)
129 resref.set(batchable.next())
129 resref.set(batchable.next())
130
130
131 def batchable(f):
131 def batchable(f):
132 '''annotation for batchable methods
132 '''annotation for batchable methods
133
133
134 Such methods must implement a coroutine as follows:
134 Such methods must implement a coroutine as follows:
135
135
136 @batchable
136 @batchable
137 def sample(self, one, two=None):
137 def sample(self, one, two=None):
138 # Handle locally computable results first:
138 # Handle locally computable results first:
139 if not one:
139 if not one:
140 yield "a local result", None
140 yield "a local result", None
141 # Build list of encoded arguments suitable for your wire protocol:
141 # Build list of encoded arguments suitable for your wire protocol:
142 encargs = [('one', encode(one),), ('two', encode(two),)]
142 encargs = [('one', encode(one),), ('two', encode(two),)]
143 # Create future for injection of encoded result:
143 # Create future for injection of encoded result:
144 encresref = future()
144 encresref = future()
145 # Return encoded arguments and future:
145 # Return encoded arguments and future:
146 yield encargs, encresref
146 yield encargs, encresref
147 # Assuming the future to be filled with the result from the batched
147 # Assuming the future to be filled with the result from the batched
148 # request now. Decode it:
148 # request now. Decode it:
149 yield decode(encresref.value)
149 yield decode(encresref.value)
150
150
151 The decorator returns a function which wraps this coroutine as a plain
151 The decorator returns a function which wraps this coroutine as a plain
152 method, but adds the original method as an attribute called "batchable",
152 method, but adds the original method as an attribute called "batchable",
153 which is used by remotebatch to split the call into separate encoding and
153 which is used by remotebatch to split the call into separate encoding and
154 decoding phases.
154 decoding phases.
155 '''
155 '''
156 def plain(*args, **opts):
156 def plain(*args, **opts):
157 batchable = f(*args, **opts)
157 batchable = f(*args, **opts)
158 encargsorres, encresref = batchable.next()
158 encargsorres, encresref = batchable.next()
159 if not encresref:
159 if not encresref:
160 return encargsorres # a local result in this case
160 return encargsorres # a local result in this case
161 self = args[0]
161 self = args[0]
162 encresref.set(self._submitone(f.func_name, encargsorres))
162 encresref.set(self._submitone(f.func_name, encargsorres))
163 return batchable.next()
163 return batchable.next()
164 setattr(plain, 'batchable', f)
164 setattr(plain, 'batchable', f)
165 return plain
165 return plain
166
166
167 # list of nodes encoding / decoding
167 # list of nodes encoding / decoding
168
168
169 def decodelist(l, sep=' '):
169 def decodelist(l, sep=' '):
170 if l:
170 if l:
171 return map(bin, l.split(sep))
171 return map(bin, l.split(sep))
172 return []
172 return []
173
173
174 def encodelist(l, sep=' '):
174 def encodelist(l, sep=' '):
175 return sep.join(map(hex, l))
175 return sep.join(map(hex, l))
176
176
177 # batched call argument encoding
177 # batched call argument encoding
178
178
179 def escapearg(plain):
179 def escapearg(plain):
180 return (plain
180 return (plain
181 .replace(':', '::')
181 .replace(':', '::')
182 .replace(',', ':,')
182 .replace(',', ':,')
183 .replace(';', ':;')
183 .replace(';', ':;')
184 .replace('=', ':='))
184 .replace('=', ':='))
185
185
186 def unescapearg(escaped):
186 def unescapearg(escaped):
187 return (escaped
187 return (escaped
188 .replace(':=', '=')
188 .replace(':=', '=')
189 .replace(':;', ';')
189 .replace(':;', ';')
190 .replace(':,', ',')
190 .replace(':,', ',')
191 .replace('::', ':'))
191 .replace('::', ':'))
192
192
193 # client side
193 # client side
194
194
195 class wirepeer(peer.peerrepository):
195 class wirepeer(peer.peerrepository):
196
196
197 def batch(self):
197 def batch(self):
198 return remotebatch(self)
198 return remotebatch(self)
199 def _submitbatch(self, req):
199 def _submitbatch(self, req):
200 cmds = []
200 cmds = []
201 for op, argsdict in req:
201 for op, argsdict in req:
202 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
202 args = ','.join('%s=%s' % p for p in argsdict.iteritems())
203 cmds.append('%s %s' % (op, args))
203 cmds.append('%s %s' % (op, args))
204 rsp = self._call("batch", cmds=';'.join(cmds))
204 rsp = self._call("batch", cmds=';'.join(cmds))
205 return rsp.split(';')
205 return rsp.split(';')
206 def _submitone(self, op, args):
206 def _submitone(self, op, args):
207 return self._call(op, **args)
207 return self._call(op, **args)
208
208
209 @batchable
209 @batchable
210 def lookup(self, key):
210 def lookup(self, key):
211 self.requirecap('lookup', _('look up remote revision'))
211 self.requirecap('lookup', _('look up remote revision'))
212 f = future()
212 f = future()
213 yield {'key': encoding.fromlocal(key)}, f
213 yield {'key': encoding.fromlocal(key)}, f
214 d = f.value
214 d = f.value
215 success, data = d[:-1].split(" ", 1)
215 success, data = d[:-1].split(" ", 1)
216 if int(success):
216 if int(success):
217 yield bin(data)
217 yield bin(data)
218 self._abort(error.RepoError(data))
218 self._abort(error.RepoError(data))
219
219
220 @batchable
220 @batchable
221 def heads(self):
221 def heads(self):
222 f = future()
222 f = future()
223 yield {}, f
223 yield {}, f
224 d = f.value
224 d = f.value
225 try:
225 try:
226 yield decodelist(d[:-1])
226 yield decodelist(d[:-1])
227 except ValueError:
227 except ValueError:
228 self._abort(error.ResponseError(_("unexpected response:"), d))
228 self._abort(error.ResponseError(_("unexpected response:"), d))
229
229
230 @batchable
230 @batchable
231 def known(self, nodes):
231 def known(self, nodes):
232 f = future()
232 f = future()
233 yield {'nodes': encodelist(nodes)}, f
233 yield {'nodes': encodelist(nodes)}, f
234 d = f.value
234 d = f.value
235 try:
235 try:
236 yield [bool(int(f)) for f in d]
236 yield [bool(int(f)) for f in d]
237 except ValueError:
237 except ValueError:
238 self._abort(error.ResponseError(_("unexpected response:"), d))
238 self._abort(error.ResponseError(_("unexpected response:"), d))
239
239
240 @batchable
240 @batchable
241 def branchmap(self):
241 def branchmap(self):
242 f = future()
242 f = future()
243 yield {}, f
243 yield {}, f
244 d = f.value
244 d = f.value
245 try:
245 try:
246 branchmap = {}
246 branchmap = {}
247 for branchpart in d.splitlines():
247 for branchpart in d.splitlines():
248 branchname, branchheads = branchpart.split(' ', 1)
248 branchname, branchheads = branchpart.split(' ', 1)
249 branchname = encoding.tolocal(urllib.unquote(branchname))
249 branchname = encoding.tolocal(urllib.unquote(branchname))
250 branchheads = decodelist(branchheads)
250 branchheads = decodelist(branchheads)
251 branchmap[branchname] = branchheads
251 branchmap[branchname] = branchheads
252 yield branchmap
252 yield branchmap
253 except TypeError:
253 except TypeError:
254 self._abort(error.ResponseError(_("unexpected response:"), d))
254 self._abort(error.ResponseError(_("unexpected response:"), d))
255
255
256 def branches(self, nodes):
256 def branches(self, nodes):
257 n = encodelist(nodes)
257 n = encodelist(nodes)
258 d = self._call("branches", nodes=n)
258 d = self._call("branches", nodes=n)
259 try:
259 try:
260 br = [tuple(decodelist(b)) for b in d.splitlines()]
260 br = [tuple(decodelist(b)) for b in d.splitlines()]
261 return br
261 return br
262 except ValueError:
262 except ValueError:
263 self._abort(error.ResponseError(_("unexpected response:"), d))
263 self._abort(error.ResponseError(_("unexpected response:"), d))
264
264
265 def between(self, pairs):
265 def between(self, pairs):
266 batch = 8 # avoid giant requests
266 batch = 8 # avoid giant requests
267 r = []
267 r = []
268 for i in xrange(0, len(pairs), batch):
268 for i in xrange(0, len(pairs), batch):
269 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
269 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
270 d = self._call("between", pairs=n)
270 d = self._call("between", pairs=n)
271 try:
271 try:
272 r.extend(l and decodelist(l) or [] for l in d.splitlines())
272 r.extend(l and decodelist(l) or [] for l in d.splitlines())
273 except ValueError:
273 except ValueError:
274 self._abort(error.ResponseError(_("unexpected response:"), d))
274 self._abort(error.ResponseError(_("unexpected response:"), d))
275 return r
275 return r
276
276
277 @batchable
277 @batchable
278 def pushkey(self, namespace, key, old, new):
278 def pushkey(self, namespace, key, old, new):
279 if not self.capable('pushkey'):
279 if not self.capable('pushkey'):
280 yield False, None
280 yield False, None
281 f = future()
281 f = future()
282 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
282 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
283 yield {'namespace': encoding.fromlocal(namespace),
283 yield {'namespace': encoding.fromlocal(namespace),
284 'key': encoding.fromlocal(key),
284 'key': encoding.fromlocal(key),
285 'old': encoding.fromlocal(old),
285 'old': encoding.fromlocal(old),
286 'new': encoding.fromlocal(new)}, f
286 'new': encoding.fromlocal(new)}, f
287 d = f.value
287 d = f.value
288 d, output = d.split('\n', 1)
288 d, output = d.split('\n', 1)
289 try:
289 try:
290 d = bool(int(d))
290 d = bool(int(d))
291 except ValueError:
291 except ValueError:
292 raise error.ResponseError(
292 raise error.ResponseError(
293 _('push failed (unexpected response):'), d)
293 _('push failed (unexpected response):'), d)
294 for l in output.splitlines(True):
294 for l in output.splitlines(True):
295 self.ui.status(_('remote: '), l)
295 self.ui.status(_('remote: '), l)
296 yield d
296 yield d
297
297
298 @batchable
298 @batchable
299 def listkeys(self, namespace):
299 def listkeys(self, namespace):
300 if not self.capable('pushkey'):
300 if not self.capable('pushkey'):
301 yield {}, None
301 yield {}, None
302 f = future()
302 f = future()
303 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
303 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
304 yield {'namespace': encoding.fromlocal(namespace)}, f
304 yield {'namespace': encoding.fromlocal(namespace)}, f
305 d = f.value
305 d = f.value
306 r = {}
306 r = {}
307 for l in d.splitlines():
307 for l in d.splitlines():
308 k, v = l.split('\t')
308 k, v = l.split('\t')
309 r[encoding.tolocal(k)] = encoding.tolocal(v)
309 r[encoding.tolocal(k)] = encoding.tolocal(v)
310 yield r
310 yield r
311
311
312 def stream_out(self):
312 def stream_out(self):
313 return self._callstream('stream_out')
313 return self._callstream('stream_out')
314
314
315 def changegroup(self, nodes, kind):
315 def changegroup(self, nodes, kind):
316 n = encodelist(nodes)
316 n = encodelist(nodes)
317 f = self._callcompressable("changegroup", roots=n)
317 f = self._callcompressable("changegroup", roots=n)
318 return changegroupmod.unbundle10(f, 'UN')
318 return changegroupmod.unbundle10(f, 'UN')
319
319
320 def changegroupsubset(self, bases, heads, kind):
320 def changegroupsubset(self, bases, heads, kind):
321 self.requirecap('changegroupsubset', _('look up remote changes'))
321 self.requirecap('changegroupsubset', _('look up remote changes'))
322 bases = encodelist(bases)
322 bases = encodelist(bases)
323 heads = encodelist(heads)
323 heads = encodelist(heads)
324 f = self._callcompressable("changegroupsubset",
324 f = self._callcompressable("changegroupsubset",
325 bases=bases, heads=heads)
325 bases=bases, heads=heads)
326 return changegroupmod.unbundle10(f, 'UN')
326 return changegroupmod.unbundle10(f, 'UN')
327
327
328 def getbundle(self, source, heads=None, common=None, bundlecaps=None,
328 def getbundle(self, source, heads=None, common=None, bundlecaps=None,
329 **kwargs):
329 **kwargs):
330 self.requirecap('getbundle', _('look up remote changes'))
330 self.requirecap('getbundle', _('look up remote changes'))
331 opts = {}
331 opts = {}
332 if heads is not None:
332 if heads is not None:
333 opts['heads'] = encodelist(heads)
333 opts['heads'] = encodelist(heads)
334 if common is not None:
334 if common is not None:
335 opts['common'] = encodelist(common)
335 opts['common'] = encodelist(common)
336 if bundlecaps is not None:
336 if bundlecaps is not None:
337 opts['bundlecaps'] = ','.join(bundlecaps)
337 opts['bundlecaps'] = ','.join(bundlecaps)
338 opts.update(kwargs)
338 opts.update(kwargs)
339 f = self._callcompressable("getbundle", **opts)
339 f = self._callcompressable("getbundle", **opts)
340 if bundlecaps is not None and 'HG2X' in bundlecaps:
340 if bundlecaps is not None and 'HG2X' in bundlecaps:
341 return bundle2.unbundle20(self.ui, f)
341 return bundle2.unbundle20(self.ui, f)
342 else:
342 else:
343 return changegroupmod.unbundle10(f, 'UN')
343 return changegroupmod.unbundle10(f, 'UN')
344
344
345 def unbundle(self, cg, heads, source):
345 def unbundle(self, cg, heads, source):
346 '''Send cg (a readable file-like object representing the
346 '''Send cg (a readable file-like object representing the
347 changegroup to push, typically a chunkbuffer object) to the
347 changegroup to push, typically a chunkbuffer object) to the
348 remote server as a bundle.
348 remote server as a bundle.
349
349
350 When pushing a bundle10 stream, return an integer indicating the
350 When pushing a bundle10 stream, return an integer indicating the
351 result of the push (see localrepository.addchangegroup()).
351 result of the push (see localrepository.addchangegroup()).
352
352
353 When pushing a bundle20 stream, return a bundle20 stream.'''
353 When pushing a bundle20 stream, return a bundle20 stream.'''
354
354
355 if heads != ['force'] and self.capable('unbundlehash'):
355 if heads != ['force'] and self.capable('unbundlehash'):
356 heads = encodelist(['hashed',
356 heads = encodelist(['hashed',
357 util.sha1(''.join(sorted(heads))).digest()])
357 util.sha1(''.join(sorted(heads))).digest()])
358 else:
358 else:
359 heads = encodelist(heads)
359 heads = encodelist(heads)
360
360
361 if util.safehasattr(cg, 'deltaheader'):
361 if util.safehasattr(cg, 'deltaheader'):
362 # this a bundle10, do the old style call sequence
362 # this a bundle10, do the old style call sequence
363 ret, output = self._callpush("unbundle", cg, heads=heads)
363 ret, output = self._callpush("unbundle", cg, heads=heads)
364 if ret == "":
364 if ret == "":
365 raise error.ResponseError(
365 raise error.ResponseError(
366 _('push failed:'), output)
366 _('push failed:'), output)
367 try:
367 try:
368 ret = int(ret)
368 ret = int(ret)
369 except ValueError:
369 except ValueError:
370 raise error.ResponseError(
370 raise error.ResponseError(
371 _('push failed (unexpected response):'), ret)
371 _('push failed (unexpected response):'), ret)
372
372
373 for l in output.splitlines(True):
373 for l in output.splitlines(True):
374 self.ui.status(_('remote: '), l)
374 self.ui.status(_('remote: '), l)
375 else:
375 else:
376 # bundle2 push. Send a stream, fetch a stream.
376 # bundle2 push. Send a stream, fetch a stream.
377 stream = self._calltwowaystream('unbundle', cg, heads=heads)
377 stream = self._calltwowaystream('unbundle', cg, heads=heads)
378 ret = bundle2.unbundle20(self.ui, stream)
378 ret = bundle2.unbundle20(self.ui, stream)
379 return ret
379 return ret
380
380
381 def debugwireargs(self, one, two, three=None, four=None, five=None):
381 def debugwireargs(self, one, two, three=None, four=None, five=None):
382 # don't pass optional arguments left at their default value
382 # don't pass optional arguments left at their default value
383 opts = {}
383 opts = {}
384 if three is not None:
384 if three is not None:
385 opts['three'] = three
385 opts['three'] = three
386 if four is not None:
386 if four is not None:
387 opts['four'] = four
387 opts['four'] = four
388 return self._call('debugwireargs', one=one, two=two, **opts)
388 return self._call('debugwireargs', one=one, two=two, **opts)
389
389
390 def _call(self, cmd, **args):
390 def _call(self, cmd, **args):
391 """execute <cmd> on the server
391 """execute <cmd> on the server
392
392
393 The command is expected to return a simple string.
393 The command is expected to return a simple string.
394
394
395 returns the server reply as a string."""
395 returns the server reply as a string."""
396 raise NotImplementedError()
396 raise NotImplementedError()
397
397
398 def _callstream(self, cmd, **args):
398 def _callstream(self, cmd, **args):
399 """execute <cmd> on the server
399 """execute <cmd> on the server
400
400
401 The command is expected to return a stream.
401 The command is expected to return a stream.
402
402
403 returns the server reply as a file like object."""
403 returns the server reply as a file like object."""
404 raise NotImplementedError()
404 raise NotImplementedError()
405
405
406 def _callcompressable(self, cmd, **args):
406 def _callcompressable(self, cmd, **args):
407 """execute <cmd> on the server
407 """execute <cmd> on the server
408
408
409 The command is expected to return a stream.
409 The command is expected to return a stream.
410
410
411 The stream may have been compressed in some implementations. This
411 The stream may have been compressed in some implementations. This
412 function takes care of the decompression. This is the only difference
412 function takes care of the decompression. This is the only difference
413 with _callstream.
413 with _callstream.
414
414
415 returns the server reply as a file like object.
415 returns the server reply as a file like object.
416 """
416 """
417 raise NotImplementedError()
417 raise NotImplementedError()
418
418
419 def _callpush(self, cmd, fp, **args):
419 def _callpush(self, cmd, fp, **args):
420 """execute a <cmd> on server
420 """execute a <cmd> on server
421
421
422 The command is expected to be related to a push. Push has a special
422 The command is expected to be related to a push. Push has a special
423 return method.
423 return method.
424
424
425 returns the server reply as a (ret, output) tuple. ret is either
425 returns the server reply as a (ret, output) tuple. ret is either
426 empty (error) or a stringified int.
426 empty (error) or a stringified int.
427 """
427 """
428 raise NotImplementedError()
428 raise NotImplementedError()
429
429
430 def _calltwowaystream(self, cmd, fp, **args):
430 def _calltwowaystream(self, cmd, fp, **args):
431 """execute <cmd> on server
431 """execute <cmd> on server
432
432
433 The command will send a stream to the server and get a stream in reply.
433 The command will send a stream to the server and get a stream in reply.
434 """
434 """
435 raise NotImplementedError()
435 raise NotImplementedError()
436
436
437 def _abort(self, exception):
437 def _abort(self, exception):
438 """clearly abort the wire protocol connection and raise the exception
438 """clearly abort the wire protocol connection and raise the exception
439 """
439 """
440 raise NotImplementedError()
440 raise NotImplementedError()
441
441
442 # server side
442 # server side
443
443
444 # wire protocol command can either return a string or one of these classes.
444 # wire protocol command can either return a string or one of these classes.
445 class streamres(object):
445 class streamres(object):
446 """wireproto reply: binary stream
446 """wireproto reply: binary stream
447
447
448 The call was successful and the result is a stream.
448 The call was successful and the result is a stream.
449 Iterate on the `self.gen` attribute to retrieve chunks.
449 Iterate on the `self.gen` attribute to retrieve chunks.
450 """
450 """
451 def __init__(self, gen):
451 def __init__(self, gen):
452 self.gen = gen
452 self.gen = gen
453
453
454 class pushres(object):
454 class pushres(object):
455 """wireproto reply: success with simple integer return
455 """wireproto reply: success with simple integer return
456
456
457 The call was successful and returned an integer contained in `self.res`.
457 The call was successful and returned an integer contained in `self.res`.
458 """
458 """
459 def __init__(self, res):
459 def __init__(self, res):
460 self.res = res
460 self.res = res
461
461
462 class pusherr(object):
462 class pusherr(object):
463 """wireproto reply: failure
463 """wireproto reply: failure
464
464
465 The call failed. The `self.res` attribute contains the error message.
465 The call failed. The `self.res` attribute contains the error message.
466 """
466 """
467 def __init__(self, res):
467 def __init__(self, res):
468 self.res = res
468 self.res = res
469
469
470 class ooberror(object):
470 class ooberror(object):
471 """wireproto reply: failure of a batch of operation
471 """wireproto reply: failure of a batch of operation
472
472
473 Something failed during a batch call. The error message is stored in
473 Something failed during a batch call. The error message is stored in
474 `self.message`.
474 `self.message`.
475 """
475 """
476 def __init__(self, message):
476 def __init__(self, message):
477 self.message = message
477 self.message = message
478
478
479 def dispatch(repo, proto, command):
479 def dispatch(repo, proto, command):
480 repo = repo.filtered("served")
480 repo = repo.filtered("served")
481 func, spec = commands[command]
481 func, spec = commands[command]
482 args = proto.getargs(spec)
482 args = proto.getargs(spec)
483 return func(repo, proto, *args)
483 return func(repo, proto, *args)
484
484
485 def options(cmd, keys, others):
485 def options(cmd, keys, others):
486 opts = {}
486 opts = {}
487 for k in keys:
487 for k in keys:
488 if k in others:
488 if k in others:
489 opts[k] = others[k]
489 opts[k] = others[k]
490 del others[k]
490 del others[k]
491 if others:
491 if others:
492 sys.stderr.write("abort: %s got unexpected arguments %s\n"
492 sys.stderr.write("abort: %s got unexpected arguments %s\n"
493 % (cmd, ",".join(others)))
493 % (cmd, ",".join(others)))
494 return opts
494 return opts
495
495
496 # list of commands
496 # list of commands
497 commands = {}
497 commands = {}
498
498
499 def wireprotocommand(name, args=''):
499 def wireprotocommand(name, args=''):
500 """decorator for wire protocol command"""
500 """decorator for wire protocol command"""
501 def register(func):
501 def register(func):
502 commands[name] = (func, args)
502 commands[name] = (func, args)
503 return func
503 return func
504 return register
504 return register
505
505
506 @wireprotocommand('batch', 'cmds *')
506 @wireprotocommand('batch', 'cmds *')
507 def batch(repo, proto, cmds, others):
507 def batch(repo, proto, cmds, others):
508 repo = repo.filtered("served")
508 repo = repo.filtered("served")
509 res = []
509 res = []
510 for pair in cmds.split(';'):
510 for pair in cmds.split(';'):
511 op, args = pair.split(' ', 1)
511 op, args = pair.split(' ', 1)
512 vals = {}
512 vals = {}
513 for a in args.split(','):
513 for a in args.split(','):
514 if a:
514 if a:
515 n, v = a.split('=')
515 n, v = a.split('=')
516 vals[n] = unescapearg(v)
516 vals[n] = unescapearg(v)
517 func, spec = commands[op]
517 func, spec = commands[op]
518 if spec:
518 if spec:
519 keys = spec.split()
519 keys = spec.split()
520 data = {}
520 data = {}
521 for k in keys:
521 for k in keys:
522 if k == '*':
522 if k == '*':
523 star = {}
523 star = {}
524 for key in vals.keys():
524 for key in vals.keys():
525 if key not in keys:
525 if key not in keys:
526 star[key] = vals[key]
526 star[key] = vals[key]
527 data['*'] = star
527 data['*'] = star
528 else:
528 else:
529 data[k] = vals[k]
529 data[k] = vals[k]
530 result = func(repo, proto, *[data[k] for k in keys])
530 result = func(repo, proto, *[data[k] for k in keys])
531 else:
531 else:
532 result = func(repo, proto)
532 result = func(repo, proto)
533 if isinstance(result, ooberror):
533 if isinstance(result, ooberror):
534 return result
534 return result
535 res.append(escapearg(result))
535 res.append(escapearg(result))
536 return ';'.join(res)
536 return ';'.join(res)
537
537
538 @wireprotocommand('between', 'pairs')
538 @wireprotocommand('between', 'pairs')
539 def between(repo, proto, pairs):
539 def between(repo, proto, pairs):
540 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
540 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
541 r = []
541 r = []
542 for b in repo.between(pairs):
542 for b in repo.between(pairs):
543 r.append(encodelist(b) + "\n")
543 r.append(encodelist(b) + "\n")
544 return "".join(r)
544 return "".join(r)
545
545
546 @wireprotocommand('branchmap')
546 @wireprotocommand('branchmap')
547 def branchmap(repo, proto):
547 def branchmap(repo, proto):
548 branchmap = repo.branchmap()
548 branchmap = repo.branchmap()
549 heads = []
549 heads = []
550 for branch, nodes in branchmap.iteritems():
550 for branch, nodes in branchmap.iteritems():
551 branchname = urllib.quote(encoding.fromlocal(branch))
551 branchname = urllib.quote(encoding.fromlocal(branch))
552 branchnodes = encodelist(nodes)
552 branchnodes = encodelist(nodes)
553 heads.append('%s %s' % (branchname, branchnodes))
553 heads.append('%s %s' % (branchname, branchnodes))
554 return '\n'.join(heads)
554 return '\n'.join(heads)
555
555
556 @wireprotocommand('branches', 'nodes')
556 @wireprotocommand('branches', 'nodes')
557 def branches(repo, proto, nodes):
557 def branches(repo, proto, nodes):
558 nodes = decodelist(nodes)
558 nodes = decodelist(nodes)
559 r = []
559 r = []
560 for b in repo.branches(nodes):
560 for b in repo.branches(nodes):
561 r.append(encodelist(b) + "\n")
561 r.append(encodelist(b) + "\n")
562 return "".join(r)
562 return "".join(r)
563
563
564
564
565 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
565 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
566 'known', 'getbundle', 'unbundlehash', 'batch']
566 'known', 'getbundle', 'unbundlehash', 'batch']
567
567
568 def _capabilities(repo, proto):
568 def _capabilities(repo, proto):
569 """return a list of capabilities for a repo
569 """return a list of capabilities for a repo
570
570
571 This function exists to allow extensions to easily wrap capabilities
571 This function exists to allow extensions to easily wrap capabilities
572 computation
572 computation
573
573
574 - returns a lists: easy to alter
574 - returns a lists: easy to alter
575 - change done here will be propagated to both `capabilities` and `hello`
575 - change done here will be propagated to both `capabilities` and `hello`
576 command without any other action needed.
576 command without any other action needed.
577 """
577 """
578 # copy to prevent modification of the global list
578 # copy to prevent modification of the global list
579 caps = list(wireprotocaps)
579 caps = list(wireprotocaps)
580 if _allowstream(repo.ui):
580 if _allowstream(repo.ui):
581 if repo.ui.configbool('server', 'preferuncompressed', False):
581 if repo.ui.configbool('server', 'preferuncompressed', False):
582 caps.append('stream-preferred')
582 caps.append('stream-preferred')
583 requiredformats = repo.requirements & repo.supportedformats
583 requiredformats = repo.requirements & repo.supportedformats
584 # if our local revlogs are just revlogv1, add 'stream' cap
584 # if our local revlogs are just revlogv1, add 'stream' cap
585 if not requiredformats - set(('revlogv1',)):
585 if not requiredformats - set(('revlogv1',)):
586 caps.append('stream')
586 caps.append('stream')
587 # otherwise, add 'streamreqs' detailing our local revlog format
587 # otherwise, add 'streamreqs' detailing our local revlog format
588 else:
588 else:
589 caps.append('streamreqs=%s' % ','.join(requiredformats))
589 caps.append('streamreqs=%s' % ','.join(requiredformats))
590 if repo.ui.configbool('experimental', 'bundle2-exp', False):
590 if repo.ui.configbool('experimental', 'bundle2-exp', False):
591 capsblob = bundle2.encodecaps(repo.bundle2caps)
591 capsblob = bundle2.encodecaps(repo.bundle2caps)
592 caps.append('bundle2-exp=' + urllib.quote(capsblob))
592 caps.append('bundle2-exp=' + urllib.quote(capsblob))
593 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
593 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
594 caps.append('httpheader=1024')
594 caps.append('httpheader=1024')
595 return caps
595 return caps
596
596
597 # If you are writing an extension and consider wrapping this function. Wrap
597 # If you are writing an extension and consider wrapping this function. Wrap
598 # `_capabilities` instead.
598 # `_capabilities` instead.
599 @wireprotocommand('capabilities')
599 @wireprotocommand('capabilities')
600 def capabilities(repo, proto):
600 def capabilities(repo, proto):
601 return ' '.join(_capabilities(repo, proto))
601 return ' '.join(_capabilities(repo, proto))
602
602
603 @wireprotocommand('changegroup', 'roots')
603 @wireprotocommand('changegroup', 'roots')
604 def changegroup(repo, proto, roots):
604 def changegroup(repo, proto, roots):
605 nodes = decodelist(roots)
605 nodes = decodelist(roots)
606 cg = changegroupmod.changegroup(repo, nodes, 'serve')
606 cg = changegroupmod.changegroup(repo, nodes, 'serve')
607 return streamres(proto.groupchunks(cg))
607 return streamres(proto.groupchunks(cg))
608
608
609 @wireprotocommand('changegroupsubset', 'bases heads')
609 @wireprotocommand('changegroupsubset', 'bases heads')
610 def changegroupsubset(repo, proto, bases, heads):
610 def changegroupsubset(repo, proto, bases, heads):
611 bases = decodelist(bases)
611 bases = decodelist(bases)
612 heads = decodelist(heads)
612 heads = decodelist(heads)
613 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
613 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
614 return streamres(proto.groupchunks(cg))
614 return streamres(proto.groupchunks(cg))
615
615
616 @wireprotocommand('debugwireargs', 'one two *')
616 @wireprotocommand('debugwireargs', 'one two *')
617 def debugwireargs(repo, proto, one, two, others):
617 def debugwireargs(repo, proto, one, two, others):
618 # only accept optional args from the known set
618 # only accept optional args from the known set
619 opts = options('debugwireargs', ['three', 'four'], others)
619 opts = options('debugwireargs', ['three', 'four'], others)
620 return repo.debugwireargs(one, two, **opts)
620 return repo.debugwireargs(one, two, **opts)
621
621
622 @wireprotocommand('getbundle', '*')
622 @wireprotocommand('getbundle', '*')
623 def getbundle(repo, proto, others):
623 def getbundle(repo, proto, others):
624 opts = options('getbundle', ['heads', 'common', 'bundlecaps'], others)
624 opts = options('getbundle', ['heads', 'common', 'bundlecaps'], others)
625 for k, v in opts.iteritems():
625 for k, v in opts.iteritems():
626 if k in ('heads', 'common'):
626 if k in ('heads', 'common'):
627 opts[k] = decodelist(v)
627 opts[k] = decodelist(v)
628 elif k == 'bundlecaps':
628 elif k == 'bundlecaps':
629 opts[k] = set(v.split(','))
629 opts[k] = set(v.split(','))
630 cg = exchange.getbundle(repo, 'serve', **opts)
630 cg = exchange.getbundle(repo, 'serve', **opts)
631 return streamres(proto.groupchunks(cg))
631 return streamres(proto.groupchunks(cg))
632
632
633 @wireprotocommand('heads')
633 @wireprotocommand('heads')
634 def heads(repo, proto):
634 def heads(repo, proto):
635 h = repo.heads()
635 h = repo.heads()
636 return encodelist(h) + "\n"
636 return encodelist(h) + "\n"
637
637
638 @wireprotocommand('hello')
638 @wireprotocommand('hello')
639 def hello(repo, proto):
639 def hello(repo, proto):
640 '''the hello command returns a set of lines describing various
640 '''the hello command returns a set of lines describing various
641 interesting things about the server, in an RFC822-like format.
641 interesting things about the server, in an RFC822-like format.
642 Currently the only one defined is "capabilities", which
642 Currently the only one defined is "capabilities", which
643 consists of a line in the form:
643 consists of a line in the form:
644
644
645 capabilities: space separated list of tokens
645 capabilities: space separated list of tokens
646 '''
646 '''
647 return "capabilities: %s\n" % (capabilities(repo, proto))
647 return "capabilities: %s\n" % (capabilities(repo, proto))
648
648
649 @wireprotocommand('listkeys', 'namespace')
649 @wireprotocommand('listkeys', 'namespace')
650 def listkeys(repo, proto, namespace):
650 def listkeys(repo, proto, namespace):
651 d = repo.listkeys(encoding.tolocal(namespace)).items()
651 d = repo.listkeys(encoding.tolocal(namespace)).items()
652 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
652 t = '\n'.join(['%s\t%s' % (encoding.fromlocal(k), encoding.fromlocal(v))
653 for k, v in d])
653 for k, v in d])
654 return t
654 return t
655
655
656 @wireprotocommand('lookup', 'key')
656 @wireprotocommand('lookup', 'key')
657 def lookup(repo, proto, key):
657 def lookup(repo, proto, key):
658 try:
658 try:
659 k = encoding.tolocal(key)
659 k = encoding.tolocal(key)
660 c = repo[k]
660 c = repo[k]
661 r = c.hex()
661 r = c.hex()
662 success = 1
662 success = 1
663 except Exception, inst:
663 except Exception, inst:
664 r = str(inst)
664 r = str(inst)
665 success = 0
665 success = 0
666 return "%s %s\n" % (success, r)
666 return "%s %s\n" % (success, r)
667
667
668 @wireprotocommand('known', 'nodes *')
668 @wireprotocommand('known', 'nodes *')
669 def known(repo, proto, nodes, others):
669 def known(repo, proto, nodes, others):
670 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
670 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
671
671
672 @wireprotocommand('pushkey', 'namespace key old new')
672 @wireprotocommand('pushkey', 'namespace key old new')
673 def pushkey(repo, proto, namespace, key, old, new):
673 def pushkey(repo, proto, namespace, key, old, new):
674 # compatibility with pre-1.8 clients which were accidentally
674 # compatibility with pre-1.8 clients which were accidentally
675 # sending raw binary nodes rather than utf-8-encoded hex
675 # sending raw binary nodes rather than utf-8-encoded hex
676 if len(new) == 20 and new.encode('string-escape') != new:
676 if len(new) == 20 and new.encode('string-escape') != new:
677 # looks like it could be a binary node
677 # looks like it could be a binary node
678 try:
678 try:
679 new.decode('utf-8')
679 new.decode('utf-8')
680 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
680 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
681 except UnicodeDecodeError:
681 except UnicodeDecodeError:
682 pass # binary, leave unmodified
682 pass # binary, leave unmodified
683 else:
683 else:
684 new = encoding.tolocal(new) # normal path
684 new = encoding.tolocal(new) # normal path
685
685
686 if util.safehasattr(proto, 'restore'):
686 if util.safehasattr(proto, 'restore'):
687
687
688 proto.redirect()
688 proto.redirect()
689
689
690 try:
690 try:
691 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
691 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
692 encoding.tolocal(old), new) or False
692 encoding.tolocal(old), new) or False
693 except util.Abort:
693 except util.Abort:
694 r = False
694 r = False
695
695
696 output = proto.restore()
696 output = proto.restore()
697
697
698 return '%s\n%s' % (int(r), output)
698 return '%s\n%s' % (int(r), output)
699
699
700 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
700 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
701 encoding.tolocal(old), new)
701 encoding.tolocal(old), new)
702 return '%s\n' % int(r)
702 return '%s\n' % int(r)
703
703
704 def _allowstream(ui):
704 def _allowstream(ui):
705 return ui.configbool('server', 'uncompressed', True, untrusted=True)
705 return ui.configbool('server', 'uncompressed', True, untrusted=True)
706
706
707 def _walkstreamfiles(repo):
707 def _walkstreamfiles(repo):
708 # this is it's own function so extensions can override it
708 # this is it's own function so extensions can override it
709 return repo.store.walk()
709 return repo.store.walk()
710
710
711 @wireprotocommand('stream_out')
711 @wireprotocommand('stream_out')
712 def stream(repo, proto):
712 def stream(repo, proto):
713 '''If the server supports streaming clone, it advertises the "stream"
713 '''If the server supports streaming clone, it advertises the "stream"
714 capability with a value representing the version and flags of the repo
714 capability with a value representing the version and flags of the repo
715 it is serving. Client checks to see if it understands the format.
715 it is serving. Client checks to see if it understands the format.
716
716
717 The format is simple: the server writes out a line with the amount
717 The format is simple: the server writes out a line with the amount
718 of files, then the total amount of bytes to be transferred (separated
718 of files, then the total amount of bytes to be transferred (separated
719 by a space). Then, for each file, the server first writes the filename
719 by a space). Then, for each file, the server first writes the filename
720 and file size (separated by the null character), then the file contents.
720 and file size (separated by the null character), then the file contents.
721 '''
721 '''
722
722
723 if not _allowstream(repo.ui):
723 if not _allowstream(repo.ui):
724 return '1\n'
724 return '1\n'
725
725
726 entries = []
726 entries = []
727 total_bytes = 0
727 total_bytes = 0
728 try:
728 try:
729 # get consistent snapshot of repo, lock during scan
729 # get consistent snapshot of repo, lock during scan
730 lock = repo.lock()
730 lock = repo.lock()
731 try:
731 try:
732 repo.ui.debug('scanning\n')
732 repo.ui.debug('scanning\n')
733 for name, ename, size in _walkstreamfiles(repo):
733 for name, ename, size in _walkstreamfiles(repo):
734 if size:
734 if size:
735 entries.append((name, size))
735 entries.append((name, size))
736 total_bytes += size
736 total_bytes += size
737 finally:
737 finally:
738 lock.release()
738 lock.release()
739 except error.LockError:
739 except error.LockError:
740 return '2\n' # error: 2
740 return '2\n' # error: 2
741
741
742 def streamer(repo, entries, total):
742 def streamer(repo, entries, total):
743 '''stream out all metadata files in repository.'''
743 '''stream out all metadata files in repository.'''
744 yield '0\n' # success
744 yield '0\n' # success
745 repo.ui.debug('%d files, %d bytes to transfer\n' %
745 repo.ui.debug('%d files, %d bytes to transfer\n' %
746 (len(entries), total_bytes))
746 (len(entries), total_bytes))
747 yield '%d %d\n' % (len(entries), total_bytes)
747 yield '%d %d\n' % (len(entries), total_bytes)
748
748
749 sopener = repo.sopener
749 sopener = repo.sopener
750 oldaudit = sopener.mustaudit
750 oldaudit = sopener.mustaudit
751 debugflag = repo.ui.debugflag
751 debugflag = repo.ui.debugflag
752 sopener.mustaudit = False
752 sopener.mustaudit = False
753
753
754 try:
754 try:
755 for name, size in entries:
755 for name, size in entries:
756 if debugflag:
756 if debugflag:
757 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
757 repo.ui.debug('sending %s (%d bytes)\n' % (name, size))
758 # partially encode name over the wire for backwards compat
758 # partially encode name over the wire for backwards compat
759 yield '%s\0%d\n' % (store.encodedir(name), size)
759 yield '%s\0%d\n' % (store.encodedir(name), size)
760 if size <= 65536:
760 if size <= 65536:
761 fp = sopener(name)
761 fp = sopener(name)
762 try:
762 try:
763 data = fp.read(size)
763 data = fp.read(size)
764 finally:
764 finally:
765 fp.close()
765 fp.close()
766 yield data
766 yield data
767 else:
767 else:
768 for chunk in util.filechunkiter(sopener(name), limit=size):
768 for chunk in util.filechunkiter(sopener(name), limit=size):
769 yield chunk
769 yield chunk
770 # replace with "finally:" when support for python 2.4 has been dropped
770 # replace with "finally:" when support for python 2.4 has been dropped
771 except Exception:
771 except Exception:
772 sopener.mustaudit = oldaudit
772 sopener.mustaudit = oldaudit
773 raise
773 raise
774 sopener.mustaudit = oldaudit
774 sopener.mustaudit = oldaudit
775
775
776 return streamres(streamer(repo, entries, total_bytes))
776 return streamres(streamer(repo, entries, total_bytes))
777
777
778 @wireprotocommand('unbundle', 'heads')
778 @wireprotocommand('unbundle', 'heads')
779 def unbundle(repo, proto, heads):
779 def unbundle(repo, proto, heads):
780 their_heads = decodelist(heads)
780 their_heads = decodelist(heads)
781
781
782 try:
782 try:
783 proto.redirect()
783 proto.redirect()
784
784
785 exchange.check_heads(repo, their_heads, 'preparing changes')
785 exchange.check_heads(repo, their_heads, 'preparing changes')
786
786
787 # write bundle data to temporary file because it can be big
787 # write bundle data to temporary file because it can be big
788 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
788 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
789 fp = os.fdopen(fd, 'wb+')
789 fp = os.fdopen(fd, 'wb+')
790 r = 0
790 r = 0
791 try:
791 try:
792 proto.getfile(fp)
792 proto.getfile(fp)
793 fp.seek(0)
793 fp.seek(0)
794 gen = exchange.readbundle(repo.ui, fp, None)
794 gen = exchange.readbundle(repo.ui, fp, None)
795 r = exchange.unbundle(repo, gen, their_heads, 'serve',
795 r = exchange.unbundle(repo, gen, their_heads, 'serve',
796 proto._client())
796 proto._client())
797 if util.safehasattr(r, 'addpart'):
797 if util.safehasattr(r, 'addpart'):
798 # The return looks streameable, we are in the bundle2 case and
798 # The return looks streameable, we are in the bundle2 case and
799 # should return a stream.
799 # should return a stream.
800 return streamres(r.getchunks())
800 return streamres(r.getchunks())
801 return pushres(r)
801 return pushres(r)
802
802
803 finally:
803 finally:
804 fp.close()
804 fp.close()
805 os.unlink(tempname)
805 os.unlink(tempname)
806 except error.BundleValueError, exc:
806 except error.BundleValueError, exc:
807 bundler = bundle2.bundle20(repo.ui)
807 bundler = bundle2.bundle20(repo.ui)
808 bundler.newpart('B2X:ERROR:UNKNOWNPART', [('parttype', str(exc))])
808 errpart = bundler.newpart('B2X:ERROR:UNSUPPORTEDCONTENT')
809 errpart.addparam('parttype', str(exc))
809 return streamres(bundler.getchunks())
810 return streamres(bundler.getchunks())
810 except util.Abort, inst:
811 except util.Abort, inst:
811 # The old code we moved used sys.stderr directly.
812 # The old code we moved used sys.stderr directly.
812 # We did not change it to minimise code change.
813 # We did not change it to minimise code change.
813 # This need to be moved to something proper.
814 # This need to be moved to something proper.
814 # Feel free to do it.
815 # Feel free to do it.
815 if getattr(inst, 'duringunbundle2', False):
816 if getattr(inst, 'duringunbundle2', False):
816 bundler = bundle2.bundle20(repo.ui)
817 bundler = bundle2.bundle20(repo.ui)
817 manargs = [('message', str(inst))]
818 manargs = [('message', str(inst))]
818 advargs = []
819 advargs = []
819 if inst.hint is not None:
820 if inst.hint is not None:
820 advargs.append(('hint', inst.hint))
821 advargs.append(('hint', inst.hint))
821 bundler.addpart(bundle2.bundlepart('B2X:ERROR:ABORT',
822 bundler.addpart(bundle2.bundlepart('B2X:ERROR:ABORT',
822 manargs, advargs))
823 manargs, advargs))
823 return streamres(bundler.getchunks())
824 return streamres(bundler.getchunks())
824 else:
825 else:
825 sys.stderr.write("abort: %s\n" % inst)
826 sys.stderr.write("abort: %s\n" % inst)
826 return pushres(0)
827 return pushres(0)
827 except error.PushRaced, exc:
828 except error.PushRaced, exc:
828 if getattr(exc, 'duringunbundle2', False):
829 if getattr(exc, 'duringunbundle2', False):
829 bundler = bundle2.bundle20(repo.ui)
830 bundler = bundle2.bundle20(repo.ui)
830 bundler.newpart('B2X:ERROR:PUSHRACED', [('message', str(exc))])
831 bundler.newpart('B2X:ERROR:PUSHRACED', [('message', str(exc))])
831 return streamres(bundler.getchunks())
832 return streamres(bundler.getchunks())
832 else:
833 else:
833 return pusherr(str(exc))
834 return pusherr(str(exc))
General Comments 0
You need to be logged in to leave comments. Login now