18 from typing
import Optional, Generator
19 from pathlib
import Path
23 from cryptography.hazmat.primitives
import padding
24 from cryptography.hazmat.primitives.ciphers
import Cipher, algorithms, modes
26 from .constants
import DEFAULT_CHUNK_SIZE, AES_KEY_SIZE, IV_SIZE, AES_BLOCK_SIZE
27 from .utils
import DataBuffer
33 Implements functionality for encrypting a stream of bytes
37 def __init__(self, key: bytes, iv: Optional[bytes] =
None, chunkSize: int = DEFAULT_CHUNK_SIZE) ->
None:
38 if len(key) != AES_KEY_SIZE:
39 raise ValueError(f
"AES key size: {len(key)}, but {AES_KEY_SIZE} expected")
43 iv = os.urandom(IV_SIZE)
45 if len(iv) != IV_SIZE:
46 raise ValueError(f
"IV size: {len(iv)}, but {IV_SIZE} expected")
49 self.
_encryptor_encryptor = Cipher(algorithms.AES256(self.
_key_key), modes.CBC(iv)).encryptor()
50 self.
_buffer_buffer = DataBuffer()
55 def feed(self, data: bytes) -> Generator[bytes,
None,
None]:
57 Encrypts the data using AES 256.
62 data which should be encrypted
66 Generator[bytes, None, None] -> yields encrypted data in chunks
69 self.
_buffer_buffer.append(data)
77 Encrypts any remaining data using AES 256.
81 bytes -> encrypted data
84 chunk = self.
_buffer_buffer.getRemaining()
87 if len(chunk) % AES_BLOCK_SIZE != 0:
88 padder = padding.PKCS7(AES_BLOCK_SIZE * 8).padder()
89 chunk = padder.update(chunk) + padder.finalize()
94 def encryptFile(key: bytes, sourcePath: Path, destinationPath: Path) ->
None:
96 Encrypts a file using AES 256. IV gets stored as the first 16 bytes
97 of the encrypted file. IV is generated using "os.urandom".
102 key which will be used for encrypting
104 path to the file which will be encrypted
105 destinationPath : Path
106 path to the encrypted file
110 RuntimeError -> if file size % AES_BLOCK_SIZE has remainder
113 with sourcePath.open(
"rb")
as source, destinationPath.open(
"wb")
as destination:
115 destination.write(encryptor.iv)
117 while (chunk := source.read(DEFAULT_CHUNK_SIZE)):
118 for encryptedData
in encryptor.feed(chunk):
119 destination.write(encryptedData)
121 destination.write(encryptor.flush())
123 if destinationPath.stat().st_size % AES_BLOCK_SIZE != 0:
124 raise RuntimeError(
"File was corrupted during encryption")
Generator[bytes, None, None] feed(self, bytes data)