18 from typing
import Optional, Any, Dict, List, Union, Tuple
19 from pathlib
import Path
20 from abc
import ABC, abstractmethod
21 from contextlib
import ExitStack
22 from http
import HTTPStatus
23 from importlib.metadata
import version
as getLibraryVersion
31 import requests.adapters
33 from .utils
import RequestBodyType, RequestFormType, logFilesData, logRequestFailure, sleepBeforeRetry, getTimeoutForRetry
34 from .request_type
import RequestType
35 from .network_response
import NetworkResponse, NetworkRequestError
36 from .file_data
import FileData
39 logger = logging.getLogger(
"coretexpylib")
41 REQUEST_TIMEOUT = (5, 10)
42 MAX_REQUEST_TIMEOUT = (60, 180)
43 DOWNLOAD_TIMEOUT = (5, 60)
44 MAX_DOWNLOAD_TIMEOUT = (60, 180)
45 UPLOAD_TIMEOUT = (5, 60)
46 MAX_UPLOAD_TIMEOUT = (60, 1800)
49 MAX_DELAY_BEFORE_RETRY = 180
51 LOGIN_ENDPOINT =
"user/login"
52 REFRESH_ENDPOINT =
"user/refresh"
53 API_TOKEN_HEADER =
"api-token"
54 API_TOKEN_KEY =
"token"
55 REFRESH_TOKEN_KEY =
"refresh_token"
57 RETRY_STATUS_CODES = [
58 HTTPStatus.TOO_MANY_REQUESTS,
59 HTTPStatus.INTERNAL_SERVER_ERROR,
60 HTTPStatus.SERVICE_UNAVAILABLE
63 DOWNLOAD_CHUNK_SIZE = 8 * 1024 * 1024
66 class RequestFailedError(Exception):
68 def __init__(self, endpoint: str, type_: RequestType) ->
None:
69 super().__init__(f
">> [Coretex] \"{type_.name}\" request failed for endpoint \"{endpoint}\"")
72 class NetworkManagerBase(ABC):
74 def __init__(self) -> None:
75 self._session = requests.Session()
77 cpuCount = os.cpu_count()
82 adapter = requests.adapters.HTTPAdapter(pool_maxsize = max(10, cpuCount))
83 self._session.mount(
"http://", adapter)
84 self._session.mount(
"https://", adapter)
87 def serverUrl(self) -> str:
88 return os.environ[
"CTX_API_URL"] +
"api/v1/"
92 def _apiToken(self) -> Optional[str]:
97 def _apiToken(self, value: Optional[str]) ->
None:
102 def _refreshToken(self) -> Optional[str]:
105 @_refreshToken.setter
107 def _refreshToken(self, value: Optional[str]) ->
None:
111 def userAgent(self) -> str:
112 coretexpylibVersion = getLibraryVersion(
"coretex")
113 return f
"coretexpylib;{coretexpylibVersion};python;{platform.python_version()}"
116 def hasStoredCredentials(self) -> bool:
118 To use this functions call it using coretex.networking.networkManager
125 raise NotImplementedError
127 def _headers(self, contentType: str =
"application/json") -> Dict[str, str]:
129 "Content-Type": contentType,
130 "Connection":
"keep-alive",
131 "X-User-Agent": self.userAgent
134 if self._apiToken
is not None:
135 headers[API_TOKEN_HEADER] = self._apiToken
139 def shouldRetry(self, retryCount: int, response: Optional[NetworkResponse]) -> bool:
141 Checks if network request should be repeated based on the number of repetitions
142 as well as the response from previous repetition
147 number of repeated function calls
148 response : Optional[NetworkResponse]
149 response of the request which is pending for retry
153 bool -> True if the request should be retried, False if not
157 if retryCount >= MAX_RETRY_COUNT:
160 if response
is not None:
163 if response.isUnauthorized()
and response.endpoint != REFRESH_ENDPOINT:
164 refreshTokenResponse = self.refreshToken()
165 return not refreshTokenResponse.hasFailed()
167 return response.statusCode
in RETRY_STATUS_CODES
174 requestType: RequestType,
175 headers: Optional[Dict[str, str]] =
None,
176 query: Optional[Dict[str, Any]] =
None,
177 body: Optional[RequestBodyType] =
None,
178 files: Optional[RequestFormType] =
None,
179 auth: Optional[Tuple[str, str]] =
None,
180 timeout: Tuple[int, int] = REQUEST_TIMEOUT,
181 maxTimeout: Tuple[int, int] = MAX_REQUEST_TIMEOUT,
182 stream: bool =
False,
184 ) -> NetworkResponse:
187 Sends an HTTP request with provided parameters
188 This method is used as a base for all other networkManager methods
193 endpoint to which the request is sent
194 requestType : RequestType
195 type of the request which is sent (get, post, put, delete, etc...)
196 headers : Optional[Dict[str, Any]]
197 headers which will be sent with request, if None default values will be used
198 query : Optional[Dict[str, Any]]
199 parameters which will be sent as query parameters
200 body : Optional[RequestBodyType]
201 parameters which will be sent as request body
202 files : Optional[RequestFormType]
203 files which will be sent as a part of form data request
204 auth : Optional[Tuple[str, str]]
205 credentials which will be send as basic auth header
207 defines if request body will be downloaded as a stream or not
208 timeout : Tuple[int, int]
209 timeout for the request, default <connection: 5s>, <read: 10s>
210 maxTimeout : Tuple[int, int]
211 timeout for the request, default <connection: 60s>, <read: 180s>
213 retry number of request - only for internal use
217 NetworkResponse -> object containing the request response
221 RequestFailedError -> if request failed due to connection issues
225 headers = self._headers()
227 url = self.serverUrl + endpoint
230 logger.debug(f
">> [Coretex] Sending request to \"{url}\"")
231 logger.debug(f
"\tType: {requestType}")
232 logger.debug(f
"\tHeaders: {headers}")
233 logger.debug(f
"\tQuery: {query}")
234 logger.debug(f
"\tBody: {body}")
235 logger.debug(f
"\tFiles: {logFilesData(files)}")
236 logger.debug(f
"\tAuth: {auth}")
237 logger.debug(f
"\tStream: {stream}")
238 logger.debug(f
"\tTimeout: {timeout}")
239 logger.debug(f
"\tMax timeout: {maxTimeout}")
240 logger.debug(f
"\tRetry count: {retryCount}")
243 data: Optional[Any] = body
244 if headers.get(
"Content-Type") ==
"application/json" and data
is not None:
245 data = json.dumps(body)
248 rawResponse = self._session.request(
259 response = NetworkResponse(rawResponse, endpoint)
260 if response.hasFailed():
261 logRequestFailure(endpoint, response)
263 if self.shouldRetry(retryCount, response):
264 if self._apiToken
is not None:
265 headers[API_TOKEN_HEADER] = self._apiToken
267 if response.statusCode == HTTPStatus.TOO_MANY_REQUESTS:
269 sleepBeforeRetry(retryCount, endpoint)
271 return self.request(endpoint, requestType, headers, query, body, files, auth, timeout, maxTimeout, stream, retryCount + 1)
274 except requests.exceptions.RequestException
as ex:
275 logger.debug(f
">> [Coretex] Request failed. Reason \"{ex}\"", exc_info = ex)
277 if self.shouldRetry(retryCount,
None):
279 sleepBeforeRetry(retryCount, endpoint)
281 if isinstance(ex, requests.exceptions.ConnectionError)
and "timeout" in str(ex):
284 timeout = getTimeoutForRetry(retryCount + 1, timeout, maxTimeout)
286 logger.debug(f
">> [Coretex] \"{endpoint}\" failed failed due to timeout. Increasing the timeout from {oldTimeout} to {timeout}")
288 if self._apiToken
is not None:
289 headers[API_TOKEN_HEADER] = self._apiToken
291 return self.request(endpoint, requestType, headers, query, body, files, auth, timeout, maxTimeout, stream, retryCount + 1)
293 raise RequestFailedError(endpoint, requestType)
298 params: Optional[Dict[str, Any]] =
None,
299 headers: Optional[Dict[str, str]] =
None
300 ) -> NetworkResponse:
303 Sends head HTTP request
308 endpoint to which the request is sent
309 params : Optional[RequestBodyType]
310 query parameters of the request
311 headers : Optional[Dict[str, str]]
312 additional headers of the request
316 NetworkResponse -> object containing the request response
320 RequestFailedError -> if request failed due to connection/timeout issues
323 if headers
is not None:
324 headers = {**self._headers(), **headers}
326 return self.request(endpoint, RequestType.head, headers, query = params)
328 def post(self, endpoint: str, params: Optional[RequestBodyType] =
None) -> NetworkResponse:
330 Sends post HTTP request
335 endpoint to which the request is sent
336 params : Optional[RequestBodyType]
341 NetworkResponse -> object containing the request response
345 RequestFailedError -> if request failed due to connection issues
348 return self.request(endpoint, RequestType.post, body = params)
350 def get(self, endpoint: str, params: Optional[Dict[str, Any]] =
None) -> NetworkResponse:
352 Sends get HTTP request
357 endpoint to which the request is sent
358 params : Optional[RequestBodyType]
359 query parameters of the request
363 NetworkResponse -> object containing the request response
367 RequestFailedError -> if request failed due to connection issues
370 return self.request(endpoint, RequestType.get, query = params)
372 def put(self, endpoint: str, params: Optional[RequestBodyType] =
None) -> NetworkResponse:
374 Sends put HTTP request
379 endpoint to which the request is sent
380 params : Optional[RequestBodyType]
385 NetworkResponse -> object containing the request response
389 RequestFailedError -> if request failed due to connection issues
392 return self.request(endpoint, RequestType.put, body = params)
394 def delete(self, endpoint: str) -> NetworkResponse:
396 Sends delete HTTP request
401 endpoint to which the request is sent
405 NetworkResponse -> object containing the request response
409 RequestFailedError -> if request failed due to connection issues
412 return self.request(endpoint, RequestType.delete)
417 params: Optional[Dict[str, Any]] =
None,
418 files: Optional[List[FileData]] =
None
419 ) -> NetworkResponse:
422 Sends multipart/form-data request
427 endpoint to which the request is sent
428 params : Optional[Dict[str, Any]]
430 files : Optional[List[FileData]]
435 NetworkResponse -> object containing the request response
439 >>> from coretex import networkManager
441 >>> response = networkManager.formData(
442 endpoint = "dummyObject/form",
447 >>> if response.hasFailed():
448 print("Failed to send form data request")
454 with ExitStack()
as stack:
455 filesData = [file.prepareForUpload(stack)
for file
in files]
457 headers = self._headers(
"multipart/form-data")
458 del headers[
"Content-Type"]
461 response = self.request(endpoint, RequestType.options)
462 if response.hasFailed():
463 raise NetworkRequestError(response,
"Could not establish a connection with the server")
466 timeout = UPLOAD_TIMEOUT
467 maxTimeout = MAX_UPLOAD_TIMEOUT
470 timeout = REQUEST_TIMEOUT
471 maxTimeout = MAX_REQUEST_TIMEOUT
480 maxTimeout = maxTimeout
485 raise RuntimeError(
"Unreachable")
487 def authenticate(self, username: str, password: str, storeCredentials: bool =
True) -> NetworkResponse:
489 Authenticates user with provided credentials
497 storeCredentials : bool
498 If true credentials will be stored in User object for reuse,
499 ignored for all managers except coretex.networking.networkManager
503 NetworkResponse -> object containing the request response
507 >>> from coretex.networking import networkManager
509 >>> response = networkManager.authenticate("dummy@coretex.ai", "123456")
510 >>> if response.hasFailed():
511 print("Failed to authenticate")
516 response = self.request(LOGIN_ENDPOINT, RequestType.post, auth = (username, password))
517 if response.hasFailed():
520 responseJson = response.getJson(dict)
522 self._apiToken = responseJson[API_TOKEN_KEY]
523 self._refreshToken = responseJson[REFRESH_TOKEN_KEY]
527 def authenticateWithStoredCredentials(self) -> NetworkResponse:
529 To use this functions call it using coretex.networking.networkManager
536 raise NotImplementedError
538 def authenticateWithRefreshToken(self, token: str) -> NetworkResponse:
540 Authenticates user with provided refresh token
549 NetworkResponse -> object containing the request response
552 self._refreshToken = token
553 return self.refreshToken()
558 destination: Union[Path, str],
559 params: Optional[Dict[str, Any]] =
None,
560 headers: Optional[Dict[str, str]] =
None
561 ) -> NetworkResponse:
564 Downloads file to the given destination
569 endpoint to which the request is sent
570 destination : Union[Path, str]
572 params : Optional[Dict[str, Any]]
573 query parameters of the request
574 headers : Optional[Dict[str, str]]
575 additional headers of the request
579 NetworkResponse -> object containing the request response
583 >>> from coretex import networkManager
585 >>> response = networkManager.genericDownload(
586 endpoint = "dummyObject/download",
587 destination = "path/to/destination/folder"
589 >>> if response.hasFailed():
590 print("Failed to download the file")
593 if isinstance(destination, str):
594 destination = Path(destination)
597 if destination.exists():
598 response = self.head(endpoint, params, headers)
599 if response.hasFailed():
605 contentLength = int(response.headers[
"Content-Length"])
606 if destination.stat().st_size == contentLength:
608 except (ValueError, KeyError):
613 if headers
is not None:
614 headers = {**self._headers(), **headers}
617 response = self.request(
623 timeout = DOWNLOAD_TIMEOUT,
624 maxTimeout = MAX_DOWNLOAD_TIMEOUT
627 if response.hasFailed():
630 with destination.open(
"wb")
as file:
631 for chunk
in response.stream(chunkSize = DOWNLOAD_CHUNK_SIZE):
636 def refreshToken(self) -> NetworkResponse:
638 Uses refresh token functionality to fetch new API access token
642 NetworkResponse -> object containing the request response
645 if self._refreshToken
is None:
646 raise ValueError(f
">> [Coretex] Cannot send \"{REFRESH_ENDPOINT}\" request, refreshToken is None")
648 headers = self._headers()
649 headers[API_TOKEN_HEADER] = self._refreshToken
651 response = self.request(REFRESH_ENDPOINT, RequestType.post, headers = headers)
652 if response.hasFailed():
655 responseJson = response.getJson(dict)
656 self._apiToken = responseJson[API_TOKEN_KEY]
660 def reset(self) -> None:
662 Removes api and refresh token
665 self._apiToken =
None
666 self._refreshToken =
None