Skip to content

Commit

Permalink
Integration tests and kuberentes_monitor / k8s implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
alesnovak-s1 committed Jan 26, 2024
1 parent c065ef0 commit d9c9779
Show file tree
Hide file tree
Showing 7 changed files with 258 additions and 8 deletions.
24 changes: 24 additions & 0 deletions .github/actions/install-k8s-agent/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@ inputs:
scalyr_api_key:
description: "Write API key to be used by the agent."
required: true
scalyr_api_key_team_1:
description: "Write API key to be used by the agent."
required: false
scalyr_api_key_team_2:
description: "Write API key to be used by the agent."
required: false
scalyr_api_key_team_3:
description: "Write API key to be used by the agent."
required: false
scalyr_cluster_name:
description: "Cluster name to use."
required: true
Expand Down Expand Up @@ -80,6 +89,21 @@ runs:
# Define api key
kubectl create secret generic scalyr-api-key --namespace scalyr --from-literal=scalyr-api-key="${{ inputs.scalyr_api_key }}"
# Create a secret if the scalyr_api_key_team_1 is set
if [ ! -z "${{ inputs.scalyr_api_key_team_1 }}" ]; then
kubectl create secret generic scalyr-api-key-team-1 --namespace scalyr --from-literal=scalyr-api-key="${{ inputs.scalyr_api_key_team_1 }}"
fi
# Create a secret if the scalyr_api_key_team_2 is set
if [ ! -z "${{ inputs.scalyr_api_key_team_2 }}" ]; then
kubectl create secret generic scalyr-api-key-team-2 --namespace scalyr --from-literal=scalyr-api-key="${{ inputs.scalyr_api_key_team_2 }}"
fi
# Create a secret if the scalyr_api_key_team_3 is set
if [ ! -z "${{ inputs.scalyr_api_key_team_3 }}" ]; then
kubectl create secret generic scalyr-api-key-team-3 --namespace scalyr --from-literal=scalyr-api-key="${{ inputs.scalyr_api_key_team_3 }}"
fi
# Create configmap
kubectl create configmap --namespace scalyr scalyr-config \
Expand Down
56 changes: 56 additions & 0 deletions .github/workflows/reusable-agent-build-container-images.yml
Original file line number Diff line number Diff line change
Expand Up @@ -369,9 +369,11 @@ jobs:
minikube image ls
kubectl apply -f tests/e2e/k8s_k8s_monitor/std_printer_deployment.yaml
kubectl apply -f tests/e2e/k8s_k8s_monitor/long_message_printer_deployment.yaml
kubectl apply -f tests/e2e/k8s_k8s_monitor/multiple_account_printers.yaml
kubectl wait --for=condition=ready pod -l app=std-printer
kubectl wait --for=condition=ready pod -l app=long-message-printer
kubectl wait --for=condition=ready pod -l app=multiple-account-printer
kubectl get pods -A
export APP_POD_NAME=$(kubectl get pod --namespace=default --selector=app=std-printer -o jsonpath="{.items[0].metadata.name}")
Expand All @@ -398,6 +400,9 @@ jobs:
with:
scalyr_server: "agent.scalyr.com"
scalyr_api_key: "${{ secrets.CT_SCALYR_TOKEN_PROD_US_CLOUDTECH_TESTING_WRITE }}"
scalyr_api_key_team_1: "${{ secrets.CT_SCALYR_TOKEN_PROD_US_CLOUDTECH_TESTING_2_WRITE }}"
scalyr_api_key_team_2: "${{ secrets.CT_SCALYR_TOKEN_PROD_US_CLOUDTECH_TESTING_3_WRITE }}"
scalyr_api_key_team_3: "${{ secrets.CT_SCALYR_TOKEN_PROD_US_CLOUDTECH_TESTING_4_WRITE }}"
scalyr_cluster_name: "${K8S_CLUSTER_NAME}"
scalyr_k8s_events_disable: "false"
main_yaml_path: "tests/e2e/scalyr-agent-2-daemonset.yaml"
Expand Down Expand Up @@ -520,6 +525,56 @@ jobs:
echo "CronJob events checks"
./scripts/cicd/scalyr-query.sh '$serverHost="'${SCALYR_AGENT_POD_NAME}'" $logfile="/var/log/scalyr-agent-2/kubernetes_events.log" $parser="k8sEvents" k8s-kind="CronJob" involvedObjectKind="CronJob" involvedObjectName="hello" watchEventType="ADDED" reason="SawCompletedJob"'
- name: Verify multiaccount records have been ingested
timeout-minutes: 14
env:
scalyr_api_key_team_1: "${{ secrets.CT_SCALYR_TOKEN_PROD_US_CLOUDTECH_TESTING_WRITE }}"
scalyr_api_key_team_2: "${{ secrets.CT_SCALYR_TOKEN_PROD_US_CLOUDTECH_TESTING_WRITE }}"
scalyr_api_key_team_3: "${{ secrets.CT_SCALYR_TOKEN_PROD_US_CLOUDTECH_TESTING_WRITE }}"
SCALYR_AGENT_POD_NAME: "${{ env.SCALYR_AGENT_POD_NAME }}"
run: |
export RETRY_ATTEMPTS="1"
function scalyr_query() {
scalyr query --columns=message "serverHost=\"${SCALYR_AGENT_POD_NAME}\" app=\"multiple-account-printer\" $@"
}
function ingested_line_count() {
API_KEY=$1
MESSAGE=$2
scalyr_readlog_token=${API_KEY} scalyr_query ${MESSAGE}" | wc -l
}
function assert_ingested_once() {
API_KEY=$1
MESSAGE=$2
LINE_COUNT=`scalyr_readlog_token=${API_KEY} scalyr_query ${MESSAGE}" | wc -l`
}
function assert_not_ingested() {
API_KEY=$1
MESSAGE=$2
scalyr_readlog_token=${API_KEY} scalyr_query ${MESSAGE}" && echo -e $SUCCESS_MSG || echo -e $ERR_MSG && exit 1
}
# See tests/e2e/k8s_k8s_monitor/multiple_account_printers.yaml:16 for the following annotations:
# log.config.scalyr.com/attributes.parser: "test-parser-1"
# log.config.scalyr.com/team1.secret: "scalyr-api-key-team-1"
# log.config.scalyr.com/cont1.team2.secret: "scalyr-api-key-team-2"
# log.config.scalyr.com/cont2.team2.secret: "scalyr-api-key-team-2"
# log.config.scalyr.com/cont2.team3.secret: "scalyr-api-key-team-3"
# Container 1 log is ingested with the scalyr-api-key-team-2 only
scalyr_query(
# Container 2 log is ingested with the scalyr-api-key-team-2 and scalyr-api-key-team-3 only
# Container 3 log is ingested with the scalyr-api-key-team-1 only
scalyr_readlog_token=${{ scalyr_api_key_team_2 }} ./scripts/cicd/scalyr-query.sh '$serverHost="'${SCALYR_AGENT_POD_NAME}'" app="multiple-account-printer" "MULTIPLE_ACCOUNT_TEST_CONTAINER_NAME:cont1'
scalyr_readlog_token=${{ scalyr_api_key_team_2 }} ./scripts/cicd/scalyr-query.sh '$serverHost="'${SCALYR_AGENT_POD_NAME}'" app="multiple-account-printer" "MULTIPLE_ACCOUNT_TEST_CONTAINER_NAME:cont1'
- name: Notify Slack on Failure
if: ${{ failure() && github.ref_name == 'master' }}
uses: act10ns/slack@ed1309ab9862e57e9e583e51c7889486b9a00b0f # v2.0.0
Expand All @@ -530,6 +585,7 @@ jobs:
steps: ${{ toJson(steps) }}
channel: '#eng-dataset-cloud-tech'


k8s_open_metrics_monitor_tests:
name: OpenMetrics Monitor - k8s ${{ inputs.builder_name }} ${{ matrix.k8s_version.version }}-${{ matrix.k8s_version.runtime}}
runs-on: ubuntu-20.04
Expand Down
52 changes: 52 additions & 0 deletions scalyr_agent/builtin_monitors/kubernetes_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3189,6 +3189,7 @@ def __get_log_config_for_container(self, cid, info, k8s_cache, base_attributes):
parser = "docker"
common_annotations = {}
container_annotations = {}
all_annotations = {}
# pod name and namespace are set to an invalid value for cases where errors occur and a log
# message is produced, so that the log message has clearly invalid values for these rather
# than just being empty
Expand Down Expand Up @@ -3271,6 +3272,7 @@ def __get_log_config_for_container(self, cid, info, k8s_cache, base_attributes):
# by default all annotations will be applied to all containers
# in the pod
all_annotations = pod.annotations

container_specific_annotations = False

# get any common annotations for all containers
Expand Down Expand Up @@ -3409,6 +3411,56 @@ def __get_log_config_for_container(self, cid, info, k8s_cache, base_attributes):
if "parser" in attrs:
result["parser"] = attrs["parser"]

# Based on the pod annotations in a format {container_name}.{team}.secret={secret_name}
# we might want to add api_keys parameter
if container_annotations or all_annotations:
def filter_teams(annotations):
return {
team: value
for team, value in annotations.items()
if re.fullmatch("team\\d+", team)
}

def fetch_secret(name):
secret = k8s_cache.secret(pod_namespace, name, time.time())
if secret:
return secret

self._logger.warning(
"Failed to fetch secret '%s' for pod '%s/%s'"
% (name, pod_namespace, pod_name),
limit_once_per_x_secs=300,
limit_key="k8s-fetch-secret-%s" % name,
)

def get_secret_api_key(secret):
api_key = secret.data.get("api-key")

if not api_key:
self._logger.warning(
"Secret '%s/%s' does not contain a scalyr-api-key field, ingoring."
% (pod_namespace, secret.name),
limit_once_per_x_secs=300,
limit_key="k8s-fetch-secret-%s" % secret.name
)

return api_key

team_annotations = filter_teams(container_annotations) or filter_teams(all_annotations)

api_keys = [
get_secret_api_key(
fetch_secret(
team_annotations[team]["secret"]
)
)
for team in team_annotations.keys()
if team_annotations.get(team).get("secret")
]

if api_keys:
result["api_keys"] = list(filter(lambda api_key: api_key is not None, api_keys))

return result

def __get_docker_logs(self, containers, k8s_cache):
Expand Down
8 changes: 5 additions & 3 deletions scalyr_agent/copying_manager/copying_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,10 +465,11 @@ def config(self):

def add_log_config(self, monitor_name, log_config, force_add=False):
"""Add the log_config item to the list of paths being watched
If force_add is true and the log_config item is marked to be removed the removal will be canceled.
Otherwise, the item will be added only if it's not monitored already.
param: monitor_name - the name of the monitor adding the log config
param: log_config - a log_config object containing the path to be added
param force_add: True or force add this file and cancel any removal which
may have been scheduled before hand.
param force_add: bool, see above
We really just want to use this with Docker monitor where there is a small windows between
the container restart where the log file is not immediately removed.
returns: an updated log_config object
Expand Down Expand Up @@ -570,7 +571,8 @@ def update_log_config(self, monitor_name, log_config):

def remove_log_path(self, monitor_name, log_path):
"""Remove the log_path from the list of paths being watched
params: log_path - a string containing the path to the file no longer being watched
param: monitor - the monitor removing the path
param: log_path - a string containing path of the log file to remove
"""
# get the list of paths with 0 reference counts
self.__lock.acquire()
Expand Down
15 changes: 10 additions & 5 deletions scalyr_agent/log_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,16 @@ class LogWatcher(object):
to add/remove a set of log paths
"""

def add_log_config(self, monitor_name, log_config):
"""Add the path specified by the log_config to the list of paths being watched
param: monitor_name - the name of the monitor adding the log_config
param: log_config - a log_config object containing at least a path
returns: the log_config variable with updated path and default information
def add_log_config(self, monitor_name, log_config, force_add):
"""Add the log_config item to the list of paths being watched
If force_add is true and the log_config item is marked to be removed the removal will be canceled.
Otherwise, the item will be added only if it's not monitored already.
param: monitor_name - the name of the monitor adding the log config
param: log_config - a log_config object containing the path to be added
param force_add: bool, see above
We really just want to use this with Docker monitor where there is a small windows between
the container restart where the log file is not immediately removed.
returns: an updated log_config object
"""
pass

Expand Down
66 changes: 66 additions & 0 deletions scalyr_agent/monitor_utils/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@
"list": Template("/api/v1/namespaces/${namespace}/pods"),
"list-all": "/api/v1/pods",
},
"Secret": {
"single": Template("/api/v1/namespaces/${namespace}/secrets/${name}"),
"list": Template("/api/v1/namespaces/${namespace}/secrets"),
"list-all": "/api/v1/secrets"
},
"ReplicaSet": {
"single": Template("/apis/apps/v1/namespaces/${namespace}/replicasets/${name}"),
"list": Template("/apis/apps/v1/namespaces/${namespace}/replicasets"),
Expand Down Expand Up @@ -518,6 +523,16 @@ def __repr__(self):
return str(self.__dict__)


class Secret(object):
def __init__(self, name, namespace, data, kind, string_data, type):
self.name = name
self.namespace = namespace
self.data = data
self.kind = kind
self.string_data = string_data
self.type = type


class Controller(object):
"""
General class for all cached Controller objects
Expand Down Expand Up @@ -1012,6 +1027,19 @@ def process_object(self, k8s, obj, query_options=None):
return result


class SecretProcessor(_K8sProcessor):
def process_object(self, k8s, obj, query_options=None):
metadata = obj.get("metadata", {})
kind = obj.get("kind", "")
namespace = metadata.get("namespace", "")
name = metadata.get("name", "")
data = obj.get("data", {})
string_data = obj.get("stringData", {})
type = obj.get("type", "")

return Secret(name, namespace, data, kind, string_data, type)


class ControllerProcessor(_K8sProcessor):
def process_object(self, k8s, obj, query_options=None):
"""Generate a Controller object from a JSON object
Expand Down Expand Up @@ -1292,6 +1320,10 @@ def __init__(
self._pod_processor = PodProcessor(self._controllers)
self._pods_cache = _K8sCache(self._pod_processor, "Pod")

# create the secret cache
self._secret_processor = SecretProcessor()
self._secrets_cache = _K8sCache(self._secret_processor, "Secret")

self._cluster_name = None
self._api_server_version = None
# The last time (in seconds since epoch) we updated the K8s version number via a query
Expand Down Expand Up @@ -1571,6 +1603,7 @@ def update_cache(self, run_state):
scalyr_logging.DEBUG_LEVEL_1, "Marking unused pods as expired"
)
self._pods_cache.mark_as_expired(current_time)
self._secrets_cache.mark_as_expired(current_time)

self._update_cluster_name(local_state.k8s)
self._update_api_server_version_if_necessary(
Expand Down Expand Up @@ -1614,6 +1647,39 @@ def update_cache(self, run_state):
local_state.cache_expiry_secs - fuzz_factor
)

def secret(
self,
namespace,
name,
current_time=None,
allow_expired=False
):
"""Returns pod info for the pod specified by namespace and name or None if no pad matches.
Warning: Failure to pass current_time leads to incorrect recording of last access times, which will
lead to these objects being refreshed prematurely (potential source of bugs)
Querying the pod information is thread-safe, but the returned object should
not be written to.
@param allow_expired: If True, an object is considered present in cache even if it is expired.
@type allow_expired: bool
"""
local_state = self._state.copy_state()

if local_state.k8s is None:
return

return self._secrets_cache.lookup(
local_state.k8s,
current_time,
namespace,
name,
kind="Secret",
allow_expired=allow_expired,
ignore_k8s_api_exception=ignore_k8s_api_exception,
)

def pod(
self,
namespace,
Expand Down
45 changes: 45 additions & 0 deletions tests/e2e/k8s_k8s_monitor/multiple_account_printers.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: multiple-account-printer
namespace: default
spec:
replicas: 1
selector:
matchLabels:
app: multiple-account-printer
template:
metadata:
labels:
app: multiple-account-printer
annotations:
log.config.scalyr.com/attributes.parser: "test-parser-1"
log.config.scalyr.com/team1.secret: "scalyr-api-key-team-1"
log.config.scalyr.com/cont1.team2.secret: "scalyr-api-key-team-2"
log.config.scalyr.com/cont2.team2.secret: "scalyr-api-key-team-2"
log.config.scalyr.com/cont2.team3.secret: "scalyr-api-key-team-3"
spec:
containers:
- name: cont1
image: docker.io/library/busybox:latest
imagePullPolicy: IfNotPresent
command: ["/bin/sh", "-c", "echo MULTIPLE_ACCOUNT_TEST_CONTAINER_NAME:$MULTIPLE_ACCOUNT_TEST_CONTAINER_NAME; sleep 900"]
env:
- name: MULTIPLE_ACCOUNT_TEST_CONTAINER_NAME
value: "cont1"
- name: cont2
image: docker.io/library/busybox:latest
imagePullPolicy: IfNotPresent
command: ["/bin/sh", "-c", "echo MULTIPLE_ACCOUNT_TEST_CONTAINER_NAME:$MULTIPLE_ACCOUNT_TEST_CONTAINER_NAME; sleep 900"]
env:
- name: MULTIPLE_ACCOUNT_TEST_CONTAINER_NAME
value: "cont2"
- name: cont3
image: docker.io/library/busybox:latest
imagePullPolicy: IfNotPresent
command: ["/bin/sh", "-c", "echo MULTIPLE_ACCOUNT_TEST_CONTAINER_NAME:$MULTIPLE_ACCOUNT_TEST_CONTAINER_NAME; sleep 900"]
env:
- name: MULTIPLE_ACCOUNT_TEST_CONTAINER_NAME
value: "cont3"
nodeSelector:
kubernetes.io/os: linux

0 comments on commit d9c9779

Please sign in to comment.