Skip to content

Commit

Permalink
loki_out: add stuctured_metadata_map_keys
Browse files Browse the repository at this point in the history
* Adds stuctured_metadata_map_keys config to dynamically populate stuctured_metadata from a map
* Add docker-compose to test loki backend

Signed-off-by: Greg Eales <0x006EA1E5@gmail.com>
  • Loading branch information
0x006EA1E5 committed Jan 23, 2025
1 parent 81f62b9 commit 560e444
Show file tree
Hide file tree
Showing 8 changed files with 510 additions and 15 deletions.
12 changes: 12 additions & 0 deletions docker_compose/loki-grafana-structured_metadata_map/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
### Description

This directory has a docker-compose file and its
configuration required to run:

1) A fluentbit installation with a dummy input, and Loki output configured for `structured_metadata_map_keys`
3) A Loki installation
4) A grafana installation with a default Loki datasource

To run this, execute:

$ docker-compose up --force-recreate -d
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
service:
log_level: debug

pipeline:
inputs:
- name: dummy
tag: logs
dummy: |
{
"message": "simple log generated",
"logger": "my.logger",
"level": "INFO",
"hostname": "localhost",
"my_map_of_attributes_1": {
"key_1": "hello, world!",
"key_2": "goodbye, world!"
},
"my_map_of_maps_1": {
"root_key": {
"sub_key_1": "hello, world!",
"sub_key_2": "goodbye, world!"
}
}
}
outputs:
- name: loki
match: logs
host: loki
remove_keys: hostname,my_map_of_attributes_1,my_map_of_maps_1
label_keys: $level,$logger
labels: service_name=test
structured_metadata: $hostname
structured_metadata_map_keys: $my_map_of_attributes_1,$my_map_of_maps_1['root_key']
line_format: key_value
drop_single_key: on
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# config file version
apiVersion: 1

# list of datasources that should be deleted from the database
deleteDatasources:
- name: Prometheus
orgId: 1

# list of datasources to insert/update depending
# whats available in the database
datasources:
- name: Loki
type: loki
access: proxy
orgId: 1
url: http://loki:3100
basicAuth: false
isDefault: true
version: 1
editable: false
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
auth_enabled: false

server:
http_listen_port: 3100
grpc_listen_port: 9096

common:
instance_addr: 127.0.0.1
path_prefix: /tmp/loki
storage:
filesystem:
chunks_directory: /tmp/loki/chunks
rules_directory: /tmp/loki/rules
replication_factor: 1
ring:
kvstore:
store: inmemory

query_range:
results_cache:
cache:
embedded_cache:
enabled: true
max_size_mb: 100

schema_config:
configs:
- from: 2020-10-24
store: tsdb
object_store: filesystem
schema: v13
index:
prefix: index_
period: 24h

ruler:
alertmanager_url: http://localhost:9093

# By default, Loki will send anonymous, but uniquely-identifiable usage and configuration
# analytics to Grafana Labs. These statistics are sent to https://stats.grafana.org/
#
# Statistics help us better understand how Loki is used, and they show us performance
# levels for most users. This helps us prioritize features and documentation.
# For more information on what's sent, look at
# https://github.com/grafana/loki/blob/main/pkg/analytics/stats.go
# Refer to the buildReport method to see what goes into a report.
#
# If you would like to disable reporting, uncomment the following lines:
#analytics:
# reporting_enabled: false
limits_config:
allow_structured_metadata: true
volume_enabled: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
services:
fluentbit:
image: fluent/fluent-bit:latest
depends_on:
- loki
container_name: fluentbit
command: /fluent-bit/bin/fluent-bit -c /etc/fluent-bit_loki_out-structured_metadata_map.yaml
ports:
- 2021:2021
networks:
- loki-network
volumes:
- ./config/fluent-bit_loki_out-structured_metadata_map.yaml:/etc/fluent-bit_loki_out-structured_metadata_map.yaml

grafana:
image: grafana/grafana:11.4.0
depends_on:
- loki
- fluentbit
ports:
- 3000:3000
volumes:
- ./config/grafana/provisioning:/etc/grafana/provisioning
networks:
- loki-network
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin

loki:
image: grafana/loki:2.9.2
command: -config.file=/etc/loki/loki-config.yaml
networks:
- loki-network
ports:
- 3100:3100
volumes:
- ./config/loki-config.yaml:/etc/loki/loki-config.yaml

networks:
loki-network:
driver: bridge
148 changes: 143 additions & 5 deletions plugins/out_loki/loki.c
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,13 @@ static void flb_loki_kv_exit(struct flb_loki *ctx)
mk_list_foreach_safe(head, tmp, &ctx->structured_metadata_list) {
kv = mk_list_entry(head, struct flb_loki_kv, _head);

/* unlink and destroy */
mk_list_del(&kv->_head);
flb_loki_kv_destroy(kv);
}
mk_list_foreach_safe(head, tmp, &ctx->structured_metadata_map_keys_list) {
kv = mk_list_entry(head, struct flb_loki_kv, _head);

/* unlink and destroy */
mk_list_del(&kv->_head);
flb_loki_kv_destroy(kv);
Expand Down Expand Up @@ -416,6 +423,94 @@ static void pack_kv(struct flb_loki *ctx,
}
}

/*
* Similar to pack_kv above, except will only use msgpack_objects of type
* MSGPACK_OBJECT_MAP, and will iterate over the keys adding each entry as a
* separate item. Non-string map values are serialised to JSON, as Loki requires
* all values to be strings.
*/
static void pack_maps(struct flb_loki *ctx,
msgpack_packer *mp_pck,
char *tag, int tag_len,
msgpack_object *map,
struct flb_mp_map_header *mh,
struct mk_list *list)
{
struct mk_list *head;
struct flb_loki_kv *kv;

msgpack_object *start_key;
msgpack_object *out_key;
msgpack_object *out_val;

msgpack_object_map accessed_map;
uint32_t accessed_map_index;
msgpack_object_kv accessed_map_kv;

char *accessed_map_val_json;

mk_list_foreach(head, list) {
/* get the flb_loki_kv for this iteration of the loop */
kv = mk_list_entry(head, struct flb_loki_kv, _head);

/* record accessor key/value pair */
if (kv->ra_key != NULL && kv->ra_val == NULL) {

/* try to get the value for the record accessor */
if (flb_ra_get_kv_pair(kv->ra_key, *map, &start_key, &out_key, &out_val)
== MSGPACK_UNPACK_CONTINUE) {

/*
* we require the value to be a map, or it doesn't make sense as
* this is adding a map's key / values
*/
if (out_val->type != MSGPACK_OBJECT_MAP || out_val->via.map.size <= 0) {
flb_plg_debug(ctx->ins, "No valid map data found for key %s",
kv->ra_key->pattern);
}
else {
accessed_map = out_val->via.map;

/* for each entry in the accessed map... */
for (accessed_map_index = 0; accessed_map_index < accessed_map.size;
accessed_map_index++) {

/* get the entry */
accessed_map_kv = accessed_map.ptr[accessed_map_index];

/* Pack the key and value */
flb_mp_map_header_append(mh);

pack_label_key(mp_pck, (char*) accessed_map_kv.key.via.str.ptr,
accessed_map_kv.key.via.str.size);
/*
* Does this need optimising? For example, to handle
* bool as non-JSON?
*/
if (accessed_map_kv.val.type == MSGPACK_OBJECT_STR) {
msgpack_pack_str_with_body(mp_pck,
accessed_map_kv.val.via.str.ptr,
accessed_map_kv.val.via.str.size);
}
/*
* convert value to JSON, as Loki expects a string value
*/
else {
accessed_map_val_json = flb_msgpack_to_json_str(1024,
&accessed_map_kv.val);
if (accessed_map_val_json) {
msgpack_pack_str_with_body(mp_pck, accessed_map_val_json,
strlen(accessed_map_val_json));
flb_free(accessed_map_val_json);
}
}
}
}
}
}
}
}

static flb_sds_t pack_structured_metadata(struct flb_loki *ctx,
msgpack_packer *mp_pck,
char *tag, int tag_len,
Expand All @@ -424,7 +519,17 @@ static flb_sds_t pack_structured_metadata(struct flb_loki *ctx,
struct flb_mp_map_header mh;
/* Initialize dynamic map header */
flb_mp_map_header_init(&mh, mp_pck);
pack_kv(ctx, mp_pck, tag, tag_len, map, &mh, &ctx->structured_metadata_list);
if (ctx->structured_metadata_map_keys) {
pack_maps(ctx, mp_pck, tag, tag_len, map, &mh,
&ctx->structured_metadata_map_keys_list);
}
/*
* explicit structured_metadata entries override
* structured_metadata_map_keys entries
* */
if (ctx->structured_metadata) {
pack_kv(ctx, mp_pck, tag, tag_len, map, &mh, &ctx->structured_metadata_list);
}
flb_mp_map_header_end(&mh);
return 0;
}
Expand Down Expand Up @@ -788,6 +893,7 @@ static int parse_labels(struct flb_loki *ctx)

flb_loki_kv_init(&ctx->labels_list);
flb_loki_kv_init(&ctx->structured_metadata_list);
flb_loki_kv_init(&ctx->structured_metadata_map_keys_list);

if (ctx->structured_metadata) {
ret = parse_kv(ctx, ctx->structured_metadata, &ctx->structured_metadata_list, &ra_used);
Expand All @@ -796,6 +902,28 @@ static int parse_labels(struct flb_loki *ctx)
}
}

/* Append structured metadata map keys set in the configuration */
if (ctx->structured_metadata_map_keys) {
mk_list_foreach(head, ctx->structured_metadata_map_keys) {
entry = mk_list_entry(head, struct flb_slist_entry, _head);
if (entry->str[0] != '$') {
flb_plg_error(ctx->ins,
"invalid structured metadata map key, the name must start "
"with '$'");
return -1;
}

ret = flb_loki_kv_append(ctx, &ctx->structured_metadata_map_keys_list,
entry->str, NULL);
if (ret == -1) {
return -1;
}
else if (ret > 0) {
ra_used++;
}
}
}

if (ctx->labels) {
ret = parse_kv(ctx, ctx->labels, &ctx->labels_list, &ra_used);
if (ret == -1) {
Expand Down Expand Up @@ -971,6 +1099,7 @@ static struct flb_loki *loki_config_create(struct flb_output_instance *ins,
ctx->ins = ins;
flb_loki_kv_init(&ctx->labels_list);
flb_loki_kv_init(&ctx->structured_metadata_list);
flb_loki_kv_init(&ctx->structured_metadata_map_keys_list);

/* Register context with plugin instance */
flb_output_set_context(ins, ctx);
Expand Down Expand Up @@ -1539,12 +1668,13 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx,
while ((ret = flb_log_event_decoder_next(
&log_decoder,
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {
msgpack_pack_array(&mp_pck, ctx->structured_metadata ? 3 : 2);
msgpack_pack_array(&mp_pck, ctx->structured_metadata ||
ctx->structured_metadata_map_keys ? 3 : 2);

/* Append the timestamp */
pack_timestamp(&mp_pck, &log_event.timestamp);
pack_record(ctx, &mp_pck, log_event.body, dynamic_tenant_id);
if (ctx->structured_metadata) {
if (ctx->structured_metadata || ctx->structured_metadata_map_keys) {
pack_structured_metadata(ctx, &mp_pck, tag, tag_len, NULL);
}
}
Expand Down Expand Up @@ -1575,12 +1705,13 @@ static flb_sds_t loki_compose_payload(struct flb_loki *ctx,
msgpack_pack_str_body(&mp_pck, "values", 6);
msgpack_pack_array(&mp_pck, 1);

msgpack_pack_array(&mp_pck, ctx->structured_metadata ? 3 : 2);
msgpack_pack_array(&mp_pck, ctx->structured_metadata ||
ctx->structured_metadata_map_keys ? 3 : 2);

/* Append the timestamp */
pack_timestamp(&mp_pck, &log_event.timestamp);
pack_record(ctx, &mp_pck, log_event.body, dynamic_tenant_id);
if (ctx->structured_metadata) {
if (ctx->structured_metadata || ctx->structured_metadata_map_keys) {
pack_structured_metadata(ctx, &mp_pck, tag, tag_len, log_event.body);
}
}
Expand Down Expand Up @@ -1905,6 +2036,13 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_loki, structured_metadata),
"optional structured metadata fields for API requests."
},

{
FLB_CONFIG_MAP_CLIST, "structured_metadata_map_keys", NULL,
0, FLB_TRUE, offsetof(struct flb_loki, structured_metadata_map_keys),
"optional structured metadata fields, as derived dynamically from configured maps "
"keys, for API requests."
},

{
FLB_CONFIG_MAP_BOOL, "auto_kubernetes_labels", "false",
Expand Down
Loading

0 comments on commit 560e444

Please sign in to comment.