Coretex
encryptor.py
1 # Copyright (C) 2023 Coretex LLC
2 
3 # This file is part of Coretex.ai
4 
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU Affero General Public License for more details.
14 
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program. If not, see <https://www.gnu.org/licenses/>.
17 
18 from typing import Optional, Generator
19 from pathlib import Path
20 
21 import os
22 
23 from cryptography.hazmat.primitives import padding
24 from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
25 
26 from .constants import DEFAULT_CHUNK_SIZE, AES_KEY_SIZE, IV_SIZE, AES_BLOCK_SIZE
27 from .utils import DataBuffer
28 
29 
31 
32  """
33  Implements functionality for encrypting a stream of bytes
34  using AES 256.
35  """
36 
37  def __init__(self, key: bytes, iv: Optional[bytes] = None, chunkSize: int = DEFAULT_CHUNK_SIZE) -> None:
38  if len(key) != AES_KEY_SIZE:
39  raise ValueError(f"AES key size: {len(key)}, but {AES_KEY_SIZE} expected")
40 
41  if iv is None:
42  # generate random secure IV (initialization vector) if one wasn't provided
43  iv = os.urandom(IV_SIZE)
44 
45  if len(iv) != IV_SIZE:
46  raise ValueError(f"IV size: {len(iv)}, but {IV_SIZE} expected")
47 
48  self._key_key = key
49  self._encryptor_encryptor = Cipher(algorithms.AES256(self._key_key), modes.CBC(iv)).encryptor()
50  self._buffer_buffer = DataBuffer()
51 
52  self.iviv = iv
53  self.chunkSizechunkSize = chunkSize
54 
55  def feed(self, data: bytes) -> Generator[bytes, None, None]:
56  """
57  Encrypts the data using AES 256.
58 
59  Parameters
60  ----------
61  data : bytes
62  data which should be encrypted
63 
64  Returns
65  -------
66  Generator[bytes, None, None] -> yields encrypted data in chunks
67  """
68 
69  self._buffer_buffer.append(data)
70 
71  while self._buffer_buffer.remaining >= self.chunkSizechunkSize:
72  chunk = self._buffer_buffer.getBytes(self.chunkSizechunkSize)
73  yield self._encryptor_encryptor.update(chunk)
74 
75  def flush(self) -> bytes:
76  """
77  Encrypts any remaining data using AES 256.
78 
79  Returns
80  -------
81  bytes -> encrypted data
82  """
83 
84  chunk = self._buffer_buffer.getRemaining()
85 
86  # Pad the data if needed
87  if len(chunk) % AES_BLOCK_SIZE != 0:
88  padder = padding.PKCS7(AES_BLOCK_SIZE * 8).padder()
89  chunk = padder.update(chunk) + padder.finalize()
90 
91  return self._encryptor_encryptor.update(chunk) + self._encryptor_encryptor.finalize()
92 
93 
94 def encryptFile(key: bytes, sourcePath: Path, destinationPath: Path) -> None:
95  """
96  Encrypts a file using AES 256. IV gets stored as the first 16 bytes
97  of the encrypted file. IV is generated using "os.urandom".
98 
99  Parameters
100  ----------
101  key : bytes
102  key which will be used for encrypting
103  sourcePath : Path
104  path to the file which will be encrypted
105  destinationPath : Path
106  path to the encrypted file
107 
108  Raises
109  ------
110  RuntimeError -> if file size % AES_BLOCK_SIZE has remainder
111  """
112 
113  with sourcePath.open("rb") as source, destinationPath.open("wb") as destination:
114  encryptor = StreamEncryptor(key)
115  destination.write(encryptor.iv)
116 
117  while (chunk := source.read(DEFAULT_CHUNK_SIZE)):
118  for encryptedData in encryptor.feed(chunk):
119  destination.write(encryptedData)
120 
121  destination.write(encryptor.flush())
122 
123  if destinationPath.stat().st_size % AES_BLOCK_SIZE != 0:
124  raise RuntimeError("File was corrupted during encryption")
Generator[bytes, None, None] feed(self, bytes data)
Definition: encryptor.py:55