Show More
@@ -1187,6 +1187,32 b' class interruptoperation(object):' | |||
|
1187 | 1187 | def gettransaction(self): |
|
1188 | 1188 | raise TransactionUnavailable('no repo access from stream interruption') |
|
1189 | 1189 | |
|
1190 | def decodepayloadchunks(ui, fh): | |
|
1191 | """Reads bundle2 part payload data into chunks. | |
|
1192 | ||
|
1193 | Part payload data consists of framed chunks. This function takes | |
|
1194 | a file handle and emits those chunks. | |
|
1195 | """ | |
|
1196 | headersize = struct.calcsize(_fpayloadsize) | |
|
1197 | readexactly = changegroup.readexactly | |
|
1198 | ||
|
1199 | chunksize = _unpack(_fpayloadsize, readexactly(fh, headersize))[0] | |
|
1200 | indebug(ui, 'payload chunk size: %i' % chunksize) | |
|
1201 | ||
|
1202 | while chunksize: | |
|
1203 | if chunksize >= 0: | |
|
1204 | yield readexactly(fh, chunksize) | |
|
1205 | elif chunksize == flaginterrupt: | |
|
1206 | # Interrupt "signal" detected. The regular stream is interrupted | |
|
1207 | # and a bundle2 part follows. Consume it. | |
|
1208 | interrupthandler(ui, fh)() | |
|
1209 | else: | |
|
1210 | raise error.BundleValueError( | |
|
1211 | 'negative payload chunk size: %s' % chunksize) | |
|
1212 | ||
|
1213 | chunksize = _unpack(_fpayloadsize, readexactly(fh, headersize))[0] | |
|
1214 | indebug(ui, 'payload chunk size: %i' % chunksize) | |
|
1215 | ||
|
1190 | 1216 | class unbundlepart(unpackermixin): |
|
1191 | 1217 | """a bundle part read from a bundle""" |
|
1192 | 1218 | |
@@ -1270,6 +1296,10 b' class unbundlepart(unpackermixin):' | |||
|
1270 | 1296 | # we read the data, tell it |
|
1271 | 1297 | self._initialized = True |
|
1272 | 1298 | |
|
1299 | def _payloadchunks(self): | |
|
1300 | """Generator of decoded chunks in the payload.""" | |
|
1301 | return decodepayloadchunks(self.ui, self._fp) | |
|
1302 | ||
|
1273 | 1303 | def read(self, size=None): |
|
1274 | 1304 | """read payload data""" |
|
1275 | 1305 | if not self._initialized: |
@@ -1320,25 +1350,14 b' class seekableunbundlepart(unbundlepart)' | |||
|
1320 | 1350 | self._seekfp(self._chunkindex[chunknum][1]) |
|
1321 | 1351 | |
|
1322 | 1352 | pos = self._chunkindex[chunknum][0] |
|
1323 | payloadsize = self._unpack(_fpayloadsize)[0] | |
|
1324 | indebug(self.ui, 'payload chunk size: %i' % payloadsize) | |
|
1325 | while payloadsize: | |
|
1326 | if payloadsize == flaginterrupt: | |
|
1327 | # interruption detection, the handler will now read a | |
|
1328 | # single part and process it. | |
|
1329 | interrupthandler(self.ui, self._fp)() | |
|
1330 | elif payloadsize < 0: | |
|
1331 | msg = 'negative payload chunk size: %i' % payloadsize | |
|
1332 | raise error.BundleValueError(msg) | |
|
1333 | else: | |
|
1334 | result = self._readexact(payloadsize) | |
|
1335 | chunknum += 1 | |
|
1336 | pos += payloadsize | |
|
1337 | if chunknum == len(self._chunkindex): | |
|
1338 | self._chunkindex.append((pos, self._tellfp())) | |
|
1339 | yield result | |
|
1340 | payloadsize = self._unpack(_fpayloadsize)[0] | |
|
1341 | indebug(self.ui, 'payload chunk size: %i' % payloadsize) | |
|
1353 | ||
|
1354 | for chunk in decodepayloadchunks(self.ui, self._fp): | |
|
1355 | chunknum += 1 | |
|
1356 | pos += len(chunk) | |
|
1357 | if chunknum == len(self._chunkindex): | |
|
1358 | self._chunkindex.append((pos, self._tellfp())) | |
|
1359 | ||
|
1360 | yield chunk | |
|
1342 | 1361 | |
|
1343 | 1362 | def _findchunk(self, pos): |
|
1344 | 1363 | '''for a given payload position, return a chunk number and offset''' |
General Comments 0
You need to be logged in to leave comments.
Login now