Coretex
upload_worker.py
1 # Copyright (C) 2023 Coretex LLC
2 
3 # This file is part of Coretex.ai
4 
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU Affero General Public License for more details.
14 
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program. If not, see <https://www.gnu.org/licenses/>.
17 
18 from typing import List, Callable
19 from threading import Thread, Event, RLock
20 
21 import time
22 import logging
23 
24 from ...logging import Log
25 
26 
27 MAX_WAIT_TIME_BEFORE_UPDATE = 5
28 UploadFunction = Callable[[List[Log]], bool]
29 
30 
31 class LoggerUploadWorker(Thread):
32 
33  """
34  Not intended for outside use
35 
36  A worker thread which is constantly running and
37  uploading logs to Coretex backend every 5 seconds
38 
39  If the upload request fails the wait time is doubled
40  """
41 
42  def __init__(self, uploadFunction: UploadFunction) -> None:
43  super().__init__()
44 
45  self.setDaemon(True)
46  self.setName("LoggerUploadWorker")
47 
48  self.__uploadFunction__uploadFunction = uploadFunction
49  self.__stopped__stopped = Event()
50  self.__lock__lock = RLock()
51  self.__waitTime__waitTime = MAX_WAIT_TIME_BEFORE_UPDATE
52  self.__pendingLogs: List[Log] = []
53 
54  @property
55  def isStopped(self) -> bool:
56  return self.__stopped__stopped.is_set()
57 
58  def stop(self) -> None:
59  with self.__lock__lock:
60  self.__stopped__stopped.set()
61 
62  def add(self, log: Log) -> None:
63  with self.__lock__lock:
64  if self.isStoppedisStopped:
65  return
66 
67  self.__pendingLogs.append(log)
68 
69  def uploadLogs(self) -> bool:
70  with self.__lock__lock:
71  if self.isStoppedisStopped:
72  return False
73 
74  if len(self.__pendingLogs) == 0:
75  return True
76 
77  # Uploads logs to Coretex using the provided function
78  result = self.__uploadFunction__uploadFunction(self.__pendingLogs)
79 
80  # Only clear logs if they were successfully uploaded to coretex
81  if result:
82  self.__pendingLogs.clear()
83 
84  return result
85 
86  def run(self) -> None:
87  while not self.isStoppedisStopped:
88  time.sleep(self.__waitTime__waitTime)
89 
90  try:
91  success = self.uploadLogsuploadLogs()
92  except BaseException as exception:
93  logging.getLogger("coretexpylib").debug(">> [Coretex] Failed to upload logs", exc_info = exception)
94  success = False
95 
96  if success:
97  # If upload of logs was success reset wait time
98  self.__waitTime__waitTime = MAX_WAIT_TIME_BEFORE_UPDATE
99  else:
100  # If upload of logs failed, double the wait time
101  self.__waitTime__waitTime *= 2