diff --git a/plugins/filter_aws/aws.c b/plugins/filter_aws/aws.c index 521945048da..a7b09de1764 100644 --- a/plugins/filter_aws/aws.c +++ b/plugins/filter_aws/aws.c @@ -34,6 +34,7 @@ #include #include +#include #include #include @@ -104,6 +105,30 @@ static void expose_aws_meta(struct flb_filter_aws *ctx) } } +static void create_kubernetes_upstream(struct flb_filter_aws *ctx, struct flb_config *config) { + ctx->kubernetes_upstream = NULL; + + ctx->tls = flb_tls_create(ctx->tls_verify, + ctx->tls_debug, + ctx->tls_vhost, + ctx->tls_ca_path, + ctx->tls_ca_file, + NULL, NULL, NULL); + if (!ctx->tls) { + flb_plg_error(ctx->ins, "tls creation failed in creating k8s upstream in aws plugin"); + } + + /* Create an Upstream context */ + ctx->kubernetes_upstream = flb_upstream_create(config, + flb_strdup(FLB_API_HOST), + FLB_API_PORT, + FLB_IO_TLS, + ctx->tls); + if (!ctx->kubernetes_upstream) { + flb_plg_error(ctx->ins, "kubernetes upstream connection initialization error in aws plugin"); + } +} + static int cb_aws_init(struct flb_filter_instance *f_ins, struct flb_config *config, void *data) @@ -165,6 +190,12 @@ static int cb_aws_init(struct flb_filter_instance *f_ins, /* Remove async flag from upstream */ ctx->ec2_upstream->flags &= ~(FLB_IO_ASYNC); + /*Create kubernetes upstream to query k8s api to define the platform type*/ + if (ctx->enable_entity && strncmp(ctx->entity_type, FLB_FILTER_ENTITY_TYPE_RESOURCE, FLB_FILTER_ENTITY_TYPE_RESOURCE_LEN) == 0) { + create_kubernetes_upstream(ctx, config); + } + ctx->kube_token_create = 0; + /* Retrieve metadata */ ret = get_ec2_metadata(ctx); if (ret < 0) { @@ -178,7 +209,6 @@ static int cb_aws_init(struct flb_filter_instance *f_ins, else { expose_aws_meta(ctx); } - flb_filter_set_context(f_ins, ctx); return 0; } @@ -370,6 +400,267 @@ static int get_vpc_metadata(struct flb_filter_aws *ctx) return ret; } +static void get_cluster_from_environment(struct flb_filter_aws *ctx) +{ + if(ctx->cluster == NULL) { + char* cluster_name = getenv("CLUSTER_NAME"); + if(cluster_name) { + ctx->cluster = flb_strdup(cluster_name); + ctx->cluster_len = strlen(cluster_name); + ctx->new_keys++; + } else { + free(cluster_name); + } + flb_plg_debug(ctx->ins, "Cluster name is %s.", ctx->cluster); + } +} + +static int file_to_buffer(const char *path, + char **out_buf, size_t *out_size) +{ + int ret; + char *buf; + ssize_t bytes; + FILE *fp; + struct stat st; + + if (!(fp = fopen(path, "r"))) { + return -1; + } + + ret = stat(path, &st); + if (ret == -1) { + flb_errno(); + fclose(fp); + return -1; + } + + buf = flb_calloc(1, (st.st_size + 1)); + if (!buf) { + flb_errno(); + fclose(fp); + return -1; + } + + bytes = fread(buf, st.st_size, 1, fp); + if (bytes < 1) { + flb_free(buf); + fclose(fp); + return -1; + } + + fclose(fp); + + *out_buf = buf; + *out_size = st.st_size; + + return 0; +} + +/* Set K8s Authorization Token and get HTTP Auth Header */ +static int get_http_auth_header(struct flb_filter_aws *ctx) +{ + int ret; + char *temp; + char *tk = NULL; + size_t tk_size = 0; + + ret = file_to_buffer(FLB_KUBE_TOKEN, &tk, &tk_size); + if (ret == -1) { + flb_plg_warn(ctx->ins, "cannot open %s", FLB_KUBE_TOKEN); + } + flb_plg_info(ctx->ins, " token updated", FLB_KUBE_TOKEN); + ctx->kube_token_create = time(NULL); + + /* Token */ + if (ctx->token != NULL) { + flb_free(ctx->token); + } + ctx->token = tk; + ctx->token_len = tk_size; + + /* HTTP Auth Header */ + if (ctx->auth == NULL) { + ctx->auth = flb_malloc(tk_size + 32); + } + else if (ctx->auth_len < tk_size + 32) { + temp = flb_realloc(ctx->auth, tk_size + 32); + if (temp == NULL) { + flb_free(ctx->auth); + ctx->auth = NULL; + return -1; + } + ctx->auth = temp; + } + + if (!ctx->auth) { + return -1; + } + ctx->auth_len = snprintf(ctx->auth, tk_size + 32, + "Bearer %s", + tk); + + return 0; +} + +/* Refresh HTTP Auth Header if K8s Authorization Token is expired */ +static int refresh_token_if_needed(struct flb_filter_aws *ctx) +{ + int expired = 0; + int ret; + + if (ctx->kube_token_create > 0) { + if (time(NULL) > ctx->kube_token_create + ctx->kube_token_ttl) { + expired = FLB_TRUE; + } + } + + if (expired || ctx->kube_token_create == 0) { + ret = get_http_auth_header(ctx); + if (ret == -1) { + flb_plg_warn(ctx->ins, "failed to set http auth header"); + return -1; + } + } + + return 0; +} + +/* Gather metadata from HTTP Request, + * this could send out HTTP Request to KUBE Server API + */ +static int get_meta_info_from_request(struct flb_filter_aws *ctx, + struct flb_upstream *upstream, + const char *namespace, + const char *resource_type, + const char *resource_name, + char **buffer, size_t *size, + int *root_type, + char* uri) +{ + struct flb_http_client *c; + struct flb_upstream_conn *u_conn; + int ret; + size_t b_sent; + int packed; + + if (!upstream) { + return -1; + } + + u_conn = flb_upstream_conn_get(upstream); + + if (!u_conn) { + flb_plg_error(ctx->ins, "kubelet upstream connection error"); + return -1; + } + + ret = refresh_token_if_needed(ctx); + if (ret == -1) { + flb_plg_error(ctx->ins, "failed to refresh token"); + return -1; + } + + /* Compose HTTP Client request*/ + c = flb_http_client(u_conn, FLB_HTTP_GET, + uri, + NULL, 0, NULL, 0, NULL, 0); + flb_http_buffer_size(c, ctx->buffer_size); + + flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10); + flb_http_add_header(c, "Connection", 10, "close", 5); + if (ctx->auth_len > 0) { + flb_http_add_header(c, "Authorization", 13, ctx->auth, ctx->auth_len); + } + + ret = flb_http_do(c, &b_sent); + flb_plg_debug(ctx->ins, "Request (ns=%s, %s=%s) http_do=%i, " + "HTTP Status: %i", + namespace, resource_type, resource_name, ret, c->resp.status); + + if (ret != 0 || c->resp.status != 200) { + if (c->resp.payload_size > 0) { + flb_plg_debug(ctx->ins, "HTTP response\n%s", + c->resp.payload); + } + flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); + return -1; + } + + packed = flb_pack_json(c->resp.payload, c->resp.payload_size, + buffer, size, root_type); + + /* release resources */ + flb_http_client_destroy(c); + flb_upstream_conn_release(u_conn); + + return packed; + +} + +/* Gather metadata from API Server */ +static int get_api_server_configmap(struct flb_filter_aws *ctx, + const char *namespace, const char *configmap, + char **out_buf, size_t *out_size) +{ + int ret; + int packed = -1; + int root_type; + char uri[1024]; + char *buf; + size_t size; + + *out_buf = NULL; + *out_size = 0; + + if (packed == -1) { + + ret = snprintf(uri, sizeof(uri) - 1, FLB_KUBE_API_CONFIGMAP_FMT, namespace, + configmap); + + if (ret == -1) { + return -1; + } + flb_plg_debug(ctx->ins, + "Send out request to API Server for configmap information in aws plugin"); + packed = get_meta_info_from_request(ctx, ctx->kubernetes_upstream, namespace, FLB_KUBE_CONFIGMAP, configmap, + &buf, &size, &root_type, uri); + } + + /* validate pack */ + if (packed == -1) { + return -1; + } + + *out_buf = buf; + *out_size = size; + + return 0; +} + +static void get_platform(struct flb_filter_aws *ctx) +{ + if (ctx->platform == NULL) { + char *config_buf = NULL; + size_t config_size; + int ret; + ret = get_api_server_configmap(ctx, KUBE_SYSTEM_NAMESPACE,AWS_AUTH_CONFIG_MAP, + &config_buf, &config_size); + if (ret == -1) { + ctx->platform = flb_strdup(NATIVE_KUBERNETES_PLATFORM); + } else { + ctx->platform = flb_strdup(EKS_PLATFORM); + } + ctx->platform_len = strlen(ctx->platform); + ctx->new_keys++; + flb_plg_debug(ctx->ins, "Platform type is %s.", ctx->platform); + if(config_buf) { + flb_free(config_buf); + } + } +} + /* * Makes a call to IMDS to set get the values of all metadata fields. * It can be called repeatedly if some metadata calls initially do not succeed. @@ -468,6 +759,7 @@ static int get_ec2_metadata(struct flb_filter_aws *ctx) } if (ctx->enable_entity) { + if (strncmp(ctx->entity_type , FLB_FILTER_ENTITY_TYPE_SERVICE, FLB_FILTER_ENTITY_TYPE_SERVICE_LEN) == 0) { if (!ctx->account_id) { ret = get_metadata_by_key(ctx, FLB_FILTER_AWS_IMDS_ACCOUNT_ID_PATH, &ctx->account_id, &ctx->account_id_len, @@ -491,6 +783,13 @@ static int get_ec2_metadata(struct flb_filter_aws *ctx) } else { ctx->new_keys++; } + } + ctx->cluster = NULL; + ctx->platform = NULL; + if (strncmp(ctx->entity_type , FLB_FILTER_ENTITY_TYPE_RESOURCE, FLB_FILTER_ENTITY_TYPE_RESOURCE_LEN) == 0) { + get_cluster_from_environment(ctx); + get_platform(ctx); + } } ctx->metadata_retrieved = FLB_TRUE; @@ -654,16 +953,7 @@ static int cb_aws_filter(const void *data, size_t bytes, ctx->hostname, ctx->hostname_len); } - if (ctx->enable_entity && ctx->instance_id != NULL && ctx->account_id != NULL) { - // Pack instance ID with entity prefix for further processing - msgpack_pack_str(&tmp_pck, FLB_FILTER_AWS_ENTITY_INSTANCE_ID_KEY_LEN); - msgpack_pack_str_body(&tmp_pck, - FLB_FILTER_AWS_ENTITY_INSTANCE_ID_KEY, - FLB_FILTER_AWS_ENTITY_INSTANCE_ID_KEY_LEN); - msgpack_pack_str(&tmp_pck, ctx->instance_id_len); - msgpack_pack_str_body(&tmp_pck, - ctx->instance_id, ctx->instance_id_len); - + if (ctx->enable_entity && ctx->account_id != NULL ) { // Pack account ID with entity prefix for further processing msgpack_pack_str(&tmp_pck, FLB_FILTER_AWS_ENTITY_ACCOUNT_ID_KEY_LEN); msgpack_pack_str_body(&tmp_pck, @@ -672,7 +962,38 @@ static int cb_aws_filter(const void *data, size_t bytes, msgpack_pack_str(&tmp_pck, ctx->account_id_len); msgpack_pack_str_body(&tmp_pck, ctx->account_id, ctx->account_id_len); + + if (ctx->instance_id != NULL && strncmp(ctx->entity_type , FLB_FILTER_ENTITY_TYPE_SERVICE, FLB_FILTER_ENTITY_TYPE_SERVICE_LEN) == 0) { + // Pack instance ID with entity prefix for further processing + msgpack_pack_str(&tmp_pck, FLB_FILTER_AWS_ENTITY_INSTANCE_ID_KEY_LEN); + msgpack_pack_str_body(&tmp_pck, + FLB_FILTER_AWS_ENTITY_INSTANCE_ID_KEY, + FLB_FILTER_AWS_ENTITY_INSTANCE_ID_KEY_LEN); + msgpack_pack_str(&tmp_pck, ctx->instance_id_len); + msgpack_pack_str_body(&tmp_pck, + ctx->instance_id, ctx->instance_id_len); + } + + if (ctx->cluster != NULL && ctx->platform != NULL && strncmp(ctx->entity_type , FLB_FILTER_ENTITY_TYPE_RESOURCE, FLB_FILTER_ENTITY_TYPE_RESOURCE_LEN) == 0 ) { + // Pack cluster name with entity prefix for further processing + msgpack_pack_str(&tmp_pck, FLB_FILTER_AWS_ENTITY_CLUSTER_KEY_LEN); + msgpack_pack_str_body(&tmp_pck, + FLB_FILTER_AWS_ENTITY_CLUSTER_KEY, + FLB_FILTER_AWS_ENTITY_CLUSTER_KEY_LEN); + msgpack_pack_str(&tmp_pck, ctx->cluster_len); + msgpack_pack_str_body(&tmp_pck, + ctx->cluster, ctx->cluster_len); + // Pack platform with entity prefix for further processing + msgpack_pack_str(&tmp_pck, FLB_FILTER_AWS_ENTITY_PLATFORM_KEY_LEN); + msgpack_pack_str_body(&tmp_pck, + FLB_FILTER_AWS_ENTITY_PLATFORM_KEY, + FLB_FILTER_AWS_ENTITY_PLATFORM_KEY_LEN); + msgpack_pack_str(&tmp_pck, ctx->platform_len); + msgpack_pack_str_body(&tmp_pck, + ctx->platform, ctx->platform_len); + } } + } msgpack_unpacked_destroy(&result); @@ -724,6 +1045,27 @@ static void flb_filter_aws_destroy(struct flb_filter_aws *ctx) flb_sds_destroy(ctx->hostname); } + if(ctx->tls) { + flb_tls_destroy(ctx->tls); + } + + if(ctx->kubernetes_upstream) { + flb_upstream_destroy(ctx->kubernetes_upstream); + } + + if(ctx->cluster) { + flb_sds_destroy(ctx->cluster); + } + + if(ctx->platform) { + flb_sds_destroy(ctx->platform); + } + + if(ctx->entity_type){ + flb_sds_destroy(ctx->entity_type); + } + flb_free(ctx->token); + flb_free(ctx->auth); flb_free(ctx); } @@ -792,6 +1134,59 @@ static struct flb_config_map config_map[] = { "Enable entity prefix for fields used for constructing entity." "This currently only affects instance ID" }, + { + FLB_CONFIG_MAP_STR, "entity_type", "service", + 0, FLB_TRUE, offsetof(struct flb_filter_aws, entity_type), + "Defines the type of entity and adds related entity fields." + "Possible values Service or Resource" + }, + { + FLB_CONFIG_MAP_TIME, "kube_token_ttl", "10m", + 0, FLB_TRUE, offsetof(struct flb_filter_aws, kube_token_ttl), + "kubernetes token ttl, until it is reread from the token file. Default: 10m" + }, + /* Buffer size for HTTP Client when reading responses from API Server */ + { + FLB_CONFIG_MAP_SIZE, "buffer_size", "32K", + 0, FLB_TRUE, offsetof(struct flb_filter_aws, buffer_size), + "buffer size to receive response from API server", + }, + + /* TLS: set debug 'level' */ + { + FLB_CONFIG_MAP_INT, "tls.debug", "0", + 0, FLB_TRUE, offsetof(struct flb_filter_aws, tls_debug), + "set TLS debug level: 0 (no debug), 1 (error), " + "2 (state change), 3 (info) and 4 (verbose)" + }, + + /* TLS: enable verification */ + { + FLB_CONFIG_MAP_BOOL, "tls.verify", "true", + 0, FLB_TRUE, offsetof(struct flb_filter_aws, tls_verify), + "enable or disable verification of TLS peer certificate" + }, + + /* TLS: set tls.vhost feature */ + { + FLB_CONFIG_MAP_STR, "tls.vhost", NULL, + 0, FLB_TRUE, offsetof(struct flb_filter_aws, tls_vhost), + "set optional TLS virtual host" + }, + + /* Kubernetes TLS: CA file */ + { + FLB_CONFIG_MAP_STR, "kube_ca_file", FLB_KUBE_CA, + 0, FLB_TRUE, offsetof(struct flb_filter_aws, tls_ca_file), + "Kubernetes TLS CA file" + }, + + /* Kubernetes TLS: CA certs path */ + { + FLB_CONFIG_MAP_STR, "kube_ca_path", NULL, + 0, FLB_TRUE, offsetof(struct flb_filter_aws, tls_ca_path), + "Kubernetes TLS ca path" + }, {0} }; diff --git a/plugins/filter_aws/aws.h b/plugins/filter_aws/aws.h index 332f0f3fc9a..7d0ed7b5319 100644 --- a/plugins/filter_aws/aws.h +++ b/plugins/filter_aws/aws.h @@ -67,6 +67,40 @@ #define FLB_FILTER_AWS_ENTITY_ACCOUNT_ID_KEY_LEN 21 #define FLB_FILTER_AWS_HOSTNAME_KEY "hostname" #define FLB_FILTER_AWS_HOSTNAME_KEY_LEN 8 +#define FLB_FILTER_AWS_ENTITY_PLATFORM_KEY "aws_entity_platform" +#define FLB_FILTER_AWS_ENTITY_PLATFORM_KEY_LEN 19 +#define FLB_FILTER_AWS_ENTITY_CLUSTER_KEY "aws_entity_cluster" +#define FLB_FILTER_AWS_ENTITY_CLUSTER_KEY_LEN 18 + +/* + * Possible entity type values for aws plugin + */ +#define FLB_FILTER_ENTITY_TYPE_RESOURCE "resource" +#define FLB_FILTER_ENTITY_TYPE_RESOURCE_LEN 8 +#define FLB_FILTER_ENTITY_TYPE_SERVICE "service" +#define FLB_FILTER_ENTITY_TYPE_SERVICE_LEN 7 +/* + * Possible cluster platform values for aws plugin + */ +#define NATIVE_KUBERNETES_PLATFORM "k8s" +#define EKS_PLATFORM "eks" + +/* + * Configmap used for verifying whether if FluentBit is + * on EKS or native Kubernetes + */ +#define KUBE_SYSTEM_NAMESPACE "kube-system" +#define AWS_AUTH_CONFIG_MAP "aws-auth" + +/* Kubernetes API server info */ +#define FLB_API_HOST "kubernetes.default.svc" +#define FLB_API_PORT 443 +#define FLB_API_TLS FLB_TRUE + +#define FLB_KUBE_TOKEN "/var/run/secrets/kubernetes.io/serviceaccount/token" +#define FLB_KUBE_CA "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" +#define FLB_KUBE_CONFIGMAP "configmap" +#define FLB_KUBE_API_CONFIGMAP_FMT "/api/v1/namespaces/%s/configmaps/%s" struct flb_filter_aws { /* upstream connection to ec2 IMDS */ @@ -121,6 +155,24 @@ struct flb_filter_aws { */ int enable_entity; + /* + * Defines the type of entity. + * Possible values resource or service + */ + flb_sds_t entity_type; + + char *cluster; + int cluster_len; + char *platform; + int platform_len; + + /* + * This connection is used for calling Kubernetes configmaps + * endpoint so pod association can determine the environment. + * Example: EKS or Native Kubernetes. + */ + struct flb_upstream *kubernetes_upstream; + /* number of new keys added by this plugin */ int new_keys; @@ -131,6 +183,32 @@ struct flb_filter_aws { /* Filter plugin instance reference */ struct flb_filter_instance *ins; + + /* HTTP Client Setup */ + size_t buffer_size; + + /* Pre-formatted HTTP Authorization header value */ + char *auth; + size_t auth_len; + + /* Command to get Kubernetes Authorization Token */ + int kube_token_create; + int kube_token_ttl; + + /* Kubernetes Token from FLB_KUBE_TOKEN file */ + char *token; + size_t token_len; + + struct flb_tls *tls; + + /* TLS CA certificate file */ + char *tls_ca_path; + char *tls_ca_file; + int tls_debug; + int tls_verify; + /* TLS virtual host (optional), set by configmap */ + flb_sds_t tls_vhost; + }; #endif diff --git a/plugins/out_cloudwatch_logs/cloudwatch_api.c b/plugins/out_cloudwatch_logs/cloudwatch_api.c index fb330726afb..233f55d6613 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_api.c +++ b/plugins/out_cloudwatch_logs/cloudwatch_api.c @@ -238,6 +238,62 @@ static int entity_add_key_attributes(struct flb_cloudwatch *ctx, struct cw_flush return -1; } +static int entity_add_resource_key_attributes(struct flb_cloudwatch *ctx, struct cw_flush *buf, struct log_stream *stream, int *offset) { + char ts[KEY_ATTRIBUTES_MAX_LEN]; + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "\"keyAttributes\":{",0)) { + goto error; + } + if(stream->entity->key_attributes->platform != NULL && strlen(stream->entity->key_attributes->platform) != 0) { + if (strncmp(stream->entity->key_attributes->platform, EKS_PLATFORM, 3) == 0) { + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "\"Type\":\"AWS::Resource\"",0)) { + goto error; + } + if (!snprintf(ts,KEY_ATTRIBUTES_MAX_LEN, ",%s%s%s","\"ResourceType\":\"","AWS::EKS::Cluster","\"")) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } else if (strncmp(stream->entity->key_attributes->platform, NATIVE_KUBERNETES_PLATFORM, 3) == 0) { + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "\"Type\":\"Resource\"",0)) { + goto error; + } + if (!snprintf(ts,KEY_ATTRIBUTES_MAX_LEN, ",%s%s%s","\"ResourceType\":\"","K8s::Cluster","\"")) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + } + if(stream->entity->key_attributes->cluster_name != NULL && strlen(stream->entity->key_attributes->cluster_name) != 0) { + if (!snprintf(ts,KEY_ATTRIBUTES_MAX_LEN, ",%s%s%s","\"Identifier\":\"",stream->entity->key_attributes->cluster_name,"\"")) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + if(stream->entity->key_attributes->account_id != NULL && strlen(stream->entity->key_attributes->account_id) != 0) { + if (!snprintf(ts,KEY_ATTRIBUTES_MAX_LEN, ",%s%s%s","\"AwsAccountId\":\"",stream->entity->key_attributes->account_id,"\"")) { + goto error; + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) { + goto error; + } + } + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "}", 1)) { + goto error; + } + return 0; +error: + return -1; +} + static int entity_add_attributes(struct flb_cloudwatch *ctx, struct cw_flush *buf, struct log_stream *stream,int *offset) { char ts[ATTRIBUTES_MAX_LEN]; if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, @@ -369,18 +425,30 @@ static int init_put_payload(struct flb_cloudwatch *ctx, struct cw_flush *buf, // If we are missing the service name, the entity will get rejected by the frontend anyway // so do not emit entity unless service name is filled. If we are missing account ID // it is considered not having sufficient information for entity therefore we should drop the entity. - if(ctx->add_entity && stream->entity != NULL && stream->entity->key_attributes != NULL && stream->entity->key_attributes->name != NULL && stream->entity->key_attributes->account_id != NULL) { + if(ctx->add_entity && stream->entity != NULL && stream->entity->key_attributes != NULL && strncmp(ctx->entity_type, FLB_FILTER_ENTITY_TYPE_RESOURCE, FLB_FILTER_ENTITY_TYPE_RESOURCE_LEN) == 0 && stream->entity->key_attributes->platform != NULL && stream->entity->key_attributes->cluster_name != NULL && stream->entity->key_attributes->account_id != NULL) { + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "\"entity\":{", 10)) { + goto error; + } + ret = entity_add_resource_key_attributes(ctx,buf,stream,offset); + if (ret < 0) { + flb_plg_error(ctx->ins, "Failed to initialize Resource Entity KeyAttributes"); + goto error; + } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, - "\"entity\":{", 10)) { + "},", 2)) { goto error; } - - if(stream->entity->key_attributes != NULL) { - ret = entity_add_key_attributes(ctx,buf,stream,offset); - if (ret < 0) { - flb_plg_error(ctx->ins, "Failed to initialize Entity KeyAttributes"); + } + else if (ctx->add_entity && stream->entity != NULL && stream->entity->key_attributes != NULL && stream->entity->key_attributes->name != NULL && stream->entity->key_attributes->account_id != NULL) { + if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, + "\"entity\":{", 10)) { goto error; - } + } + ret = entity_add_key_attributes(ctx,buf,stream,offset); + if (ret < 0) { + flb_plg_error(ctx->ins, "Failed to initialize Entity KeyAttributes"); + goto error; } if(stream->entity->attributes != NULL) { ret = entity_add_attributes(ctx,buf,stream,offset); @@ -390,10 +458,9 @@ static int init_put_payload(struct flb_cloudwatch *ctx, struct cw_flush *buf, } } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, - "},", 2)) { + "},", 2)) { goto error; } - } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, @@ -1126,13 +1193,31 @@ void parse_entity(struct flb_cloudwatch *ctx, entity *entity, msgpack_object map } entity->key_attributes->account_id = flb_strndup(val.via.str.ptr, val.via.str.size); } + if(strncmp(key.via.str.ptr, "aws_entity_platform",key.via.str.size ) == 0 ) { + if(entity->key_attributes->platform == NULL) { + entity->root_filter_count++; + } else { + flb_free(entity->key_attributes->platform); + } + entity->key_attributes->platform = flb_strndup(val.via.str.ptr, val.via.str.size); + } + if(strncmp(key.via.str.ptr, "aws_entity_cluster",key.via.str.size ) == 0 ) { + if(entity->key_attributes->cluster_name == NULL) { + entity->root_filter_count++; + } else { + flb_free(entity->key_attributes->cluster_name); + } + entity->key_attributes->cluster_name = flb_strndup(val.via.str.ptr, val.via.str.size); + } } - if(entity->key_attributes->name == NULL && entity->attributes->name_source == NULL &&entity->attributes->workload != NULL) { - entity->key_attributes->name = flb_strndup(entity->attributes->workload, strlen(entity->attributes->workload)); - entity->attributes->name_source = flb_strndup("K8sWorkload", 11); - } - if(entity->key_attributes->environment == NULL) { - entity->key_attributes->environment = find_fallback_environment(ctx, entity); + if (strncmp(ctx->entity_type, FLB_FILTER_ENTITY_TYPE_SERVICE, FLB_FILTER_ENTITY_TYPE_SERVICE_LEN) == 0) { + if(entity->key_attributes->name == NULL && entity->attributes->name_source == NULL &&entity->attributes->workload != NULL) { + entity->key_attributes->name = flb_strndup(entity->attributes->workload, strlen(entity->attributes->workload)); + entity->attributes->name_source = flb_strndup("K8sWorkload", 11); + } + if(entity->key_attributes->environment == NULL) { + entity->key_attributes->environment = find_fallback_environment(ctx, entity); + } } } @@ -1150,11 +1235,13 @@ void update_or_create_entity(struct flb_cloudwatch *ctx, struct log_stream *stre } memset(stream->entity->key_attributes, 0, sizeof(entity_key_attributes)); - stream->entity->attributes = flb_malloc(sizeof(entity_attributes)); - if (stream->entity->attributes == NULL) { - return; + if (strncmp(ctx->entity_type , FLB_FILTER_ENTITY_TYPE_SERVICE, FLB_FILTER_ENTITY_TYPE_SERVICE_LEN) == 0) { + stream->entity->attributes = flb_malloc(sizeof(entity_attributes)); + if (stream->entity->attributes == NULL) { + return; + } + memset(stream->entity->attributes, 0, sizeof(entity_attributes)); } - memset(stream->entity->attributes, 0, sizeof(entity_attributes)); stream->entity->filter_count = 0; stream->entity->root_filter_count = 0; stream->entity->service_name_found = 0; @@ -1242,7 +1329,7 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin, map = root.via.array.ptr[1]; map_size = map.via.map.size; - if(ctx->kubernete_metadata_enabled && ctx->add_entity) { + if( ctx->add_entity && ( ctx->kubernete_metadata_enabled || strncmp(ctx->entity_type, FLB_FILTER_ENTITY_TYPE_RESOURCE, FLB_FILTER_ENTITY_TYPE_RESOURCE_LEN) == 0 )) { msgpack_sbuffer_init(&filtered_sbuf); msgpack_unpacked_init(&modified_unpacked); } @@ -1251,7 +1338,7 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin, flb_plg_debug(ctx->ins, "Couldn't determine log group & stream for record with tag %s", tag); goto error; } - if(ctx->kubernete_metadata_enabled && ctx->add_entity) { + if(ctx->add_entity && ( ctx->kubernete_metadata_enabled || strncmp(ctx->entity_type, FLB_FILTER_ENTITY_TYPE_RESOURCE, FLB_FILTER_ENTITY_TYPE_RESOURCE_LEN) == 0 )) { update_or_create_entity(ctx,stream,map); // Prepare a buffer to pack the modified map if(stream->entity != NULL && (stream->entity->root_filter_count > 0 || stream->entity->filter_count > 0)) { @@ -1384,7 +1471,7 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin, if (ret == 0) { i++; } - if(ctx->kubernete_metadata_enabled && ctx->add_entity) { + if(ctx->add_entity && ( ctx->kubernete_metadata_enabled || strncmp(ctx->entity_type, FLB_FILTER_ENTITY_TYPE_RESOURCE, FLB_FILTER_ENTITY_TYPE_RESOURCE_LEN) == 0 )) { msgpack_sbuffer_destroy(&filtered_sbuf); msgpack_unpacked_destroy(&modified_unpacked); } @@ -1403,7 +1490,7 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin, error: msgpack_unpacked_destroy(&result); - if(ctx->kubernete_metadata_enabled && ctx->add_entity) { + if(ctx->add_entity && ( ctx->kubernete_metadata_enabled || strncmp(ctx->entity_type, FLB_FILTER_ENTITY_TYPE_RESOURCE, FLB_FILTER_ENTITY_TYPE_RESOURCE_LEN) == 0 )) { msgpack_sbuffer_destroy(&filtered_sbuf); msgpack_unpacked_destroy(&modified_unpacked); } diff --git a/plugins/out_cloudwatch_logs/cloudwatch_api.h b/plugins/out_cloudwatch_logs/cloudwatch_api.h index b8be9d2d39b..a8ead2d7cc1 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_api.h +++ b/plugins/out_cloudwatch_logs/cloudwatch_api.h @@ -48,6 +48,19 @@ #define AWS_ENTITY_PREFIX "aws_entity" #define AWS_ENTITY_PREFIX_LEN 10 +/* + * Possible entity type values for aws plugin + */ +#define FLB_FILTER_ENTITY_TYPE_RESOURCE "resource" +#define FLB_FILTER_ENTITY_TYPE_RESOURCE_LEN 8 +#define FLB_FILTER_ENTITY_TYPE_SERVICE "service" +#define FLB_FILTER_ENTITY_TYPE_SERVICE_LEN 7 +/* + * Possible cluster platform values for aws plugin + */ +#define NATIVE_KUBERNETES_PLATFORM "k8s" +#define EKS_PLATFORM "eks" + #include "cloudwatch_logs.h" void cw_flush_destroy(struct cw_flush *buf); diff --git a/plugins/out_cloudwatch_logs/cloudwatch_logs.c b/plugins/out_cloudwatch_logs/cloudwatch_logs.c index 0b85be26e01..41129e1f5c1 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_logs.c +++ b/plugins/out_cloudwatch_logs/cloudwatch_logs.c @@ -516,6 +516,8 @@ void entity_destroy(entity *entity) { flb_free(entity->key_attributes->name); flb_free(entity->key_attributes->type); flb_free(entity->key_attributes->account_id); + flb_free(entity->key_attributes->platform); + flb_free(entity->key_attributes->cluster_name); flb_free(entity->key_attributes); } flb_free(entity); @@ -676,6 +678,12 @@ static struct flb_config_map config_map[] = { "add entity to PutLogEvent calls" }, + { + FLB_CONFIG_MAP_STR, "entity_type", "service", + 0, FLB_TRUE, offsetof(struct flb_cloudwatch, entity_type), + "store the entity type. Possible values resource or service" + }, + /* EOF */ {0} }; diff --git a/plugins/out_cloudwatch_logs/cloudwatch_logs.h b/plugins/out_cloudwatch_logs/cloudwatch_logs.h index 037b366b53e..de95dd6a7c3 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_logs.h +++ b/plugins/out_cloudwatch_logs/cloudwatch_logs.h @@ -49,6 +49,8 @@ typedef struct entity_key_attributes { char *name; char *environment; char *account_id; + char *cluster_name; + char *platform; }entity_key_attributes; /* Attributes used for CloudWatch Entity object @@ -194,6 +196,7 @@ struct flb_cloudwatch { int kubernete_metadata_enabled; int add_entity; + char *entity_type; }; void flb_cloudwatch_ctx_destroy(struct flb_cloudwatch *ctx);