-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathbac.py
More file actions
118 lines (102 loc) · 2.75 KB
/
bac.py
File metadata and controls
118 lines (102 loc) · 2.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import numpy as np
class BACEncoder:
def __init__(self):
self.L = 0
self.R = 1 << 32
self.cod = []
self.buf = 0
self.cnt = 0
def encode(self, bit, zero_prob):
base = 1 << 16
ZR = (self.R * int(base * zero_prob)) >> 16
OR = self.R - ZR
if bit == 0:
self.R = ZR
else:
self.R = OR
self.L += ZR
# Renorm
if self.L >= 1 << 32:
self.buf += 1
self.L &= 0xffffffff
if self.cnt > 0:
self.cod.append(self.buf)
for _ in range(self.cnt - 1):
self.cod.append(0)
self.buf = 0
self.cnt = 0
while (self.R < (1 << 24)):
if self.L < (0xff << 24):
self.cod.append(self.buf)
for _ in range(self.cnt):
self.cod.append(0xff)
self.cnt = 0
self.buf = (self.L >> 24) & 0xff
else:
self.cnt += 1
self.R <<= 8
self.L = (self.L << 8) & 0xffffffff
def flush(self):
c = 0xff
if self.L >= 1 << 32:
self.buf += 1
c = 0
self.cod.append(self.buf)
for _ in range(self.cnt):
self.cod.append(c)
for _ in range(4):
self.cod.append((self.L >> 24) & 0xff)
self.L <<= 8
class BACDecoder:
def __init__(self, cod):
C = 0
self.R = 1 << 32
code = 0
read_pos = 1
for _ in range(4):
code = (code << 8) | cod[read_pos]
read_pos += 1
self.read_pos = read_pos
self.cod = cod
self.C = code
self.L = 0
def decode(self, zero_prob):
base = 1 << 16
ZR = (self.R * int(base * zero_prob)) >> 16
OR = self.R - ZR
bit = 0
if self.L + ZR <= self.C:
bit = 1
self.R = OR
self.C -= ZR
else:
bit = 0
self.R = ZR
# Renorm
while self.R < (1 << 24):
self.R <<= 8
self.C = ((self.C << 8) + self.cod[self.read_pos]) & 0xffffffff
self.read_pos += 1
return bit
def test_stat():
source = []
# zero_prb
prob = 1.0 / 4
n = 4
for _ in range(10000):
b = np.random.choice(n)
if b == 0:
source.append(0)
else:
source.append(1)
encoder = BACEncoder()
for bit in source:
encoder.encode(bit, prob)
encoder.flush()
print(len(encoder.cod))
decoder = BACDecoder(encoder.cod)
decod = []
for i in range(10000):
decod.append(decoder.decode(prob))
print(source == decod)
test_stat()