Coretex
node.py
1 # Copyright (C) 2023 Coretex LLC
2 
3 # This file is part of Coretex.ai
4 
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU Affero General Public License for more details.
14 
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program. If not, see <https://www.gnu.org/licenses/>.
17 
18 from typing import Any, Dict, Optional, Tuple
19 from enum import Enum
20 from pathlib import Path
21 from base64 import b64encode
22 
23 import os
24 import logging
25 import requests
26 import platform
27 
28 import click
29 
30 from . import ui
31 from .utils import isGPUAvailable
32 from ...cryptography import rsa
33 from ...networking import networkManager, NetworkRequestError
34 from ...utils import CommandException, docker
35 from ...node import NodeMode, NodeStatus
36 from ...configuration import config_defaults, NodeConfiguration, InvalidConfiguration, ConfigurationNotFound
37 
38 
39 class NodeException(Exception):
40  pass
41 
42 
43 class ImageType(Enum):
44 
45  official = "official"
46  custom = "custom"
47 
48 
49 def pull(image: str) -> None:
50  try:
51  ui.progressEcho(f"Fetching image {image}...")
52  docker.imagePull(image)
53  ui.successEcho(f"Image {image} successfully fetched.")
54  except BaseException as ex:
55  logging.getLogger("cli").debug(ex, exc_info = ex)
56  raise NodeException("Failed to fetch latest node version.")
57 
58 
59 def isRunning() -> bool:
60  return docker.containerRunning(config_defaults.DOCKER_CONTAINER_NAME)
61 
62 
63 def exists() -> bool:
64  return docker.containerExists(config_defaults.DOCKER_CONTAINER_NAME)
65 
66 
67 def start(dockerImage: str, nodeConfig: NodeConfiguration) -> None:
68  try:
69  ui.progressEcho("Starting Coretex Node...")
70  docker.createNetwork(config_defaults.DOCKER_CONTAINER_NETWORK)
71 
72  environ = {
73  "CTX_API_URL": os.environ["CTX_API_URL"],
74  "CTX_STORAGE_PATH": "/root/.coretex",
75  "CTX_NODE_ACCESS_TOKEN": nodeConfig.accessToken,
76  "CTX_NODE_MODE": str(nodeConfig.mode),
77  "CTX_HEARTBEAT_INTERVAL": str(nodeConfig.heartbeatInterval)
78  }
79 
80  if nodeConfig.modelId is not None:
81  environ["CTX_MODEL_ID"] = str(nodeConfig.modelId)
82 
83  nodeSecret = nodeConfig.secret if nodeConfig.secret is not None else config_defaults.DEFAULT_NODE_SECRET # change in configuration
84  if isinstance(nodeSecret, str) and nodeSecret != config_defaults.DEFAULT_NODE_SECRET:
85  environ["CTX_NODE_SECRET"] = nodeSecret
86 
87  volumes = [
88  (nodeConfig.storagePath, "/root/.coretex")
89  ]
90 
91  if nodeConfig.allowDocker:
92  volumes.append(("/var/run/docker.sock", "/var/run/docker.sock"))
93 
94  initScript = nodeConfig.getInitScriptPath()
95  if initScript is not None:
96  volumes.append((str(initScript), "/script/init.sh"))
97 
98  docker.start(
99  config_defaults.DOCKER_CONTAINER_NAME,
100  dockerImage,
101  nodeConfig.allowGpu,
102  nodeConfig.ram,
103  nodeConfig.swap,
104  nodeConfig.sharedMemory,
105  nodeConfig.cpuCount,
106  environ,
107  volumes
108  )
109 
110  ui.successEcho("Successfully started Coretex Node.")
111  except BaseException as ex:
112  logging.getLogger("cli").debug(ex, exc_info = ex)
113  raise NodeException("Failed to start Coretex Node.")
114 
115 
116 def clean() -> None:
117  try:
118  docker.removeContainer(config_defaults.DOCKER_CONTAINER_NAME)
119  docker.removeNetwork(config_defaults.DOCKER_CONTAINER_NETWORK)
120  except BaseException as ex:
121  logging.getLogger("cli").debug(ex, exc_info = ex)
122  raise NodeException("Failed to clean inactive Coretex Node.")
123 
124 
125 def deactivateNode(id: Optional[int]) -> None:
126  params = {
127  "id": id
128  }
129 
130  response = networkManager.post("service/deactivate", params)
131  if response.hasFailed():
132  raise NetworkRequestError(response, "Failed to deactivate node.")
133 
134 
135 def stop(nodeId: Optional[int] = None) -> None:
136  try:
137  ui.progressEcho("Stopping Coretex Node...")
138  docker.stopContainer(config_defaults.DOCKER_CONTAINER_NAME)
139 
140  if nodeId is not None:
141  deactivateNode(nodeId)
142 
143  clean()
144  ui.successEcho("Successfully stopped Coretex Node....")
145  except BaseException as ex:
146  logging.getLogger("cli").debug(ex, exc_info = ex)
147  raise NodeException("Failed to stop Coretex Node.")
148 
149 
150 def getNodeStatus() -> NodeStatus:
151  try:
152  response = requests.get("http://localhost:21000/status", timeout = 1)
153  status = response.json()["status"]
154  return NodeStatus(status)
155  except:
156  return NodeStatus.inactive
157 
158 
159 def getRepoFromImageUrl(image: str) -> str:
160  imageName = image.split("/")[-1]
161  if not ":" in imageName:
162  return image
163 
164  tagIndex = image.rfind(":")
165  if tagIndex != -1:
166  return image[:tagIndex]
167  else:
168  return image
169 
170 
171 def getTagFromImageUrl(image: str) -> str:
172  imageName = image.split("/")[-1]
173  if not ":" in imageName:
174  return "latest"
175 
176  tagIndex = image.rfind(":")
177  if tagIndex != -1:
178  return image[tagIndex + 1:]
179  else:
180  return "latest"
181 
182 
183 def shouldUpdate(image: str) -> bool:
184  repository = getRepoFromImageUrl(image)
185  try:
186  imageJson = docker.imageInspect(image)
187  except CommandException:
188  # imageInspect() will raise an error if image doesn't exist locally
189  return True
190 
191  try:
192  manifestJson = docker.manifestInspect(image)
193  except CommandException:
194  return False
195 
196  for digest in imageJson["RepoDigests"]:
197  if repository in digest and manifestJson["Descriptor"]["digest"] in digest:
198  return False
199 
200  return True
201 
202 
203 def showLogs(tail: Optional[int], follow: bool, timestamps: bool) -> None:
204  docker.getLogs(config_defaults.DOCKER_CONTAINER_NAME, tail, follow, timestamps)
205 
206 
207 def registerNode(
208  name: str,
209  nodeMode: NodeMode,
210  publicKey: Optional[bytes] = None,
211  nearWalletId: Optional[str] = None,
212  endpointInvocationPrice: Optional[float] = None
213 ) -> Tuple[int, str]:
214 
215  params: Dict[str, Any] = {
216  "machine_name": name,
217  "mode": nodeMode.value
218  }
219 
220  if publicKey is not None:
221  params["public_key"] = b64encode(publicKey).decode("utf-8")
222 
223  if nearWalletId is not None:
224  params["near_wallet_id"] = nearWalletId
225 
226  if endpointInvocationPrice is not None:
227  params["endpoint_invocation_price"] = endpointInvocationPrice
228 
229  response = networkManager.post("service", params)
230 
231  if response.hasFailed():
232  raise NetworkRequestError(response, "Failed to configure node. Please try again...")
233 
234  accessToken = response.getJson(dict).get("access_token")
235  nodeId = response.getJson(dict).get("id")
236 
237  if not isinstance(accessToken, str) or not isinstance(nodeId, int):
238  raise TypeError("Something went wrong. Please try again...")
239 
240  return nodeId, accessToken
241 
242 
243 def selectImageType() -> ImageType:
244  availableImages = {
245  "Official Coretex image": ImageType.official,
246  "Custom image": ImageType.custom,
247  }
248 
249  choices = list(availableImages.keys())
250  selectedImage = ui.arrowPrompt(choices, "Please select image that you want to use (use arrow keys to select an option):")
251 
252  return availableImages[selectedImage]
253 
254 
255 def selectNodeMode() -> NodeMode:
256  # Define modes which can be picked
257  # Order of the elements in list affects how choices will
258  # be displayed in the terminal
259  nodeModes = [
260  NodeMode.any,
261  NodeMode.execution,
262  NodeMode.endpointReserved,
263  NodeMode.endpointShared
264  ]
265 
266  availableNodeModes = { mode.toString(): mode for mode in nodeModes }
267  choices = list(availableNodeModes.keys())
268 
269  selectedMode = ui.arrowPrompt(choices, "Please select Coretex Node mode (use arrow keys to select an option):")
270  return availableNodeModes[selectedMode]
271 
272 
273 def promptCpu(cpuLimit: int) -> int:
274  cpuCount: int = ui.clickPrompt(f"Enter the number of CPUs the container will use (Maximum: {cpuLimit}) (press enter to use default)", cpuLimit, type = int)
275 
276  if cpuCount == 0:
277  ui.errorEcho(f"ERROR: Number of CPU's the container will use must be higher than 0")
278  return promptCpu(cpuLimit)
279 
280  if cpuCount > cpuLimit:
281  ui.errorEcho(f"ERROR: CPU limit in Docker Desktop ({cpuLimit}) is lower than the specified value ({cpuCount})")
282  return promptCpu(cpuLimit)
283 
284  return cpuCount
285 
286 
287 def promptRam(ramLimit: int) -> int:
288  nodeRam: int = ui.clickPrompt(
289  f"Node RAM memory limit in GB (Minimum: {config_defaults.MINIMUM_RAM}GB, "
290  f"Maximum: {ramLimit}GB) (press enter to use default)",
291  ramLimit,
292  type = int
293  )
294 
295  if nodeRam > ramLimit:
296  ui.errorEcho(
297  f"ERROR: RAM limit in Docker Desktop ({ramLimit}GB) is lower than the configured value ({nodeRam}GB). "
298  "Please adjust resource limitations in Docker Desktop settings."
299  )
300  return promptRam(ramLimit)
301 
302  if nodeRam < config_defaults.MINIMUM_RAM:
303  ui.errorEcho(
304  f"ERROR: Configured RAM ({nodeRam}GB) is lower than "
305  "the minimum Node RAM requirement ({config_defaults.MINIMUM_RAM}GB)."
306  )
307  return promptRam(ramLimit)
308 
309  return nodeRam
310 
311 
312 def promptSwap(nodeRam: int, swapLimit: int) -> int:
313  nodeSwap: int = ui.clickPrompt(
314  f"Node SWAP memory limit in GB (Maximum: {swapLimit}GB) (press enter to use default)",
315  min(swapLimit, nodeRam * 2),
316  type = int
317  )
318 
319  if nodeSwap > swapLimit:
320  ui.errorEcho(
321  f"ERROR: SWAP memory limit in Docker Desktop ({swapLimit}GB) is lower than the configured value ({nodeSwap}GB). "
322  f"If you want to use higher value than {swapLimit}GB, you have to change docker limits."
323  )
324  return promptSwap(nodeRam, swapLimit)
325 
326  return nodeSwap
327 
328 
329 def promptInvocationPrice() -> float:
330  invocationPrice: float = ui.clickPrompt(
331  "Enter the price of a single endpoint invocation",
332  config_defaults.DEFAULT_ENDPOINT_INVOCATION_PRICE,
333  type = float
334  )
335 
336  if invocationPrice < 0:
337  ui.errorEcho("Endpoint invocation price cannot be less than 0!")
338  return promptInvocationPrice()
339 
340  return invocationPrice
341 
342 
343 def _configureInitScript() -> str:
344  initScript = ui.clickPrompt(
345  "Enter a path to sh script which will be executed before Node starts",
346  config_defaults.DEFAULT_INIT_SCRIPT,
347  type = str
348  )
349 
350  if initScript == config_defaults.DEFAULT_INIT_SCRIPT:
351  return config_defaults.DEFAULT_INIT_SCRIPT
352 
353  path = Path(initScript).expanduser().absolute()
354 
355  if path.is_dir():
356  ui.errorEcho("Provided path is pointing to a directory, file expected!")
357  return _configureInitScript()
358 
359  if not path.exists():
360  ui.errorEcho("Provided file does not exist!")
361  return _configureInitScript()
362 
363  return str(path)
364 
365 
366 def checkResourceLimitations() -> None:
367  _, ramLimit = docker.getResourceLimits()
368 
369  if ramLimit < config_defaults.MINIMUM_RAM:
370  raise RuntimeError(
371  f"Minimum Node RAM requirement ({config_defaults.MINIMUM_RAM}GB) "
372  "is higher than your current Docker desktop RAM limit ({ramLimit}GB). "
373  "Please adjust resource limitations in Docker Desktop settings to match Node requirements."
374  )
375 
376 
377 def configureNode(advanced: bool) -> NodeConfiguration:
378  ui.highlightEcho("[Node Configuration]")
379  nodeConfig = NodeConfiguration({}) # create new empty node config
380  currentOS = platform.system().lower()
381 
382  cpuLimit, ramLimit = docker.getResourceLimits()
383  swapLimit = docker.getDockerSwapLimit()
384 
385  nodeConfig.name = ui.clickPrompt("Node name", type = str)
386 
387  imageType = selectImageType()
388  if imageType == ImageType.custom:
389  nodeConfig.image = ui.clickPrompt("Specify URL of docker image that you want to use:", type = str)
390  else:
391  nodeConfig.image = "coretexai/coretex-node"
392 
393  # GPU Access is supported for:
394  # - Linux (Docker Engine)
395  # - Windows (Docker Desktop)
396 
397  if isGPUAvailable() and not (docker.isDockerDesktop() and currentOS != "windows"):
398  nodeConfig.allowGpu = ui.clickPrompt("Do you want to allow the Node to access your GPU? (Y/n)", type = bool, default = True)
399  else:
400  nodeConfig.allowGpu = False
401 
402  if nodeConfig.allowGpu and platform.system().lower() == "linux" and not docker.isDaemonFileUpdated():
403  shouldUpdateDockerConfig = ui.clickPrompt(
404  "NVIDIA has a bug where a docker container running Coretex Node can lose access to GPU "
405  "(https://github.com/NVIDIA/nvidia-container-toolkit/issues/48). "
406  "\nDo you want Coretex CLI to apply a workaround for this bug "
407  "(NOTE: This requires docker daemon restart)? (Y/n)",
408  type = bool,
409  default = True
410  )
411 
412  if shouldUpdateDockerConfig:
413  docker.updateDaemonFile()
414  shouldRestartDocker = ui.clickPrompt("Do you want to restart Docker to apply the changes? (Y/n)", type = bool, default = True)
415 
416  if shouldRestartDocker:
417  docker.restartDocker()
418  else:
419  ui.warningEcho(
420  "Warning: The changes will not take effect until Docker is restarted. "
421  "(https://github.com/NVIDIA/nvidia-container-toolkit/issues/48)"
422  )
423  else:
424  ui.warningEcho(
425  "Warning: Not updating the daemon.json file may lead to GPU access issues in Docker "
426  "containers. (https://github.com/NVIDIA/nvidia-container-toolkit/issues/48)"
427  )
428 
429  if imageType == ImageType.official:
430  tag = "gpu" if nodeConfig.allowGpu else "cpu"
431  nodeConfig.image += f":latest-{tag}"
432 
433  nodeConfig.storagePath = config_defaults.DEFAULT_STORAGE_PATH
434  nodeConfig.ram = int(min(max(config_defaults.MINIMUM_RAM, ramLimit), config_defaults.DEFAULT_RAM))
435  nodeConfig.swap = min(swapLimit, int(max(config_defaults.DEFAULT_SWAP_MEMORY, swapLimit)))
436  nodeConfig.sharedMemory = config_defaults.DEFAULT_SHARED_MEMORY
437  nodeConfig.cpuCount = int(min(cpuLimit, config_defaults.DEFAULT_CPU_COUNT))
438  nodeConfig.mode = config_defaults.DEFAULT_NODE_MODE
439  nodeConfig.allowDocker = config_defaults.DEFAULT_ALLOW_DOCKER
440  nodeConfig.secret = config_defaults.DEFAULT_NODE_SECRET
441  nodeConfig.initScript = config_defaults.DEFAULT_INIT_SCRIPT
442 
443  if advanced:
444  nodeConfig.mode = selectNodeMode()
445  nodeConfig.storagePath = ui.clickPrompt("Storage path (press enter to use default)", config_defaults.DEFAULT_STORAGE_PATH, type = str)
446 
447  nodeConfig.cpuCount = promptCpu(cpuLimit)
448  nodeConfig.ram = promptRam(ramLimit)
449  nodeConfig.swap = promptSwap(nodeConfig.ram, swapLimit)
450 
451  nodeConfig.sharedMemory = ui.clickPrompt(
452  "Node POSIX shared memory limit in GB (press enter to use default)",
453  config_defaults.DEFAULT_SHARED_MEMORY,
454  type = int
455  )
456 
457  nodeConfig.allowDocker = ui.clickPrompt(
458  "Allow Node to access system docker? This is a security risk! (Y/n)",
459  config_defaults.DEFAULT_ALLOW_DOCKER,
460  type = bool
461  )
462 
463  nodeConfig.initScript = _configureInitScript()
464 
465  nodeConfig.secret = ui.clickPrompt(
466  "Enter a secret which will be used to generate RSA key-pair for Node",
467  config_defaults.DEFAULT_NODE_SECRET,
468  type = str,
469  hide_input = True
470  )
471 
472  if nodeConfig.mode in [NodeMode.endpointReserved, NodeMode.endpointShared]:
473  nodeConfig.nearWalletId = ui.clickPrompt(
474  "Enter a NEAR wallet id to which the funds will be transfered when executing endpoints",
475  config_defaults.DEFAULT_NEAR_WALLET_ID,
476  type = str
477  )
478 
479  nodeConfig.endpointInvocationPrice = promptInvocationPrice()
480 
481  nodeConfig.heartbeatInterval = ui.clickPrompt(
482  "Enter interval (seconds) at which the Node will send heartbeat to Coretex Server",
483  config_defaults.HEARTBEAT_INTERVAL // 1000,
484  type = int
485  ) * 1000 # Node expects the value in ms
486  else:
487  ui.stdEcho("To configure node manually run coretex node config with --advanced flag.")
488 
489  publicKey: Optional[bytes] = None
490  if nodeConfig.secret is not None and nodeConfig.secret != config_defaults.DEFAULT_NODE_SECRET:
491  ui.progressEcho("Generating RSA key-pair (2048 bits long) using provided node secret...")
492  rsaKey = rsa.generateKey(2048, nodeConfig.secret.encode("utf-8"))
493  publicKey = rsa.getPublicKeyBytes(rsaKey.public_key())
494 
495  nodeConfig.id, nodeConfig.accessToken = registerNode(
496  nodeConfig.name,
497  nodeConfig.mode,
498  publicKey,
499  nodeConfig.nearWalletId,
500  nodeConfig.endpointInvocationPrice
501  )
502 
503  return nodeConfig
504 
505 
506 def initializeNodeConfiguration() -> None:
507  try:
508  NodeConfiguration.load()
509  return
510  except ConfigurationNotFound:
511  ui.errorEcho("Node configuration not found.")
512  if not click.confirm("Would you like to configure the node?", default = True):
513  raise
514  except InvalidConfiguration as ex:
515  for error in ex.errors:
516  ui.errorEcho(error)
517 
518  if not click.confirm("Would you like to update the configuration?", default = True):
519  raise
520 
521  if isRunning():
522  if not click.confirm("Node is already running. Do you wish to stop the Node?", default = True):
523  ui.errorEcho("If you wish to reconfigure your node, use \"coretex node stop\" command first.")
524  return
525 
526  nodeConfig = NodeConfiguration.load()
527  stop(nodeConfig.id)
528 
529  nodeConfig = configureNode(advanced = False)
530  nodeConfig.save()