Skip to content

Commit

Permalink
Remove node unused config and code (#2949)
Browse files Browse the repository at this point in the history
* Remove unused config and code

* flaky test? will investigate later
  • Loading branch information
lferran authored Mar 7, 2025
1 parent 32dd260 commit 0d17c27
Show file tree
Hide file tree
Showing 16 changed files with 4 additions and 252 deletions.
1 change: 0 additions & 1 deletion charts/nucliadb_ingest/templates/ingest.cm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ data:
GRPC_PORT: {{ .Values.serving.grpc | quote }}
METRICS_PORT: {{ .Values.serving.metricsPort | quote }}
PULL_TIME_ERROR_BACKOFF: {{ .Values.config.pull_time_error_backoff | quote }}
NODE_REPLICAS: {{ .Values.config.node_replicas | quote }}
{{- if .Values.debug }}
LOG_LEVEL: "DEBUG"
{{- end }}
Expand Down
1 change: 0 additions & 1 deletion charts/nucliadb_ingest/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ envFrom:
# Kubernetes settings
config:
pull_time_error_backoff: 100
node_replicas: 2

affinity: {}
nodeSelector: {}
Expand Down
2 changes: 0 additions & 2 deletions charts/nucliadb_shared/templates/nucliadb.cm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ data:
NUCLIA_HASH_SEED: {{ .Values.nuclia.nuclia_hash_seed | quote }}
NUCLIA_PARTITIONS: {{ .Values.nuclia.nuclia_partitions | quote }}
NUCLIADB_INGEST: {{ .Values.nucliadb.ingest }}
NODE_WRITER_PORT: {{ .Values.nucliadb.node_writer_port | quote }}
NODE_READER_PORT: {{ .Values.nucliadb.node_reader_port | quote }}
CACHE_PUBSUB_NATS_URL: {{ toJson .Values.cache.cache_pubsub_nats_url |quote }}
{{- with .Values.cache.cache_pubsub_nats_auth }}
CACHE_PUBSUB_NATS_AUTH: {{ . }}
Expand Down
2 changes: 0 additions & 2 deletions charts/nucliadb_shared/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ nuclia:

nucliadb:
ingest: ingest-orm-grpc:8030
node_writer_port: 10001
node_reader_port: 10000

transaction:
transaction_jetstream_auth:
Expand Down
3 changes: 0 additions & 3 deletions e2e/run-e2e-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,12 @@ set -e

/opt/nucliadb/bin/pip install -r e2e/requirements.txt
source /opt/nucliadb/bin/activate
export standalone_node_port=10009
export DATA_PATH=data1
export standalone_node_role=index
nucliadb --http-port=8080 &
export standalone_node_port=10010
export DATA_PATH=data2
export standalone_node_role=index
nucliadb --http-port=8081 &
export standalone_node_port=99999
export DATA_PATH=foobar
export standalone_node_role=worker
nucliadb --http-port=8082 &
Expand Down
46 changes: 1 addition & 45 deletions nucliadb/src/migrations/0020_drain_nodes_from_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@

import logging

from nucliadb.common import datamanagers
from nucliadb.common.cluster.rollover import rollover_kb_index
from nucliadb.common.cluster.settings import settings as cluster_settings
from nucliadb.migrator.context import ExecutionContext

logger = logging.getLogger(__name__)
Expand All @@ -39,45 +36,4 @@
async def migrate(context: ExecutionContext) -> None: ...


async def migrate_kb(context: ExecutionContext, kbid: str) -> None:
"""
Rollover KB shards if any of the shards are on the nodes to drain
"""
drain_node_ids = cluster_settings.drain_nodes
if len(drain_node_ids) == 0:
logger.info("Skipping migration because no drain_nodes are set")
return

if not await kb_has_shards_on_drain_nodes(kbid, drain_node_ids):
logger.info(
"KB does not have shards on the nodes to drain, skipping rollover",
extra={"kbid": kbid},
)
return

logger.info("Rolling over affected KB", extra={"kbid": kbid})
await rollover_kb_index(context, kbid, drain_nodes=drain_node_ids)


async def kb_has_shards_on_drain_nodes(kbid: str, drain_node_ids: list[str]) -> bool:
async with datamanagers.with_ro_transaction() as txn:
shards = await datamanagers.cluster.get_kb_shards(txn, kbid=kbid)
if not shards:
logger.warning("Shards object not found", extra={"kbid": kbid})
return False
shard_in_drain_nodes = False
for shard in shards.shards:
for replica in shard.replicas:
if replica.node in drain_node_ids:
logger.info(
"Shard found in drain nodes, will rollover it",
extra={
"kbid": kbid,
"logical_shard": shard.shard,
"replica_shard_id": replica.shard.id,
"node": replica.node,
"drain_node_ids": drain_node_ids,
},
)
shard_in_drain_nodes = True
return shard_in_drain_nodes
async def migrate_kb(context: ExecutionContext, kbid: str) -> None: ...
77 changes: 0 additions & 77 deletions nucliadb/src/nucliadb/common/cluster/index_node.py

This file was deleted.

41 changes: 1 addition & 40 deletions nucliadb/src/nucliadb/common/cluster/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,6 @@
from pydantic_settings import BaseSettings


class ClusterDiscoveryMode(str, enum.Enum):
MANUAL = "manual"
KUBERNETES = "kubernetes"
SINGLE_NODE = "single_node"


class StandaloneNodeRole(enum.Enum):
ALL = "all"
INDEX = "index"
Expand All @@ -39,53 +33,20 @@ class StandaloneNodeRole(enum.Enum):
class Settings(BaseSettings):
data_path: str = "./data/node"
standalone_mode: bool = False
standalone_node_port: int = Field(
default=10009,
title="Standalone node port",
description="Port to use for standalone nodes to communication with each other through",
)
standalone_node_role: StandaloneNodeRole = StandaloneNodeRole.ALL

node_replicas: int = 2

node_writer_port: int = 10000
node_reader_port: int = 10001

# Only for testing purposes
writer_port_map: dict[str, int] = {}
reader_port_map: dict[str, int] = {}

# Node limits
# Index limits
max_shard_paragraphs: int = Field(
default=500_000,
title="Max shard paragraphs",
description="Maximum number of paragraphs to target per shard",
)
max_node_replicas: int = Field(
default=800,
title="Max node replicas",
description="Maximum number of shard replicas a single node will manage",
)
max_resource_paragraphs: int = Field(
default=50_000,
title="Max paragraphs per resource",
description="Maximum number of paragraphs allowed on a single resource",
)

drain_nodes: list[str] = Field(
default=[],
title="Drain nodes",
description="List of node IDs to ignore when creating new shards. It is used for draining nodes from a cluster. Example: ['1bf3bfe7-e164-4a19-a4d9-41372fc15aca',]", # noqa: E501
)

local_reader_threads: int = 5
local_writer_threads: int = 5

cluster_discovery_mode: ClusterDiscoveryMode = ClusterDiscoveryMode.KUBERNETES
cluster_discovery_kubernetes_namespace: str = "nucliadb"
cluster_discovery_kubernetes_selector: str = "appType=node"
cluster_discovery_manual_addresses: list[str] = []

nidx_api_address: Optional[str] = Field(default=None, description="NIDX gRPC API address")
nidx_searcher_address: Optional[str] = Field(
default=None, description="NIDX gRPC searcher API address"
Expand Down
25 changes: 0 additions & 25 deletions nucliadb/src/nucliadb/ingest/cache.py

This file was deleted.

9 changes: 1 addition & 8 deletions nucliadb/src/nucliadb/standalone/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import os

from nucliadb.common.cluster.settings import StandaloneNodeRole
from nucliadb.standalone.settings import Settings, StandaloneDiscoveryMode
from nucliadb.standalone.settings import Settings

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -76,7 +76,6 @@ def config_nucliadb(nucliadb_args: Settings):
use some specific settings.
"""

from nucliadb.common.cluster.settings import ClusterDiscoveryMode
from nucliadb.common.cluster.settings import settings as cluster_settings
from nucliadb.ingest.settings import settings as ingest_settings
from nucliadb.train.settings import settings as train_settings
Expand All @@ -91,14 +90,8 @@ def config_nucliadb(nucliadb_args: Settings):

cluster_settings.standalone_mode = True
cluster_settings.data_path = nucliadb_args.data_path
cluster_settings.standalone_node_port = nucliadb_args.standalone_node_port
cluster_settings.standalone_node_role = nucliadb_args.standalone_node_role

if nucliadb_args.cluster_discovery_mode == StandaloneDiscoveryMode.DEFAULT:
# default for standalone is single node
cluster_settings.cluster_discovery_mode = ClusterDiscoveryMode.SINGLE_NODE
cluster_settings.node_replicas = 1

ingest_settings.nuclia_partitions = 1
ingest_settings.replica_number = 0
ingest_settings.partitions = ["1"]
Expand Down
3 changes: 0 additions & 3 deletions nucliadb/src/nucliadb/standalone/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,7 @@ def run():
"Admin UI": f"http://{settings.http_host}:{settings.http_port}/admin",
"Key-value backend": ingest_settings.driver.value,
"Blob storage backend": storage_settings.file_backend.value,
"Cluster discovery mode": cluster_settings.cluster_discovery_mode.value,
"Node replicas": cluster_settings.node_replicas,
"Index data path": os.path.realpath(cluster_settings.data_path),
"Node port": cluster_settings.standalone_node_port,
"Auth policy": settings.auth_policy.value,
"Node role": cluster_settings.standalone_node_role.value,
}
Expand Down
11 changes: 0 additions & 11 deletions nucliadb/src/nucliadb/standalone/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,6 @@
from nucliadb_utils.storages.settings import Settings as ExtendedStorageSettings


class StandaloneDiscoveryMode(Enum):
DEFAULT = "default"
MANUAL = "manual"
KUBERNETES = "kubernetes"
SINGLE_NODE = "single_node"


class AuthPolicy(Enum):
UPSTREAM_NAIVE = "upstream_naive"
UPSTREAM_AUTH_HEADER = "upstream_auth_header"
Expand All @@ -60,8 +53,6 @@ class Settings(DriverSettings, StorageSettings, ExtendedStorageSettings):
http_port: int = pydantic.Field(default=8080, description="HTTP Port")
ingest_grpc_port: int = pydantic.Field(default=8030, description="Ingest GRPC Port")
train_grpc_port: int = pydantic.Field(default=8031, description="Train GRPC Port")
standalone_node_port: int = pydantic.Field(default=10009, description="Node GRPC Port")

auth_policy: AuthPolicy = pydantic.Field(
default=AuthPolicy.UPSTREAM_NAIVE,
description="""Auth policy to use for http requests.
Expand Down Expand Up @@ -111,8 +102,6 @@ class Settings(DriverSettings, StorageSettings, ExtendedStorageSettings):
description="JWK key used for temporary token generation and validation.",
)

cluster_discovery_mode: StandaloneDiscoveryMode = StandaloneDiscoveryMode.DEFAULT

fork: bool = pydantic.Field(default=False, description="Fork process on startup")

# Standalone logging settings
Expand Down
31 changes: 0 additions & 31 deletions nucliadb/tests/ingest/unit/test_cache.py

This file was deleted.

Loading

0 comments on commit 0d17c27

Please sign in to comment.