Coretex
decryptor.py
1 # Copyright (C) 2023 Coretex LLC
2 
3 # This file is part of Coretex.ai
4 
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU Affero General Public License for more details.
14 
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program. If not, see <https://www.gnu.org/licenses/>.
17 
18 from typing import Generator
19 from pathlib import Path
20 
21 from cryptography.hazmat.primitives import padding
22 from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
23 
24 from .constants import DEFAULT_CHUNK_SIZE, AES_KEY_SIZE, IV_SIZE, AES_BLOCK_SIZE
25 from .utils import DataBuffer
26 
27 
29 
30  """
31  Implements functionality for decrypting a stream of bytes
32  using AES 256.
33  """
34 
35  def __init__(self, key: bytes, iv: bytes, chunkSize: int = DEFAULT_CHUNK_SIZE) -> None:
36  if len(key) != AES_KEY_SIZE:
37  raise ValueError(f"AES key size: {len(key)}, but {AES_KEY_SIZE} expected")
38 
39  if len(iv) != IV_SIZE:
40  raise ValueError(f"IV size: {len(iv)}, but {IV_SIZE} expected")
41 
42  self._key_key = key
43  self._decryptor_decryptor = Cipher(algorithms.AES256(self._key_key), modes.CBC(iv)).decryptor()
44  self._buffer_buffer = DataBuffer()
45 
46  self.iviv = iv
47  self.chunkSizechunkSize = chunkSize
48 
49  def feed(self, data: bytes) -> Generator[bytes, None, None]:
50  """
51  Decrypts the data using AES 256.
52 
53  Parameters
54  ----------
55  data : bytes
56  data which should be decrypted
57 
58  Returns
59  -------
60  Generator[bytes, None, None] -> yields decrypted data in chunks
61  """
62 
63  self._buffer_buffer.append(data)
64 
65  while self._buffer_buffer.remaining >= self.chunkSizechunkSize:
66  chunk = self._buffer_buffer.getBytes(self.chunkSizechunkSize)
67  yield self._decryptor_decryptor.update(chunk)
68 
69  def flush(self) -> bytes:
70  """
71  Decrypts any remaining data using AES 256.
72 
73  Returns
74  -------
75  bytes -> decrypted data
76  """
77 
78  # Decrypt chunk
79  chunk = self._buffer_buffer.getRemaining()
80  chunk = self._decryptor_decryptor.update(chunk) + self._decryptor_decryptor.finalize()
81 
82  # Unpad chunk
83  try:
84  unpadder = padding.PKCS7(AES_BLOCK_SIZE * 8).unpadder()
85  return unpadder.update(chunk) + unpadder.finalize()
86  except ValueError:
87  # If unpadding failed either the key is wrong or
88  # the padding was not performed during encryption
89  # because ciphertext bytes were divisible by AES
90  # block size so there was no need for padding
91  return chunk
92 
93 
94 def decryptFile(key: bytes, sourcePath: Path, destinationPath: Path) -> None:
95  """
96  Decrypts a file using AES 256.
97 
98  Parameters
99  ----------
100  key : bytes
101  key which will be used for decrypting
102  sourcePath : Path
103  path to the file which will be decrypted
104  destinationPath : Path
105  path to the decrypted file
106  """
107 
108  with sourcePath.open("rb") as source, destinationPath.open("wb") as destination:
109  iv = source.read(IV_SIZE)
110  decryptor = StreamDecryptor(key, iv)
111 
112  while (chunk := source.read(DEFAULT_CHUNK_SIZE)):
113  for decryptedData in decryptor.feed(chunk):
114  destination.write(decryptedData)
115 
116  destination.write(decryptor.flush())
Generator[bytes, None, None] feed(self, bytes data)
Definition: decryptor.py:49