##// END OF EJS Templates
typing: add type hints to mpatch implementations...
Matt Harbison -
r50494:94a79703 default
parent child Browse files
Show More
@@ -1,48 +1,50 b''
1 1 # mpatch.py - CFFI implementation of mpatch.c
2 2 #
3 3 # Copyright 2016 Maciej Fijalkowski <fijall@gmail.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8
9 from typing import List
10
9 11 from ..pure.mpatch import *
10 12 from ..pure.mpatch import mpatchError # silence pyflakes
11 13 from . import _mpatch # pytype: disable=import-error
12 14
13 15 ffi = _mpatch.ffi
14 16 lib = _mpatch.lib
15 17
16 18
17 19 @ffi.def_extern()
18 20 def cffi_get_next_item(arg, pos):
19 21 all, bins = ffi.from_handle(arg)
20 22 container = ffi.new(b"struct mpatch_flist*[1]")
21 23 to_pass = ffi.new(b"char[]", str(bins[pos]))
22 24 all.append(to_pass)
23 25 r = lib.mpatch_decode(to_pass, len(to_pass) - 1, container)
24 26 if r < 0:
25 27 return ffi.NULL
26 28 return container[0]
27 29
28 30
29 def patches(text, bins):
31 def patches(text: bytes, bins: List[bytes]) -> bytes:
30 32 lgt = len(bins)
31 33 all = []
32 34 if not lgt:
33 35 return text
34 36 arg = (all, bins)
35 37 patch = lib.mpatch_fold(ffi.new_handle(arg), lib.cffi_get_next_item, 0, lgt)
36 38 if not patch:
37 39 raise mpatchError(b"cannot decode chunk")
38 40 outlen = lib.mpatch_calcsize(len(text), patch)
39 41 if outlen < 0:
40 42 lib.mpatch_lfree(patch)
41 43 raise mpatchError(b"inconsistency detected")
42 44 buf = ffi.new(b"char[]", outlen)
43 45 if lib.mpatch_apply(buf, text, len(text), patch) < 0:
44 46 lib.mpatch_lfree(patch)
45 47 raise mpatchError(b"error applying patches")
46 48 res = ffi.buffer(buf, outlen)[:]
47 49 lib.mpatch_lfree(patch)
48 50 return res
@@ -1,134 +1,143 b''
1 1 # mpatch.py - Python implementation of mpatch.c
2 2 #
3 3 # Copyright 2009 Olivia Mackall <olivia@selenic.com> and others
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8
9 9 import io
10 10 import struct
11 11
12 from typing import (
13 List,
14 Tuple,
15 )
16
12 17
13 18 stringio = io.BytesIO
14 19
15 20
16 21 class mpatchError(Exception):
17 22 """error raised when a delta cannot be decoded"""
18 23
19 24
20 25 # This attempts to apply a series of patches in time proportional to
21 26 # the total size of the patches, rather than patches * len(text). This
22 27 # means rather than shuffling strings around, we shuffle around
23 28 # pointers to fragments with fragment lists.
24 29 #
25 30 # When the fragment lists get too long, we collapse them. To do this
26 31 # efficiently, we do all our operations inside a buffer created by
27 32 # mmap and simply use memmove. This avoids creating a bunch of large
28 33 # temporary string buffers.
29 34
30 35
31 def _pull(dst, src, l): # pull l bytes from src
36 def _pull(
37 dst: List[Tuple[int, int]], src: List[Tuple[int, int]], l: int
38 ) -> None: # pull l bytes from src
32 39 while l:
33 40 f = src.pop()
34 41 if f[0] > l: # do we need to split?
35 42 src.append((f[0] - l, f[1] + l))
36 43 dst.append((l, f[1]))
37 44 return
38 45 dst.append(f)
39 46 l -= f[0]
40 47
41 48
42 def _move(m, dest, src, count):
49 def _move(m: stringio, dest: int, src: int, count: int) -> None:
43 50 """move count bytes from src to dest
44 51
45 52 The file pointer is left at the end of dest.
46 53 """
47 54 m.seek(src)
48 55 buf = m.read(count)
49 56 m.seek(dest)
50 57 m.write(buf)
51 58
52 59
53 def _collect(m, buf, list):
60 def _collect(
61 m: stringio, buf: int, list: List[Tuple[int, int]]
62 ) -> Tuple[int, int]:
54 63 start = buf
55 64 for l, p in reversed(list):
56 65 _move(m, buf, p, l)
57 66 buf += l
58 67 return (buf - start, start)
59 68
60 69
61 def patches(a, bins):
70 def patches(a: bytes, bins: List[bytes]) -> bytes:
62 71 if not bins:
63 72 return a
64 73
65 74 plens = [len(x) for x in bins]
66 75 pl = sum(plens)
67 76 bl = len(a) + pl
68 77 tl = bl + bl + pl # enough for the patches and two working texts
69 78 b1, b2 = 0, bl
70 79
71 80 if not tl:
72 81 return a
73 82
74 83 m = stringio()
75 84
76 85 # load our original text
77 86 m.write(a)
78 87 frags = [(len(a), b1)]
79 88
80 89 # copy all the patches into our segment so we can memmove from them
81 90 pos = b2 + bl
82 91 m.seek(pos)
83 92 for p in bins:
84 93 m.write(p)
85 94
86 95 for plen in plens:
87 96 # if our list gets too long, execute it
88 97 if len(frags) > 128:
89 98 b2, b1 = b1, b2
90 99 frags = [_collect(m, b1, frags)]
91 100
92 101 new = []
93 102 end = pos + plen
94 103 last = 0
95 104 while pos < end:
96 105 m.seek(pos)
97 106 try:
98 107 p1, p2, l = struct.unpack(b">lll", m.read(12))
99 108 except struct.error:
100 109 raise mpatchError(b"patch cannot be decoded")
101 110 _pull(new, frags, p1 - last) # what didn't change
102 111 _pull([], frags, p2 - p1) # what got deleted
103 112 new.append((l, pos + 12)) # what got added
104 113 pos += l + 12
105 114 last = p2
106 115 frags.extend(reversed(new)) # what was left at the end
107 116
108 117 t = _collect(m, b2, frags)
109 118
110 119 m.seek(t[1])
111 120 return m.read(t[0])
112 121
113 122
114 def patchedsize(orig, delta):
123 def patchedsize(orig: int, delta: bytes) -> int:
115 124 outlen, last, bin = 0, 0, 0
116 125 binend = len(delta)
117 126 data = 12
118 127
119 128 while data <= binend:
120 129 decode = delta[bin : bin + 12]
121 130 start, end, length = struct.unpack(b">lll", decode)
122 131 if start > end:
123 132 break
124 133 bin = data + length
125 134 data = bin + 12
126 135 outlen += start - last
127 136 last = end
128 137 outlen += length
129 138
130 139 if bin != binend:
131 140 raise mpatchError(b"patch cannot be decoded")
132 141
133 142 outlen += orig - last
134 143 return outlen
General Comments 0
You need to be logged in to leave comments. Login now