##// END OF EJS Templates
bundle2: implement generic part payload decoder...
Gregory Szorc -
r35110:8aa43ff9 default
parent child Browse files
Show More
@@ -1187,6 +1187,32 b' class interruptoperation(object):'
1187 def gettransaction(self):
1187 def gettransaction(self):
1188 raise TransactionUnavailable('no repo access from stream interruption')
1188 raise TransactionUnavailable('no repo access from stream interruption')
1189
1189
1190 def decodepayloadchunks(ui, fh):
1191 """Reads bundle2 part payload data into chunks.
1192
1193 Part payload data consists of framed chunks. This function takes
1194 a file handle and emits those chunks.
1195 """
1196 headersize = struct.calcsize(_fpayloadsize)
1197 readexactly = changegroup.readexactly
1198
1199 chunksize = _unpack(_fpayloadsize, readexactly(fh, headersize))[0]
1200 indebug(ui, 'payload chunk size: %i' % chunksize)
1201
1202 while chunksize:
1203 if chunksize >= 0:
1204 yield readexactly(fh, chunksize)
1205 elif chunksize == flaginterrupt:
1206 # Interrupt "signal" detected. The regular stream is interrupted
1207 # and a bundle2 part follows. Consume it.
1208 interrupthandler(ui, fh)()
1209 else:
1210 raise error.BundleValueError(
1211 'negative payload chunk size: %s' % chunksize)
1212
1213 chunksize = _unpack(_fpayloadsize, readexactly(fh, headersize))[0]
1214 indebug(ui, 'payload chunk size: %i' % chunksize)
1215
1190 class unbundlepart(unpackermixin):
1216 class unbundlepart(unpackermixin):
1191 """a bundle part read from a bundle"""
1217 """a bundle part read from a bundle"""
1192
1218
@@ -1270,6 +1296,10 b' class unbundlepart(unpackermixin):'
1270 # we read the data, tell it
1296 # we read the data, tell it
1271 self._initialized = True
1297 self._initialized = True
1272
1298
1299 def _payloadchunks(self):
1300 """Generator of decoded chunks in the payload."""
1301 return decodepayloadchunks(self.ui, self._fp)
1302
1273 def read(self, size=None):
1303 def read(self, size=None):
1274 """read payload data"""
1304 """read payload data"""
1275 if not self._initialized:
1305 if not self._initialized:
@@ -1320,25 +1350,14 b' class seekableunbundlepart(unbundlepart)'
1320 self._seekfp(self._chunkindex[chunknum][1])
1350 self._seekfp(self._chunkindex[chunknum][1])
1321
1351
1322 pos = self._chunkindex[chunknum][0]
1352 pos = self._chunkindex[chunknum][0]
1323 payloadsize = self._unpack(_fpayloadsize)[0]
1353
1324 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1354 for chunk in decodepayloadchunks(self.ui, self._fp):
1325 while payloadsize:
1355 chunknum += 1
1326 if payloadsize == flaginterrupt:
1356 pos += len(chunk)
1327 # interruption detection, the handler will now read a
1357 if chunknum == len(self._chunkindex):
1328 # single part and process it.
1358 self._chunkindex.append((pos, self._tellfp()))
1329 interrupthandler(self.ui, self._fp)()
1359
1330 elif payloadsize < 0:
1360 yield chunk
1331 msg = 'negative payload chunk size: %i' % payloadsize
1332 raise error.BundleValueError(msg)
1333 else:
1334 result = self._readexact(payloadsize)
1335 chunknum += 1
1336 pos += payloadsize
1337 if chunknum == len(self._chunkindex):
1338 self._chunkindex.append((pos, self._tellfp()))
1339 yield result
1340 payloadsize = self._unpack(_fpayloadsize)[0]
1341 indebug(self.ui, 'payload chunk size: %i' % payloadsize)
1342
1361
1343 def _findchunk(self, pos):
1362 def _findchunk(self, pos):
1344 '''for a given payload position, return a chunk number and offset'''
1363 '''for a given payload position, return a chunk number and offset'''
General Comments 0
You need to be logged in to leave comments. Login now