Skip to content

Commit

Permalink
Merge pull request #28 from breezy-team/bencode-utf8
Browse files Browse the repository at this point in the history
Add bencode_utf8/bdecode_utf8 functions

Fixes #27
  • Loading branch information
jelmer authored Apr 14, 2024
2 parents b062865 + a4d3fc7 commit a15c097
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 58 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ __pycache__
fastbencode.egg-info
*.pyc
dist
*~
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ Example:
>>> bdecode(bencode([1, 2, b'a', {b'd': 3}]))
[1, 2, b'a', {b'd': 3}]

The default ``bencode``/``bdecode`` functions just operate on
bytestrings. Use ``bencode_utf8`` / ``bdecode_utf8`` to
serialize/deserialize all plain strings as UTF-8 bytestrings.
Note that for performance reasons, all dictionary keys still have to be
bytestrings.

License
=======
fastbencode is available under the GNU GPL, version 2 or later.
Expand Down
116 changes: 70 additions & 46 deletions fastbencode/_bencode_py.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,28 @@

class BDecoder:

def __init__(self, yield_tuples=False) -> None:
def __init__(self, yield_tuples=False, bytestring_encoding=None) -> None:
"""Constructor.
:param yield_tuples: if true, decode "l" elements as tuples rather than
lists.
"""
self.yield_tuples = yield_tuples
self.bytestring_encoding = bytestring_encoding
decode_func = {}
decode_func[b'l'] = self.decode_list
decode_func[b'd'] = self.decode_dict
decode_func[b'i'] = self.decode_int
decode_func[b'0'] = self.decode_string
decode_func[b'1'] = self.decode_string
decode_func[b'2'] = self.decode_string
decode_func[b'3'] = self.decode_string
decode_func[b'4'] = self.decode_string
decode_func[b'5'] = self.decode_string
decode_func[b'6'] = self.decode_string
decode_func[b'7'] = self.decode_string
decode_func[b'8'] = self.decode_string
decode_func[b'9'] = self.decode_string
decode_func[b'0'] = self.decode_bytes
decode_func[b'1'] = self.decode_bytes
decode_func[b'2'] = self.decode_bytes
decode_func[b'3'] = self.decode_bytes
decode_func[b'4'] = self.decode_bytes
decode_func[b'5'] = self.decode_bytes
decode_func[b'6'] = self.decode_bytes
decode_func[b'7'] = self.decode_bytes
decode_func[b'8'] = self.decode_bytes
decode_func[b'9'] = self.decode_bytes
self.decode_func = decode_func

def decode_int(self, x, f):
Expand All @@ -54,13 +55,16 @@ def decode_int(self, x, f):
raise ValueError
return (n, newf + 1)

def decode_string(self, x, f):
def decode_bytes(self, x, f):
colon = x.index(b':', f)
n = int(x[f:colon])
if x[f:f + 1] == b'0' and colon != f + 1:
raise ValueError
colon += 1
return (x[colon:colon + n], colon + n)
d = x[colon:colon + n]
if self.bytestring_encoding:
d = d.decode(self.bytestring_encoding)
return (d, colon + n)

def decode_list(self, x, f):
r, f = [], f + 1
Expand All @@ -75,7 +79,7 @@ def decode_dict(self, x, f):
r, f = {}, f + 1
lastkey = None
while x[f:f + 1] != b'e':
k, f = self.decode_string(x, f)
k, f = self.decode_bytes(x, f)
if lastkey is not None and lastkey >= k:
raise ValueError
lastkey = k
Expand All @@ -100,6 +104,9 @@ def bdecode(self, x):
_tuple_decoder = BDecoder(True)
bdecode_as_tuple = _tuple_decoder.bdecode

_utf8_decoder = BDecoder(bytestring_encoding='utf-8')
bdecode_utf8 = _utf8_decoder.bdecode


class Bencached:
__slots__ = ['bencoded']
Expand All @@ -108,55 +115,72 @@ def __init__(self, s) -> None:
self.bencoded = s


def encode_bencached(x, r):
r.append(x.bencoded)
class BEncoder:

def __init__(self, bytestring_encoding=None):
self.bytestring_encoding = bytestring_encoding
self.encode_func: Dict[Type, Callable[[object, List[bytes]], None]] = {
Bencached: self.encode_bencached,
int: self.encode_int,
bytes: self.encode_bytes,
list: self.encode_list,
tuple: self.encode_list,
dict: self.encode_dict,
bool: self.encode_bool,
str: self.encode_str,
}

def encode_bencached(self, x, r):
r.append(x.bencoded)

def encode_bool(x, r):
encode_int(int(x), r)

def encode_bool(self, x, r):
self.encode_int(int(x), r)

def encode_int(x, r):
r.extend((b'i', int_to_bytes(x), b'e'))

def encode_int(self, x, r):
r.extend((b'i', int_to_bytes(x), b'e'))

def encode_string(x, r):
r.extend((int_to_bytes(len(x)), b':', x))

def encode_bytes(self, x, r):
r.extend((int_to_bytes(len(x)), b':', x))

def encode_list(x, r):
r.append(b'l')
for i in x:
encode_func[type(i)](i, r)
r.append(b'e')
def encode_list(self, x, r):
r.append(b'l')
for i in x:
self.encode(i, r)
r.append(b'e')


def encode_dict(x, r):
r.append(b'd')
ilist = sorted(x.items())
for k, v in ilist:
r.extend((int_to_bytes(len(k)), b':', k))
encode_func[type(v)](v, r)
r.append(b'e')
def encode_dict(self, x, r):
r.append(b'd')
ilist = sorted(x.items())
for k, v in ilist:
r.extend((int_to_bytes(len(k)), b':', k))
self.encode(v, r)
r.append(b'e')

def encode_str(self, x, r):
if self.bytestring_encoding is None:
raise TypeError("string found but no encoding specified. "
"Use bencode_utf8 rather bencode?")
return self.encode_bytes(x.encode(self.bytestring_encoding), r)

encode_func: Dict[Type, Callable[[object, List[bytes]], None]] = {}
encode_func[type(Bencached(0))] = encode_bencached
encode_func[int] = encode_int
def encode(self, x, r):
self.encode_func[type(x)](x, r)


def int_to_bytes(n):
return b'%d' % n


encode_func[bytes] = encode_string
encode_func[list] = encode_list
encode_func[tuple] = encode_list
encode_func[dict] = encode_dict
encode_func[bool] = encode_bool


def bencode(x):
r = []
encode_func[type(x)](x, r)
encoder = BEncoder()
encoder.encode(x, r)
return b''.join(r)

def bencode_utf8(x):
r = []
encoder = BEncoder(bytestring_encoding='utf-8')
encoder.encode(x, r)
return b''.join(r)
Loading

0 comments on commit a15c097

Please sign in to comment.