Coretex
network_manager_base.py
1 # Copyright (C) 2023 Coretex LLC
2 
3 # This file is part of Coretex.ai
4 
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU Affero General Public License for more details.
14 
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program. If not, see <https://www.gnu.org/licenses/>.
17 
18 from typing import Optional, Any, Dict, List, Union, Tuple
19 from pathlib import Path
20 from abc import ABC, abstractmethod
21 from contextlib import ExitStack
22 from http import HTTPStatus
23 from importlib.metadata import version as getLibraryVersion
24 
25 import os
26 import json
27 import logging
28 import platform
29 
30 import requests
31 import requests.adapters
32 
33 from .utils import RequestBodyType, RequestFormType, logFilesData, logRequestFailure, sleepBeforeRetry, getTimeoutForRetry
34 from .request_type import RequestType
35 from .network_response import NetworkResponse, NetworkRequestError
36 from .file_data import FileData
37 
38 
39 logger = logging.getLogger("coretexpylib")
40 
41 REQUEST_TIMEOUT = (5, 10) # Connection = 5 seconds, Read = 10 seconds
42 MAX_REQUEST_TIMEOUT = (60, 180) # Connection = 60 seconds, Read = 3 minutes
43 DOWNLOAD_TIMEOUT = (5, 60) # Connection = 5 seconds, Read = 1 minute
44 MAX_DOWNLOAD_TIMEOUT = (60, 180) # Connection = 1 minute, Read = 3 minutes
45 UPLOAD_TIMEOUT = (5, 60) # Connection = 5 seconds, Read = 1 minute
46 MAX_UPLOAD_TIMEOUT = (60, 1800) # Connection = 1 minute, Read = 30 minutes
47 
48 MAX_RETRY_COUNT = 5 # Request will be retried 5 times before raising an error
49 MAX_DELAY_BEFORE_RETRY = 180 # 3 minute
50 
51 LOGIN_ENDPOINT = "user/login"
52 REFRESH_ENDPOINT = "user/refresh"
53 API_TOKEN_HEADER = "api-token"
54 API_TOKEN_KEY = "token"
55 REFRESH_TOKEN_KEY = "refresh_token"
56 
57 RETRY_STATUS_CODES = [
58  HTTPStatus.TOO_MANY_REQUESTS,
59  HTTPStatus.INTERNAL_SERVER_ERROR,
60  HTTPStatus.SERVICE_UNAVAILABLE
61 ]
62 
63 DOWNLOAD_CHUNK_SIZE = 8 * 1024 * 1024 # 8 MB
64 
65 
66 class RequestFailedError(Exception):
67 
68  def __init__(self, endpoint: str, type_: RequestType) -> None:
69  super().__init__(f">> [Coretex] \"{type_.name}\" request failed for endpoint \"{endpoint}\"")
70 
71 
72 class NetworkManagerBase(ABC):
73 
74  def __init__(self) -> None:
75  self._session = requests.Session()
76 
77  cpuCount = os.cpu_count()
78  if cpuCount is None:
79  cpuCount = 1
80 
81  # 10 is default, keep that value for machines which have <= 10 cores
82  adapter = requests.adapters.HTTPAdapter(pool_maxsize = max(10, cpuCount))
83  self._session.mount("http://", adapter)
84  self._session.mount("https://", adapter)
85 
86  @property
87  def serverUrl(self) -> str:
88  return os.environ["CTX_API_URL"] + "api/v1/"
89 
90  @property
91  @abstractmethod
92  def _apiToken(self) -> Optional[str]:
93  pass
94 
95  @_apiToken.setter
96  @abstractmethod
97  def _apiToken(self, value: Optional[str]) -> None:
98  pass
99 
100  @property
101  @abstractmethod
102  def _refreshToken(self) -> Optional[str]:
103  pass
104 
105  @_refreshToken.setter
106  @abstractmethod
107  def _refreshToken(self, value: Optional[str]) -> None:
108  pass
109 
110  @property
111  def userAgent(self) -> str:
112  coretexpylibVersion = getLibraryVersion("coretex")
113  return f"coretexpylib;{coretexpylibVersion};python;{platform.python_version()}"
114 
115  @property
116  def hasStoredCredentials(self) -> bool:
117  """
118  To use this functions call it using coretex.networking.networkManager
119 
120  Raises
121  ------
122  NotImplementedError
123  """
124 
125  raise NotImplementedError
126 
127  def _headers(self, contentType: str = "application/json") -> Dict[str, str]:
128  headers = {
129  "Content-Type": contentType,
130  "Connection": "keep-alive",
131  "X-User-Agent": self.userAgent
132  }
133 
134  if self._apiToken is not None:
135  headers[API_TOKEN_HEADER] = self._apiToken
136 
137  return headers
138 
139  def shouldRetry(self, retryCount: int, response: Optional[NetworkResponse]) -> bool:
140  """
141  Checks if network request should be repeated based on the number of repetitions
142  as well as the response from previous repetition
143 
144  Parameters
145  ----------
146  retryCount : int
147  number of repeated function calls
148  response : Optional[NetworkResponse]
149  response of the request which is pending for retry
150 
151  Returns
152  -------
153  bool -> True if the request should be retried, False if not
154  """
155 
156  # Limit retry count to 3 times
157  if retryCount >= MAX_RETRY_COUNT:
158  return False
159 
160  if response is not None:
161  # If we get unauthorized maybe API token is expired
162  # If refresh endpoint failed with unauthorized do not retry
163  if response.isUnauthorized() and response.endpoint != REFRESH_ENDPOINT:
164  refreshTokenResponse = self.refreshToken()
165  return not refreshTokenResponse.hasFailed()
166 
167  return response.statusCode in RETRY_STATUS_CODES
168 
169  return True
170 
171  def request(
172  self,
173  endpoint: str,
174  requestType: RequestType,
175  headers: Optional[Dict[str, str]] = None,
176  query: Optional[Dict[str, Any]] = None,
177  body: Optional[RequestBodyType] = None,
178  files: Optional[RequestFormType] = None,
179  auth: Optional[Tuple[str, str]] = None,
180  timeout: Tuple[int, int] = REQUEST_TIMEOUT,
181  maxTimeout: Tuple[int, int] = MAX_REQUEST_TIMEOUT,
182  stream: bool = False,
183  retryCount: int = 0
184  ) -> NetworkResponse:
185 
186  """
187  Sends an HTTP request with provided parameters
188  This method is used as a base for all other networkManager methods
189 
190  Parameters
191  ----------
192  endpoint : str
193  endpoint to which the request is sent
194  requestType : RequestType
195  type of the request which is sent (get, post, put, delete, etc...)
196  headers : Optional[Dict[str, Any]]
197  headers which will be sent with request, if None default values will be used
198  query : Optional[Dict[str, Any]]
199  parameters which will be sent as query parameters
200  body : Optional[RequestBodyType]
201  parameters which will be sent as request body
202  files : Optional[RequestFormType]
203  files which will be sent as a part of form data request
204  auth : Optional[Tuple[str, str]]
205  credentials which will be send as basic auth header
206  stream : bool
207  defines if request body will be downloaded as a stream or not
208  timeout : Tuple[int, int]
209  timeout for the request, default <connection: 5s>, <read: 10s>
210  maxTimeout : Tuple[int, int]
211  timeout for the request, default <connection: 60s>, <read: 180s>
212  retryCount : int
213  retry number of request - only for internal use
214 
215  Returns
216  -------
217  NetworkResponse -> object containing the request response
218 
219  Raises
220  ------
221  RequestFailedError -> if request failed due to connection issues
222  """
223 
224  if headers is None:
225  headers = self._headers()
226 
227  url = self.serverUrl + endpoint
228 
229  # Log request debug data
230  logger.debug(f">> [Coretex] Sending request to \"{url}\"")
231  logger.debug(f"\tType: {requestType}")
232  logger.debug(f"\tHeaders: {headers}")
233  logger.debug(f"\tQuery: {query}")
234  logger.debug(f"\tBody: {body}")
235  logger.debug(f"\tFiles: {logFilesData(files)}")
236  logger.debug(f"\tAuth: {auth}")
237  logger.debug(f"\tStream: {stream}")
238  logger.debug(f"\tTimeout: {timeout}")
239  logger.debug(f"\tMax timeout: {maxTimeout}")
240  logger.debug(f"\tRetry count: {retryCount}")
241 
242  # If Content-Type is application/json make sure that body is converted to json
243  data: Optional[Any] = body
244  if headers.get("Content-Type") == "application/json" and data is not None:
245  data = json.dumps(body)
246 
247  try:
248  rawResponse = self._session.request(
249  requestType.value,
250  url,
251  params = query,
252  data = data,
253  auth = auth,
254  timeout = timeout,
255  files = files,
256  headers = headers
257  )
258 
259  response = NetworkResponse(rawResponse, endpoint)
260  if response.hasFailed():
261  logRequestFailure(endpoint, response)
262 
263  if self.shouldRetry(retryCount, response):
264  if self._apiToken is not None:
265  headers[API_TOKEN_HEADER] = self._apiToken
266 
267  if response.statusCode == HTTPStatus.TOO_MANY_REQUESTS:
268  # If the rate limiter is hit sleep before retrying the request
269  sleepBeforeRetry(retryCount, endpoint)
270 
271  return self.request(endpoint, requestType, headers, query, body, files, auth, timeout, maxTimeout, stream, retryCount + 1)
272 
273  return response
274  except requests.exceptions.RequestException as ex:
275  logger.debug(f">> [Coretex] Request failed. Reason \"{ex}\"", exc_info = ex)
276 
277  if self.shouldRetry(retryCount, None):
278  # If an exception happened during the request add a delay before retrying
279  sleepBeforeRetry(retryCount, endpoint)
280 
281  if isinstance(ex, requests.exceptions.ConnectionError) and "timeout" in str(ex):
282  # If request failed due to timeout recalculate (increase) the timeout
283  oldTimeout = timeout
284  timeout = getTimeoutForRetry(retryCount + 1, timeout, maxTimeout)
285 
286  logger.debug(f">> [Coretex] \"{endpoint}\" failed failed due to timeout. Increasing the timeout from {oldTimeout} to {timeout}")
287 
288  if self._apiToken is not None:
289  headers[API_TOKEN_HEADER] = self._apiToken
290 
291  return self.request(endpoint, requestType, headers, query, body, files, auth, timeout, maxTimeout, stream, retryCount + 1)
292 
293  raise RequestFailedError(endpoint, requestType)
294 
295  def head(
296  self,
297  endpoint: str,
298  params: Optional[Dict[str, Any]] = None,
299  headers: Optional[Dict[str, str]] = None
300  ) -> NetworkResponse:
301 
302  """
303  Sends head HTTP request
304 
305  Parameters
306  ----------
307  endpoint : str
308  endpoint to which the request is sent
309  params : Optional[RequestBodyType]
310  query parameters of the request
311  headers : Optional[Dict[str, str]]
312  additional headers of the request
313 
314  Returns
315  -------
316  NetworkResponse -> object containing the request response
317 
318  Raises
319  ------
320  RequestFailedError -> if request failed due to connection/timeout issues
321  """
322 
323  if headers is not None:
324  headers = {**self._headers(), **headers}
325 
326  return self.request(endpoint, RequestType.head, headers, query = params)
327 
328  def post(self, endpoint: str, params: Optional[RequestBodyType] = None) -> NetworkResponse:
329  """
330  Sends post HTTP request
331 
332  Parameters
333  ----------
334  endpoint : str
335  endpoint to which the request is sent
336  params : Optional[RequestBodyType]
337  body of the request
338 
339  Returns
340  -------
341  NetworkResponse -> object containing the request response
342 
343  Raises
344  ------
345  RequestFailedError -> if request failed due to connection issues
346  """
347 
348  return self.request(endpoint, RequestType.post, body = params)
349 
350  def get(self, endpoint: str, params: Optional[Dict[str, Any]] = None) -> NetworkResponse:
351  """
352  Sends get HTTP request
353 
354  Parameters
355  ----------
356  endpoint : str
357  endpoint to which the request is sent
358  params : Optional[RequestBodyType]
359  query parameters of the request
360 
361  Returns
362  -------
363  NetworkResponse -> object containing the request response
364 
365  Raises
366  ------
367  RequestFailedError -> if request failed due to connection issues
368  """
369 
370  return self.request(endpoint, RequestType.get, query = params)
371 
372  def put(self, endpoint: str, params: Optional[RequestBodyType] = None) -> NetworkResponse:
373  """
374  Sends put HTTP request
375 
376  Parameters
377  ----------
378  endpoint : str
379  endpoint to which the request is sent
380  params : Optional[RequestBodyType]
381  body of the request
382 
383  Returns
384  -------
385  NetworkResponse -> object containing the request response
386 
387  Raises
388  ------
389  RequestFailedError -> if request failed due to connection issues
390  """
391 
392  return self.request(endpoint, RequestType.put, body = params)
393 
394  def delete(self, endpoint: str) -> NetworkResponse:
395  """
396  Sends delete HTTP request
397 
398  Parameters
399  ----------
400  endpoint : str
401  endpoint to which the request is sent
402 
403  Returns
404  -------
405  NetworkResponse -> object containing the request response
406 
407  Raises
408  ------
409  RequestFailedError -> if request failed due to connection issues
410  """
411 
412  return self.request(endpoint, RequestType.delete)
413 
414  def formData(
415  self,
416  endpoint: str,
417  params: Optional[Dict[str, Any]] = None,
418  files: Optional[List[FileData]] = None
419  ) -> NetworkResponse:
420 
421  """
422  Sends multipart/form-data request
423 
424  Parameters
425  ----------
426  endpoint : str
427  endpoint to which the request is sent
428  params : Optional[Dict[str, Any]]
429  form data parameters
430  files : Optional[List[FileData]]
431  form data files
432 
433  Returns
434  -------
435  NetworkResponse -> object containing the request response
436 
437  Example
438  -------
439  >>> from coretex import networkManager
440  \b
441  >>> response = networkManager.formData(
442  endpoint = "dummyObject/form",
443  params = {
444  "key": "value"
445  }
446  )
447  >>> if response.hasFailed():
448  print("Failed to send form data request")
449  """
450 
451  if files is None:
452  files = []
453 
454  with ExitStack() as stack:
455  filesData = [file.prepareForUpload(stack) for file in files]
456 
457  headers = self._headers("multipart/form-data")
458  del headers["Content-Type"]
459 
460  if len(files) > 0:
461  response = self.request(endpoint, RequestType.options)
462  if response.hasFailed():
463  raise NetworkRequestError(response, "Could not establish a connection with the server")
464 
465  # If files are being uploaded bigger timeout is required
466  timeout = UPLOAD_TIMEOUT
467  maxTimeout = MAX_UPLOAD_TIMEOUT
468  else:
469  # If there are no files there is no need for big timeouts
470  timeout = REQUEST_TIMEOUT
471  maxTimeout = MAX_REQUEST_TIMEOUT
472 
473  return self.request(
474  endpoint,
475  RequestType.post,
476  headers,
477  body = params,
478  files = filesData,
479  timeout = timeout,
480  maxTimeout = maxTimeout
481  )
482 
483  # mypy is complaining about missing return statement but this code is unreachable
484  # see: https://github.com/python/mypy/issues/7726
485  raise RuntimeError("Unreachable")
486 
487  def authenticate(self, username: str, password: str, storeCredentials: bool = True) -> NetworkResponse:
488  """
489  Authenticates user with provided credentials
490 
491  Parameters
492  ----------
493  username : str
494  Coretex.ai username
495  password : str
496  Coretex.ai password
497  storeCredentials : bool
498  If true credentials will be stored in User object for reuse,
499  ignored for all managers except coretex.networking.networkManager
500 
501  Returns
502  -------
503  NetworkResponse -> object containing the request response
504 
505  Example
506  -------
507  >>> from coretex.networking import networkManager
508  \b
509  >>> response = networkManager.authenticate("dummy@coretex.ai", "123456")
510  >>> if response.hasFailed():
511  print("Failed to authenticate")
512  """
513 
514  # authenticate using credentials stored in requests.Session.auth
515 
516  response = self.request(LOGIN_ENDPOINT, RequestType.post, auth = (username, password))
517  if response.hasFailed():
518  return response
519 
520  responseJson = response.getJson(dict)
521 
522  self._apiToken = responseJson[API_TOKEN_KEY]
523  self._refreshToken = responseJson[REFRESH_TOKEN_KEY]
524 
525  return response
526 
527  def authenticateWithStoredCredentials(self) -> NetworkResponse:
528  """
529  To use this functions call it using coretex.networking.networkManager
530 
531  Raises
532  ------
533  NotImplementedError
534  """
535 
536  raise NotImplementedError
537 
538  def authenticateWithRefreshToken(self, token: str) -> NetworkResponse:
539  """
540  Authenticates user with provided refresh token
541 
542  Parameters
543  ----------
544  token : str
545  refresh token
546 
547  Returns
548  -------
549  NetworkResponse -> object containing the request response
550  """
551 
552  self._refreshToken = token
553  return self.refreshToken()
554 
555  def download(
556  self,
557  endpoint: str,
558  destination: Union[Path, str],
559  params: Optional[Dict[str, Any]] = None,
560  headers: Optional[Dict[str, str]] = None
561  ) -> NetworkResponse:
562 
563  """
564  Downloads file to the given destination
565 
566  Parameters
567  ----------
568  endpoint : str
569  endpoint to which the request is sent
570  destination : Union[Path, str]
571  path to save file
572  params : Optional[Dict[str, Any]]
573  query parameters of the request
574  headers : Optional[Dict[str, str]]
575  additional headers of the request
576 
577  Returns
578  -------
579  NetworkResponse -> object containing the request response
580 
581  Example
582  -------
583  >>> from coretex import networkManager
584  \b
585  >>> response = networkManager.genericDownload(
586  endpoint = "dummyObject/download",
587  destination = "path/to/destination/folder"
588  )
589  >>> if response.hasFailed():
590  print("Failed to download the file")
591  """
592 
593  if isinstance(destination, str):
594  destination = Path(destination)
595 
596  # If the destination exists check if it's corrupted
597  if destination.exists():
598  response = self.head(endpoint, params, headers)
599  if response.hasFailed():
600  return response
601 
602  # If the Content-Length returned by the head request is not equal to destination's
603  # file size force the file to be re-downloaded
604  try:
605  contentLength = int(response.headers["Content-Length"])
606  if destination.stat().st_size == contentLength:
607  return response
608  except (ValueError, KeyError):
609  # KeyError - Content-Length is not present in headers
610  # ValueError - Content-Length cannot be converted to int
611  pass
612 
613  if headers is not None:
614  headers = {**self._headers(), **headers}
615 
616  # Timeout for download applies per chunk, not for the full file download
617  response = self.request(
618  endpoint,
619  RequestType.get,
620  headers,
621  query = params,
622  stream = True,
623  timeout = DOWNLOAD_TIMEOUT,
624  maxTimeout = MAX_DOWNLOAD_TIMEOUT
625  )
626 
627  if response.hasFailed():
628  return response
629 
630  with destination.open("wb") as file:
631  for chunk in response.stream(chunkSize = DOWNLOAD_CHUNK_SIZE):
632  file.write(chunk)
633 
634  return response
635 
636  def refreshToken(self) -> NetworkResponse:
637  """
638  Uses refresh token functionality to fetch new API access token
639 
640  Returns
641  -------
642  NetworkResponse -> object containing the request response
643  """
644 
645  if self._refreshToken is None:
646  raise ValueError(f">> [Coretex] Cannot send \"{REFRESH_ENDPOINT}\" request, refreshToken is None")
647 
648  headers = self._headers()
649  headers[API_TOKEN_HEADER] = self._refreshToken
650 
651  response = self.request(REFRESH_ENDPOINT, RequestType.post, headers = headers)
652  if response.hasFailed():
653  return response
654 
655  responseJson = response.getJson(dict)
656  self._apiToken = responseJson[API_TOKEN_KEY]
657 
658  return response
659 
660  def reset(self) -> None:
661  """
662  Removes api and refresh token
663  """
664 
665  self._apiToken = None
666  self._refreshToken = None