Coretex
chunk_upload_session.py
1 # Copyright (C) 2023 Coretex LLC
2 
3 # This file is part of Coretex.ai
4 
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU Affero General Public License for more details.
14 
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program. If not, see <https://www.gnu.org/licenses/>.
17 
18 from typing import Union
19 from pathlib import Path
20 
21 import logging
22 
23 from .network_manager_base import FileData
24 from .network_manager import networkManager
25 from .network_response import NetworkRequestError
26 
27 
28 MAX_CHUNK_SIZE = 128 * 1024 * 1024 # 128 MiB
29 
30 
31 def _loadChunk(filePath: Path, start: int, chunkSize: int) -> bytes:
32  with filePath.open("rb") as file:
33  file.seek(start)
34  return file.read(chunkSize)
35 
36 
38 
39  """
40  A class which splits a file into chunks and uploades it
41  chunk by chunk. This class should be used for uploading
42  files larger than 2 GiB, since Python does not support
43  uploading files with a larger size.
44 
45  Maximum chunk size is 128 MiB.
46 
47  Properties
48  ----------
49  chunkSize : int
50  size of chunks into which the file will be split
51  maximum value is 128 MiB, while the minimum value is 1
52  filePath : Union[Path, str]
53  path to the file which will be uploaded
54  fileSize : int
55  size of the file which will be uploaded
56  """
57 
58  def __init__(self, chunkSize: int, filePath: Union[Path, str]) -> None:
59  if chunkSize <= 0 or chunkSize > MAX_CHUNK_SIZE:
60  raise ValueError(f">> [Coretex] Invalid \"chunkSize\" value \"{chunkSize}\". Value must be in range 0-{MAX_CHUNK_SIZE}")
61 
62  if isinstance(filePath, str):
63  filePath = Path(filePath)
64 
65  self.chunkSizechunkSize = chunkSize
66  self.filePathfilePath = filePath
67  self.fileSizefileSize = filePath.lstat().st_size
68 
69  def __start(self) -> str:
70  parameters = {
71  "size": self.fileSizefileSize
72  }
73 
74  response = networkManager.post("upload/start", parameters)
75  if response.hasFailed():
76  raise NetworkRequestError(response, f"Failed to start chunked upload for \"{self.filePath}\"")
77 
78  uploadId = response.getJson(dict).get("id")
79 
80  if not isinstance(uploadId, str):
81  raise ValueError(f">> [Coretex] Invalid API response, invalid value \"{uploadId}\" for field \"id\"")
82 
83  return uploadId
84 
85  def __uploadChunk(self, uploadId: str, start: int, end: int) -> None:
86  parameters = {
87  "id": uploadId,
88  "start": start,
89  "end": end - 1 # API expects start/end to be inclusive
90  }
91 
92  chunk = _loadChunk(self.filePathfilePath, start, self.chunkSizechunkSize)
93  files = [
94  FileData.createFromBytes("file", chunk, self.filePathfilePath.name)
95  ]
96 
97  response = networkManager.formData("upload/chunk", parameters, files)
98  if response.hasFailed():
99  raise NetworkRequestError(response, f"Failed to upload file chunk with byte range \"{start}-{end}\"")
100 
101  logging.getLogger("coretexpylib").debug(f">> [Coretex] Uploaded chunk with range \"{start}-{end}\"")
102 
103  def run(self) -> str:
104  """
105  Uploads the file to Coretex.ai
106 
107  Returns
108  -------
109  str -> ID of the uploaded file
110 
111  Raises
112  ------
113  NetworkRequestError, ValueError -> if some kind of error happened during
114  the upload of the provided file
115 
116  Example
117  -------
118  >>> from coretex.networking import ChunkUploadSession, NetworkRequestError
119  \b
120  >>> chunkSize = 16 * 1024 * 1024 # chunk size: 16 MiB
121  >>> uploadSession = ChunkUploadSession(chunkSize, path/fo/file.ext)
122  \b
123  >>> try:
124  uploadId = uploadSession.run()
125  print(uploadId)
126  except NetworkRequestError, ValueError:
127  print("Failed to upload file")
128  """
129  logging.getLogger("coretexpylib").debug(f">> [Coretex] Starting upload for \"{self.filePath}\"")
130 
131  uploadId = self.__start__start()
132 
133  chunkCount = self.fileSizefileSize // self.chunkSizechunkSize
134  if self.fileSizefileSize % self.chunkSizechunkSize != 0:
135  chunkCount += 1
136 
137  for i in range(chunkCount):
138  start = i * self.chunkSizechunkSize
139  end = min(start + self.chunkSizechunkSize, self.fileSizefileSize)
140 
141  self.__uploadChunk__uploadChunk(uploadId, start, end)
142 
143  return uploadId
144 
145 
146 def fileChunkUpload(path: Path, chunkSize: int = MAX_CHUNK_SIZE) -> str:
147  """
148  Uploads file in chunks to Coretex.ai server.
149  Should be used when uploading large files.
150 
151  Parameters
152  ----------
153  path : Path
154  File which will be uploaded in chunks
155  chunkSize : int
156  Size of the chunks into which file will be split
157  before uploading. Maximum value is 128 MiBs
158 
159  Returns
160  -------
161  str -> id of the file which was uploaded
162  """
163 
164  if not path.is_file():
165  raise ValueError(f"{path} is not a file")
166 
167  if chunkSize > MAX_CHUNK_SIZE:
168  chunkSize = MAX_CHUNK_SIZE
169 
170  uploadSession = ChunkUploadSession(MAX_CHUNK_SIZE, path)
171  return uploadSession.run()
None __uploadChunk(self, str uploadId, int start, int end)