Coretex
docker.py
1 from typing import Dict, Any, List, Tuple, Optional
2 from pathlib import Path
3 
4 import json
5 import platform
6 import tempfile
7 
8 from .process import command, CommandException
9 from ..statistics import getTotalSwapMemory
10 
11 
12 class DockerConfigurationException(Exception):
13  pass
14 
15 
16 def isDockerAvailable() -> None:
17  try:
18  # Run the command to check if Docker exists and is available
19  command(["docker", "ps"], ignoreStdout = True, ignoreStderr = True)
20  except CommandException:
21  raise RuntimeError("Docker not available. Please check that it is properly installed and running on your system.")
22 
23 
24 def networkExists(name: str) -> bool:
25  # This function inspects the specified Docker network using the
26  # "docker network inspect" command. If the command exits with a return code
27  # of 0, indicating success, the function returns True, meaning the network exists.
28  # If the command exits with a non-zero return code, indicating failure,
29  # the function returns False, meaning the network doesn't exist.
30  try:
31  command(["docker", "network", "inspect", name], ignoreStdout = True, ignoreStderr = True)
32  return True
33  except:
34  return False
35 
36 
37 def containerRunning(name: str) -> bool:
38  try:
39  _, output, _ = command(["docker", "ps", "--format", "{{.Names}}"], ignoreStderr = True, ignoreStdout = True)
40  return name in output.splitlines()
41  except:
42  return False
43 
44 
45 def containerExists(name: str) -> bool:
46  try:
47  _, output, _ = command(["docker", "ps", "-a", "--format", "{{.Names}}"], ignoreStderr = True, ignoreStdout = True)
48  return name in output.splitlines()
49  except:
50  return False
51 
52 
53 def createNetwork(name: str) -> None:
54  if networkExists(name):
55  removeNetwork(name)
56 
57  command(["docker", "network", "create", "--driver", "bridge", name], ignoreStdout = True)
58 
59 
60 def removeNetwork(name: str) -> None:
61  command(["docker", "network", "rm", name], ignoreStdout = True, ignoreStderr = True)
62 
63 
64 def removeImage(image: str) -> None:
65  command(["docker", "image", "rm", image], ignoreStdout = True, ignoreStderr = True)
66 
67 
68 def removeDanglingImages(repository: str, tag: str) -> None:
69  _, output, _ = command(["docker", "image", "ls", repository, "--format", "json"], ignoreStdout = True, ignoreStderr = True)
70  images = output.strip().split("\n")
71 
72  for image in images:
73  if len(image) == 0:
74  continue
75 
76  jsonImg = json.loads(image)
77  if jsonImg["Tag"] != tag:
78  removeImage(jsonImg["ID"])
79 
80 
81 def imagePull(image: str) -> None:
82  command(["docker", "image", "pull", image])
83 
84 
85 def start(
86  name: str,
87  image: str,
88  allowGpu: bool,
89  ram: int,
90  swap: int,
91  shm: int,
92  cpuCount: int,
93  environ: Dict[str, str],
94  volumes: List[Tuple[str, str]]
95 ) -> None:
96 
97  # https://github.com/moby/moby/issues/14215#issuecomment-115959661
98  # --memory-swap = total memory limit -> memory + swap
99 
100  runCommand = [
101  "docker", "run", "-d",
102  "--restart", "always",
103  "-p", "21000:21000",
104  "--cap-add", "SYS_PTRACE",
105  "--network", name,
106  "--memory", f"{ram}G",
107  "--memory-swap", f"{ram + swap}G",
108  "--shm-size", f"{shm}G",
109  "--cpus", f"{cpuCount}",
110  "--name", name,
111  ]
112 
113  for key, value in environ.items():
114  runCommand.extend(["--env", f"{key}={value}"])
115 
116  for source, destination in volumes:
117  runCommand.extend(["-v", f"{source}:{destination}"])
118 
119  if allowGpu:
120  runCommand.extend(["--gpus", "all"])
121 
122  # Docker image must always be the last parameter of docker run command
123  runCommand.append(image)
124  command(runCommand, ignoreStdout = True, ignoreStderr = True)
125 
126 
127 def stopContainer(name: str) -> None:
128  command(["docker", "stop", name], ignoreStdout = True, ignoreStderr = True)
129 
130 
131 def removeContainer(name: str) -> None:
132  command(["docker", "rm", name], ignoreStdout = True, ignoreStderr = True)
133 
134 
135 def manifestInspect(image: str) -> Dict[str, Any]:
136  _, output, _ = command(["docker", "manifest", "inspect", image, "--verbose"], ignoreStdout = True)
137  jsonOutput = json.loads(output)
138  if not isinstance(jsonOutput, dict):
139  raise TypeError(f"Invalid function result type \"{type(jsonOutput)}\". Expected: \"dict\"")
140 
141  return jsonOutput
142 
143 
144 def imageInspect(image: str) -> Dict[str, Any]:
145  _, output, _ = command(["docker", "image", "inspect", image], ignoreStdout = True, ignoreStderr = True)
146  jsonOutput = json.loads(output)
147  if not isinstance(jsonOutput, list):
148  raise TypeError(f"Invalid json.loads() result type \"{type(jsonOutput)}\". Expected: \"list\"")
149 
150  if not isinstance(jsonOutput[0], dict): # Since we are inspecting image with specific repository AND tag output will be a list with single object
151  raise TypeError(f"Invalid function result type \"{type(jsonOutput[0])}\". Expected: \"dict\"")
152 
153  return jsonOutput[0]
154 
155 
156 def getResourceLimits() -> Tuple[int, int]:
157  _, output, _ = command(["docker", "info", "--format", "{{json .}}"], ignoreStdout = True, ignoreStderr = True)
158  jsonOutput = json.loads(output)
159 
160  return jsonOutput["NCPU"], round(jsonOutput["MemTotal"] / (1024 ** 3))
161 
162 
163 def getDockerConfigPath() -> Optional[Path]:
164  if platform.system() == "Darwin":
165  return Path.home().joinpath("Library", "Group Containers", "group.com.docker", "settings.json")
166  elif platform.system() == "Windows":
167  return Path.home().joinpath("AppData", "Roaming", "Docker", "settings.json")
168  elif platform.system() == "Linux":
169  return Path.home().joinpath(".docker", "desktop", "settings.json")
170  else:
171  return None
172 
173 
174 def getDockerSwapLimit() -> int:
175  configPath = getDockerConfigPath()
176 
177  if configPath is None or not configPath.exists():
178  return getTotalSwapMemory()
179 
180  with configPath.open("r") as configFile:
181  configJson = json.load(configFile)
182 
183  swapLimit = configJson.get("swapMiB")
184  if not isinstance(swapLimit, int):
185  return getTotalSwapMemory()
186 
187  return int(swapLimit / 1024)
188 
189 
190 def getContainerImageName(containerName: str) -> str:
191  _, output, _ = command(["docker", "inspect", "--format", "{{.Config.Image}}", containerName], ignoreStdout = True, ignoreStderr = True)
192  return output.strip()
193 
194 
195 def getLogs(name: str, tail: Optional[int], follow: bool, timestamps: bool) -> None:
196  runCommand = ["docker", "logs", name]
197  if isinstance(tail, int):
198  runCommand.extend(["--tail", str(tail)])
199 
200  if timestamps:
201  runCommand.append("-t")
202 
203  if follow:
204  runCommand.append("-f")
205 
206  command(runCommand)
207 
208 
209 def isDockerDesktop() -> bool:
210  try:
211  _, output, _ = command(["docker", "info", "--format", "{{json .}}"], ignoreStdout = True, ignoreStderr = True)
212  jsonOutput = json.loads(output)
213 
214  clientInfo = jsonOutput.get("ClientInfo")
215  if not isinstance(clientInfo, dict):
216  return False
217 
218  pluginsInfo = clientInfo.get("Plugins")
219  if not isinstance(pluginsInfo, dict):
220  return False
221 
222  versionInfo = pluginsInfo.get("Version")
223  if not isinstance(versionInfo, str):
224  return False
225 
226  return "desktop" in versionInfo
227  except:
228  return False
229 
230 
231 def isDaemonFileUpdated() -> bool:
232  daemonFile = Path("/etc/docker/daemon.json")
233  cGroupFix = "native.cgroupdriver=cgroupfs"
234 
235  if not daemonFile.exists():
236  return False
237 
238  with daemonFile.open("r") as file:
239  try:
240  config = json.load(file)
241  execOpts = config.get("exec-opts", [])
242  return cGroupFix in execOpts
243  except json.JSONDecodeError:
244  return False
245 
246 
247 def updateDaemonFile() -> None:
248  daemonFile = Path("/etc/docker/daemon.json")
249  cGroupFix = "native.cgroupdriver=cgroupfs"
250  config: Dict[str, Any] = {}
251 
252  if not daemonFile.exists():
253  config = {}
254 
255  with daemonFile.open("r") as file:
256  try:
257  config = json.load(file)
258  except json.JSONDecodeError:
259  config = {}
260 
261  execOpts: List[str] = config.get("exec-opts", [])
262  execOpts.append(cGroupFix)
263  config["exec-opts"] = execOpts
264 
265  with tempfile.NamedTemporaryFile("w", delete = True) as tempFile:
266  json.dump(config, tempFile, indent = 4)
267  tempFilePath = tempFile.name
268 
269  # Use sudo to move the temporary file to the protected location
270  command(["sudo", "mv", tempFilePath, str(daemonFile)], ignoreStderr = True, ignoreStdout = True)
271 
272 
273 def restartDocker() -> None:
274  command(["sudo", "systemctl", "restart", "docker"], ignoreStderr = True, ignoreStdout = True)