Coretex
process_utils.py
1 from typing import List
2 from pathlib import Path
3 
4 import subprocess
5 
6 from threading import Thread
7 
8 from .run_logger import runLogger
9 from ..logging import LogSeverity
10 
11 
12 def captureRunStdout(process: subprocess.Popen) -> None:
13  stdout = process.stdout
14  if stdout is None:
15  raise ValueError("stdout is None for subprocess")
16 
17  while process.poll() is None:
18  line: str = stdout.readline().decode("UTF-8")
19  runLogger.logProcessOutput(line)
20 
21 
22 def captureRunStderr(process: subprocess.Popen, isEnabled: bool) -> None:
23  stderr = process.stderr
24  if stderr is None:
25  raise ValueError("stderr is None for subprocess")
26 
27  lines: List[str] = []
28  while process.poll() is None:
29  line: str = stderr.readline().decode("UTF-8")
30  lines.append(line)
31 
32  # Dump stderr output at the end to perserve stdout order
33  for line in lines:
34  if isEnabled and process.returncode == 0:
35  severity = LogSeverity.warning
36  elif isEnabled:
37  severity = LogSeverity.fatal
38  else:
39  # We always want to know what gets logged to stderr
40  severity = LogSeverity.debug
41 
42  runLogger.logProcessOutput(line, severity)
43 
44 
45 def executeRunLocally(
46  args: List[str],
47  captureErr: bool,
48  cwd: Path = Path.cwd()
49 ) -> int:
50 
51  process = subprocess.Popen(
52  args,
53  shell = False,
54  cwd = cwd,
55  stdout = subprocess.PIPE,
56  stderr = subprocess.PIPE
57  )
58 
59  # Run a thread which captures process stdout and prints it out to Coretex.ai console
60  Thread(
61  target = captureRunStdout,
62  args = (process,),
63  name = "Process stdout reader"
64  ).start()
65 
66  if captureErr:
67  # Run a thread which captures process stderr and dumps it out node log file
68  Thread(
69  target = captureRunStderr,
70  args = (process, captureErr),
71  name = "Process stderr reader"
72  ).start()
73 
74  return process.wait()