18 from typing
import Any, Dict, Optional, Tuple
20 from pathlib
import Path
21 from base64
import b64encode
31 from .utils
import isGPUAvailable
32 from ...cryptography
import rsa
33 from ...networking
import networkManager, NetworkRequestError
34 from ...utils
import CommandException, docker
35 from ...node
import NodeMode, NodeStatus
36 from ...configuration
import config_defaults, NodeConfiguration, InvalidConfiguration, ConfigurationNotFound
39 class NodeException(Exception):
43 class ImageType(Enum):
49 def pull(image: str) ->
None:
51 ui.progressEcho(f
"Fetching image {image}...")
52 docker.imagePull(image)
53 ui.successEcho(f
"Image {image} successfully fetched.")
54 except BaseException
as ex:
55 logging.getLogger(
"cli").debug(ex, exc_info = ex)
56 raise NodeException(
"Failed to fetch latest node version.")
59 def isRunning() -> bool:
60 return docker.containerRunning(config_defaults.DOCKER_CONTAINER_NAME)
64 return docker.containerExists(config_defaults.DOCKER_CONTAINER_NAME)
67 def start(dockerImage: str, nodeConfig: NodeConfiguration) ->
None:
69 ui.progressEcho(
"Starting Coretex Node...")
70 docker.createNetwork(config_defaults.DOCKER_CONTAINER_NETWORK)
73 "CTX_API_URL": os.environ[
"CTX_API_URL"],
74 "CTX_STORAGE_PATH":
"/root/.coretex",
75 "CTX_NODE_ACCESS_TOKEN": nodeConfig.accessToken,
76 "CTX_NODE_MODE": str(nodeConfig.mode),
77 "CTX_HEARTBEAT_INTERVAL": str(nodeConfig.heartbeatInterval)
80 if nodeConfig.modelId
is not None:
81 environ[
"CTX_MODEL_ID"] = str(nodeConfig.modelId)
83 nodeSecret = nodeConfig.secret
if nodeConfig.secret
is not None else config_defaults.DEFAULT_NODE_SECRET
84 if isinstance(nodeSecret, str)
and nodeSecret != config_defaults.DEFAULT_NODE_SECRET:
85 environ[
"CTX_NODE_SECRET"] = nodeSecret
88 (nodeConfig.storagePath,
"/root/.coretex")
91 if nodeConfig.allowDocker:
92 volumes.append((
"/var/run/docker.sock",
"/var/run/docker.sock"))
94 initScript = nodeConfig.getInitScriptPath()
95 if initScript
is not None:
96 volumes.append((str(initScript),
"/script/init.sh"))
99 config_defaults.DOCKER_CONTAINER_NAME,
104 nodeConfig.sharedMemory,
110 ui.successEcho(
"Successfully started Coretex Node.")
111 except BaseException
as ex:
112 logging.getLogger(
"cli").debug(ex, exc_info = ex)
113 raise NodeException(
"Failed to start Coretex Node.")
118 docker.removeContainer(config_defaults.DOCKER_CONTAINER_NAME)
119 docker.removeNetwork(config_defaults.DOCKER_CONTAINER_NETWORK)
120 except BaseException
as ex:
121 logging.getLogger(
"cli").debug(ex, exc_info = ex)
122 raise NodeException(
"Failed to clean inactive Coretex Node.")
125 def deactivateNode(id: Optional[int]) ->
None:
130 response = networkManager.post(
"service/deactivate", params)
131 if response.hasFailed():
132 raise NetworkRequestError(response,
"Failed to deactivate node.")
135 def stop(nodeId: Optional[int] =
None) ->
None:
137 ui.progressEcho(
"Stopping Coretex Node...")
138 docker.stopContainer(config_defaults.DOCKER_CONTAINER_NAME)
140 if nodeId
is not None:
141 deactivateNode(nodeId)
144 ui.successEcho(
"Successfully stopped Coretex Node....")
145 except BaseException
as ex:
146 logging.getLogger(
"cli").debug(ex, exc_info = ex)
147 raise NodeException(
"Failed to stop Coretex Node.")
150 def getNodeStatus() -> NodeStatus:
152 response = requests.get(
"http://localhost:21000/status", timeout = 1)
153 status = response.json()[
"status"]
154 return NodeStatus(status)
156 return NodeStatus.inactive
159 def getRepoFromImageUrl(image: str) -> str:
160 imageName = image.split(
"/")[-1]
161 if not ":" in imageName:
164 tagIndex = image.rfind(
":")
166 return image[:tagIndex]
171 def getTagFromImageUrl(image: str) -> str:
172 imageName = image.split(
"/")[-1]
173 if not ":" in imageName:
176 tagIndex = image.rfind(
":")
178 return image[tagIndex + 1:]
183 def shouldUpdate(image: str) -> bool:
184 repository = getRepoFromImageUrl(image)
186 imageJson = docker.imageInspect(image)
187 except CommandException:
192 manifestJson = docker.manifestInspect(image)
193 except CommandException:
196 for digest
in imageJson[
"RepoDigests"]:
197 if repository
in digest
and manifestJson[
"Descriptor"][
"digest"]
in digest:
203 def showLogs(tail: Optional[int], follow: bool, timestamps: bool) ->
None:
204 docker.getLogs(config_defaults.DOCKER_CONTAINER_NAME, tail, follow, timestamps)
210 publicKey: Optional[bytes] =
None,
211 nearWalletId: Optional[str] =
None,
212 endpointInvocationPrice: Optional[float] =
None
213 ) -> Tuple[int, str]:
215 params: Dict[str, Any] = {
216 "machine_name": name,
217 "mode": nodeMode.value
220 if publicKey
is not None:
221 params[
"public_key"] = b64encode(publicKey).decode(
"utf-8")
223 if nearWalletId
is not None:
224 params[
"near_wallet_id"] = nearWalletId
226 if endpointInvocationPrice
is not None:
227 params[
"endpoint_invocation_price"] = endpointInvocationPrice
229 response = networkManager.post(
"service", params)
231 if response.hasFailed():
232 raise NetworkRequestError(response,
"Failed to configure node. Please try again...")
234 accessToken = response.getJson(dict).get(
"access_token")
235 nodeId = response.getJson(dict).get(
"id")
237 if not isinstance(accessToken, str)
or not isinstance(nodeId, int):
238 raise TypeError(
"Something went wrong. Please try again...")
240 return nodeId, accessToken
243 def selectImageType() -> ImageType:
245 "Official Coretex image": ImageType.official,
246 "Custom image": ImageType.custom,
249 choices = list(availableImages.keys())
250 selectedImage = ui.arrowPrompt(choices,
"Please select image that you want to use (use arrow keys to select an option):")
252 return availableImages[selectedImage]
255 def selectNodeMode() -> NodeMode:
262 NodeMode.endpointReserved,
263 NodeMode.endpointShared
266 availableNodeModes = { mode.toString(): mode
for mode
in nodeModes }
267 choices = list(availableNodeModes.keys())
269 selectedMode = ui.arrowPrompt(choices,
"Please select Coretex Node mode (use arrow keys to select an option):")
270 return availableNodeModes[selectedMode]
273 def promptCpu(cpuLimit: int) -> int:
274 cpuCount: int = ui.clickPrompt(f
"Enter the number of CPUs the container will use (Maximum: {cpuLimit}) (press enter to use default)", cpuLimit, type = int)
277 ui.errorEcho(f
"ERROR: Number of CPU's the container will use must be higher than 0")
278 return promptCpu(cpuLimit)
280 if cpuCount > cpuLimit:
281 ui.errorEcho(f
"ERROR: CPU limit in Docker Desktop ({cpuLimit}) is lower than the specified value ({cpuCount})")
282 return promptCpu(cpuLimit)
287 def promptRam(ramLimit: int) -> int:
288 nodeRam: int = ui.clickPrompt(
289 f
"Node RAM memory limit in GB (Minimum: {config_defaults.MINIMUM_RAM}GB, "
290 f
"Maximum: {ramLimit}GB) (press enter to use default)",
295 if nodeRam > ramLimit:
297 f
"ERROR: RAM limit in Docker Desktop ({ramLimit}GB) is lower than the configured value ({nodeRam}GB). "
298 "Please adjust resource limitations in Docker Desktop settings."
300 return promptRam(ramLimit)
302 if nodeRam < config_defaults.MINIMUM_RAM:
304 f
"ERROR: Configured RAM ({nodeRam}GB) is lower than "
305 "the minimum Node RAM requirement ({config_defaults.MINIMUM_RAM}GB)."
307 return promptRam(ramLimit)
312 def promptSwap(nodeRam: int, swapLimit: int) -> int:
313 nodeSwap: int = ui.clickPrompt(
314 f
"Node SWAP memory limit in GB (Maximum: {swapLimit}GB) (press enter to use default)",
315 min(swapLimit, nodeRam * 2),
319 if nodeSwap > swapLimit:
321 f
"ERROR: SWAP memory limit in Docker Desktop ({swapLimit}GB) is lower than the configured value ({nodeSwap}GB). "
322 f
"If you want to use higher value than {swapLimit}GB, you have to change docker limits."
324 return promptSwap(nodeRam, swapLimit)
329 def promptInvocationPrice() -> float:
330 invocationPrice: float = ui.clickPrompt(
331 "Enter the price of a single endpoint invocation",
332 config_defaults.DEFAULT_ENDPOINT_INVOCATION_PRICE,
336 if invocationPrice < 0:
337 ui.errorEcho(
"Endpoint invocation price cannot be less than 0!")
338 return promptInvocationPrice()
340 return invocationPrice
343 def _configureInitScript() -> str:
344 initScript = ui.clickPrompt(
345 "Enter a path to sh script which will be executed before Node starts",
346 config_defaults.DEFAULT_INIT_SCRIPT,
350 if initScript == config_defaults.DEFAULT_INIT_SCRIPT:
351 return config_defaults.DEFAULT_INIT_SCRIPT
353 path = Path(initScript).expanduser().absolute()
356 ui.errorEcho(
"Provided path is pointing to a directory, file expected!")
357 return _configureInitScript()
359 if not path.exists():
360 ui.errorEcho(
"Provided file does not exist!")
361 return _configureInitScript()
366 def checkResourceLimitations() -> None:
367 _, ramLimit = docker.getResourceLimits()
369 if ramLimit < config_defaults.MINIMUM_RAM:
371 f
"Minimum Node RAM requirement ({config_defaults.MINIMUM_RAM}GB) "
372 "is higher than your current Docker desktop RAM limit ({ramLimit}GB). "
373 "Please adjust resource limitations in Docker Desktop settings to match Node requirements."
377 def configureNode(advanced: bool) -> NodeConfiguration:
378 ui.highlightEcho(
"[Node Configuration]")
379 nodeConfig = NodeConfiguration({})
380 currentOS = platform.system().lower()
382 cpuLimit, ramLimit = docker.getResourceLimits()
383 swapLimit = docker.getDockerSwapLimit()
385 nodeConfig.name = ui.clickPrompt(
"Node name", type = str)
387 imageType = selectImageType()
388 if imageType == ImageType.custom:
389 nodeConfig.image = ui.clickPrompt(
"Specify URL of docker image that you want to use:", type = str)
391 nodeConfig.image =
"coretexai/coretex-node"
397 if isGPUAvailable()
and not (docker.isDockerDesktop()
and currentOS !=
"windows"):
398 nodeConfig.allowGpu = ui.clickPrompt(
"Do you want to allow the Node to access your GPU? (Y/n)", type = bool, default =
True)
400 nodeConfig.allowGpu =
False
402 if nodeConfig.allowGpu
and platform.system().lower() ==
"linux" and not docker.isDaemonFileUpdated():
403 shouldUpdateDockerConfig = ui.clickPrompt(
404 "NVIDIA has a bug where a docker container running Coretex Node can lose access to GPU "
405 "(https://github.com/NVIDIA/nvidia-container-toolkit/issues/48). "
406 "\nDo you want Coretex CLI to apply a workaround for this bug "
407 "(NOTE: This requires docker daemon restart)? (Y/n)",
412 if shouldUpdateDockerConfig:
413 docker.updateDaemonFile()
414 shouldRestartDocker = ui.clickPrompt(
"Do you want to restart Docker to apply the changes? (Y/n)", type = bool, default =
True)
416 if shouldRestartDocker:
417 docker.restartDocker()
420 "Warning: The changes will not take effect until Docker is restarted. "
421 "(https://github.com/NVIDIA/nvidia-container-toolkit/issues/48)"
425 "Warning: Not updating the daemon.json file may lead to GPU access issues in Docker "
426 "containers. (https://github.com/NVIDIA/nvidia-container-toolkit/issues/48)"
429 if imageType == ImageType.official:
430 tag =
"gpu" if nodeConfig.allowGpu
else "cpu"
431 nodeConfig.image += f
":latest-{tag}"
433 nodeConfig.storagePath = config_defaults.DEFAULT_STORAGE_PATH
434 nodeConfig.ram = int(min(max(config_defaults.MINIMUM_RAM, ramLimit), config_defaults.DEFAULT_RAM))
435 nodeConfig.swap = min(swapLimit, int(max(config_defaults.DEFAULT_SWAP_MEMORY, swapLimit)))
436 nodeConfig.sharedMemory = config_defaults.DEFAULT_SHARED_MEMORY
437 nodeConfig.cpuCount = int(min(cpuLimit, config_defaults.DEFAULT_CPU_COUNT))
438 nodeConfig.mode = config_defaults.DEFAULT_NODE_MODE
439 nodeConfig.allowDocker = config_defaults.DEFAULT_ALLOW_DOCKER
440 nodeConfig.secret = config_defaults.DEFAULT_NODE_SECRET
441 nodeConfig.initScript = config_defaults.DEFAULT_INIT_SCRIPT
444 nodeConfig.mode = selectNodeMode()
445 nodeConfig.storagePath = ui.clickPrompt(
"Storage path (press enter to use default)", config_defaults.DEFAULT_STORAGE_PATH, type = str)
447 nodeConfig.cpuCount = promptCpu(cpuLimit)
448 nodeConfig.ram = promptRam(ramLimit)
449 nodeConfig.swap = promptSwap(nodeConfig.ram, swapLimit)
451 nodeConfig.sharedMemory = ui.clickPrompt(
452 "Node POSIX shared memory limit in GB (press enter to use default)",
453 config_defaults.DEFAULT_SHARED_MEMORY,
457 nodeConfig.allowDocker = ui.clickPrompt(
458 "Allow Node to access system docker? This is a security risk! (Y/n)",
459 config_defaults.DEFAULT_ALLOW_DOCKER,
463 nodeConfig.initScript = _configureInitScript()
465 nodeConfig.secret = ui.clickPrompt(
466 "Enter a secret which will be used to generate RSA key-pair for Node",
467 config_defaults.DEFAULT_NODE_SECRET,
472 if nodeConfig.mode
in [NodeMode.endpointReserved, NodeMode.endpointShared]:
473 nodeConfig.nearWalletId = ui.clickPrompt(
474 "Enter a NEAR wallet id to which the funds will be transfered when executing endpoints",
475 config_defaults.DEFAULT_NEAR_WALLET_ID,
479 nodeConfig.endpointInvocationPrice = promptInvocationPrice()
481 nodeConfig.heartbeatInterval = ui.clickPrompt(
482 "Enter interval (seconds) at which the Node will send heartbeat to Coretex Server",
483 config_defaults.HEARTBEAT_INTERVAL // 1000,
487 ui.stdEcho(
"To configure node manually run coretex node config with --advanced flag.")
489 publicKey: Optional[bytes] =
None
490 if nodeConfig.secret
is not None and nodeConfig.secret != config_defaults.DEFAULT_NODE_SECRET:
491 ui.progressEcho(
"Generating RSA key-pair (2048 bits long) using provided node secret...")
492 rsaKey = rsa.generateKey(2048, nodeConfig.secret.encode(
"utf-8"))
493 publicKey = rsa.getPublicKeyBytes(rsaKey.public_key())
495 nodeConfig.id, nodeConfig.accessToken = registerNode(
499 nodeConfig.nearWalletId,
500 nodeConfig.endpointInvocationPrice
506 def initializeNodeConfiguration() -> None:
508 NodeConfiguration.load()
510 except ConfigurationNotFound:
511 ui.errorEcho(
"Node configuration not found.")
512 if not click.confirm(
"Would you like to configure the node?", default =
True):
514 except InvalidConfiguration
as ex:
515 for error
in ex.errors:
518 if not click.confirm(
"Would you like to update the configuration?", default =
True):
522 if not click.confirm(
"Node is already running. Do you wish to stop the Node?", default =
True):
523 ui.errorEcho(
"If you wish to reconfigure your node, use \"coretex node stop\" command first.")
526 nodeConfig = NodeConfiguration.load()
529 nodeConfig = configureNode(advanced =
False)