Show More
@@ -1187,6 +1187,32 class interruptoperation(object): | |||||
1187 | def gettransaction(self): |
|
1187 | def gettransaction(self): | |
1188 | raise TransactionUnavailable('no repo access from stream interruption') |
|
1188 | raise TransactionUnavailable('no repo access from stream interruption') | |
1189 |
|
1189 | |||
|
1190 | def decodepayloadchunks(ui, fh): | |||
|
1191 | """Reads bundle2 part payload data into chunks. | |||
|
1192 | ||||
|
1193 | Part payload data consists of framed chunks. This function takes | |||
|
1194 | a file handle and emits those chunks. | |||
|
1195 | """ | |||
|
1196 | headersize = struct.calcsize(_fpayloadsize) | |||
|
1197 | readexactly = changegroup.readexactly | |||
|
1198 | ||||
|
1199 | chunksize = _unpack(_fpayloadsize, readexactly(fh, headersize))[0] | |||
|
1200 | indebug(ui, 'payload chunk size: %i' % chunksize) | |||
|
1201 | ||||
|
1202 | while chunksize: | |||
|
1203 | if chunksize >= 0: | |||
|
1204 | yield readexactly(fh, chunksize) | |||
|
1205 | elif chunksize == flaginterrupt: | |||
|
1206 | # Interrupt "signal" detected. The regular stream is interrupted | |||
|
1207 | # and a bundle2 part follows. Consume it. | |||
|
1208 | interrupthandler(ui, fh)() | |||
|
1209 | else: | |||
|
1210 | raise error.BundleValueError( | |||
|
1211 | 'negative payload chunk size: %s' % chunksize) | |||
|
1212 | ||||
|
1213 | chunksize = _unpack(_fpayloadsize, readexactly(fh, headersize))[0] | |||
|
1214 | indebug(ui, 'payload chunk size: %i' % chunksize) | |||
|
1215 | ||||
1190 | class unbundlepart(unpackermixin): |
|
1216 | class unbundlepart(unpackermixin): | |
1191 | """a bundle part read from a bundle""" |
|
1217 | """a bundle part read from a bundle""" | |
1192 |
|
1218 | |||
@@ -1270,6 +1296,10 class unbundlepart(unpackermixin): | |||||
1270 | # we read the data, tell it |
|
1296 | # we read the data, tell it | |
1271 | self._initialized = True |
|
1297 | self._initialized = True | |
1272 |
|
1298 | |||
|
1299 | def _payloadchunks(self): | |||
|
1300 | """Generator of decoded chunks in the payload.""" | |||
|
1301 | return decodepayloadchunks(self.ui, self._fp) | |||
|
1302 | ||||
1273 | def read(self, size=None): |
|
1303 | def read(self, size=None): | |
1274 | """read payload data""" |
|
1304 | """read payload data""" | |
1275 | if not self._initialized: |
|
1305 | if not self._initialized: | |
@@ -1320,25 +1350,14 class seekableunbundlepart(unbundlepart) | |||||
1320 | self._seekfp(self._chunkindex[chunknum][1]) |
|
1350 | self._seekfp(self._chunkindex[chunknum][1]) | |
1321 |
|
1351 | |||
1322 | pos = self._chunkindex[chunknum][0] |
|
1352 | pos = self._chunkindex[chunknum][0] | |
1323 | payloadsize = self._unpack(_fpayloadsize)[0] |
|
1353 | ||
1324 | indebug(self.ui, 'payload chunk size: %i' % payloadsize) |
|
1354 | for chunk in decodepayloadchunks(self.ui, self._fp): | |
1325 | while payloadsize: |
|
|||
1326 | if payloadsize == flaginterrupt: |
|
|||
1327 | # interruption detection, the handler will now read a |
|
|||
1328 | # single part and process it. |
|
|||
1329 | interrupthandler(self.ui, self._fp)() |
|
|||
1330 | elif payloadsize < 0: |
|
|||
1331 | msg = 'negative payload chunk size: %i' % payloadsize |
|
|||
1332 | raise error.BundleValueError(msg) |
|
|||
1333 | else: |
|
|||
1334 | result = self._readexact(payloadsize) |
|
|||
1335 |
|
|
1355 | chunknum += 1 | |
1336 |
|
|
1356 | pos += len(chunk) | |
1337 |
|
|
1357 | if chunknum == len(self._chunkindex): | |
1338 |
|
|
1358 | self._chunkindex.append((pos, self._tellfp())) | |
1339 | yield result |
|
1359 | ||
1340 | payloadsize = self._unpack(_fpayloadsize)[0] |
|
1360 | yield chunk | |
1341 | indebug(self.ui, 'payload chunk size: %i' % payloadsize) |
|
|||
1342 |
|
1361 | |||
1343 | def _findchunk(self, pos): |
|
1362 | def _findchunk(self, pos): | |
1344 | '''for a given payload position, return a chunk number and offset''' |
|
1363 | '''for a given payload position, return a chunk number and offset''' |
General Comments 0
You need to be logged in to leave comments.
Login now