Coretex
utils.py
1 # Copyright (C) 2023 Coretex LLC
2 
3 # This file is part of Coretex.ai
4 
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU Affero General Public License for more details.
14 
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program. If not, see <https://www.gnu.org/licenses/>.
17 
18 class DataBuffer:
19 
20  def __init__(self) -> None:
21  self.data = bytearray()
22  self.position = 0
23 
24  @property
25  def remaining(self) -> int:
26  return len(self.data) - self.position
27 
28  def append(self, data: bytes) -> None:
29  """
30  Appends new data to the end of the buffer
31 
32  Parameters
33  ----------
34  data : bytes
35  data which will be appended
36  """
37 
38  self.data.extend(data)
39 
40  def get(self) -> int:
41  """
42  Reads one byte from the buffer and increments
43  its internal state by 1
44 
45  Returns
46  -------
47  int -> byte which was read
48 
49  Raises
50  ------
51  OverflowError -> if there are no more bytes to read
52  """
53 
54  if len(self.data) <= self.position:
55  raise OverflowError("All data was extracted from buffer")
56 
57  value = self.data[self.position]
58  self.position += 1
59 
60  return value
61 
62  def getBytes(self, count: int) -> bytes:
63  """
64  Reads N number of bytes from the buffer and increments
65  its internal state by N
66 
67  Parameters
68  ----------
69  count : int
70  number of bytes to read
71 
72  Returns
73  -------
74  bytes -> bytes which were read
75 
76  Raises
77  ------
78  OverflowError -> if count exceeds number of available bytes
79  """
80 
81  if len(self.data) < (self.position + count):
82  raise OverflowError("Tried to extract more than than what the buffer has")
83 
84  values = self.data[self.position:self.position + count]
85  self.position += count
86 
87  return values
88 
89  def getRemaining(self) -> bytes:
90  """
91  Reads remaining bytes from the buffer
92 
93  Returns
94  -------
95  bytes -> bytes which were read
96  """
97 
98  remaining = len(self.data) - self.position
99  return self.getBytes(remaining)