From d9c9779fb46bb52b4bebd0b02cae37af83c966b6 Mon Sep 17 00:00:00 2001 From: "ales.novak" Date: Fri, 26 Jan 2024 14:21:16 +0100 Subject: [PATCH] Integration tests and kuberentes_monitor / k8s implementation --- .github/actions/install-k8s-agent/action.yml | 24 +++++++ .../reusable-agent-build-container-images.yml | 56 ++++++++++++++++ .../builtin_monitors/kubernetes_monitor.py | 52 +++++++++++++++ .../copying_manager/copying_manager.py | 8 ++- scalyr_agent/log_watcher.py | 15 +++-- scalyr_agent/monitor_utils/k8s.py | 66 +++++++++++++++++++ .../multiple_account_printers.yaml | 45 +++++++++++++ 7 files changed, 258 insertions(+), 8 deletions(-) create mode 100644 tests/e2e/k8s_k8s_monitor/multiple_account_printers.yaml diff --git a/.github/actions/install-k8s-agent/action.yml b/.github/actions/install-k8s-agent/action.yml index 030bea37b0..8578ee8759 100644 --- a/.github/actions/install-k8s-agent/action.yml +++ b/.github/actions/install-k8s-agent/action.yml @@ -9,6 +9,15 @@ inputs: scalyr_api_key: description: "Write API key to be used by the agent." required: true + scalyr_api_key_team_1: + description: "Write API key to be used by the agent." + required: false + scalyr_api_key_team_2: + description: "Write API key to be used by the agent." + required: false + scalyr_api_key_team_3: + description: "Write API key to be used by the agent." + required: false scalyr_cluster_name: description: "Cluster name to use." required: true @@ -80,6 +89,21 @@ runs: # Define api key kubectl create secret generic scalyr-api-key --namespace scalyr --from-literal=scalyr-api-key="${{ inputs.scalyr_api_key }}" + + # Create a secret if the scalyr_api_key_team_1 is set + if [ ! -z "${{ inputs.scalyr_api_key_team_1 }}" ]; then + kubectl create secret generic scalyr-api-key-team-1 --namespace scalyr --from-literal=scalyr-api-key="${{ inputs.scalyr_api_key_team_1 }}" + fi + + # Create a secret if the scalyr_api_key_team_2 is set + if [ ! -z "${{ inputs.scalyr_api_key_team_2 }}" ]; then + kubectl create secret generic scalyr-api-key-team-2 --namespace scalyr --from-literal=scalyr-api-key="${{ inputs.scalyr_api_key_team_2 }}" + fi + + # Create a secret if the scalyr_api_key_team_3 is set + if [ ! -z "${{ inputs.scalyr_api_key_team_3 }}" ]; then + kubectl create secret generic scalyr-api-key-team-3 --namespace scalyr --from-literal=scalyr-api-key="${{ inputs.scalyr_api_key_team_3 }}" + fi # Create configmap kubectl create configmap --namespace scalyr scalyr-config \ diff --git a/.github/workflows/reusable-agent-build-container-images.yml b/.github/workflows/reusable-agent-build-container-images.yml index 9a1aff86c8..1895822d7f 100644 --- a/.github/workflows/reusable-agent-build-container-images.yml +++ b/.github/workflows/reusable-agent-build-container-images.yml @@ -369,9 +369,11 @@ jobs: minikube image ls kubectl apply -f tests/e2e/k8s_k8s_monitor/std_printer_deployment.yaml kubectl apply -f tests/e2e/k8s_k8s_monitor/long_message_printer_deployment.yaml + kubectl apply -f tests/e2e/k8s_k8s_monitor/multiple_account_printers.yaml kubectl wait --for=condition=ready pod -l app=std-printer kubectl wait --for=condition=ready pod -l app=long-message-printer + kubectl wait --for=condition=ready pod -l app=multiple-account-printer kubectl get pods -A export APP_POD_NAME=$(kubectl get pod --namespace=default --selector=app=std-printer -o jsonpath="{.items[0].metadata.name}") @@ -398,6 +400,9 @@ jobs: with: scalyr_server: "agent.scalyr.com" scalyr_api_key: "${{ secrets.CT_SCALYR_TOKEN_PROD_US_CLOUDTECH_TESTING_WRITE }}" + scalyr_api_key_team_1: "${{ secrets.CT_SCALYR_TOKEN_PROD_US_CLOUDTECH_TESTING_2_WRITE }}" + scalyr_api_key_team_2: "${{ secrets.CT_SCALYR_TOKEN_PROD_US_CLOUDTECH_TESTING_3_WRITE }}" + scalyr_api_key_team_3: "${{ secrets.CT_SCALYR_TOKEN_PROD_US_CLOUDTECH_TESTING_4_WRITE }}" scalyr_cluster_name: "${K8S_CLUSTER_NAME}" scalyr_k8s_events_disable: "false" main_yaml_path: "tests/e2e/scalyr-agent-2-daemonset.yaml" @@ -520,6 +525,56 @@ jobs: echo "CronJob events checks" ./scripts/cicd/scalyr-query.sh '$serverHost="'${SCALYR_AGENT_POD_NAME}'" $logfile="/var/log/scalyr-agent-2/kubernetes_events.log" $parser="k8sEvents" k8s-kind="CronJob" involvedObjectKind="CronJob" involvedObjectName="hello" watchEventType="ADDED" reason="SawCompletedJob"' + - name: Verify multiaccount records have been ingested + timeout-minutes: 14 + env: + scalyr_api_key_team_1: "${{ secrets.CT_SCALYR_TOKEN_PROD_US_CLOUDTECH_TESTING_WRITE }}" + scalyr_api_key_team_2: "${{ secrets.CT_SCALYR_TOKEN_PROD_US_CLOUDTECH_TESTING_WRITE }}" + scalyr_api_key_team_3: "${{ secrets.CT_SCALYR_TOKEN_PROD_US_CLOUDTECH_TESTING_WRITE }}" + SCALYR_AGENT_POD_NAME: "${{ env.SCALYR_AGENT_POD_NAME }}" + run: | + export RETRY_ATTEMPTS="1" + + function scalyr_query() { + scalyr query --columns=message "serverHost=\"${SCALYR_AGENT_POD_NAME}\" app=\"multiple-account-printer\" $@" + } + + function ingested_line_count() { + API_KEY=$1 + MESSAGE=$2 + scalyr_readlog_token=${API_KEY} scalyr_query ${MESSAGE}" | wc -l + } + + function assert_ingested_once() { + API_KEY=$1 + MESSAGE=$2 + LINE_COUNT=`scalyr_readlog_token=${API_KEY} scalyr_query ${MESSAGE}" | wc -l` + } + + function assert_not_ingested() { + API_KEY=$1 + MESSAGE=$2 + scalyr_readlog_token=${API_KEY} scalyr_query ${MESSAGE}" && echo -e $SUCCESS_MSG || echo -e $ERR_MSG && exit 1 + } + + # See tests/e2e/k8s_k8s_monitor/multiple_account_printers.yaml:16 for the following annotations: + # log.config.scalyr.com/attributes.parser: "test-parser-1" + # log.config.scalyr.com/team1.secret: "scalyr-api-key-team-1" + # log.config.scalyr.com/cont1.team2.secret: "scalyr-api-key-team-2" + # log.config.scalyr.com/cont2.team2.secret: "scalyr-api-key-team-2" + # log.config.scalyr.com/cont2.team3.secret: "scalyr-api-key-team-3" + + # Container 1 log is ingested with the scalyr-api-key-team-2 only + scalyr_query( + + # Container 2 log is ingested with the scalyr-api-key-team-2 and scalyr-api-key-team-3 only + + # Container 3 log is ingested with the scalyr-api-key-team-1 only + + + scalyr_readlog_token=${{ scalyr_api_key_team_2 }} ./scripts/cicd/scalyr-query.sh '$serverHost="'${SCALYR_AGENT_POD_NAME}'" app="multiple-account-printer" "MULTIPLE_ACCOUNT_TEST_CONTAINER_NAME:cont1' + scalyr_readlog_token=${{ scalyr_api_key_team_2 }} ./scripts/cicd/scalyr-query.sh '$serverHost="'${SCALYR_AGENT_POD_NAME}'" app="multiple-account-printer" "MULTIPLE_ACCOUNT_TEST_CONTAINER_NAME:cont1' + - name: Notify Slack on Failure if: ${{ failure() && github.ref_name == 'master' }} uses: act10ns/slack@ed1309ab9862e57e9e583e51c7889486b9a00b0f # v2.0.0 @@ -530,6 +585,7 @@ jobs: steps: ${{ toJson(steps) }} channel: '#eng-dataset-cloud-tech' + k8s_open_metrics_monitor_tests: name: OpenMetrics Monitor - k8s ${{ inputs.builder_name }} ${{ matrix.k8s_version.version }}-${{ matrix.k8s_version.runtime}} runs-on: ubuntu-20.04 diff --git a/scalyr_agent/builtin_monitors/kubernetes_monitor.py b/scalyr_agent/builtin_monitors/kubernetes_monitor.py index 72aca729e0..4d33f21e43 100644 --- a/scalyr_agent/builtin_monitors/kubernetes_monitor.py +++ b/scalyr_agent/builtin_monitors/kubernetes_monitor.py @@ -3189,6 +3189,7 @@ def __get_log_config_for_container(self, cid, info, k8s_cache, base_attributes): parser = "docker" common_annotations = {} container_annotations = {} + all_annotations = {} # pod name and namespace are set to an invalid value for cases where errors occur and a log # message is produced, so that the log message has clearly invalid values for these rather # than just being empty @@ -3271,6 +3272,7 @@ def __get_log_config_for_container(self, cid, info, k8s_cache, base_attributes): # by default all annotations will be applied to all containers # in the pod all_annotations = pod.annotations + container_specific_annotations = False # get any common annotations for all containers @@ -3409,6 +3411,56 @@ def __get_log_config_for_container(self, cid, info, k8s_cache, base_attributes): if "parser" in attrs: result["parser"] = attrs["parser"] + # Based on the pod annotations in a format {container_name}.{team}.secret={secret_name} + # we might want to add api_keys parameter + if container_annotations or all_annotations: + def filter_teams(annotations): + return { + team: value + for team, value in annotations.items() + if re.fullmatch("team\\d+", team) + } + + def fetch_secret(name): + secret = k8s_cache.secret(pod_namespace, name, time.time()) + if secret: + return secret + + self._logger.warning( + "Failed to fetch secret '%s' for pod '%s/%s'" + % (name, pod_namespace, pod_name), + limit_once_per_x_secs=300, + limit_key="k8s-fetch-secret-%s" % name, + ) + + def get_secret_api_key(secret): + api_key = secret.data.get("api-key") + + if not api_key: + self._logger.warning( + "Secret '%s/%s' does not contain a scalyr-api-key field, ingoring." + % (pod_namespace, secret.name), + limit_once_per_x_secs=300, + limit_key="k8s-fetch-secret-%s" % secret.name + ) + + return api_key + + team_annotations = filter_teams(container_annotations) or filter_teams(all_annotations) + + api_keys = [ + get_secret_api_key( + fetch_secret( + team_annotations[team]["secret"] + ) + ) + for team in team_annotations.keys() + if team_annotations.get(team).get("secret") + ] + + if api_keys: + result["api_keys"] = list(filter(lambda api_key: api_key is not None, api_keys)) + return result def __get_docker_logs(self, containers, k8s_cache): diff --git a/scalyr_agent/copying_manager/copying_manager.py b/scalyr_agent/copying_manager/copying_manager.py index 2cd8351f89..ab532e4bfc 100644 --- a/scalyr_agent/copying_manager/copying_manager.py +++ b/scalyr_agent/copying_manager/copying_manager.py @@ -465,10 +465,11 @@ def config(self): def add_log_config(self, monitor_name, log_config, force_add=False): """Add the log_config item to the list of paths being watched + If force_add is true and the log_config item is marked to be removed the removal will be canceled. + Otherwise, the item will be added only if it's not monitored already. param: monitor_name - the name of the monitor adding the log config param: log_config - a log_config object containing the path to be added - param force_add: True or force add this file and cancel any removal which - may have been scheduled before hand. + param force_add: bool, see above We really just want to use this with Docker monitor where there is a small windows between the container restart where the log file is not immediately removed. returns: an updated log_config object @@ -570,7 +571,8 @@ def update_log_config(self, monitor_name, log_config): def remove_log_path(self, monitor_name, log_path): """Remove the log_path from the list of paths being watched - params: log_path - a string containing the path to the file no longer being watched + param: monitor - the monitor removing the path + param: log_path - a string containing path of the log file to remove """ # get the list of paths with 0 reference counts self.__lock.acquire() diff --git a/scalyr_agent/log_watcher.py b/scalyr_agent/log_watcher.py index 5cdaaed1e1..f3286b8060 100644 --- a/scalyr_agent/log_watcher.py +++ b/scalyr_agent/log_watcher.py @@ -24,11 +24,16 @@ class LogWatcher(object): to add/remove a set of log paths """ - def add_log_config(self, monitor_name, log_config): - """Add the path specified by the log_config to the list of paths being watched - param: monitor_name - the name of the monitor adding the log_config - param: log_config - a log_config object containing at least a path - returns: the log_config variable with updated path and default information + def add_log_config(self, monitor_name, log_config, force_add): + """Add the log_config item to the list of paths being watched + If force_add is true and the log_config item is marked to be removed the removal will be canceled. + Otherwise, the item will be added only if it's not monitored already. + param: monitor_name - the name of the monitor adding the log config + param: log_config - a log_config object containing the path to be added + param force_add: bool, see above + We really just want to use this with Docker monitor where there is a small windows between + the container restart where the log file is not immediately removed. + returns: an updated log_config object """ pass diff --git a/scalyr_agent/monitor_utils/k8s.py b/scalyr_agent/monitor_utils/k8s.py index b3d621c7e2..2b5da4f0d1 100644 --- a/scalyr_agent/monitor_utils/k8s.py +++ b/scalyr_agent/monitor_utils/k8s.py @@ -102,6 +102,11 @@ "list": Template("/api/v1/namespaces/${namespace}/pods"), "list-all": "/api/v1/pods", }, + "Secret": { + "single": Template("/api/v1/namespaces/${namespace}/secrets/${name}"), + "list": Template("/api/v1/namespaces/${namespace}/secrets"), + "list-all": "/api/v1/secrets" + }, "ReplicaSet": { "single": Template("/apis/apps/v1/namespaces/${namespace}/replicasets/${name}"), "list": Template("/apis/apps/v1/namespaces/${namespace}/replicasets"), @@ -518,6 +523,16 @@ def __repr__(self): return str(self.__dict__) +class Secret(object): + def __init__(self, name, namespace, data, kind, string_data, type): + self.name = name + self.namespace = namespace + self.data = data + self.kind = kind + self.string_data = string_data + self.type = type + + class Controller(object): """ General class for all cached Controller objects @@ -1012,6 +1027,19 @@ def process_object(self, k8s, obj, query_options=None): return result +class SecretProcessor(_K8sProcessor): + def process_object(self, k8s, obj, query_options=None): + metadata = obj.get("metadata", {}) + kind = obj.get("kind", "") + namespace = metadata.get("namespace", "") + name = metadata.get("name", "") + data = obj.get("data", {}) + string_data = obj.get("stringData", {}) + type = obj.get("type", "") + + return Secret(name, namespace, data, kind, string_data, type) + + class ControllerProcessor(_K8sProcessor): def process_object(self, k8s, obj, query_options=None): """Generate a Controller object from a JSON object @@ -1292,6 +1320,10 @@ def __init__( self._pod_processor = PodProcessor(self._controllers) self._pods_cache = _K8sCache(self._pod_processor, "Pod") + # create the secret cache + self._secret_processor = SecretProcessor() + self._secrets_cache = _K8sCache(self._secret_processor, "Secret") + self._cluster_name = None self._api_server_version = None # The last time (in seconds since epoch) we updated the K8s version number via a query @@ -1571,6 +1603,7 @@ def update_cache(self, run_state): scalyr_logging.DEBUG_LEVEL_1, "Marking unused pods as expired" ) self._pods_cache.mark_as_expired(current_time) + self._secrets_cache.mark_as_expired(current_time) self._update_cluster_name(local_state.k8s) self._update_api_server_version_if_necessary( @@ -1614,6 +1647,39 @@ def update_cache(self, run_state): local_state.cache_expiry_secs - fuzz_factor ) + def secret( + self, + namespace, + name, + current_time=None, + allow_expired=False + ): + """Returns pod info for the pod specified by namespace and name or None if no pad matches. + + Warning: Failure to pass current_time leads to incorrect recording of last access times, which will + lead to these objects being refreshed prematurely (potential source of bugs) + + Querying the pod information is thread-safe, but the returned object should + not be written to. + + @param allow_expired: If True, an object is considered present in cache even if it is expired. + @type allow_expired: bool + """ + local_state = self._state.copy_state() + + if local_state.k8s is None: + return + + return self._secrets_cache.lookup( + local_state.k8s, + current_time, + namespace, + name, + kind="Secret", + allow_expired=allow_expired, + ignore_k8s_api_exception=ignore_k8s_api_exception, + ) + def pod( self, namespace, diff --git a/tests/e2e/k8s_k8s_monitor/multiple_account_printers.yaml b/tests/e2e/k8s_k8s_monitor/multiple_account_printers.yaml new file mode 100644 index 0000000000..48f52596d8 --- /dev/null +++ b/tests/e2e/k8s_k8s_monitor/multiple_account_printers.yaml @@ -0,0 +1,45 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: multiple-account-printer + namespace: default +spec: + replicas: 1 + selector: + matchLabels: + app: multiple-account-printer + template: + metadata: + labels: + app: multiple-account-printer + annotations: + log.config.scalyr.com/attributes.parser: "test-parser-1" + log.config.scalyr.com/team1.secret: "scalyr-api-key-team-1" + log.config.scalyr.com/cont1.team2.secret: "scalyr-api-key-team-2" + log.config.scalyr.com/cont2.team2.secret: "scalyr-api-key-team-2" + log.config.scalyr.com/cont2.team3.secret: "scalyr-api-key-team-3" + spec: + containers: + - name: cont1 + image: docker.io/library/busybox:latest + imagePullPolicy: IfNotPresent + command: ["/bin/sh", "-c", "echo MULTIPLE_ACCOUNT_TEST_CONTAINER_NAME:$MULTIPLE_ACCOUNT_TEST_CONTAINER_NAME; sleep 900"] + env: + - name: MULTIPLE_ACCOUNT_TEST_CONTAINER_NAME + value: "cont1" + - name: cont2 + image: docker.io/library/busybox:latest + imagePullPolicy: IfNotPresent + command: ["/bin/sh", "-c", "echo MULTIPLE_ACCOUNT_TEST_CONTAINER_NAME:$MULTIPLE_ACCOUNT_TEST_CONTAINER_NAME; sleep 900"] + env: + - name: MULTIPLE_ACCOUNT_TEST_CONTAINER_NAME + value: "cont2" + - name: cont3 + image: docker.io/library/busybox:latest + imagePullPolicy: IfNotPresent + command: ["/bin/sh", "-c", "echo MULTIPLE_ACCOUNT_TEST_CONTAINER_NAME:$MULTIPLE_ACCOUNT_TEST_CONTAINER_NAME; sleep 900"] + env: + - name: MULTIPLE_ACCOUNT_TEST_CONTAINER_NAME + value: "cont3" + nodeSelector: + kubernetes.io/os: linux