Coretex
file.py
1 # Copyright (C) 2023 Coretex LLC
2 
3 # This file is part of Coretex.ai
4 
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU Affero General Public License for more details.
14 
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program. If not, see <https://www.gnu.org/licenses/>.
17 
18 from typing import Generator, Optional, Union
19 from pathlib import Path
20 from zipfile import ZipFile
21 
22 import mimetypes
23 import zipfile
24 import tarfile
25 import gzip
26 import shutil
27 import logging
28 
29 
30 class InvalidFileExtension(Exception):
31 
32  """
33  Exception raised if file extension is unkown or invalid
34  """
35 
36  pass
37 
38 
39 def guessMimeType(filePath: Union[Path, str]) -> str:
40  """
41  Tries to guess mime type of the file
42 
43  Parameters
44  ----------
45  filePath : Union[Path, str]
46  file whose mime type will be guessed
47 
48  Returns
49  -------
50  str -> guessed mime type, or "application/octet-stream" if
51  it was not possible to guess
52  """
53 
54  mimeTypesResult = mimetypes.guess_type(filePath)
55 
56  mimeType = mimeTypesResult[0]
57  if mimeType is None:
58  return "application/octet-stream"
59 
60  return mimeType
61 
62 
63 def isGzip(path: Path) -> bool:
64  """
65  Checks if the file is compressed with gz
66 
67  Might not be 100% reliable, it checks the first 2 bytes
68  of the file for the gz-compressed file header (0x1F and 0x8B)
69  and checks for .gz file extension
70 
71  Parameters
72  ----------
73  path : Path
74  the file to be checked
75 
76  Returns
77  -------
78  bool -> True if file is gz compressed, False otherwise
79  """
80 
81  # .gz compressed files always start with 2 bytes: 0x1F and 0x8B
82  # Testing for this is not 100% reliable, it is highly unlikely
83  # that "ordinary text files" start with those two bytes—in UTF-8 it's not even legal.
84  # That's why we check for extension and do the byte checking
85  # Ref: https://stackoverflow.com/a/3703300/7585106
86 
87  if not path.is_file():
88  return False
89 
90  with open(path, 'rb') as file:
91  return file.read(2) == b'\x1f\x8b' and path.name.endswith(".gz")
92 
93 
94 def isArchive(path: Path) -> bool:
95  """
96  Checks if the file is an archive
97 
98  Parameters
99  ----------
100  path : Path
101  file to be checked
102 
103  Returns
104  -------
105  bool -> True if it is an archive, False otherwise
106  """
107 
108  return zipfile.is_zipfile(path) or tarfile.is_tarfile(path)
109 
110 
111 def gzipDecompress(source: Path, destination: Path) -> None:
112  """
113  Decompresses a gz-compressed file
114 
115  Parameters
116  ----------
117  source : Path
118  file to be decompressed
119  destination : Path
120  location to which the decompressed file will be stored
121 
122  Raises
123  ------
124  ValueError -> if the file is not a gz-compressed file
125  """
126 
127  if not isGzip(source):
128  raise ValueError(">> [Coretex] Not a .gz file")
129 
130  with gzip.open(source, "r") as gzipFile, open(destination, "wb") as destinationFile:
131  shutil.copyfileobj(gzipFile, destinationFile)
132 
133 
134 def archive(source: Path, destination: Path) -> None:
135  """
136  Archives and compresses the provided file or directory
137  using ZipFile module
138 
139  Parameters
140  ----------
141  source : Path
142  file to be archived and compressed
143  destination : Path
144  location to which the zip file will be stored
145  """
146 
147  with ZipFile(destination, "w", zipfile.ZIP_DEFLATED) as destinationFile:
148  if source.is_file():
149  destinationFile.write(source, source.name)
150  else:
151  for path in source.rglob("*"):
152  if not path.is_file():
153  continue
154 
155  destinationFile.write(path, path.relative_to(source))
156 
157 
158 def walk(path: Path) -> Generator[Path, None, None]:
159  """
160  os.walk implementation for pathlib.Path
161 
162  Parameters
163  ----------
164  path : Path
165  starting point of the walk function, must be a directory
166 
167  Returns
168  -------
169  Generator[Path, None, None] -> generator which contains all
170  subdirectories and subfiles
171  """
172 
173  for p in path.iterdir():
174  yield p.resolve()
175 
176  if p.is_dir():
177  yield from walk(p)
178 
179 
180 def recursiveUnzip(entryPoint: Path, destination: Optional[Path] = None, remove: bool = False) -> None:
181  """
182  Recursively unarchives the file
183 
184  Parameters
185  ----------
186  entryPoint : Path
187  initial archive
188  destination : Optional[Path]
189  destination of unarchived files
190  remove : bool
191  delete archive after unarchive is done
192 
193  Raises
194  ------
195  ValueError -> if the path is not an archive
196  """
197 
198  logging.getLogger("coretexpylib").debug(f">> [Coretex] recursiveUnzip: source = {str(entryPoint)}, destination = {str(destination)}")
199 
200  if destination is None:
201  destination = entryPoint.parent / entryPoint.stem
202 
203  # Decompress with gzip if is gzip
204  if isGzip(entryPoint):
205  gzipDecompress(entryPoint, destination)
206 
207  if remove:
208  entryPoint.unlink()
209 
210  if not isArchive(destination):
211  return
212 
213  # gzip nameing convention is .original_file_ext.gz, so by calling .stem we remove .gz
214  # for destination
215  recursiveUnzip(destination, destination.parent / destination.stem, remove = True)
216  return
217 
218  if not isArchive(entryPoint):
219  raise ValueError(">> [Coretex] Not an archive")
220 
221  if zipfile.is_zipfile(entryPoint):
222  with ZipFile(entryPoint, "r") as zipFile:
223  zipFile.extractall(destination)
224 
225  if tarfile.is_tarfile(entryPoint):
226  with tarfile.open(entryPoint, "r") as tarFile:
227  tarFile.extractall(destination)
228 
229  if remove:
230  entryPoint.unlink()
231 
232  for path in walk(destination):
233  if isArchive(path) or isGzip(path):
234  recursiveUnzip(path, remove = True)