18 from typing
import Optional, Type
19 from typing_extensions
import Self
20 from types
import TracebackType, FrameType
21 from multiprocessing.connection
import Connection
27 import multiprocessing
32 from .
import utils, metrics, artifacts
33 from ...entities
import TaskRun
34 from ...networking
import networkManager, NetworkRequestError
37 def _update(taskRun: TaskRun) ->
None:
38 logging.getLogger(
"coretexpylib").debug(
">> [Coretex] Heartbeat")
39 taskRun.updateStatus()
41 logging.getLogger(
"coretexpylib").debug(
">> [Coretex] Uploading metrics")
42 metrics.upload(taskRun)
45 def _taskRunWorker(output: Connection, refreshToken: str, taskRunId: int, parentId: int) ->
None:
48 def handleTerminateSignal(signum: int, frame: Optional[FrameType]) ->
None:
49 if signum != signal.SIGTERM:
52 logging.getLogger(
"coretexpylib").debug(
">> [Coretex] Received terminate signal. Terminating...")
57 signal.signal(signal.SIGTERM, handleTerminateSignal)
58 utils.initializeLogger(taskRunId)
60 response = networkManager.authenticateWithRefreshToken(refreshToken)
61 if response.hasFailed():
62 utils.sendFailure(output,
"Failed to authenticate with refresh token")
66 taskRun: TaskRun = TaskRun.fetchById(taskRunId)
67 except NetworkRequestError:
68 utils.sendFailure(output, f
"Failed to fetch TaskRun with id \"{taskRunId}\"")
72 metrics.create(taskRun)
73 except NetworkRequestError:
74 utils.sendFailure(output,
"Failed to create metrics")
77 utils.sendSuccess(output,
"TaskRun worker succcessfully started")
79 parent = psutil.Process(parentId)
80 current = psutil.Process(os.getpid())
83 with artifacts.track(taskRun):
84 while parent.is_running()
and not isStopped:
85 logging.getLogger(
"coretexpylib").debug(f
">> [Coretex] Worker process id {current.pid}, parent process id {parent.pid}")
88 start = timeit.default_timer()
90 diff = timeit.default_timer() - start
95 logging.getLogger(
"coretexpylib").debug(f
">> [Coretex] Sleeping for {sleepTime}s")
98 logging.getLogger(
"coretexpylib").debug(
">> [Coretex] Finished")
103 def __init__(self, refreshToken: str, taskRunId: int) ->
None:
104 self._refreshToken = refreshToken
106 output, input = multiprocessing.Pipe()
109 self.__process = multiprocessing.Process(
110 name = f
"TaskRun {taskRunId} worker process",
111 target = _taskRunWorker,
112 args = (output, refreshToken, taskRunId, os.getpid()),
116 def start(self) -> None:
117 self.__process.start()
119 result = self.__input.recv()
120 if result[
"code"] != 0:
121 raise RuntimeError(result[
"message"])
123 message = result[
"message"]
124 logging.getLogger(
"coretexpylib").info(f
">> [Coretex] {message}")
126 def stop(self) -> None:
127 logging.getLogger(
"coretexpylib").info(
">> [Coretex] Stopping the worker process")
129 self.__process.terminate()
130 self.__process.join()
132 def __enter__(self) -> Self:
138 exceptionType: Optional[Type[BaseException]],
139 exceptionValue: Optional[BaseException],
140 exceptionTraceback: Optional[TracebackType]
143 if self.__process.is_alive():
146 def kill(self) -> None:
147 logging.getLogger(
"coretexpylib").info(
">> [Coretex] Killing the worker process")
148 self.__process.kill()