Coretex
worker.py
1 # Copyright (C) 2023 Coretex LLC
2 
3 # This file is part of Coretex.ai
4 
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU Affero General Public License for more details.
14 
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program. If not, see <https://www.gnu.org/licenses/>.
17 
18 from typing import Optional, Type
19 from typing_extensions import Self
20 from types import TracebackType, FrameType
21 from multiprocessing.connection import Connection
22 
23 import time
24 import timeit
25 import os
26 import logging
27 import multiprocessing
28 import signal
29 
30 import psutil
31 
32 from . import utils, metrics, artifacts
33 from ...entities import TaskRun
34 from ...networking import networkManager, NetworkRequestError
35 
36 
37 def _update(taskRun: TaskRun) -> None:
38  logging.getLogger("coretexpylib").debug(">> [Coretex] Heartbeat")
39  taskRun.updateStatus() # updateStatus without params is considered heartbeat
40 
41  logging.getLogger("coretexpylib").debug(">> [Coretex] Uploading metrics")
42  metrics.upload(taskRun)
43 
44 
45 def _taskRunWorker(output: Connection, refreshToken: str, taskRunId: int, parentId: int) -> None:
46  isStopped = False
47 
48  def handleTerminateSignal(signum: int, frame: Optional[FrameType]) -> None:
49  if signum != signal.SIGTERM:
50  return
51 
52  logging.getLogger("coretexpylib").debug(">> [Coretex] Received terminate signal. Terminating...")
53 
54  nonlocal isStopped
55  isStopped = True
56 
57  signal.signal(signal.SIGTERM, handleTerminateSignal)
58  utils.initializeLogger(taskRunId)
59 
60  response = networkManager.authenticateWithRefreshToken(refreshToken)
61  if response.hasFailed():
62  utils.sendFailure(output, "Failed to authenticate with refresh token")
63  return
64 
65  try:
66  taskRun: TaskRun = TaskRun.fetchById(taskRunId)
67  except NetworkRequestError:
68  utils.sendFailure(output, f"Failed to fetch TaskRun with id \"{taskRunId}\"")
69  return
70 
71  try:
72  metrics.create(taskRun)
73  except NetworkRequestError:
74  utils.sendFailure(output, "Failed to create metrics")
75  return
76 
77  utils.sendSuccess(output, "TaskRun worker succcessfully started")
78 
79  parent = psutil.Process(parentId)
80  current = psutil.Process(os.getpid())
81 
82  # Start tracking files which are created inside current working directory
83  with artifacts.track(taskRun):
84  while parent.is_running() and not isStopped:
85  logging.getLogger("coretexpylib").debug(f">> [Coretex] Worker process id {current.pid}, parent process id {parent.pid}")
86 
87  # Measure elapsed time to calculate for how long should the process sleep
88  start = timeit.default_timer()
89  _update(taskRun)
90  diff = timeit.default_timer() - start
91 
92  # Make sure that metrics and heartbeat are sent every 5 seconds
93  if diff < 5:
94  sleepTime = 5 - diff
95  logging.getLogger("coretexpylib").debug(f">> [Coretex] Sleeping for {sleepTime}s")
96  time.sleep(sleepTime)
97 
98  logging.getLogger("coretexpylib").debug(">> [Coretex] Finished")
99 
100 
101 class TaskRunWorker:
102 
103  def __init__(self, refreshToken: str, taskRunId: int) -> None:
104  self._refreshToken = refreshToken
105 
106  output, input = multiprocessing.Pipe()
107  self.__input = input
108 
109  self.__process = multiprocessing.Process(
110  name = f"TaskRun {taskRunId} worker process",
111  target = _taskRunWorker,
112  args = (output, refreshToken, taskRunId, os.getpid()),
113  daemon = True
114  )
115 
116  def start(self) -> None:
117  self.__process.start()
118 
119  result = self.__input.recv()
120  if result["code"] != 0:
121  raise RuntimeError(result["message"])
122 
123  message = result["message"]
124  logging.getLogger("coretexpylib").info(f">> [Coretex] {message}")
125 
126  def stop(self) -> None:
127  logging.getLogger("coretexpylib").info(">> [Coretex] Stopping the worker process")
128 
129  self.__process.terminate()
130  self.__process.join()
131 
132  def __enter__(self) -> Self:
133  self.start()
134  return self
135 
136  def __exit__(
137  self,
138  exceptionType: Optional[Type[BaseException]],
139  exceptionValue: Optional[BaseException],
140  exceptionTraceback: Optional[TracebackType]
141  ) -> None:
142 
143  if self.__process.is_alive():
144  self.stop()
145 
146  def kill(self) -> None:
147  logging.getLogger("coretexpylib").info(">> [Coretex] Killing the worker process")
148  self.__process.kill()