##// END OF EJS Templates
bundle2: extract logic for seeking bundle2 part into own class...
Gregory Szorc -
r35109:073eec08 default
parent child Browse files
Show More
@@ -854,7 +854,7 b' class unbundle20(unpackermixin):'
854 indebug(self.ui, 'start extraction of bundle2 parts')
854 indebug(self.ui, 'start extraction of bundle2 parts')
855 headerblock = self._readpartheader()
855 headerblock = self._readpartheader()
856 while headerblock is not None:
856 while headerblock is not None:
857 part = unbundlepart(self.ui, headerblock, self._fp)
857 part = seekableunbundlepart(self.ui, headerblock, self._fp)
858 yield part
858 yield part
859 # Seek to the end of the part to force it's consumption so the next
859 # Seek to the end of the part to force it's consumption so the next
860 # part can be read. But then seek back to the beginning so the
860 # part can be read. But then seek back to the beginning so the
@@ -1155,7 +1155,7 b' class interrupthandler(unpackermixin):'
1155 if headerblock is None:
1155 if headerblock is None:
1156 indebug(self.ui, 'no part found during interruption.')
1156 indebug(self.ui, 'no part found during interruption.')
1157 return
1157 return
1158 part = unbundlepart(self.ui, headerblock, self._fp)
1158 part = seekableunbundlepart(self.ui, headerblock, self._fp)
1159 op = interruptoperation(self.ui)
1159 op = interruptoperation(self.ui)
1160 hardabort = False
1160 hardabort = False
1161 try:
1161 try:
@@ -1207,10 +1207,8 b' class unbundlepart(unpackermixin):'
1207 self.advisoryparams = None
1207 self.advisoryparams = None
1208 self.params = None
1208 self.params = None
1209 self.mandatorykeys = ()
1209 self.mandatorykeys = ()
1210 self._payloadstream = None
1211 self._readheader()
1210 self._readheader()
1212 self._mandatory = None
1211 self._mandatory = None
1213 self._chunkindex = [] #(payload, file) position tuples for chunk starts
1214 self._pos = 0
1212 self._pos = 0
1215
1213
1216 def _fromheader(self, size):
1214 def _fromheader(self, size):
@@ -1237,46 +1235,6 b' class unbundlepart(unpackermixin):'
1237 self.params.update(self.advisoryparams)
1235 self.params.update(self.advisoryparams)
1238 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1236 self.mandatorykeys = frozenset(p[0] for p in mandatoryparams)
1239
1237
1240 def _payloadchunks(self, chunknum=0):
1241 '''seek to specified chunk and start yielding data'''
1242 if len(self._chunkindex) == 0:
1243 assert chunknum == 0, 'Must start with chunk 0'
1244 self._chunkindex.append((0, self._tellfp()))
1245 else:
1246 assert chunknum < len(self._chunkindex), \
1247 'Unknown chunk %d' % chunknum
1248 self._seekfp(self._chunkindex[chunknum][1])
1249
1250 pos = self._chunkindex[chunknum][0]
1251 payloadsize = self._unpack(_fpayloadsize)[0]
1252 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1253 while payloadsize:
1254 if payloadsize == flaginterrupt:
1255 # interruption detection, the handler will now read a
1256 # single part and process it.
1257 interrupthandler(self.ui, self._fp)()
1258 elif payloadsize < 0:
1259 msg = 'negative payload chunk size: %i' % payloadsize
1260 raise error.BundleValueError(msg)
1261 else:
1262 result = self._readexact(payloadsize)
1263 chunknum += 1
1264 pos += payloadsize
1265 if chunknum == len(self._chunkindex):
1266 self._chunkindex.append((pos, self._tellfp()))
1267 yield result
1268 payloadsize = self._unpack(_fpayloadsize)[0]
1269 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1270
1271 def _findchunk(self, pos):
1272 '''for a given payload position, return a chunk number and offset'''
1273 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1274 if ppos == pos:
1275 return chunk, 0
1276 elif ppos > pos:
1277 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1278 raise ValueError('Unknown chunk')
1279
1280 def _readheader(self):
1238 def _readheader(self):
1281 """read the header and setup the object"""
1239 """read the header and setup the object"""
1282 typesize = self._unpackheader(_fparttypesize)[0]
1240 typesize = self._unpackheader(_fparttypesize)[0]
@@ -1328,6 +1286,69 b' class unbundlepart(unpackermixin):'
1328 self.consumed = True
1286 self.consumed = True
1329 return data
1287 return data
1330
1288
1289 class seekableunbundlepart(unbundlepart):
1290 """A bundle2 part in a bundle that is seekable.
1291
1292 Regular ``unbundlepart`` instances can only be read once. This class
1293 extends ``unbundlepart`` to enable bi-directional seeking within the
1294 part.
1295
1296 Bundle2 part data consists of framed chunks. Offsets when seeking
1297 refer to the decoded data, not the offsets in the underlying bundle2
1298 stream.
1299
1300 To facilitate quickly seeking within the decoded data, instances of this
1301 class maintain a mapping between offsets in the underlying stream and
1302 the decoded payload. This mapping will consume memory in proportion
1303 to the number of chunks within the payload (which almost certainly
1304 increases in proportion with the size of the part).
1305 """
1306 def __init__(self, ui, header, fp):
1307 # (payload, file) offsets for chunk starts.
1308 self._chunkindex = []
1309
1310 super(seekableunbundlepart, self).__init__(ui, header, fp)
1311
1312 def _payloadchunks(self, chunknum=0):
1313 '''seek to specified chunk and start yielding data'''
1314 if len(self._chunkindex) == 0:
1315 assert chunknum == 0, 'Must start with chunk 0'
1316 self._chunkindex.append((0, self._tellfp()))
1317 else:
1318 assert chunknum < len(self._chunkindex), \
1319 'Unknown chunk %d' % chunknum
1320 self._seekfp(self._chunkindex[chunknum][1])
1321
1322 pos = self._chunkindex[chunknum][0]
1323 payloadsize = self._unpack(_fpayloadsize)[0]
1324 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1325 while payloadsize:
1326 if payloadsize == flaginterrupt:
1327 # interruption detection, the handler will now read a
1328 # single part and process it.
1329 interrupthandler(self.ui, self._fp)()
1330 elif payloadsize < 0:
1331 msg = 'negative payload chunk size: %i' % payloadsize
1332 raise error.BundleValueError(msg)
1333 else:
1334 result = self._readexact(payloadsize)
1335 chunknum += 1
1336 pos += payloadsize
1337 if chunknum == len(self._chunkindex):
1338 self._chunkindex.append((pos, self._tellfp()))
1339 yield result
1340 payloadsize = self._unpack(_fpayloadsize)[0]
1341 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1342
1343 def _findchunk(self, pos):
1344 '''for a given payload position, return a chunk number and offset'''
1345 for chunk, (ppos, fpos) in enumerate(self._chunkindex):
1346 if ppos == pos:
1347 return chunk, 0
1348 elif ppos > pos:
1349 return chunk - 1, pos - self._chunkindex[chunk - 1][0]
1350 raise ValueError('Unknown chunk')
1351
1331 def tell(self):
1352 def tell(self):
1332 return self._pos
1353 return self._pos
1333
1354
General Comments 0
You need to be logged in to leave comments. Login now