From f26c63b7fe2907524e6cf578a1d06a245c5f1799 Mon Sep 17 00:00:00 2001 From: am-kinetica <85610855+am-kinetica@users.noreply.github.com> Date: Tue, 9 Jan 2024 22:40:51 +0530 Subject: [PATCH] Kinetica exporter/pr 2 metrics (#27239) **Description:** Added metrics handling **Link to tracking Issue:** https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/24294 **Testing:** No new tests **Documentation:** No new documentation --- .chloggen/kinetica-exporter_PR-2-metrics.yaml | 27 + exporter/kineticaexporter/common.go | 750 ++++++++ exporter/kineticaexporter/config.go | 1 - .../kineticaexporter/exporter_metric_test.go | 524 ++++++ exporter/kineticaexporter/go.mod | 20 +- exporter/kineticaexporter/go.sum | 41 +- exporter/kineticaexporter/metrics_exporter.go | 1603 ++++++++++++++++- exporter/kineticaexporter/writer.go | 867 +++++++++ 8 files changed, 3819 insertions(+), 14 deletions(-) create mode 100755 .chloggen/kinetica-exporter_PR-2-metrics.yaml create mode 100644 exporter/kineticaexporter/common.go create mode 100644 exporter/kineticaexporter/exporter_metric_test.go create mode 100644 exporter/kineticaexporter/writer.go diff --git a/.chloggen/kinetica-exporter_PR-2-metrics.yaml b/.chloggen/kinetica-exporter_PR-2-metrics.yaml new file mode 100755 index 000000000000..de5636784dbc --- /dev/null +++ b/.chloggen/kinetica-exporter_PR-2-metrics.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: kineticaexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "added metrics handling" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27239] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/kineticaexporter/common.go b/exporter/kineticaexporter/common.go new file mode 100644 index 000000000000..902ff230e67c --- /dev/null +++ b/exporter/kineticaexporter/common.go @@ -0,0 +1,750 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package kineticaexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kineticaexporter" + +import ( + "encoding/json" + "fmt" + + "go.opentelemetry.io/collector/pdata/pcommon" +) + +const ( + MeasurementSpans = "spans" + MeasurementSpanLinks = "span-links" + MeasurementLogs = "logs" + + // These attribute key names are influenced by the proto message keys. + AttributeTime = "time" + AttributeStartTimeUnixNano = "start_time_unix_nano" + AttributeTraceID = "trace_id" + AttributeSpanID = "span_id" + AttributeTraceState = "trace_state" + AttributeParentSpanID = "parent_span_id" + AttributeParentServiceName = "parent_service_name" + AttributeChildServiceName = "child_service_name" + AttributeCallCount = "call_count" + AttributeSpansQueueDepth = "spans_queue_depth" + AttributeSpansDropped = "spans_dropped" + AttributeName = "name" + AttributeSpanKind = "kind" + AttributeEndTimeUnixNano = "end_time_unix_nano" + AttributeDurationNano = "duration_nano" + AttributeDroppedAttributesCount = "dropped_attributes_count" + AttributeDroppedEventsCount = "dropped_events_count" + AttributeDroppedLinksCount = "dropped_links_count" + AttributeAttributes = "attributes" + AttributeLinkedTraceID = "linked_trace_id" + AttributeLinkedSpanID = "linked_span_id" + AttributeSeverityNumber = "severity_number" + AttributeSeverityText = "severity_text" + AttributeBody = "body" + + LogTable = "log" + LogAttributeTable = "log_attribute" + LogResourceAttributeTable = "log_resource_attribute" + LogScopeAttributeTable = "log_scope_attribute" + + TraceSpanTable = "trace_span" + TraceSpanAttributeTable = "trace_span_attribute" + TraceResourceAttributeTable = "trace_resource_attribute" + TraceScopeAttributeTable = "trace_scope_attribute" + TraceEventAttributeTable = "trace_event_attribute" + TraceLinkAttributeTable = "trace_link_attribute" + + GaugeTable = "metric_gauge" + GaugeDatapointTable = "metric_gauge_datapoint" + GaugeDatapointAttributeTable = "metric_gauge_datapoint_attribute" + GaugeDatapointExemplarTable = "metric_gauge_datapoint_exemplar" + GaugeDatapointExemplarAttributeTable = "metric_gauge_datapoint_exemplar_attribute" + GaugeResourceAttributeTable = "metric_gauge_resource_attribute" + GaugeScopeAttributeTable = "metric_gauge_scope_attribute" + + SumTable = "metric_sum" + SumResourceAttributeTable = "metric_sum_resource_attribute" + SumScopeAttributeTable = "metric_sum_scope_attribute" + SumDatapointTable = "metric_sum_datapoint" + SumDatapointAttributeTable = "metric_sum_datapoint_attribute" + SumDatapointExemplarTable = "metric_sum_datapoint_exemplar" + SumDataPointExemplarAttributeTable = "metric_sum_datapoint_exemplar_attribute" + + HistogramTable = "metric_histogram" + HistogramResourceAttributeTable = "metric_histogram_resource_attribute" + HistogramScopeAttributeTable = "metric_histogram_scope_attribute" + HistogramDatapointTable = "metric_histogram_datapoint" + HistogramDatapointAttributeTable = "metric_histogram_datapoint_attribute" + HistogramBucketCountsTable = "metric_histogram_datapoint_bucket_count" + HistogramExplicitBoundsTable = "metric_histogram_datapoint_explicit_bound" + HistogramDatapointExemplarTable = "metric_histogram_datapoint_exemplar" + HistogramDataPointExemplarAttributeTable = "metric_histogram_datapoint_exemplar_attribute" + + ExpHistogramTable = "metric_exp_histogram" + ExpHistogramResourceAttributeTable = "metric_exp_histogram_resource_attribute" + ExpHistogramScopeAttributeTable = "metric_exp_histogram_scope_attribute" + ExpHistogramDatapointTable = "metric_exp_histogram_datapoint" + ExpHistogramDatapointAttributeTable = "metric_exp_histogram_datapoint_attribute" + ExpHistogramPositiveBucketCountsTable = "metric_exp_histogram_datapoint_bucket_positive_count" + ExpHistogramNegativeBucketCountsTable = "metric_exp_histogram_datapoint_bucket_negative_count" + ExpHistogramDatapointExemplarTable = "metric_exp_histogram_datapoint_exemplar" + ExpHistogramDataPointExemplarAttributeTable = "metric_exp_histogram_datapoint_exemplar_attribute" + + SummaryTable = "metric_summary" + SummaryResourceAttributeTable = "metric_summary_resource_attribute" + SummaryScopeAttributeTable = "metric_summary_scope_attribute" + SummaryDatapointTable = "metric_summary_datapoint" + SummaryDatapointAttributeTable = "metric_summary_datapoint_attribute" + SummaryDatapointQuantileValueTable = "metric_summary_datapoint_quantile_values" + + ChunkSize = 10000 +) + +const ( + CreateSchema string = "create schema if not exists %s;" + + HasTable string = "execute endpoint '/has/table' JSON '{\"table_name\":\"%s\"}'" + + // Metrics - DDLs + // Gauge + CreateGauge string = `CREATE TABLE %smetric_gauge + ( + gauge_id UUID (primary_key, shard_key) not null, + metric_name varchar(256) not null, + metric_description varchar (256), + metric_unit varchar (256) + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + CreateGaugeDatapoint string = `CREATE TABLE %smetric_gauge_datapoint + ( + gauge_id UUID (primary_key, shard_key) not null, + id UUID (primary_key) not null, + start_time_unix TIMESTAMP NOT NULL, + time_unix TIMESTAMP NOT NULL, + gauge_value DOUBLE, + flags INT, + FOREIGN KEY (gauge_id) references %smetric_gauge(gauge_id) as fk_gauge_datapoint + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + CreateGaugeDatapointAttribute string = `CREATE TABLE %smetric_gauge_datapoint_attribute + ( + "gauge_id" UUID (primary_key, shard_key) NOT NULL, + datapoint_id uuid (primary_key) not null, + "key" VARCHAR (primary_key, 128, dict) NOT NULL, + "string_value" VARCHAR (256), + "bool_value" BOOLEAN, + "int_value" INTEGER, + "double_value" DOUBLE, + "bytes_value" BLOB (store_only), + FOREIGN KEY (gauge_id) references %smetric_gauge(gauge_id) as fk_gauge_datapoint_attribute + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + CreateGaugeDatapointExemplar string = `CREATE TABLE %smetric_gauge_datapoint_exemplar + ( + "gauge_id" UUID (primary_key, shard_key) NOT NULL, + datapoint_id uuid (primary_key) not null, + exemplar_id UUID (primary_key) not null, + time_unix TIMESTAMP NOT NULL, + gauge_value DOUBLE, + "trace_id" VARCHAR (32), + "span_id" VARCHAR (16), + FOREIGN KEY (gauge_id) references %smetric_gauge(gauge_id) as fk_gauge_datapoint_exemplar + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + CreateGaugeDatapointExemplarAttribute string = `CREATE TABLE %smetric_gauge_datapoint_exemplar_attribute + ( + "gauge_id" UUID (primary_key, shard_key) NOT NULL, + datapoint_id uuid (primary_key) not null, + exemplar_id UUID (primary_key) not null, + "key" VARCHAR (primary_key, 128, dict) NOT NULL, + "string_value" VARCHAR (256), + "bool_value" BOOLEAN, + "int_value" INTEGER, + "double_value" DOUBLE, + "bytes_value" BLOB (store_only), + FOREIGN KEY (gauge_id) references %smetric_gauge(gauge_id) as fk_gauge_datapoint_exemplar_attribute + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + CreateGaugeResourceAttribute string = `CREATE TABLE %smetric_gauge_resource_attribute + ( + "gauge_id" UUID (primary_key) NOT NULL, + "key" VARCHAR (primary_key, 128, dict) NOT NULL, + "string_value" VARCHAR (256), + "bool_value" BOOLEAN, + "int_value" INTEGER, + "double_value" DOUBLE, + "bytes_value" BLOB (store_only), + SHARD KEY (gauge_id), + FOREIGN KEY (gauge_id) references %smetric_gauge(gauge_id) as fk_gauge_resource_attribute + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + CreateGaugeScopeAttribute string = `CREATE TABLE %smetric_gauge_scope_attribute + ( + "gauge_id" UUID (primary_key) NOT NULL, + "name" VARCHAR (256), + "version" VARCHAR (256), + "key" VARCHAR (primary_key, 128, dict) NOT NULL, + "string_value" VARCHAR (256), + "bool_value" BOOLEAN, + "int_value" INTEGER, + "double_value" DOUBLE, + "bytes_value" BLOB (store_only), + SHARD KEY (gauge_id), + FOREIGN KEY (gauge_id) references %smetric_gauge(gauge_id) as fk_gauge_scope_attribute + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + // Sum + + CreateSum string = `CREATE TABLE %smetric_sum + ( + sum_id UUID (primary_key, shard_key) not null, + metric_name varchar (256) not null, + metric_description varchar (256), + metric_unit varchar (256), + aggregation_temporality INTEGER, + is_monotonic BOOLEAN + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + CreateSumDatapoint string = `CREATE TABLE %smetric_sum_datapoint + ( + sum_id UUID (primary_key, shard_key) not null, + id UUID (primary_key) not null, + start_time_unix TIMESTAMP NOT NULL, + time_unix TIMESTAMP NOT NULL, + sum_value DOUBLE, + flags INT, + FOREIGN KEY (sum_id) references %smetric_sum(sum_id) as fk_sum_datapoint + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + CreateSumDatapointAttribute string = `CREATE TABLE %smetric_sum_datapoint_attribute + ( + "sum_id" UUID (primary_key, shard_key) NOT NULL, + datapoint_id uuid (primary_key) not null, + "key" VARCHAR (primary_key, 128, dict) NOT NULL, + "string_value" VARCHAR (256), + "bool_value" BOOLEAN, + "int_value" INTEGER, + "double_value" DOUBLE, + "bytes_value" BLOB (store_only), + FOREIGN KEY (sum_id) references %smetric_sum(sum_id) as fk_sum_datapoint_attribute + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + CreateSumDatapointExemplar string = `CREATE TABLE %smetric_sum_datapoint_exemplar + ( + "sum_id" UUID (primary_key, shard_key) NOT NULL, + datapoint_id uuid (primary_key) not null, + exemplar_id UUID (primary_key) not null, + time_unix TIMESTAMP NOT NULL, + sum_value DOUBLE, + "trace_id" VARCHAR (32), + "span_id" VARCHAR (16), + FOREIGN KEY (sum_id) references %smetric_sum(sum_id) as fk_sum_datapoint_exemplar + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + CreateSumDatapointExemplarAttribute string = `CREATE TABLE %smetric_sum_datapoint_exemplar_attribute + ( + "sum_id" UUID (primary_key, shard_key) NOT NULL, + datapoint_id uuid (primary_key) not null, + exemplar_id UUID (primary_key) not null, + "key" VARCHAR (primary_key, 128, dict) NOT NULL, + "string_value" VARCHAR (256), + "bool_value" BOOLEAN, + "int_value" INTEGER, + "double_value" DOUBLE, + "bytes_value" BLOB (store_only), + FOREIGN KEY (sum_id) references %smetric_sum(sum_id) as fk_sum_datapoint_exemplar_attribute + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + CreateSumResourceAttribute string = `CREATE TABLE %smetric_sum_resource_attribute + ( + "sum_id" UUID (primary_key) NOT NULL, + "key" VARCHAR (primary_key, 128, dict) NOT NULL, + "string_value" VARCHAR (256), + "bool_value" BOOLEAN, + "int_value" INTEGER, + "double_value" DOUBLE, + "bytes_value" BLOB (store_only), + SHARD KEY (sum_id), + FOREIGN KEY (sum_id) references %smetric_sum(sum_id) as fk_sum_resource_attribute + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + CreateSumScopeAttribute string = `CREATE TABLE %smetric_sum_scope_attribute + ( + "sum_id" UUID (primary_key) NOT NULL, + "name" VARCHAR (256), + "version" VARCHAR (256), + "key" VARCHAR (primary_key, 128, dict) NOT NULL, + "string_value" VARCHAR (256), + "bool_value" BOOLEAN, + "int_value" INTEGER, + "double_value" DOUBLE, + "bytes_value" BLOB (store_only), + SHARD KEY (sum_id), + FOREIGN KEY (sum_id) references %smetric_sum(sum_id) as fk_sum_scope_attribute + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + + // Histogram + + CreateHistogram string = `CREATE TABLE %smetric_histogram + ( + histogram_id UUID (primary_key, shard_key) not null, + metric_name varchar (256) not null, + metric_description varchar (256), + metric_unit varchar (256), + aggregation_temporality int8 + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + + CreateHistogramDatapoint string = `CREATE TABLE %smetric_histogram_datapoint + ( + histogram_id UUID (primary_key, shard_key) not null, + id UUID (primary_key) not null, + start_time_unix TIMESTAMP, + time_unix TIMESTAMP NOT NULL, + count LONG, + data_sum DOUBLE, + data_min DOUBLE, + data_max DOUBLE, + flags INT, + FOREIGN KEY (histogram_id) references %smetric_histogram(histogram_id) as fk_histogram_datapoint + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + CreateHistogramDatapointBucketCount string = `CREATE TABLE %smetric_histogram_datapoint_bucket_count + ( + histogram_id UUID (primary_key, shard_key) not null, + datapoint_id UUID (primary_key) not null, + count_id UUID (primary_key) not null, + count LONG, + FOREIGN KEY (histogram_id) references %smetric_histogram(histogram_id) as fk_histogram_datapoint_bucket_count + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + CreateHistogramDatapointExplicitBound string = `CREATE TABLE %smetric_histogram_datapoint_explicit_bound + ( + histogram_id UUID (primary_key, shard_key) not null, + datapoint_id UUID (primary_key) not null, + bound_id UUID (primary_key) not null, + explicit_bound DOUBLE, + FOREIGN KEY (histogram_id) references %smetric_histogram(histogram_id) as fk_histogram_datapoint_explicit_bound + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + CreateHistogramDatapointAttribute string = `CREATE TABLE %smetric_histogram_datapoint_attribute + ( + "histogram_id" UUID (primary_key, shard_key) NOT NULL, + datapoint_id uuid (primary_key) not null, + "key" VARCHAR (primary_key, 128, dict) NOT NULL, + "string_value" VARCHAR (256), + "bool_value" BOOLEAN, + "int_value" INTEGER, + "double_value" DOUBLE, + "bytes_value" BLOB (store_only), + FOREIGN KEY (histogram_id) references %smetric_histogram(histogram_id) as fk_histogram_datapoint_attribute + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + CreateHistogramDatapointExemplar string = `CREATE TABLE %smetric_histogram_datapoint_exemplar + ( + "histogram_id" UUID (primary_key, shard_key) NOT NULL, + datapoint_id uuid (primary_key) not null, + exemplar_id UUID (primary_key) not null, + time_unix TIMESTAMP NOT NULL, + histogram_value DOUBLE, + "trace_id" VARCHAR (32), + "span_id" VARCHAR (16), + FOREIGN KEY (histogram_id) references %smetric_histogram(histogram_id) as fk_histogram_datapoint_exemplar + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + CreateHistogramDatapointExemplarAttribute string = `CREATE TABLE %smetric_histogram_datapoint_exemplar_attribute + ( + "histogram_id" UUID (primary_key, shard_key) NOT NULL, + datapoint_id uuid (primary_key) not null, + exemplar_id UUID (primary_key) not null, + "key" VARCHAR (primary_key, 128, dict) NOT NULL, + "string_value" VARCHAR (256), + "bool_value" BOOLEAN, + "int_value" INTEGER, + "double_value" DOUBLE, + "bytes_value" BLOB (store_only), + FOREIGN KEY (histogram_id) references %smetric_histogram(histogram_id) as fk_histogram_datapoint_exemplar_attribute + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + CreateHistogramResourceAttribute string = `CREATE TABLE %smetric_histogram_resource_attribute + ( + "histogram_id" UUID (primary_key) NOT NULL, + "key" VARCHAR (primary_key, 128, dict) NOT NULL, + "string_value" VARCHAR (256), + "bool_value" BOOLEAN, + "int_value" INTEGER, + "double_value" DOUBLE, + "bytes_value" BLOB (store_only), + SHARD KEY (histogram_id), + FOREIGN KEY (histogram_id) references %smetric_histogram(histogram_id) as fk_histogram_resource_attribute + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + CreateHistogramScopeAttribute string = `CREATE TABLE %smetric_histogram_scope_attribute + ( + "histogram_id" UUID (primary_key) NOT NULL, + "name" VARCHAR (256), + "version" VARCHAR (256), + "key" VARCHAR (primary_key, 128, dict) NOT NULL, + "string_value" VARCHAR (256), + "bool_value" BOOLEAN, + "int_value" INTEGER, + "double_value" DOUBLE, + "bytes_value" BLOB (store_only), + SHARD KEY (histogram_id), + FOREIGN KEY (histogram_id) references %smetric_histogram(histogram_id) as fk_histogram_scope_attribute + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + // exponential Histogram + CreateExpHistogram string = `CREATE TABLE %smetric_exp_histogram + ( + histogram_id UUID (primary_key, shard_key) not null, + metric_name varchar (256) not null, + metric_description varchar (256), + metric_unit varchar (256), + aggregation_temporality int8 + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + CreateExpHistogramDatapoint string = `CREATE TABLE %smetric_exp_histogram_datapoint + ( + histogram_id UUID (primary_key, shard_key) not null, + id UUID (primary_key) not null, + start_time_unix TIMESTAMP, + time_unix TIMESTAMP NOT NULL, + count LONG, + data_sum DOUBLE, + scale INTEGER, + zero_count LONG, + buckets_positive_offset INTEGER, + buckets_negative_offset INTEGER, + data_min DOUBLE, + data_max DOUBLE, + flags INT, + FOREIGN KEY (histogram_id) references %smetric_exp_histogram(histogram_id) as fk_exp_histogram_datapoint + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + CreateExpHistogramDatapointBucketPositiveCount string = `CREATE TABLE %smetric_exp_histogram_datapoint_bucket_positive_count + ( + histogram_id UUID (primary_key, shard_key) not null, + datapoint_id UUID (primary_key) not null, + count_id UUID (primary_key) not null, + count LONG, + FOREIGN KEY (histogram_id) references %smetric_exp_histogram(histogram_id) as fk_exp_histogram_datapoint_bucket_count + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + CreateExpHistogramDatapointBucketNegativeCount string = `CREATE TABLE %smetric_exp_histogram_datapoint_bucket_negative_count + ( + histogram_id UUID (primary_key, shard_key) not null, + datapoint_id UUID (primary_key) not null, + count_id UUID (primary_key) not null, + count LONG, + FOREIGN KEY (histogram_id) references %smetric_exp_histogram(histogram_id) as fk_exp_histogram_datapoint_bucket_count + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + CreateExpHistogramDatapointAttribute string = `CREATE TABLE %smetric_exp_histogram_datapoint_attribute + ( + "histogram_id" UUID (primary_key, shard_key) NOT NULL, + datapoint_id uuid (primary_key) not null, + "key" VARCHAR (primary_key, 128, dict) NOT NULL, + "string_value" VARCHAR (256), + "bool_value" BOOLEAN, + "int_value" INTEGER, + "double_value" DOUBLE, + "bytes_value" BLOB (store_only), + FOREIGN KEY (histogram_id) references %smetric_exp_histogram(histogram_id) as fk_exp_histogram_datapoint_attribute + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + CreateExpHistogramDatapointExemplar string = `CREATE TABLE %smetric_exp_histogram_datapoint_exemplar + ( + "histogram_id" UUID (primary_key, shard_key) NOT NULL, + datapoint_id uuid (primary_key) not null, + exemplar_id UUID (primary_key) not null, + time_unix TIMESTAMP NOT NULL, + sum_value DOUBLE, + "trace_id" VARCHAR (32), + "span_id" VARCHAR (16), + FOREIGN KEY (histogram_id) references %smetric_exp_histogram(histogram_id) as fk_exp_histogram_datapoint_exemplar + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + CreateExpHistogramDatapointExemplarAttribute string = `CREATE TABLE %smetric_exp_histogram_datapoint_exemplar_attribute + ( + "histogram_id" UUID (primary_key, shard_key) NOT NULL, + datapoint_id uuid (primary_key) not null, + exemplar_id UUID (primary_key) not null, + "key" VARCHAR (primary_key, 128, dict) NOT NULL, + "string_value" VARCHAR (256), + "bool_value" BOOLEAN, + "int_value" INTEGER, + "double_value" DOUBLE, + "bytes_value" BLOB (store_only), + FOREIGN KEY (histogram_id) references %smetric_exp_histogram(histogram_id) as fk_exp_histogram_datapoint_exemplar_attribute + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + CreateExpHistogramResourceAttribute string = `CREATE TABLE %smetric_exp_histogram_resource_attribute + ( + "histogram_id" UUID (primary_key) NOT NULL, + "key" VARCHAR (primary_key, 128, dict) NOT NULL, + "string_value" VARCHAR (256), + "bool_value" BOOLEAN, + "int_value" INTEGER, + "double_value" DOUBLE, + "bytes_value" BLOB (store_only), + SHARD KEY (histogram_id), + FOREIGN KEY (histogram_id) references %smetric_histogram(histogram_id) as fk_histogram_resource_attribute + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + CreateExpHistogramScopeAttribute string = `CREATE TABLE %smetric_exp_histogram_scope_attribute + ( + "histogram_id" UUID (primary_key) NOT NULL, + "name" VARCHAR (256), + "version" VARCHAR (256), + "key" VARCHAR (primary_key, 128, dict) NOT NULL, + "string_value" VARCHAR (256), + "bool_value" BOOLEAN, + "int_value" INTEGER, + "double_value" DOUBLE, + "bytes_value" BLOB (store_only), + SHARD KEY (histogram_id), + FOREIGN KEY (histogram_id) references %smetric_histogram(histogram_id) as fk_histogram_scope_attribute + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + // Summary + CreateSummary string = `CREATE TABLE %smetric_summary + ( + summary_id UUID (primary_key, shard_key) not null, + metric_name varchar (256) not null, + metric_description varchar (256), + metric_unit varchar (256) + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + CreateSummaryDatapoint string = `CREATE TABLE %smetric_summary_datapoint + ( + summary_id UUID (primary_key, shard_key) not null, + id UUID (primary_key) not null, + start_time_unix TIMESTAMP, + time_unix TIMESTAMP NOT NULL, + count LONG, + data_sum DOUBLE, + flags INT, + FOREIGN KEY (summary_id) references %smetric_summary(summary_id) as fk_summary_datapoint + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + CreateSummaryDatapointAttribute string = `CREATE TABLE %smetric_summary_datapoint_attribute + ( + "summary_id" UUID (primary_key, shard_key) NOT NULL, + datapoint_id uuid (primary_key) not null, + "key" VARCHAR (primary_key, 128, dict) NOT NULL, + "string_value" VARCHAR (256), + "bool_value" BOOLEAN, + "int_value" INTEGER, + "double_value" DOUBLE, + "bytes_value" BLOB (store_only), + FOREIGN KEY (summary_id) references %smetric_summary(summary_id) as fk_summary_datapoint_attribute + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + CreateSummaryDatapointQuantileValues string = `CREATE TABLE %smetric_summary_datapoint_quantile_values + ( + summary_id UUID (primary_key, shard_key) not null, + datapoint_id UUID (primary_key) not null, + quantile_id UUID (primary_key) not null, + quantile DOUBLE, + value DOUBLE, + FOREIGN KEY (summary_id) references %smetric_summary(summary_id) as fk_summary_datapoint_quantile + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + CreateSummaryResourceAttribute string = `CREATE TABLE %smetric_summary_resource_attribute + ( + "summary_id" UUID (primary_key) NOT NULL, + "key" VARCHAR (primary_key, 128, dict) NOT NULL, + "string_value" VARCHAR (256), + "bool_value" BOOLEAN, + "int_value" INTEGER, + "double_value" DOUBLE, + "bytes_value" BLOB (store_only), + SHARD KEY (summary_id), + FOREIGN KEY (summary_id) references %smetric_summary(summary_id) as fk_summary_resource_attribute + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` + CreateSummaryScopeAttribute string = `CREATE TABLE %smetric_summary_scope_attribute + ( + "summary_id" UUID (primary_key) NOT NULL, + "name" VARCHAR (256), + "version" VARCHAR (256), + "key" VARCHAR (primary_key, 128, dict) NOT NULL, + "string_value" VARCHAR (256), + "bool_value" BOOLEAN, + "int_value" INTEGER, + "double_value" DOUBLE, + "bytes_value" BLOB (store_only), + SHARD KEY (summary_id), + FOREIGN KEY (summary_id) references %smetric_summary(summary_id) as fk_summary_scope_attribute + ) USING TABLE PROPERTIES (NO_ERROR_IF_EXISTS = TRUE); + ` +) + +// ValueTypePair - struct to wrap a value as [any] and its type [pcommon.ValueType] +type ValueTypePair struct { + value any + valueType pcommon.ValueType +} + +// attributeValueToKineticaFieldValue - Convert an attribute value to a [ValueTypePair] for writing to Kinetica +// +// @param value +// @return ValueTypePair +// @return error +func attributeValueToKineticaFieldValue(value pcommon.Value) (ValueTypePair, error) { + switch value.Type() { + case pcommon.ValueTypeStr: + var val string + if len(value.Str()) > 256 { + val = value.Str()[0:255] + } else { + val = value.Str() + } + return ValueTypePair{val, pcommon.ValueTypeStr}, nil + case pcommon.ValueTypeInt: + return ValueTypePair{value.Int(), pcommon.ValueTypeInt}, nil + case pcommon.ValueTypeDouble: + return ValueTypePair{value.Double(), pcommon.ValueTypeDouble}, nil + case pcommon.ValueTypeBool: + return ValueTypePair{value.Bool(), pcommon.ValueTypeBool}, nil + case pcommon.ValueTypeMap: + jsonBytes, err := json.Marshal(otlpKeyValueListToMap(value.Map())) + if err != nil { + return ValueTypePair{nil, pcommon.ValueTypeEmpty}, err + } + return ValueTypePair{string(jsonBytes), pcommon.ValueTypeStr}, nil + case pcommon.ValueTypeSlice: + jsonBytes, err := json.Marshal(otlpArrayToSlice(value.Slice())) + if err != nil { + return ValueTypePair{nil, pcommon.ValueTypeEmpty}, err + } + return ValueTypePair{string(jsonBytes), pcommon.ValueTypeStr}, nil + case pcommon.ValueTypeEmpty: + return ValueTypePair{nil, pcommon.ValueTypeEmpty}, nil + default: + return ValueTypePair{nil, pcommon.ValueTypeEmpty}, fmt.Errorf("Unknown value type %v", value) + } +} + +// otlpKeyValueListToMap - Convert an otlp Map to a map[string]interface{} with proper type conversions +// +// @param kvList +// @return map +func otlpKeyValueListToMap(kvList pcommon.Map) map[string]any { + m := make(map[string]any, kvList.Len()) + kvList.Range(func(k string, v pcommon.Value) bool { + switch v.Type() { + case pcommon.ValueTypeStr: + m[k] = v.Str() + case pcommon.ValueTypeInt: + m[k] = v.Int() + case pcommon.ValueTypeDouble: + m[k] = v.Double() + case pcommon.ValueTypeBool: + m[k] = v.Bool() + case pcommon.ValueTypeMap: + m[k] = otlpKeyValueListToMap(v.Map()) + case pcommon.ValueTypeSlice: + m[k] = otlpArrayToSlice(v.Slice()) + case pcommon.ValueTypeEmpty: + m[k] = nil + default: + m[k] = fmt.Sprintf(" %v", v) + } + return true + }) + return m +} + +// otlpArrayToSlice - Convert an otlp slice to a slice of interface{} with proper type conversions +// +// @param arr +// @return []interface{} +func otlpArrayToSlice(arr pcommon.Slice) []any { + s := make([]any, 0, arr.Len()) + for i := 0; i < arr.Len(); i++ { + v := arr.At(i) + switch v.Type() { + case pcommon.ValueTypeStr: + s = append(s, v.Str()) + case pcommon.ValueTypeInt: + s = append(s, v.Int()) + case pcommon.ValueTypeDouble: + s = append(s, v.Double()) + case pcommon.ValueTypeBool: + s = append(s, v.Bool()) + case pcommon.ValueTypeEmpty: + s = append(s, nil) + default: + s = append(s, fmt.Sprintf(" %v", v)) + } + } + return s +} + +// getAttributeValue +// +// @param vtPair +// @return *AttributeValue +// @return error +func getAttributeValue(vtPair ValueTypePair) (*AttributeValue, error) { + var av *AttributeValue + var err error + switch vtPair.valueType { + case pcommon.ValueTypeStr: + value := vtPair.value.(string) + av = new(AttributeValue) + av.StringValue = value + case pcommon.ValueTypeInt: + value := vtPair.value.(int) + av = new(AttributeValue) + av.IntValue = value + case pcommon.ValueTypeDouble: + value := vtPair.value.(float64) + av = new(AttributeValue) + av.DoubleValue = value + case pcommon.ValueTypeBool: + value := vtPair.value.(int8) + av = new(AttributeValue) + av.BoolValue = value + case pcommon.ValueTypeBytes: + value := vtPair.value.([]byte) + av = new(AttributeValue) + copy(av.BytesValue, value) + case pcommon.ValueTypeMap: + // value := vtPair.value + // av = new(AttributeValue) + // av.SetStringValue(value) + err = fmt.Errorf("Unhandled value type %v", vtPair.valueType) + + case pcommon.ValueTypeSlice: + // value := vtPair.value.(string) + // av = new(AttributeValue) + // av.SetStringValue(value) + err = fmt.Errorf("Unhandled value type %v", vtPair.valueType) + + default: + err = fmt.Errorf("Unknown value type %v", vtPair.valueType) + } + + if err != nil { + return nil, err + } + + return av, nil + +} + +// chunkBySize - Splits a slice into multiple slices of the given size +// +// @param items +// @param chunkSize +// @return [][]T +func chunkBySize[T any](items []T, chunkSize int) [][]T { + var _chunks = make([][]T, 0, (len(items)/chunkSize)+1) + for chunkSize < len(items) { + items, _chunks = items[chunkSize:], append(_chunks, items[0:chunkSize:chunkSize]) + } + return append(_chunks, items) +} diff --git a/exporter/kineticaexporter/config.go b/exporter/kineticaexporter/config.go index 701e40077585..a90b7baeaad8 100644 --- a/exporter/kineticaexporter/config.go +++ b/exporter/kineticaexporter/config.go @@ -18,7 +18,6 @@ type Config struct { Username string `mapstructure:"username"` Password configopaque.String `mapstructure:"password"` BypassSslCertCheck bool `mapstructure:"bypasssslcertcheck"` - LogConfigFile string `mapstructure:"logconfigfile"` } // Validate the config diff --git a/exporter/kineticaexporter/exporter_metric_test.go b/exporter/kineticaexporter/exporter_metric_test.go new file mode 100644 index 000000000000..49677fa1b383 --- /dev/null +++ b/exporter/kineticaexporter/exporter_metric_test.go @@ -0,0 +1,524 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package kineticaexporter + +import ( + "context" + "fmt" + "io" + "net/http" + "net/http/httptest" + "os" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/zap/zaptest" +) + +var testServer *httptest.Server +var baseURL string + +var showTableMetricSummaryResponse = "\x04OK\x00&show_table_response\xfe\x13&otel.metric_summary\x02&otel.metric_summary\x00\x02\x00\x00\x02(14940230562727554683\x00\x02\xc6\x03{\"type\":\"record\",\"name\":\"type_name\",\"fields\":[{\"name\":\"summary_id\",\"type\":\"string\"},{\"name\":\"metric_name\",\"type\":\"string\"},{\"name\":\"metric_description\",\"type\":[\"string\",\"null\"]},{\"name\":\"metric_unit\",\"type\":[\"string\",\"null\"]}]}\x00\x02\x00\x00\x02\x08$metric_description\x06\x08data\x0echar256\x10nullable\x00\x16metric_name\x04\x08data\x0echar256\x00\x16metric_unit\x06\x08data\x0echar256\x10nullable\x00\x14summary_id\x08\x08data\x16primary_key\x12shard_key\x08uuid\x00\x00\x00\x028\"attribute_indexes\x00 collection_names\x08otel$compressed_columns\x000datasource_subscriptions\x00\x18foreign_keys\x00\"foreign_shard_key\x00$global_access_mode\x14read_write,is_automatic_partition\nfalse\x10is_dirty\x00\"is_view_persisted\x00\"last_refresh_time\x00(owner_resource_group ", string(finalResponseBytes)) + + _, err = w.Write(finalResponseBytes) + if err != nil { + http.Error(w, "Error wrting reesponse", http.StatusInternalServerError) + return + } + +} + +func getShowTableResponse(requestBody string) []byte { + switch { + case strings.Contains(requestBody, "metric_summary_scope_attribute"): + return []byte(showTableMetricSummaryScopeAttributesResponse) + case strings.Contains(requestBody, "metric_summary_resource_attribute"): + return []byte(showTableMetricSummaryResourceAttributesResponse) + case strings.Contains(requestBody, "metric_summary_datapoint_quantile_values"): + return []byte(showTableMetricSummaryDatapointQuantileValuesResponse) + case strings.Contains(requestBody, "metric_summary_datapoint_attribute"): + return []byte(showTableMetricSummaryDatapointAttributeResponse) + case strings.Contains(requestBody, "metric_summary_datapoint"): + return []byte(showTableMetricSummaryDatapointResponse) + case strings.Contains(requestBody, "metric_summary"): + return []byte(showTableMetricSummaryResponse) + default: + return []byte("") + } + +} + +// Setup function (runs before tests start) +func TestMain(m *testing.M) { + // Create a test server with a simple handler function + testServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Println("In main ...") + switch r.URL.Path { + case "/insert/records": + handleInsertRecords(w, r) + case "/show/table": + handleShowTable(w, r) + case "/execute/sql": + handleExecuteSQL(w, r) + default: + http.Error(w, "Not found", http.StatusNotFound) + } + })) + + baseURL = testServer.URL // Store the base URL for test requests + + // Run the tests + code := m.Run() + + // Teardown: Close the test server after all tests are finished + testServer.Close() + // Perform any other teardown operations here if needed + + // Exit with the test status code + // This allows TestMain to report the result of the tests + // You can also perform further actions based on the test results + os.Exit(code) +} diff --git a/exporter/kineticaexporter/go.mod b/exporter/kineticaexporter/go.mod index 6187ee128056..0ce7da635c1c 100644 --- a/exporter/kineticaexporter/go.mod +++ b/exporter/kineticaexporter/go.mod @@ -3,7 +3,11 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kineti go 1.20 require ( + github.com/google/uuid v1.5.0 + github.com/kineticadb/kinetica-api-go v0.0.3 + github.com/samber/lo v1.38.1 github.com/stretchr/testify v1.8.4 + github.com/wk8/go-ordered-map v1.0.0 go.opentelemetry.io/collector/component v0.91.0 go.opentelemetry.io/collector/config/configopaque v0.91.0 go.opentelemetry.io/collector/confmap v0.91.0 @@ -11,25 +15,35 @@ require ( go.opentelemetry.io/collector/pdata v1.0.0 go.opentelemetry.io/otel/metric v1.21.0 go.opentelemetry.io/otel/trace v1.21.0 + go.uber.org/multierr v1.11.0 go.uber.org/zap v1.26.0 + gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect ) require ( github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/go-logr/logr v1.3.0 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-resty/resty/v2 v2.7.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.3 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/hamba/avro/v2 v2.13.0 // indirect github.com/hashicorp/go-version v1.6.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/knadh/koanf/providers/confmap v0.1.0 // indirect github.com/knadh/koanf/v2 v2.0.1 // indirect + github.com/logrusorgru/aurora v2.0.3+incompatible // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/ztrue/tracerr v0.3.0 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/collector v0.91.0 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.91.0 // indirect @@ -38,11 +52,11 @@ require ( go.opentelemetry.io/collector/featuregate v1.0.0 // indirect go.opentelemetry.io/collector/receiver v0.91.0 // indirect go.opentelemetry.io/otel v1.21.0 // indirect - go.uber.org/multierr v1.11.0 // indirect - golang.org/x/net v0.18.0 // indirect + golang.org/x/exp v0.0.0-20231214170342-aacd6d4b4611 // indirect + golang.org/x/net v0.19.0 // indirect golang.org/x/sys v0.15.0 // indirect golang.org/x/text v0.14.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20230911183012-2d3300fd4832 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20231212172506-995d672761c0 // indirect google.golang.org/grpc v1.59.0 // indirect google.golang.org/protobuf v1.31.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/exporter/kineticaexporter/go.sum b/exporter/kineticaexporter/go.sum index 3a87bce0d1c8..179e0d2e6895 100644 --- a/exporter/kineticaexporter/go.sum +++ b/exporter/kineticaexporter/go.sum @@ -18,8 +18,13 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU= github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= +github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-resty/resty/v2 v2.7.0 h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY= +github.com/go-resty/resty/v2 v2.7.0/go.mod h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= @@ -38,6 +43,8 @@ github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -48,10 +55,16 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= +github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hamba/avro/v2 v2.13.0 h1:QY2uX2yvJTW0OoMKelGShvq4v1hqab6CxJrPwh0fnj0= +github.com/hamba/avro/v2 v2.13.0/go.mod h1:Q9YK+qxAhtVrNqOhwlZTATLgLA8qxG2vtvkhK8fJ7Jo= github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek= github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/kineticadb/kinetica-api-go v0.0.3 h1:DXBe/VYr5WGjQNZG+5ETBVJiFq7bMKpgB1/U5c7NcW0= +github.com/kineticadb/kinetica-api-go v0.0.3/go.mod h1:mbazf5lM5ApXXF3eRelAY76BhFWpVyuXhWv3vmTJixA= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs= @@ -62,6 +75,9 @@ github.com/knadh/koanf/v2 v2.0.1 h1:1dYGITt1I23x8cfx8ZnldtezdyaZtfAuRtIFOiRzK7g= github.com/knadh/koanf/v2 v2.0.1/go.mod h1:ZeiIlIDXTE7w1lMT6UVcNiRAS2/rCeLn/GdLNvY1Dus= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/logrusorgru/aurora v0.0.0-20181002194514-a7b3b318ed4e/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4= +github.com/logrusorgru/aurora v2.0.3+incompatible h1:tOpm7WcpBTn4fjmVfgpQq0EfczGlG91VSDkswnjF5A8= +github.com/logrusorgru/aurora v2.0.3+incompatible/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4= github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg= github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= @@ -84,17 +100,24 @@ github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lne github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI= github.com/prometheus/statsd_exporter v0.22.7 h1:7Pji/i2GuhK6Lu7DHrtTkFmNBCudCPT1pX2CziuyQR0= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM= +github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/wk8/go-ordered-map v1.0.0 h1:BV7z+2PaK8LTSd/mWgY12HyMAo5CEgkHqbkVq2thqr8= +github.com/wk8/go-ordered-map v1.0.0/go.mod h1:9ZIbRunKbuvfPKyBP1SIKLcXNlv74YCOZ3t3VTS6gRk= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/ztrue/tracerr v0.3.0 h1:lDi6EgEYhPYPnKcjsYzmWw4EkFEoA/gfe+I9Y5f+h6Y= +github.com/ztrue/tracerr v0.3.0/go.mod h1:qEalzze4VN9O8tnhBXScfCrmoJo10o8TN5ciKjm6Mww= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/collector v0.91.0 h1:C7sGUJDJ5nwm+CkWpAaVP3lNsuYpwSRbkmLncFjkmO8= @@ -137,6 +160,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20231214170342-aacd6d4b4611 h1:qCEDpW1G+vcj3Y7Fy52pEM1AWm3abj8WimGYejI3SC4= +golang.org/x/exp v0.0.0-20231214170342-aacd6d4b4611/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= @@ -151,8 +176,9 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.18.0 h1:mIYleuAkSbHh0tCv7RvjL3F6ZVbLjq4+R7zbOn3Kokg= -golang.org/x/net v0.18.0/go.mod h1:/czyP5RqHAH4odGYxBJ1qz0+CE5WZ+2j1YgoEo8F2jQ= +golang.org/x/net v0.0.0-20211029224645-99673261e6eb/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= +golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -163,10 +189,14 @@ golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -186,8 +216,8 @@ google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7 google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= -google.golang.org/genproto/googleapis/rpc v0.0.0-20230911183012-2d3300fd4832 h1:o4LtQxebKIJ4vkzyhtD2rfUNZ20Zf0ik5YVP5E7G7VE= -google.golang.org/genproto/googleapis/rpc v0.0.0-20230911183012-2d3300fd4832/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231212172506-995d672761c0 h1:/jFB8jK5R3Sq3i/lmeZO0cATSzFfZaJq1J2Euan3XKU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231212172506-995d672761c0/go.mod h1:FUoWkonphQm3RhTS+kOEhF8h0iDpm4tdXolVCeZ9KKA= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= @@ -210,7 +240,10 @@ google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= +gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/exporter/kineticaexporter/metrics_exporter.go b/exporter/kineticaexporter/metrics_exporter.go index c5fd6484b864..c96527e72f73 100644 --- a/exporter/kineticaexporter/metrics_exporter.go +++ b/exporter/kineticaexporter/metrics_exporter.go @@ -5,26 +5,238 @@ package kineticaexporter // import "github.com/open-telemetry/opentelemetry-coll import ( "context" + "fmt" + "strings" + "github.com/google/uuid" + "github.com/samber/lo" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/multierr" "go.uber.org/zap" ) +// Metrics handling + +type kineticaGaugeRecord struct { + gauge *Gauge + resourceAttribute []GaugeResourceAttribute + scopeAttribute []GaugeScopeAttribute + datapoint []GaugeDatapoint + datapointAttribute []GaugeDatapointAttribute + exemplars []GaugeDatapointExemplar + exemplarAttribute []GaugeDataPointExemplarAttribute +} + +type kineticaSumRecord struct { + sum *Sum + sumResourceAttribute []SumResourceAttribute + sumScopeAttribute []SumScopeAttribute + datapoint []SumDatapoint + datapointAttribute []SumDataPointAttribute + exemplars []SumDatapointExemplar + exemplarAttribute []SumDataPointExemplarAttribute +} + +type kineticaHistogramRecord struct { + histogram *Histogram + histogramResourceAttribute []HistogramResourceAttribute + histogramScopeAttribute []HistogramScopeAttribute + histogramDatapoint []HistogramDatapoint + histogramDatapointAtribute []HistogramDataPointAttribute + histogramBucketCount []HistogramDatapointBucketCount + histogramExplicitBound []HistogramDatapointExplicitBound + exemplars []HistogramDatapointExemplar + exemplarAttribute []HistogramDataPointExemplarAttribute +} + +type kineticaExponentialHistogramRecord struct { + histogram *ExponentialHistogram + histogramResourceAttribute []ExponentialHistogramResourceAttribute + histogramScopeAttribute []ExponentialHistogramScopeAttribute + histogramDatapoint []ExponentialHistogramDatapoint + histogramDatapointAttribute []ExponentialHistogramDataPointAttribute + histogramBucketNegativeCount []ExponentialHistogramBucketNegativeCount + histogramBucketPositiveCount []ExponentialHistogramBucketPositiveCount + exemplars []ExponentialHistogramDatapointExemplar + exemplarAttribute []ExponentialHistogramDataPointExemplarAttribute +} + +type kineticaSummaryRecord struct { + summary *Summary + summaryDatapoint []SummaryDatapoint + summaryDatapointAttribute []SummaryDataPointAttribute + summaryResourceAttribute []SummaryResourceAttribute + summaryScopeAttribute []SummaryScopeAttribute + summaryDatapointQuantileValues []SummaryDatapointQuantileValues +} + +var ( + gaugeTableDDLs = []string{ + CreateGauge, + CreateGaugeDatapoint, + CreateGaugeDatapointAttribute, + CreateGaugeDatapointExemplar, + CreateGaugeDatapointExemplarAttribute, + CreateGaugeResourceAttribute, + CreateGaugeScopeAttribute, + } + + sumTableDDLs = []string{ + CreateSum, + CreateSumDatapoint, + CreateSumDatapointAttribute, + CreateSumDatapointExemplar, + CreateSumDatapointExemplarAttribute, + CreateSumResourceAttribute, + CreateSumScopeAttribute, + } + + histogramTableDDLs = []string{ + CreateHistogram, + CreateHistogramDatapoint, + CreateHistogramDatapointBucketCount, + CreateHistogramDatapointExplicitBound, + CreateHistogramDatapointAttribute, + CreateHistogramDatapointExemplar, + CreateHistogramDatapointExemplarAttribute, + CreateHistogramResourceAttribute, + CreateHistogramScopeAttribute, + } + + expHistogramTableDDLs = []string{ + CreateExpHistogram, + CreateExpHistogramDatapoint, + CreateExpHistogramDatapointBucketPositiveCount, + CreateExpHistogramDatapointBucketNegativeCount, + CreateExpHistogramDatapointAttribute, + CreateExpHistogramDatapointExemplar, + CreateExpHistogramDatapointExemplarAttribute, + CreateExpHistogramResourceAttribute, + CreateExpHistogramScopeAttribute, + } + + summaryTableDDLs = []string{ + CreateSummary, + CreateSummaryDatapoint, + CreateSummaryDatapointAttribute, + CreateSummaryDatapointQuantileValues, + CreateSummaryResourceAttribute, + CreateSummaryScopeAttribute, + } +) + type kineticaMetricsExporter struct { logger *zap.Logger + writer *KiWriter } -func newMetricsExporter(_ *zap.Logger, _ *Config) *kineticaMetricsExporter { +func newMetricsExporter(logger *zap.Logger, cfg *Config) *kineticaMetricsExporter { + kineticaLogger := logger + writer := newKiWriter(context.TODO(), *cfg, kineticaLogger) metricsExp := &kineticaMetricsExporter{ - logger: nil, + logger: kineticaLogger, + writer: writer, } return metricsExp } -func (e *kineticaMetricsExporter) start(_ context.Context, _ component.Host) error { +func (e *kineticaMetricsExporter) start(ctx context.Context, _ component.Host) error { - return nil + fmt.Println("SCHEMA NAME - ", e.writer.cfg.Schema) + + if e.writer.cfg.Schema != "" && len(e.writer.cfg.Schema) != 0 { + // Config has a schema name + if err := createSchema(ctx, e.writer, e.writer.cfg); err != nil { + return err + } + } + err := createMetricTables(ctx, e.writer) + return err +} + +// createMetricTables +// +// @param ctx +// @param kiWriter +// @return error +func createMetricTables(ctx context.Context, kiWriter *KiWriter) error { + var errs []error + + // create gauge tables + err := createTablesForMetricType(ctx, gaugeTableDDLs, kiWriter) + if err != nil { + errs = append(errs, err) + } + + // create sum tables + err = createTablesForMetricType(ctx, sumTableDDLs, kiWriter) + if err != nil { + errs = append(errs, err) + } + + // create histogram tables + err = createTablesForMetricType(ctx, histogramTableDDLs, kiWriter) + if err != nil { + errs = append(errs, err) + } + + // create exponential histogram tables + err = createTablesForMetricType(ctx, expHistogramTableDDLs, kiWriter) + if err != nil { + errs = append(errs, err) + } + + // create summary tables + err = createTablesForMetricType(ctx, summaryTableDDLs, kiWriter) + if err != nil { + errs = append(errs, err) + } + + return multierr.Combine(errs...) +} + +func createTablesForMetricType(ctx context.Context, metricTypeDDLs []string, kiWriter *KiWriter) error { + var errs []error + + var schema string + schema = strings.Trim(kiWriter.cfg.Schema, " ") + if len(schema) > 0 { + schema += "." + } else { + schema = "" + } + + lo.ForEach(metricTypeDDLs, func(ddl string, index int) { + + stmt := strings.ReplaceAll(ddl, "%s", schema) + kiWriter.logger.Debug("Creating Table - ", zap.String("DDL", stmt)) + + _, err := kiWriter.Db.ExecuteSqlRaw(ctx, stmt, 0, 0, "", nil) + if err != nil { + kiWriter.logger.Error(err.Error()) + errs = append(errs, err) + } + }) + + return multierr.Combine(errs...) +} + +// createSchema - Create a schema +// +// @param ctx +// @param kiWriter +// @param config +// @return bool - True if schema creation successful +func createSchema(ctx context.Context, kiWriter *KiWriter, config Config) error { + stmt := fmt.Sprintf(CreateSchema, config.Schema) + kiWriter.logger.Debug(stmt) + _, err := kiWriter.Db.ExecuteSqlRaw(ctx, stmt, 0, 0, "", nil) + if err != nil { + kiWriter.logger.Error(err.Error()) + } + return err } // shutdown will shut down the exporter. @@ -38,7 +250,1386 @@ func (e *kineticaMetricsExporter) shutdown(_ context.Context) error { // @param _ ctx unused // @param md // @return error -func (e *kineticaMetricsExporter) pushMetricsData(_ context.Context, _ pmetric.Metrics) error { +func (e *kineticaMetricsExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) error { - return nil + var metricType pmetric.MetricType + var errs []error + + var gaugeRecords []kineticaGaugeRecord + var sumRecords []kineticaSumRecord + var histogramRecords []kineticaHistogramRecord + var exponentialHistogramRecords []kineticaExponentialHistogramRecord + var summaryRecords []kineticaSummaryRecord + + e.logger.Debug("Resource metrics ", zap.Int("count = ", md.ResourceMetrics().Len())) + + for i := 0; i < md.ResourceMetrics().Len(); i++ { + metrics := md.ResourceMetrics().At(i) + resAttr := metrics.Resource().Attributes() + + e.logger.Debug("Scope metrics ", zap.Int("count = ", metrics.ScopeMetrics().Len())) + + for j := 0; j < metrics.ScopeMetrics().Len(); j++ { + metricSlice := metrics.ScopeMetrics().At(j).Metrics() + scopeInstr := metrics.ScopeMetrics().At(j).Scope() + scopeURL := metrics.ScopeMetrics().At(j).SchemaUrl() + + e.logger.Debug("metrics ", zap.Int("count = ", metricSlice.Len())) + + for k := 0; k < metricSlice.Len(); k++ { + + metric := metricSlice.At(k) + metricType = metric.Type() + switch metric.Type() { + case pmetric.MetricTypeGauge: + gaugeRecord, err := e.createGaugeRecord(resAttr, metrics.SchemaUrl(), scopeInstr, scopeURL, metric.Gauge(), metric.Name(), metric.Description(), metric.Unit()) + if err == nil { + gaugeRecords = append(gaugeRecords, *gaugeRecord) + e.logger.Debug("Added gauge") + } else { + e.logger.Error(err.Error()) + errs = append(errs, err) + } + case pmetric.MetricTypeSum: + sumRecord, err := e.createSumRecord(resAttr, metrics.SchemaUrl(), scopeInstr, scopeURL, metric.Sum(), metric.Name(), metric.Description(), metric.Unit()) + if err == nil { + sumRecords = append(sumRecords, *sumRecord) + e.logger.Debug("Added sum") + } else { + e.logger.Error(err.Error()) + errs = append(errs, err) + } + case pmetric.MetricTypeHistogram: + histogramRecord, err := e.createHistogramRecord(resAttr, metrics.SchemaUrl(), scopeInstr, scopeURL, metric.Histogram(), metric.Name(), metric.Description(), metric.Unit()) + if err == nil { + histogramRecords = append(histogramRecords, *histogramRecord) + e.logger.Debug("Added histogram") + } else { + e.logger.Error(err.Error()) + errs = append(errs, err) + } + case pmetric.MetricTypeExponentialHistogram: + exponentialHistogramRecord, err := e.createExponentialHistogramRecord(resAttr, metrics.SchemaUrl(), scopeInstr, scopeURL, metric.ExponentialHistogram(), metric.Name(), metric.Description(), metric.Unit()) + if err == nil { + exponentialHistogramRecords = append(exponentialHistogramRecords, *exponentialHistogramRecord) + e.logger.Debug("Added exp histogram") + } else { + e.logger.Error(err.Error()) + errs = append(errs, err) + } + case pmetric.MetricTypeSummary: + summaryRecord, err := e.createSummaryRecord(resAttr, metrics.SchemaUrl(), scopeInstr, scopeURL, metric.Summary(), metric.Name(), metric.Description(), metric.Unit()) + if err == nil { + summaryRecords = append(summaryRecords, *summaryRecord) + e.logger.Debug("Added summary") + } else { + e.logger.Error(err.Error()) + errs = append(errs, err) + } + default: + return fmt.Errorf("Unsupported metrics type") + } + + e.logger.Debug("Gauge ", zap.Int("count = ", len(gaugeRecords))) + e.logger.Debug("Sum ", zap.Int("count = ", len(sumRecords))) + e.logger.Debug("Histogram ", zap.Int("count = ", len(histogramRecords))) + e.logger.Debug("Exp Histogram ", zap.Int("count = ", len(exponentialHistogramRecords))) + e.logger.Debug("Summary ", zap.Int("count = ", len(summaryRecords))) + + if len(errs) > 0 { + e.logger.Error(multierr.Combine(errs...).Error()) + return multierr.Combine(errs...) + } + } + } + } + + e.logger.Debug("Before writing metrics into Kinetica") + + switch metricType { + case pmetric.MetricTypeGauge: + if err := e.writer.persistGaugeRecord(gaugeRecords); err != nil { + errs = append(errs, err) + e.logger.Error(err.Error()) + } + case pmetric.MetricTypeSum: + if err := e.writer.persistSumRecord(sumRecords); err != nil { + errs = append(errs, err) + e.logger.Error(err.Error()) + } + case pmetric.MetricTypeHistogram: + if err := e.writer.persistHistogramRecord(histogramRecords); err != nil { + errs = append(errs, err) + e.logger.Error(err.Error()) + } + case pmetric.MetricTypeExponentialHistogram: + if err := e.writer.persistExponentialHistogramRecord(exponentialHistogramRecords); err != nil { + errs = append(errs, err) + e.logger.Error(err.Error()) + } + case pmetric.MetricTypeSummary: + if err := e.writer.persistSummaryRecord(summaryRecords); err != nil { + errs = append(errs, err) + e.logger.Error(err.Error()) + } + default: + return fmt.Errorf("Unsupported metrics type") + + } + return multierr.Combine(errs...) +} + +// createSummaryRecord - creates a Summary type record +// +// @receiver e - Method aplicable to [kineticaMetricsExporter] +// @param resAttr - a map of key to value of resource attributes +// @param _ schemaURL - unused +// @param scopeInstr - the instrumentation scope +// @param _ scopeURL - unused +// @param summaryRecord - the Summary [pmetric.Summary] record +// @param name +// @param description +// @param unit +// @return *kineticaSummaryRecord +// @return error +func (e *kineticaMetricsExporter) createSummaryRecord(resAttr pcommon.Map, _ string, scopeInstr pcommon.InstrumentationScope, _ string, summaryRecord pmetric.Summary, name, description, unit string) (*kineticaSummaryRecord, error) { + var errs []error + + kiSummaryRecord := new(kineticaSummaryRecord) + + summary := &Summary{ + SummaryID: uuid.New().String(), + MetricName: name, + Description: description, + Unit: unit, + } + + kiSummaryRecord.summary = summary + + // Handle data points + var datapointAttribute []SummaryDataPointAttribute + datapointAttributes := make(map[string]ValueTypePair) + + for i := 0; i < summaryRecord.DataPoints().Len(); i++ { + datapoint := summaryRecord.DataPoints().At(i) + summaryDatapoint := &SummaryDatapoint{ + SummaryID: summary.SummaryID, + ID: uuid.New().String(), + StartTimeUnix: datapoint.StartTimestamp().AsTime().UnixMilli(), + TimeUnix: datapoint.Timestamp().AsTime().UnixMilli(), + Count: int64(datapoint.Count()), + Sum: datapoint.Sum(), + Flags: int(datapoint.Flags()), + } + kiSummaryRecord.summaryDatapoint = append(kiSummaryRecord.summaryDatapoint, *summaryDatapoint) + + // Handle summary datapoint attribute + datapoint.Attributes().Range(func(k string, v pcommon.Value) bool { + if k == "" { + e.logger.Debug("Sum record attribute key is empty") + } else if v, err := attributeValueToKineticaFieldValue(v); err == nil { + datapointAttributes[k] = v + } else { + e.logger.Debug("Invalid sum record attribute value", zap.String("Error", err.Error())) + errs = append(errs, err) + } + return true + }) + + for key := range datapointAttributes { + vtPair := datapointAttributes[key] + sa, err := e.newSummaryDatapointAttributeValue(summary.SummaryID, summaryDatapoint.ID, key, vtPair) + if err != nil { + e.logger.Error(err.Error()) + } else { + datapointAttribute = append(datapointAttribute, *sa) + } + } + kiSummaryRecord.summaryDatapointAttribute = append(kiSummaryRecord.summaryDatapointAttribute, datapointAttribute...) + + for k := range datapointAttributes { + delete(datapointAttributes, k) + } + + // Handle quantile values + quantileValues := datapoint.QuantileValues() + for i := 0; i < quantileValues.Len(); i++ { + quantileValue := quantileValues.At(i) + summaryQV := &SummaryDatapointQuantileValues{ + SummaryID: summary.SummaryID, + DatapointID: summaryDatapoint.ID, + QuantileID: uuid.New().String(), + Quantile: quantileValue.Quantile(), + Value: quantileValue.Value(), + } + kiSummaryRecord.summaryDatapointQuantileValues = append(kiSummaryRecord.summaryDatapointQuantileValues, *summaryQV) + } + + } + + // Handle Resource attribute + var resourceAttribute []SummaryResourceAttribute + resourceAttributes := make(map[string]ValueTypePair) + + resAttr.Range(func(k string, v pcommon.Value) bool { + if k == "" { + e.logger.Debug("Resource attribute key is empty") + } else if v, err := attributeValueToKineticaFieldValue(v); err == nil { + resourceAttributes[k] = v + } else { + e.logger.Debug("Invalid resource attribute value", zap.String("Error", err.Error())) + errs = append(errs, err) + } + return true + }) + + for key := range resourceAttributes { + vtPair := resourceAttributes[key] + ga, err := e.newSummaryResourceAttributeValue(summary.SummaryID, key, vtPair) + if err != nil { + e.logger.Error(err.Error()) + } else { + resourceAttribute = append(resourceAttribute, *ga) + } + } + + copy(kiSummaryRecord.summaryResourceAttribute, resourceAttribute) + + // Handle Scope attribute + var scopeAttribute []SummaryScopeAttribute + scopeAttributes := make(map[string]ValueTypePair) + scopeName := scopeInstr.Name() + scopeVersion := scopeInstr.Version() + + scopeInstr.Attributes().Range(func(k string, v pcommon.Value) bool { + if k == "" { + e.logger.Debug("Scope attribute key is empty") + } else if v, err := attributeValueToKineticaFieldValue(v); err == nil { + scopeAttributes[k] = v + } else { + e.logger.Debug("Invalid scope attribute value", zap.String("Error", err.Error())) + errs = append(errs, err) + } + return true + }) + + for key := range scopeAttributes { + vtPair := scopeAttributes[key] + sa, err := e.newSummaryScopeAttributeValue(summary.SummaryID, key, scopeName, scopeVersion, vtPair) + if err != nil { + e.logger.Error(err.Error()) + } else { + scopeAttribute = append(scopeAttribute, *sa) + } + } + + copy(kiSummaryRecord.summaryScopeAttribute, scopeAttribute) + + return kiSummaryRecord, multierr.Combine(errs...) +} + +// createExponentialHistogramRecord - creates an exponential histogram type record +// +// @receiver e +// @param resAttr +// @param _ schemaURL - unused +// @param scopeInstr +// @param _ scopeURL - unused +// @param exponentialHistogramRecord +// @param name +// @param description +// @param unit +// @return *kineticaExponentialHistogramRecord +// @return error +func (e *kineticaMetricsExporter) createExponentialHistogramRecord(resAttr pcommon.Map, _ string, scopeInstr pcommon.InstrumentationScope, _ string, exponentialHistogramRecord pmetric.ExponentialHistogram, name, description, unit string) (*kineticaExponentialHistogramRecord, error) { + var errs []error + + kiExpHistogramRecord := new(kineticaExponentialHistogramRecord) + + histogram := &ExponentialHistogram{ + HistogramID: uuid.New().String(), + MetricName: name, + Description: description, + Unit: unit, + AggregationTemporality: exponentialHistogramRecord.AggregationTemporality(), + } + + kiExpHistogramRecord.histogram = histogram + + // Handle data points + var datapointAttribute []ExponentialHistogramDataPointAttribute + datapointAttributes := make(map[string]ValueTypePair) + + var exemplarAttribute []ExponentialHistogramDataPointExemplarAttribute + exemplarAttributes := make(map[string]ValueTypePair) + + var datapointBucketPositiveCount []ExponentialHistogramBucketPositiveCount + var datapointBucketNegativeCount []ExponentialHistogramBucketNegativeCount + + for i := 0; i < exponentialHistogramRecord.DataPoints().Len(); i++ { + datapoint := exponentialHistogramRecord.DataPoints().At(i) + + expHistogramDatapoint := ExponentialHistogramDatapoint{ + HistogramID: histogram.HistogramID, + ID: uuid.New().String(), + StartTimeUnix: datapoint.StartTimestamp().AsTime().UnixMilli(), + TimeUnix: datapoint.Timestamp().AsTime().UnixMilli(), + Count: int64(datapoint.Count()), + Sum: datapoint.Sum(), + Min: datapoint.Min(), + Max: datapoint.Max(), + Flags: int(datapoint.Flags()), + Scale: int(datapoint.Scale()), + ZeroCount: int64(datapoint.ZeroCount()), + BucketsPositiveOffset: int(datapoint.Positive().Offset()), + BucketsNegativeOffset: int(datapoint.Negative().Offset()), + } + kiExpHistogramRecord.histogramDatapoint = append(kiExpHistogramRecord.histogramDatapoint, expHistogramDatapoint) + + // Handle histogram datapoint attribute + datapoint.Attributes().Range(func(k string, v pcommon.Value) bool { + if k == "" { + e.logger.Debug("Sum record attribute key is empty") + } else if v, err := attributeValueToKineticaFieldValue(v); err == nil { + datapointAttributes[k] = v + } else { + e.logger.Debug("Invalid sum record attribute value", zap.String("Error", err.Error())) + errs = append(errs, err) + } + return true + }) + + for key := range datapointAttributes { + vtPair := datapointAttributes[key] + sa, err := e.newExponentialHistogramDatapointAttributeValue(histogram.HistogramID, expHistogramDatapoint.ID, key, vtPair) + if err != nil { + e.logger.Error(err.Error()) + } else { + datapointAttribute = append(datapointAttribute, *sa) + } + } + kiExpHistogramRecord.histogramDatapointAttribute = append(kiExpHistogramRecord.histogramDatapointAttribute, datapointAttribute...) + + for k := range datapointAttributes { + delete(datapointAttributes, k) + } + + // Handle datapoint exemplars + exemplars := datapoint.Exemplars() + + for i := 0; i < exemplars.Len(); i++ { + exemplar := exemplars.At(i) + sumDatapointExemplar := ExponentialHistogramDatapointExemplar{ + HistogramID: histogram.HistogramID, + DatapointID: expHistogramDatapoint.ID, + ExemplarID: uuid.New().String(), + TimeUnix: exemplar.Timestamp().AsTime().UnixMilli(), + HistogramValue: exemplar.DoubleValue(), + TraceID: exemplar.TraceID().String(), + SpanID: exemplar.SpanID().String(), + } + kiExpHistogramRecord.exemplars = append(kiExpHistogramRecord.exemplars, sumDatapointExemplar) + + // Handle Exemplar attribute + exemplar.FilteredAttributes().Range(func(k string, v pcommon.Value) bool { + if k == "" { + e.logger.Debug("Sum record attribute key is empty") + } else if v, err := attributeValueToKineticaFieldValue(v); err == nil { + exemplarAttributes[k] = v + } else { + e.logger.Debug("Invalid sum record attribute value", zap.String("Error", err.Error())) + errs = append(errs, err) + } + return true + }) + + for key := range exemplarAttributes { + vtPair := exemplarAttributes[key] + ea, err := e.newExponentialHistogramDatapointExemplarAttributeValue(expHistogramDatapoint.HistogramID, expHistogramDatapoint.ID, sumDatapointExemplar.ExemplarID, key, vtPair) + if err != nil { + e.logger.Error(err.Error()) + } else { + exemplarAttribute = append(exemplarAttribute, *ea) + } + } + + kiExpHistogramRecord.exemplarAttribute = append(kiExpHistogramRecord.exemplarAttribute, exemplarAttribute...) + + for k := range exemplarAttributes { + delete(exemplarAttributes, k) + } + + } + + // Handle positive and negative bucket counts + for i := 0; i < datapoint.Positive().BucketCounts().Len(); i++ { + positiveBucketCount := datapoint.Positive().BucketCounts().At(i) + datapointBucketPositiveCount = append(datapointBucketPositiveCount, ExponentialHistogramBucketPositiveCount{ + HistogramID: expHistogramDatapoint.HistogramID, + DatapointID: expHistogramDatapoint.ID, + CountID: uuid.New().String(), + Count: int64(positiveBucketCount), + }) + } + kiExpHistogramRecord.histogramBucketPositiveCount = append(kiExpHistogramRecord.histogramBucketPositiveCount, datapointBucketPositiveCount...) + + for i := 0; i < datapoint.Negative().BucketCounts().Len(); i++ { + negativeBucketCount := datapoint.Negative().BucketCounts().At(i) + datapointBucketNegativeCount = append(datapointBucketNegativeCount, ExponentialHistogramBucketNegativeCount{ + HistogramID: expHistogramDatapoint.HistogramID, + DatapointID: expHistogramDatapoint.ID, + CountID: uuid.New().String(), + Count: negativeBucketCount, + }) + } + kiExpHistogramRecord.histogramBucketNegativeCount = append(kiExpHistogramRecord.histogramBucketNegativeCount, datapointBucketNegativeCount...) + + } + + // Handle Resource attribute + var resourceAttribute []ExponentialHistogramResourceAttribute + resourceAttributes := make(map[string]ValueTypePair) + + resAttr.Range(func(k string, v pcommon.Value) bool { + if k == "" { + e.logger.Debug("Resource attribute key is empty") + } else if v, err := attributeValueToKineticaFieldValue(v); err == nil { + resourceAttributes[k] = v + } else { + e.logger.Debug("Invalid resource attribute value", zap.String("Error", err.Error())) + errs = append(errs, err) + } + return true + }) + + for key := range resourceAttributes { + vtPair := resourceAttributes[key] + ga, err := e.newExponentialHistogramResourceAttributeValue(histogram.HistogramID, key, vtPair) + if err != nil { + e.logger.Error(err.Error()) + } else { + resourceAttribute = append(resourceAttribute, *ga) + } + } + + copy(kiExpHistogramRecord.histogramResourceAttribute, resourceAttribute) + + // Handle Scope attribute + var scopeAttribute []ExponentialHistogramScopeAttribute + scopeAttributes := make(map[string]ValueTypePair) + scopeName := scopeInstr.Name() + scopeVersion := scopeInstr.Version() + + scopeInstr.Attributes().Range(func(k string, v pcommon.Value) bool { + if k == "" { + e.logger.Debug("Scope attribute key is empty") + } else if v, err := attributeValueToKineticaFieldValue(v); err == nil { + scopeAttributes[k] = v + } else { + e.logger.Debug("Invalid scope attribute value", zap.String("Error", err.Error())) + errs = append(errs, err) + } + return true + }) + + for key := range scopeAttributes { + vtPair := scopeAttributes[key] + sa, err := e.newExponentialHistogramScopeAttributeValue(histogram.HistogramID, key, scopeName, scopeVersion, vtPair) + if err != nil { + e.logger.Error(err.Error()) + } else { + scopeAttribute = append(scopeAttribute, *sa) + } + } + + copy(kiExpHistogramRecord.histogramScopeAttribute, scopeAttribute) + + return kiExpHistogramRecord, multierr.Combine(errs...) +} + +// createHistogramRecord - creates a Histogram type record +// +// @receiver e +// @param resAttr +// @param _ schemaURL - unused +// @param scopeInstr +// @param _ scopeURL - unused +// @param histogramRecord +// @param name +// @param description +// @param unit +// @return *kineticaHistogramRecord +// @return error +func (e *kineticaMetricsExporter) createHistogramRecord(resAttr pcommon.Map, _ string, scopeInstr pcommon.InstrumentationScope, _ string, histogramRecord pmetric.Histogram, name, description, unit string) (*kineticaHistogramRecord, error) { + + e.logger.Debug("In createHistogramRecord ...") + + var errs []error + + kiHistogramRecord := new(kineticaHistogramRecord) + + histogram := &Histogram{ + HistogramID: uuid.New().String(), + MetricName: name, + Description: description, + Unit: unit, + AggregationTemporality: histogramRecord.AggregationTemporality(), + } + + kiHistogramRecord.histogram = histogram + + // Handle data points + var datapointAttribute []HistogramDataPointAttribute + datapointAttributes := make(map[string]ValueTypePair) + + var exemplarAttribute []HistogramDataPointExemplarAttribute + exemplarAttributes := make(map[string]ValueTypePair) + + // Handle data points + for i := 0; i < histogramRecord.DataPoints().Len(); i++ { + datapoint := histogramRecord.DataPoints().At(i) + + histogramDatapoint := &HistogramDatapoint{ + HistogramID: histogram.HistogramID, + ID: uuid.New().String(), + StartTimeUnix: datapoint.StartTimestamp().AsTime().UnixMilli(), + TimeUnix: datapoint.Timestamp().AsTime().UnixMilli(), + Count: int64(datapoint.Count()), + Sum: datapoint.Sum(), + Min: datapoint.Min(), + Max: datapoint.Max(), + Flags: int(datapoint.Flags()), + } + kiHistogramRecord.histogramDatapoint = append(kiHistogramRecord.histogramDatapoint, *histogramDatapoint) + + // Handle histogram datapoint attribute + datapoint.Attributes().Range(func(k string, v pcommon.Value) bool { + if k == "" { + e.logger.Debug("Histogram record attribute key is empty") + } else if v, err := attributeValueToKineticaFieldValue(v); err == nil { + datapointAttributes[k] = v + } else { + e.logger.Debug("Invalid histogram record attribute value", zap.String("Error", err.Error())) + errs = append(errs, err) + } + return true + }) + + for key := range datapointAttributes { + vtPair := datapointAttributes[key] + sa, err := e.newHistogramDatapointAttributeValue(histogram.HistogramID, histogramDatapoint.ID, key, vtPair) + if err != nil { + e.logger.Error(err.Error()) + } else { + datapointAttribute = append(datapointAttribute, *sa) + } + } + kiHistogramRecord.histogramDatapointAtribute = append(kiHistogramRecord.histogramDatapointAtribute, datapointAttribute...) + + for k := range datapointAttributes { + delete(datapointAttributes, k) + } + + // Handle data point exemplars + exemplars := datapoint.Exemplars() + + for i := 0; i < exemplars.Len(); i++ { + exemplar := exemplars.At(i) + histogramDatapointExemplar := HistogramDatapointExemplar{ + HistogramID: histogram.HistogramID, + DatapointID: histogramDatapoint.ID, + ExemplarID: uuid.New().String(), + TimeUnix: exemplar.Timestamp().AsTime().UnixMilli(), + HistogramValue: exemplar.DoubleValue(), + TraceID: exemplar.TraceID().String(), + SpanID: exemplar.SpanID().String(), + } + kiHistogramRecord.exemplars = append(kiHistogramRecord.exemplars, histogramDatapointExemplar) + + // Handle Exemplar attribute + exemplar.FilteredAttributes().Range(func(k string, v pcommon.Value) bool { + if k == "" { + e.logger.Debug("Histogram record attribute key is empty") + } else if v, err := attributeValueToKineticaFieldValue(v); err == nil { + exemplarAttributes[k] = v + } else { + e.logger.Debug("Invalid histogram record attribute value", zap.String("Error", err.Error())) + errs = append(errs, err) + } + return true + }) + + for key := range exemplarAttributes { + vtPair := exemplarAttributes[key] + ea, err := e.newHistogramDatapointExemplarAttributeValue(histogramDatapoint.HistogramID, histogramDatapoint.ID, histogramDatapointExemplar.ExemplarID, key, vtPair) + if err != nil { + e.logger.Error(err.Error()) + } else { + exemplarAttribute = append(exemplarAttribute, *ea) + } + } + + kiHistogramRecord.exemplarAttribute = append(kiHistogramRecord.exemplarAttribute, exemplarAttribute...) + + for k := range exemplarAttributes { + delete(exemplarAttributes, k) + } + } + + histogramBucketCounts := datapoint.BucketCounts() + for i := 0; i < histogramBucketCounts.Len(); i++ { + bucketCount := HistogramDatapointBucketCount{ + HistogramID: histogramDatapoint.HistogramID, + DatapointID: histogramDatapoint.ID, + CountID: uuid.New().String(), + Count: int64(histogramBucketCounts.At(i)), + } + kiHistogramRecord.histogramBucketCount = append(kiHistogramRecord.histogramBucketCount, bucketCount) + } + + histogramExplicitBounds := datapoint.ExplicitBounds() + for i := 0; i < histogramExplicitBounds.Len(); i++ { + explicitBound := HistogramDatapointExplicitBound{ + HistogramID: histogramDatapoint.HistogramID, + DatapointID: histogramDatapoint.ID, + BoundID: uuid.New().String(), + ExplicitBound: histogramExplicitBounds.At(i), + } + kiHistogramRecord.histogramExplicitBound = append(kiHistogramRecord.histogramExplicitBound, explicitBound) + } + } + + // Handle Resource attribute + var resourceAttribute []HistogramResourceAttribute + resourceAttributes := make(map[string]ValueTypePair) + + if resAttr.Len() > 0 { + resAttr.Range(func(k string, v pcommon.Value) bool { + if k == "" { + e.logger.Debug("Resource attribute key is empty") + } else if v, err := attributeValueToKineticaFieldValue(v); err == nil { + resourceAttributes[k] = v + } else { + e.logger.Debug("Invalid resource attribute value", zap.String("Error", err.Error())) + errs = append(errs, err) + } + return true + }) + + for key := range resourceAttributes { + vtPair := resourceAttributes[key] + ga, err := e.newHistogramResourceAttributeValue(histogram.HistogramID, key, vtPair) + if err != nil { + e.logger.Error(err.Error()) + } else { + resourceAttribute = append(resourceAttribute, *ga) + } + } + + kiHistogramRecord.histogramResourceAttribute = make([]HistogramResourceAttribute, len(resourceAttribute)) + copy(kiHistogramRecord.histogramResourceAttribute, resourceAttribute) + + } + + // Handle Scope attribute + var scopeAttribute []HistogramScopeAttribute + scopeAttributes := make(map[string]ValueTypePair) + scopeName := scopeInstr.Name() + scopeVersion := scopeInstr.Version() + + if scopeInstr.Attributes().Len() > 0 { + scopeInstr.Attributes().Range(func(k string, v pcommon.Value) bool { + if k == "" { + e.logger.Debug("Scope attribute key is empty") + } else if v, err := attributeValueToKineticaFieldValue(v); err == nil { + scopeAttributes[k] = v + } else { + e.logger.Debug("Invalid scope attribute value", zap.String("Error", err.Error())) + errs = append(errs, err) + } + return true + }) + + for key := range scopeAttributes { + vtPair := scopeAttributes[key] + sa, err := e.newHistogramScopeAttributeValue(histogram.HistogramID, key, scopeName, scopeVersion, vtPair) + if err != nil { + e.logger.Error(err.Error()) + } else { + scopeAttribute = append(scopeAttribute, *sa) + } + } + + kiHistogramRecord.histogramScopeAttribute = make([]HistogramScopeAttribute, len(scopeAttribute)) + copy(kiHistogramRecord.histogramScopeAttribute, scopeAttribute) + } + + return kiHistogramRecord, multierr.Combine(errs...) +} + +// createSumRecord - creates a SUM type record +// +// @receiver e +// @param resAttr +// @param schemaURL +// @param scopeInstr +// @param scopeURL +// @param sumRecord +// @param name +// @param description +// @param unit +// @return *kineticaSumRecord +// @return error +func (e *kineticaMetricsExporter) createSumRecord(resAttr pcommon.Map, _ string, scopeInstr pcommon.InstrumentationScope, _ string, sumRecord pmetric.Sum, name, description, unit string) (*kineticaSumRecord, error) { + var errs []error + + kiSumRecord := new(kineticaSumRecord) + var isMonotonic int8 + if sumRecord.IsMonotonic() { + isMonotonic = 1 + } + + sum := &Sum{ + SumID: uuid.New().String(), + MetricName: name, + Description: description, + Unit: unit, + AggregationTemporality: sumRecord.AggregationTemporality(), + IsMonotonic: isMonotonic, + } + + kiSumRecord.sum = sum + + // Handle data points + var sumDatapointAttribute []SumDataPointAttribute + sumDatapointAttributes := make(map[string]ValueTypePair) + + var exemplarAttribute []SumDataPointExemplarAttribute + exemplarAttributes := make(map[string]ValueTypePair) + + for i := 0; i < sumRecord.DataPoints().Len(); i++ { + datapoint := sumRecord.DataPoints().At(i) + + sumDatapoint := SumDatapoint{ + SumID: sum.SumID, + ID: uuid.New().String(), + StartTimeUnix: datapoint.StartTimestamp().AsTime().UnixMilli(), + TimeUnix: datapoint.Timestamp().AsTime().UnixMilli(), + SumValue: datapoint.DoubleValue(), + Flags: int(datapoint.Flags()), + } + kiSumRecord.datapoint = append(kiSumRecord.datapoint, sumDatapoint) + + // Handle Sum attribute + datapoint.Attributes().Range(func(k string, v pcommon.Value) bool { + if k == "" { + e.logger.Debug("Sum record attribute key is empty") + } else if v, err := attributeValueToKineticaFieldValue(v); err == nil { + sumDatapointAttributes[k] = v + } else { + e.logger.Debug("Invalid sum record attribute value", zap.String("Error", err.Error())) + errs = append(errs, err) + } + return true + }) + + for key := range sumDatapointAttributes { + vtPair := sumDatapointAttributes[key] + sa, err := e.newSumDatapointAttributeValue(sum.SumID, sumDatapoint.ID, key, vtPair) + if err != nil { + e.logger.Error(err.Error()) + } else { + sumDatapointAttribute = append(sumDatapointAttribute, *sa) + } + } + kiSumRecord.datapointAttribute = append(kiSumRecord.datapointAttribute, sumDatapointAttribute...) + + for k := range sumDatapointAttributes { + delete(sumDatapointAttributes, k) + } + + // Handle data point exemplars + exemplars := datapoint.Exemplars() + + for i := 0; i < exemplars.Len(); i++ { + exemplar := exemplars.At(i) + sumDatapointExemplar := SumDatapointExemplar{ + SumID: sum.SumID, + DatapointID: sumDatapoint.ID, + ExemplarID: uuid.New().String(), + TimeUnix: exemplar.Timestamp().AsTime().UnixMilli(), + SumValue: exemplar.DoubleValue(), + TraceID: exemplar.TraceID().String(), + SpanID: exemplar.SpanID().String(), + } + kiSumRecord.exemplars = append(kiSumRecord.exemplars, sumDatapointExemplar) + + // Handle Exemplar attribute + exemplar.FilteredAttributes().Range(func(k string, v pcommon.Value) bool { + if k == "" { + e.logger.Debug("Sum record attribute key is empty") + } else if v, err := attributeValueToKineticaFieldValue(v); err == nil { + exemplarAttributes[k] = v + } else { + e.logger.Debug("Invalid sum record attribute value", zap.String("Error", err.Error())) + errs = append(errs, err) + } + return true + }) + + for key := range exemplarAttributes { + vtPair := exemplarAttributes[key] + ea, err := e.newSumDatapointExemplarAttributeValue(sum.SumID, sumDatapoint.ID, sumDatapointExemplar.ExemplarID, key, vtPair) + if err != nil { + e.logger.Error(err.Error()) + } else { + exemplarAttribute = append(exemplarAttribute, *ea) + } + } + + kiSumRecord.exemplarAttribute = append(kiSumRecord.exemplarAttribute, exemplarAttribute...) + + for k := range exemplarAttributes { + delete(exemplarAttributes, k) + } + + } + + } + + // Handle Resource attribute + var resourceAttribute []SumResourceAttribute + resourceAttributes := make(map[string]ValueTypePair) + + if resAttr.Len() > 0 { + resAttr.Range(func(k string, v pcommon.Value) bool { + if k == "" { + e.logger.Debug("Resource attribute key is empty") + } else if v, err := attributeValueToKineticaFieldValue(v); err == nil { + resourceAttributes[k] = v + } else { + e.logger.Debug("Invalid resource attribute value", zap.String("Error", err.Error())) + errs = append(errs, err) + } + return true + }) + + for key := range resourceAttributes { + vtPair := resourceAttributes[key] + ga, err := e.newSumResourceAttributeValue(sum.SumID, key, vtPair) + if err != nil { + e.logger.Error(err.Error()) + } else { + resourceAttribute = append(resourceAttribute, *ga) + } + } + + kiSumRecord.sumResourceAttribute = make([]SumResourceAttribute, len(resourceAttribute)) + copy(kiSumRecord.sumResourceAttribute, resourceAttribute) + } + + // Handle Scope attribute + var scopeAttribute []SumScopeAttribute + scopeAttributes := make(map[string]ValueTypePair) + scopeName := scopeInstr.Name() + scopeVersion := scopeInstr.Version() + + if scopeInstr.Attributes().Len() > 0 { + scopeInstr.Attributes().Range(func(k string, v pcommon.Value) bool { + if k == "" { + e.logger.Debug("Scope attribute key is empty") + } else if v, err := attributeValueToKineticaFieldValue(v); err == nil { + scopeAttributes[k] = v + } else { + e.logger.Debug("Invalid scope attribute value", zap.String("Error", err.Error())) + errs = append(errs, err) + } + return true + }) + + for key := range scopeAttributes { + vtPair := scopeAttributes[key] + sa, err := e.newSumScopeAttributeValue(sum.SumID, key, scopeName, scopeVersion, vtPair) + if err != nil { + e.logger.Error(err.Error()) + } else { + scopeAttribute = append(scopeAttribute, *sa) + } + } + + copy(kiSumRecord.sumScopeAttribute, scopeAttribute) + } else { + // No attributes found - just basic scope + kiSumRecord.sumScopeAttribute = append(kiSumRecord.sumScopeAttribute, SumScopeAttribute{ + SumID: sum.SumID, + ScopeName: scopeName, + ScopeVersion: scopeVersion, + Key: "", + AttributeValue: AttributeValue{}, + }) + + } + + return kiSumRecord, multierr.Combine(errs...) +} + +// createGaugeRecord - creates a Gauge type record +// +// @receiver e +// @param resAttr +// @param _ schemaURL unused +// @param scopeInstr +// @param _ scopeURL unused +// @param gaugeRecord +// @param name +// @param description +// @param unit +// @return *kineticaGaugeRecord +// @return error +func (e *kineticaMetricsExporter) createGaugeRecord(resAttr pcommon.Map, _ string, scopeInstr pcommon.InstrumentationScope, _ string, gaugeRecord pmetric.Gauge, name, description, unit string) (*kineticaGaugeRecord, error) { + + var errs []error + + kiGaugeRecord := new(kineticaGaugeRecord) + + gauge := &Gauge{ + GaugeID: uuid.New().String(), + MetricName: name, + Description: description, + Unit: unit, + } + + kiGaugeRecord.gauge = gauge + + // Handle data points + var gaugeDatapointAttribute []GaugeDatapointAttribute + gaugeDatapointAttributes := make(map[string]ValueTypePair) + + var exemplarAttribute []GaugeDataPointExemplarAttribute + exemplarAttributes := make(map[string]ValueTypePair) + + for i := 0; i < gaugeRecord.DataPoints().Len(); i++ { + datapoint := gaugeRecord.DataPoints().At(i) + + gaugeDatapoint := GaugeDatapoint{ + GaugeID: gauge.GaugeID, + ID: uuid.New().String(), + StartTimeUnix: datapoint.StartTimestamp().AsTime().UnixMilli(), + TimeUnix: datapoint.Timestamp().AsTime().UnixMilli(), + GaugeValue: datapoint.DoubleValue(), + Flags: int(datapoint.Flags()), + } + kiGaugeRecord.datapoint = append(kiGaugeRecord.datapoint, gaugeDatapoint) + + datapoint.Attributes().Range(func(k string, v pcommon.Value) bool { + if k == "" { + e.logger.Debug("Gauge record attribute key is empty") + } else if v, err := attributeValueToKineticaFieldValue(v); err == nil { + gaugeDatapointAttributes[k] = v + } else { + e.logger.Debug("Invalid gauge record attribute value", zap.String("Error", err.Error())) + errs = append(errs, err) + } + return true + }) + + for key := range gaugeDatapointAttributes { + vtPair := gaugeDatapointAttributes[key] + ga, err := e.newGaugeDatapointAttributeValue(gauge.GaugeID, gaugeDatapoint.ID, key, vtPair) + if err != nil { + e.logger.Error(err.Error()) + } else { + gaugeDatapointAttribute = append(gaugeDatapointAttribute, *ga) + } + } + + kiGaugeRecord.datapointAttribute = append(kiGaugeRecord.datapointAttribute, gaugeDatapointAttribute...) + + for k := range gaugeDatapointAttributes { + delete(gaugeDatapointAttributes, k) + } + + // Handle data point exemplars + exemplars := datapoint.Exemplars() + for i := 0; i < exemplars.Len(); i++ { + exemplar := exemplars.At(i) + gaugeDatapointExemplar := GaugeDatapointExemplar{ + GaugeID: gauge.GaugeID, + DatapointID: gaugeDatapoint.ID, + ExemplarID: uuid.New().String(), + TimeUnix: exemplar.Timestamp().AsTime().UnixMilli(), + GaugeValue: exemplar.DoubleValue(), + TraceID: exemplar.TraceID().String(), + SpanID: exemplar.SpanID().String(), + } + kiGaugeRecord.exemplars = append(kiGaugeRecord.exemplars, gaugeDatapointExemplar) + + // Handle Exemplar attribute + exemplar.FilteredAttributes().Range(func(k string, v pcommon.Value) bool { + if k == "" { + e.logger.Debug("Sum record attribute key is empty") + } else if v, err := attributeValueToKineticaFieldValue(v); err == nil { + exemplarAttributes[k] = v + } else { + e.logger.Debug("Invalid sum record attribute value", zap.String("Error", err.Error())) + errs = append(errs, err) + } + return true + }) + + for key := range exemplarAttributes { + vtPair := exemplarAttributes[key] + ea, err := e.newGaugeDatapointExemplarAttributeValue(gauge.GaugeID, gaugeDatapoint.ID, gaugeDatapointExemplar.ExemplarID, key, vtPair) + if err != nil { + e.logger.Error(err.Error()) + } else { + exemplarAttribute = append(exemplarAttribute, *ea) + } + } + + kiGaugeRecord.exemplarAttribute = append(kiGaugeRecord.exemplarAttribute, exemplarAttribute...) + + for k := range exemplarAttributes { + delete(exemplarAttributes, k) + } + } + } + + // Handle Resource attribute + e.logger.Debug("Resource Attributes received ->", zap.Any("Attributes", resAttr)) + + var resourceAttribute []GaugeResourceAttribute + resourceAttributes := make(map[string]ValueTypePair) + + if resAttr.Len() > 0 { + resAttr.Range(func(k string, v pcommon.Value) bool { + if k == "" { + e.logger.Debug("Resource attribute key is empty") + } else if v, err := attributeValueToKineticaFieldValue(v); err == nil { + resourceAttributes[k] = v + } else { + e.logger.Debug("Invalid resource attribute value", zap.String("Error", err.Error())) + errs = append(errs, err) + } + return true + }) + + e.logger.Debug("Resource Attributes to be added ->", zap.Any("Attributes", resourceAttributes)) + for key := range resourceAttributes { + vtPair := resourceAttributes[key] + ga, err := e.newGaugeResourceAttributeValue(gauge.GaugeID, key, vtPair) + + e.logger.Debug("New resource attribute ->", zap.Any("Attribute", ga)) + + if err != nil { + e.logger.Error(err.Error()) + } else { + resourceAttribute = append(resourceAttribute, *ga) + } + } + + kiGaugeRecord.resourceAttribute = make([]GaugeResourceAttribute, len(resourceAttribute)) + copy(kiGaugeRecord.resourceAttribute, resourceAttribute) + e.logger.Debug("Resource Attributes actually added ->", zap.Any("Attributes", kiGaugeRecord.resourceAttribute)) + } + + // Handle Scope attribute + e.logger.Debug("Scope Attributes received ->", zap.Any("Attributes", scopeInstr.Attributes())) + + var scopeAttribute []GaugeScopeAttribute + scopeAttributes := make(map[string]ValueTypePair) + scopeName := scopeInstr.Name() + scopeVersion := scopeInstr.Version() + + if scopeInstr.Attributes().Len() > 0 { + scopeInstr.Attributes().Range(func(k string, v pcommon.Value) bool { + if k == "" { + e.logger.Debug("Scope attribute key is empty") + } else if v, err := attributeValueToKineticaFieldValue(v); err == nil { + scopeAttributes[k] = v + } else { + e.logger.Debug("Invalid scope attribute value", zap.String("Error", err.Error())) + errs = append(errs, err) + } + return true + }) + + e.logger.Debug("Scope Attributes to be added ->", zap.Any("Attributes", scopeAttributes)) + for key := range scopeAttributes { + vtPair := scopeAttributes[key] + ga, err := e.newGaugeScopeAttributeValue(gauge.GaugeID, key, scopeName, scopeVersion, vtPair) + + e.logger.Debug("New scope attribute ->", zap.Any("Attribute", ga)) + + if err != nil { + e.logger.Error(err.Error()) + } else { + scopeAttribute = append(scopeAttribute, *ga) + } + } + + kiGaugeRecord.scopeAttribute = make([]GaugeScopeAttribute, len(scopeAttribute)) + copy(kiGaugeRecord.scopeAttribute, scopeAttribute) + e.logger.Debug("Scope Attributes actually added ->", zap.Any("Attributes", kiGaugeRecord.scopeAttribute)) + } else { + // No attributes found - just basic scope + kiGaugeRecord.scopeAttribute = append(kiGaugeRecord.scopeAttribute, GaugeScopeAttribute{ + GaugeID: gauge.GaugeID, + ScopeName: scopeName, + ScopeVersion: scopeVersion, + Key: "", + AttributeValue: AttributeValue{}, + }) + } + + return kiGaugeRecord, multierr.Combine(errs...) +} + +// Utility functions +func (e *kineticaMetricsExporter) newGaugeResourceAttributeValue(gaugeID string, key string, vtPair ValueTypePair) (*GaugeResourceAttribute, error) { + var av *AttributeValue + var err error + + av, err = getAttributeValue(vtPair) + if err != nil { + return nil, err + } + + ra := &GaugeResourceAttribute{gaugeID, key, *av} + return ra, nil +} + +func (e *kineticaMetricsExporter) newGaugeDatapointAttributeValue(gaugeID string, datapointID string, key string, vtPair ValueTypePair) (*GaugeDatapointAttribute, error) { + var av *AttributeValue + var err error + + av, err = getAttributeValue(vtPair) + if err != nil { + return nil, err + } + + ga := &GaugeDatapointAttribute{gaugeID, datapointID, key, *av} + return ga, nil +} + +func (e *kineticaMetricsExporter) newGaugeScopeAttributeValue(gaugeID string, key string, scopeName string, scopeVersion string, vtPair ValueTypePair) (*GaugeScopeAttribute, error) { + var av *AttributeValue + var err error + + av, err = getAttributeValue(vtPair) + if err != nil { + return nil, err + } + + sa := &GaugeScopeAttribute{gaugeID, key, scopeName, scopeVersion, *av} + return sa, nil +} + +func (e *kineticaMetricsExporter) newSumDatapointAttributeValue(sumID string, datapointID string, key string, vtPair ValueTypePair) (*SumDataPointAttribute, error) { + var av *AttributeValue + var err error + + av, err = getAttributeValue(vtPair) + if err != nil { + return nil, err + } + + ga := &SumDataPointAttribute{sumID, datapointID, key, *av} + return ga, nil +} + +func (e *kineticaMetricsExporter) newSumResourceAttributeValue(sumID string, key string, vtPair ValueTypePair) (*SumResourceAttribute, error) { + var av *AttributeValue + var err error + + av, err = getAttributeValue(vtPair) + if err != nil { + return nil, err + } + + ra := &SumResourceAttribute{sumID, key, *av} + return ra, nil +} + +func (e *kineticaMetricsExporter) newSumScopeAttributeValue(sumID string, key string, scopeName string, scopeVersion string, vtPair ValueTypePair) (*SumScopeAttribute, error) { + var av *AttributeValue + var err error + + av, err = getAttributeValue(vtPair) + if err != nil { + return nil, err + } + + sa := &SumScopeAttribute{sumID, key, scopeName, scopeVersion, *av} + return sa, nil +} + +func (e *kineticaMetricsExporter) newSumDatapointExemplarAttributeValue(sumID string, sumDatapointID string, sumDatapointExemplarID string, key string, vtPair ValueTypePair) (*SumDataPointExemplarAttribute, error) { + var av *AttributeValue + var err error + + av, err = getAttributeValue(vtPair) + if err != nil { + return nil, err + } + + sa := &SumDataPointExemplarAttribute{sumID, sumDatapointID, sumDatapointExemplarID, key, *av} + return sa, nil +} + +func (e *kineticaMetricsExporter) newExponentialHistogramDatapointExemplarAttributeValue(histogramID string, histogramDatapointID string, histogramDatapointExemplarID string, key string, vtPair ValueTypePair) (*ExponentialHistogramDataPointExemplarAttribute, error) { + var av *AttributeValue + var err error + + av, err = getAttributeValue(vtPair) + if err != nil { + return nil, err + } + + sa := &ExponentialHistogramDataPointExemplarAttribute{histogramID, histogramDatapointID, histogramDatapointExemplarID, key, *av} + return sa, nil +} + +func (e *kineticaMetricsExporter) newExponentialHistogramDatapointAttributeValue(histogramID string, datapointID string, key string, vtPair ValueTypePair) (*ExponentialHistogramDataPointAttribute, error) { + var av *AttributeValue + var err error + + av, err = getAttributeValue(vtPair) + if err != nil { + return nil, err + } + + ga := &ExponentialHistogramDataPointAttribute{histogramID, datapointID, key, *av} + return ga, nil +} + +func (e *kineticaMetricsExporter) newHistogramDatapointExemplarAttributeValue(histogramID string, datapointID string, exemplarID string, key string, vtPair ValueTypePair) (*HistogramDataPointExemplarAttribute, error) { + var av *AttributeValue + var err error + + av, err = getAttributeValue(vtPair) + if err != nil { + return nil, err + } + + ga := &HistogramDataPointExemplarAttribute{histogramID, datapointID, exemplarID, key, *av} + return ga, nil +} + +func (e *kineticaMetricsExporter) newGaugeDatapointExemplarAttributeValue(gaugeID string, gaugeDatapointID string, gaugeDatapointExemplarID string, key string, vtPair ValueTypePair) (*GaugeDataPointExemplarAttribute, error) { + var av *AttributeValue + var err error + + av, err = getAttributeValue(vtPair) + if err != nil { + return nil, err + } + + sa := &GaugeDataPointExemplarAttribute{gaugeID, gaugeDatapointID, gaugeDatapointExemplarID, key, *av} + return sa, nil +} + +func (e *kineticaMetricsExporter) newHistogramResourceAttributeValue(histogramID string, key string, vtPair ValueTypePair) (*HistogramResourceAttribute, error) { + var av *AttributeValue + var err error + + av, err = getAttributeValue(vtPair) + if err != nil { + return nil, err + } + + ra := &HistogramResourceAttribute{histogramID, key, *av} + return ra, nil +} + +func (e *kineticaMetricsExporter) newHistogramScopeAttributeValue(histogramID string, key string, scopeName string, scopeVersion string, vtPair ValueTypePair) (*HistogramScopeAttribute, error) { + var av *AttributeValue + var err error + + av, err = getAttributeValue(vtPair) + if err != nil { + return nil, err + } + + sa := &HistogramScopeAttribute{histogramID, key, scopeName, scopeVersion, *av} + return sa, nil +} + +func (e *kineticaMetricsExporter) newExponentialHistogramResourceAttributeValue(histogramID string, key string, vtPair ValueTypePair) (*ExponentialHistogramResourceAttribute, error) { + var av *AttributeValue + var err error + + av, err = getAttributeValue(vtPair) + if err != nil { + return nil, err + } + + ra := &ExponentialHistogramResourceAttribute{histogramID, key, *av} + return ra, nil +} + +func (e *kineticaMetricsExporter) newExponentialHistogramScopeAttributeValue(histogramID string, key string, scopeName string, scopeVersion string, vtPair ValueTypePair) (*ExponentialHistogramScopeAttribute, error) { + var av *AttributeValue + var err error + + av, err = getAttributeValue(vtPair) + if err != nil { + return nil, err + } + + sa := &ExponentialHistogramScopeAttribute{histogramID, key, scopeName, scopeVersion, *av} + return sa, nil +} + +func (e *kineticaMetricsExporter) newSummaryResourceAttributeValue(summaryID string, key string, vtPair ValueTypePair) (*SummaryResourceAttribute, error) { + var av *AttributeValue + var err error + + av, err = getAttributeValue(vtPair) + if err != nil { + return nil, err + } + + ra := &SummaryResourceAttribute{summaryID, key, *av} + return ra, nil +} + +func (e *kineticaMetricsExporter) newSummaryScopeAttributeValue(summaryID string, key string, scopeName string, scopeVersion string, vtPair ValueTypePair) (*SummaryScopeAttribute, error) { + var av *AttributeValue + var err error + + av, err = getAttributeValue(vtPair) + if err != nil { + return nil, err + } + + sa := &SummaryScopeAttribute{summaryID, key, scopeName, scopeVersion, *av} + return sa, nil + +} + +func (e *kineticaMetricsExporter) newSummaryDatapointAttributeValue(summaryID string, summaryDatapointID string, key string, vtPair ValueTypePair) (*SummaryDataPointAttribute, error) { + var av *AttributeValue + var err error + + av, err = getAttributeValue(vtPair) + if err != nil { + return nil, err + } + + sa := &SummaryDataPointAttribute{summaryID, summaryDatapointID, key, *av} + return sa, nil +} + +func (e *kineticaMetricsExporter) newHistogramDatapointAttributeValue(histogramID string, datapointID string, key string, vtPair ValueTypePair) (*HistogramDataPointAttribute, error) { + var av *AttributeValue + var err error + + av, err = getAttributeValue(vtPair) + if err != nil { + return nil, err + } + + ga := &HistogramDataPointAttribute{histogramID, datapointID, key, *av} + return ga, nil } diff --git a/exporter/kineticaexporter/writer.go b/exporter/kineticaexporter/writer.go new file mode 100644 index 000000000000..fe5eabdf2ae5 --- /dev/null +++ b/exporter/kineticaexporter/writer.go @@ -0,0 +1,867 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package kineticaexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kineticaexporter" + +import ( + "context" + "fmt" + "sync" + + "github.com/kineticadb/kinetica-api-go/kinetica" + orderedmap "github.com/wk8/go-ordered-map" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/multierr" + "go.uber.org/zap" +) + +// AttributeValue - struct to contain attribute values of different types +// Used by other metric structs +type AttributeValue struct { + IntValue int `avro:"int_value"` + StringValue string `avro:"string_value"` + BoolValue int8 `avro:"bool_value"` + DoubleValue float64 `avro:"double_value"` + BytesValue []byte `avro:"bytes_value"` +} + +// newAttributeValue Constructor for AttributeValue +// +// @param intValue +// @param stringValue +// @param boolValue +// @param doubleValue +// @param bytesValue +// @return *AttributeValue +// func newAttributeValue(intValue int, stringValue string, boolValue int8, doubleValue float64, bytesValue []byte) *AttributeValue { +// o := new(AttributeValue) +// o.IntValue = intValue +// o.StringValue = stringValue +// o.BoolValue = boolValue +// o.DoubleValue = doubleValue +// o.BytesValue = bytesValue +// return o +// } + +// KiWriter - struct modeling the Kinetica connection, contains the +// Kinetica connection [kinetica.Kinetica], the Kinetica Options [kinetica.KineticaOptions], +// the config [Config] and the logger [zap.Logger] +type KiWriter struct { + Db kinetica.Kinetica + Options kinetica.KineticaOptions + cfg Config + logger *zap.Logger +} + +// GetDb - Getter for the Kinetica instance +// +// @receiver kiwriter +// @return gpudb.Gpudb +func (kiwriter *KiWriter) GetDb() kinetica.Kinetica { + return kiwriter.Db +} + +// GetOptions - Getter for the Kinetica options. +// +// @receiver kiwriter +// @return gpudb.GpudbOptions +func (kiwriter *KiWriter) GetOptions() kinetica.KineticaOptions { + return kiwriter.Options +} + +// GetCfg - Getter for the [Config] value +// +// @receiver kiwriter +// @return Config +func (kiwriter *KiWriter) GetCfg() Config { + return kiwriter.cfg +} + +// Writer - global pointer to kiwriter struct initialized in the init func +var Writer *KiWriter + +// init +func init() { + ctx := context.TODO() + cfg := createDefaultConfig() + config := cfg.(*Config) + options := kinetica.KineticaOptions{Username: config.Username, Password: string(config.Password), ByPassSslCertCheck: config.BypassSslCertCheck} + gpudbInst := kinetica.NewWithOptions(ctx, config.Host, &options) + Writer = &KiWriter{*gpudbInst, options, *config, nil} +} + +// newKiWriter - Constructor for the [KiWriter] struct +// +// @param ctx +// @param cfg +// @return *KiWriter +func newKiWriter(ctx context.Context, cfg Config, logger *zap.Logger) *KiWriter { + options := kinetica.KineticaOptions{Username: cfg.Username, Password: string(cfg.Password), ByPassSslCertCheck: cfg.BypassSslCertCheck} + gpudbInst := kinetica.NewWithOptions(ctx, cfg.Host, &options) + return &KiWriter{*gpudbInst, options, cfg, logger} +} + +// getGpuDbInst - Creates and returns a new [kinetica.Kinetica] struct +// +// @param cfg +// @return *gpudb.Gpudb +// func getGpuDbInst(cfg *Config) *kinetica.Kinetica { +// ctx := context.TODO() +// options := kinetica.KineticaOptions{Username: cfg.Username, Password: string(cfg.Password), ByPassSslCertCheck: cfg.BypassSslCertCheck} +// // fmt.Println("Options", options) +// gpudbInst := kinetica.NewWithOptions(ctx, cfg.Host, &options) + +// return gpudbInst + +// } + +// Metrics Handling + +// Gauge - struct modeling the Gauge data +type Gauge struct { + GaugeID string `avro:"gauge_id"` + MetricName string `avro:"metric_name"` + Description string `avro:"metric_description"` + Unit string `avro:"metric_unit"` +} + +// GaugeDatapoint - struct modeling the Gauge Datapoint +type GaugeDatapoint struct { + GaugeID string `avro:"gauge_id"` + ID string `avro:"id"` + StartTimeUnix int64 `mapstructure:"start_time_unix" avro:"start_time_unix"` + TimeUnix int64 `mapstructure:"time_unix" avro:"time_unix"` + GaugeValue float64 `mapstructure:"gauge_value" avro:"gauge_value"` + Flags int `mapstructure:"flags" avro:"flags"` +} + +// GaugeDatapointAttribute - struct modeling the Gauge Datapoint attributes +type GaugeDatapointAttribute struct { + GaugeID string `avro:"gauge_id"` + DatapointID string `avro:"datapoint_id"` + Key string `avro:"key"` + AttributeValue `mapstructure:",squash"` +} + +// GaugeDatapointExemplar - struct modeling a Gauge Datapoint Exemplar +type GaugeDatapointExemplar struct { + GaugeID string `avro:"gauge_id"` + DatapointID string `avro:"datapoint_id"` + ExemplarID string `avro:"exemplar_id"` + TimeUnix int64 `mapstructure:"time_unix" avro:"time_unix"` + GaugeValue float64 `mapstructure:"gauge_value" avro:"gauge_value"` + TraceID string `mapstructure:"trace_id" avro:"trace_id"` + SpanID string `mapstructure:"span_id" avro:"span_id"` +} + +// GaugeDataPointExemplarAttribute - struct modeling a Gauge Datapoint Exemplar attribute +type GaugeDataPointExemplarAttribute struct { + GaugeID string `avro:"gauge_id"` + DatapointID string `avro:"datapoint_id"` + ExemplarID string `avro:"exemplar_id"` + Key string `avro:"key"` + AttributeValue `mapstructure:",squash"` +} + +// GaugeResourceAttribute - struct modeling a Gauge resource attribute +type GaugeResourceAttribute struct { + GaugeID string `avro:"gauge_id"` + Key string `avro:"key"` + AttributeValue `mapstructure:",squash"` +} + +// GaugeScopeAttribute - struct modeling a Gauge Scope attribute +type GaugeScopeAttribute struct { + GaugeID string `avro:"gauge_id"` + ScopeName string `avro:"name"` + ScopeVersion string `avro:"version"` + Key string `avro:"key"` + AttributeValue `mapstructure:",squash"` +} + +// END Gauge + +// Sum + +// Sum - struct modeling a Sum metric +type Sum struct { + SumID string `avro:"sum_id"` + MetricName string `avro:"metric_name"` + Description string `avro:"metric_description"` + Unit string `avro:"metric_unit"` + pmetric.AggregationTemporality `avro:"aggregation_temporality"` + IsMonotonic int8 `avro:"is_monotonic"` +} + +// SumDatapoint - struct modeling a Sum Datapoint +type SumDatapoint struct { + SumID string `avro:"sum_id"` + ID string `avro:"id"` + StartTimeUnix int64 `mapstructure:"start_time_unix" avro:"start_time_unix"` + TimeUnix int64 `mapstructure:"time_unix" avro:"time_unix"` + SumValue float64 `mapstructure:"sum_value" avro:"sum_value"` + Flags int `mapstructure:"flags" avro:"flags"` +} + +// SumDataPointAttribute - struct modeling a Sum Datapoint attribute +type SumDataPointAttribute struct { + SumID string `avro:"sum_id"` + DatapointID string `avro:"datapoint_id"` + Key string `avro:"key"` + AttributeValue `mapstructure:",squash"` +} + +// SumDatapointExemplar - struct modeling a Sum Datapoint Exemplar +type SumDatapointExemplar struct { + SumID string `avro:"sum_id"` + DatapointID string `avro:"datapoint_id"` + ExemplarID string `avro:"exemplar_id"` + TimeUnix int64 `mapstructure:"time_unix" avro:"time_unix"` + SumValue float64 `mapstructure:"sum_value" avro:"sum_value"` + TraceID string `mapstructure:"trace_id" avro:"trace_id"` + SpanID string `mapstructure:"span_id" avro:"span_id"` +} + +// SumDataPointExemplarAttribute - struct modeling a Sum Datapoint Exemplar attribute +type SumDataPointExemplarAttribute struct { + SumID string `avro:"sum_id"` + DatapointID string `avro:"datapoint_id"` + ExemplarID string `avro:"exemplar_id"` + Key string `avro:"key"` + AttributeValue `mapstructure:",squash"` +} + +// SumResourceAttribute - struct modeling a Sum Resource attribute +type SumResourceAttribute struct { + SumID string `avro:"sum_id"` + Key string `avro:"key"` + AttributeValue `mapstructure:",squash"` +} + +// SumScopeAttribute - struct modeling a Sum Scope attribute +type SumScopeAttribute struct { + SumID string `avro:"sum_id"` + ScopeName string `avro:"name"` + ScopeVersion string `avro:"version"` + Key string `avro:"key"` + AttributeValue `mapstructure:",squash"` +} + +// END Sum + +// Histogram + +// Histogram - struct modeling a Histogram metric type +type Histogram struct { + HistogramID string `avro:"histogram_id"` + MetricName string `avro:"metric_name"` + Description string `avro:"metric_description"` + Unit string `avro:"metric_unit"` + pmetric.AggregationTemporality `avro:"aggregation_temporality"` +} + +// HistogramDatapoint - struct modeling a Histogram Datapoint +type HistogramDatapoint struct { + HistogramID string `avro:"histogram_id"` + ID string `avro:"id"` + StartTimeUnix int64 `avro:"start_time_unix"` + TimeUnix int64 `avro:"time_unix"` + Count int64 `avro:"count"` + Sum float64 `avro:"data_sum"` + Min float64 `avro:"data_min"` + Max float64 `avro:"data_max"` + Flags int `avro:"flags"` +} + +// HistogramDataPointAttribute - struct modeling a Histogram Datapoint attribute +type HistogramDataPointAttribute struct { + HistogramID string `avro:"histogram_id"` + DatapointID string `avro:"datapoint_id"` + Key string `avro:"key"` + AttributeValue `mapstructure:",squash"` +} + +// HistogramDatapointBucketCount - struct modeling a Histogram Datapoint Bucket Count +type HistogramDatapointBucketCount struct { + HistogramID string `avro:"histogram_id"` + DatapointID string `avro:"datapoint_id"` + CountID string `avro:"count_id"` + Count int64 `avro:"count"` +} + +// HistogramDatapointExplicitBound - struct modeling a Histogram Datapoint Explicit Bound +type HistogramDatapointExplicitBound struct { + HistogramID string `avro:"histogram_id"` + DatapointID string `avro:"datapoint_id"` + BoundID string `avro:"bound_id"` + ExplicitBound float64 `avro:"explicit_bound"` +} + +// HistogramDatapointExemplar - struct modeling a Histogram Datapoint Exemplar +type HistogramDatapointExemplar struct { + HistogramID string `avro:"histogram_id"` + DatapointID string `avro:"datapoint_id"` + ExemplarID string `avro:"exemplar_id"` + TimeUnix int64 `avro:"time_unix"` + HistogramValue float64 `avro:"histogram_value"` + TraceID string `mapstructure:"trace_id" avro:"trace_id"` + SpanID string `mapstructure:"span_id" avro:"span_id"` +} + +// HistogramDataPointExemplarAttribute - struct modeling a Histogram Datapoint Exemplar attribute +type HistogramDataPointExemplarAttribute struct { + HistogramID string `avro:"histogram_id"` + DatapointID string `avro:"datapoint_id"` + ExemplarID string `avro:"exemplar_id"` + Key string `avro:"key"` + AttributeValue `mapstructure:",squash"` +} + +// HistogramResourceAttribute - struct modeling a Histogram Resource Attribute +type HistogramResourceAttribute struct { + HistogramID string `avro:"histogram_id"` + Key string `avro:"key"` + AttributeValue `mapstructure:",squash"` +} + +// HistogramScopeAttribute - struct modeling a Histogram Scope Attribute +type HistogramScopeAttribute struct { + HistogramID string `avro:"histogram_id"` + ScopeName string `avro:"name"` + ScopeVersion string `avro:"version"` + Key string `avro:"key"` + AttributeValue `mapstructure:",squash"` +} + +// End Histogram + +// Exponential Histogram + +// ExponentialHistogram - struct modeling an Exponential Histogram +type ExponentialHistogram struct { + HistogramID string `avro:"histogram_id"` + MetricName string `avro:"metric_name"` + Description string `avro:"metric_description"` + Unit string `avro:"metric_unit"` + pmetric.AggregationTemporality `avro:"aggregation_temporality"` +} + +// ExponentialHistogramDatapoint - struct modeling an Exponential Histogram Datapoint +type ExponentialHistogramDatapoint struct { + HistogramID string `avro:"histogram_id"` + ID string `avro:"id"` + StartTimeUnix int64 `avro:"start_time_unix"` + TimeUnix int64 `avro:"time_unix"` + Count int64 `avro:"count"` + Sum float64 `avro:"data_sum"` + Min float64 `avro:"data_min"` + Max float64 `avro:"data_max"` + Flags int `avro:"flags"` + Scale int `avro:"scale"` + ZeroCount int64 `avro:"zero_count"` + BucketsPositiveOffset int `avro:"buckets_positive_offset"` + BucketsNegativeOffset int `avro:"buckets_negative_offset"` +} + +// ExponentialHistogramDataPointAttribute - struct modeling an Exponential Histogram Datapoint attribute +type ExponentialHistogramDataPointAttribute struct { + HistogramID string `avro:"histogram_id"` + DatapointID string `avro:"datapoint_id"` + Key string `avro:"key"` + AttributeValue `mapstructure:",squash"` +} + +// ExponentialHistogramBucketNegativeCount - struct modeling an Exponential Histogram Bucket Negative Count +type ExponentialHistogramBucketNegativeCount struct { + HistogramID string `avro:"histogram_id"` + DatapointID string `avro:"datapoint_id"` + CountID string `avro:"count_id"` + Count uint64 `avro:"count"` +} + +// ExponentialHistogramBucketPositiveCount - struct modeling an Exponential Histogram Bucket Positive Count +type ExponentialHistogramBucketPositiveCount struct { + HistogramID string `avro:"histogram_id"` + DatapointID string `avro:"datapoint_id"` + CountID string `avro:"count_id"` + Count int64 `avro:"count"` +} + +// ExponentialHistogramDatapointExemplar - struct modeling an Exponential Histogram Datapoint Exemplar +type ExponentialHistogramDatapointExemplar struct { + HistogramID string `avro:"histogram_id"` + DatapointID string `avro:"datapoint_id"` + ExemplarID string `avro:"exemplar_id"` + TimeUnix int64 `avro:"time_unix"` + HistogramValue float64 `avro:"histogram_value"` + TraceID string `mapstructure:"trace_id" avro:"trace_id"` + SpanID string `mapstructure:"span_id" avro:"span_id"` +} + +// ExponentialHistogramDataPointExemplarAttribute - struct modeling an Exponential Histogram Datapoint Exemplar attribute +type ExponentialHistogramDataPointExemplarAttribute struct { + HistogramID string `avro:"histogram_id"` + DatapointID string `avro:"datapoint_id"` + ExemplarID string `avro:"exemplar_id"` + Key string `avro:"key"` + AttributeValue `mapstructure:",squash"` +} + +// ExponentialHistogramResourceAttribute - struct modeling an Exponential Histogram Resource attribute +type ExponentialHistogramResourceAttribute struct { + HistogramID string `avro:"histogram_id"` + Key string `avro:"key"` + AttributeValue `mapstructure:",squash"` +} + +// ExponentialHistogramScopeAttribute - struct modeling an Exponential Histogram Scope attribute +type ExponentialHistogramScopeAttribute struct { + HistogramID string `avro:"histogram_id"` + ScopeName string `avro:"name"` + ScopeVersion string `avro:"version"` + Key string `avro:"key"` + AttributeValue `mapstructure:",squash"` +} + +// END Exponential Histogram + +// Summary + +// Summary - struct modeling a Summary type metric +type Summary struct { + SummaryID string `avro:"summary_id"` + MetricName string `avro:"metric_name"` + Description string `avro:"metric_description"` + Unit string `avro:"metric_unit"` +} + +// SummaryDatapoint - struct modeling a Summary Datapoint +type SummaryDatapoint struct { + SummaryID string `avro:"summary_id"` + ID string `avro:"id"` + StartTimeUnix int64 `avro:"start_time_unix"` + TimeUnix int64 `avro:"time_unix"` + Count int64 `avro:"count"` + Sum float64 `avro:"data_sum"` + Flags int `avro:"flags"` +} + +// SummaryDataPointAttribute - struct modeling a Summary Datapoint attribute +type SummaryDataPointAttribute struct { + SummaryID string `avro:"summary_id"` + DatapointID string `avro:"datapoint_id"` + Key string `avro:"key"` + AttributeValue `mapstructure:",squash"` +} + +// SummaryDatapointQuantileValues - struct modeling a Summary Datapoint Quantile value +type SummaryDatapointQuantileValues struct { + SummaryID string `avro:"summary_id"` + DatapointID string `avro:"datapoint_id"` + QuantileID string `avro:"quantile_id"` + Quantile float64 `avro:"quantile"` + Value float64 `avro:"value"` +} + +// SummaryResourceAttribute - struct modeling a Summary Resource attribute +type SummaryResourceAttribute struct { + SummaryID string `avro:"summary_id"` + Key string `avro:"key"` + AttributeValue `mapstructure:",squash"` +} + +// SummaryScopeAttribute - struct modeling a Summary Scope attribute +type SummaryScopeAttribute struct { + SummaryID string `avro:"summary_id"` + ScopeName string `avro:"name"` + ScopeVersion string `avro:"version"` + Key string `avro:"key"` + AttributeValue `mapstructure:",squash"` +} + +// END Summary + +// END Metrics Handling + +// writeMetric - a helper method used by different metric persistence methods to write the +// metric data in order. +// +// @receiver kiwriter - pointer to [KiWriter] +// @param metricType - a [pmetric.MetricTypeGauge] or something else converted to string +// @param tableDataMap - a map from table name to the relevant data +// @return error +func (kiwriter *KiWriter) writeMetric(metricType string, tableDataMap *orderedmap.OrderedMap) error { + + kiwriter.logger.Debug("Writing metric", zap.String("Type", metricType)) + + var errs []error + errsChan := make(chan error, tableDataMap.Len()) + + wg := &sync.WaitGroup{} + for pair := tableDataMap.Oldest(); pair != nil; pair = pair.Next() { + tableName := pair.Key.(string) + data := pair.Value.([]any) + + wg.Add(1) + + go func(tableName string, data []any, wg *sync.WaitGroup) { + err := kiwriter.doChunkedInsert(context.TODO(), tableName, data) + if err != nil { + errsChan <- err + } + wg.Done() + }(tableName, data, wg) + + } + wg.Wait() + + close(errsChan) + + var insErrs error + for err := range errsChan { + insErrs = multierr.Append(insErrs, err) + } + errs = append(errs, insErrs) + return multierr.Combine(errs...) +} + +func (kiwriter *KiWriter) persistGaugeRecord(gaugeRecords []kineticaGaugeRecord) error { + kiwriter.logger.Debug("In persistGaugeRecord ...") + + var errs []error + var gauges []any + var resourceAttributes []any + var scopeAttributes []any + var datapoints []any + var datapointAttributes []any + var exemplars []any + var exemplarAttributes []any + + for _, gaugerecord := range gaugeRecords { + + gauges = append(gauges, *gaugerecord.gauge) + + for _, gr := range gaugerecord.resourceAttribute { + resourceAttributes = append(resourceAttributes, gr) + } + + for _, sa := range gaugerecord.scopeAttribute { + scopeAttributes = append(scopeAttributes, sa) + } + + for _, dp := range gaugerecord.datapoint { + datapoints = append(datapoints, dp) + } + + for _, dpattr := range gaugerecord.datapointAttribute { + datapointAttributes = append(datapointAttributes, dpattr) + } + + for _, ge := range gaugerecord.exemplars { + exemplars = append(exemplars, ge) + } + + for _, geattr := range gaugerecord.exemplarAttribute { + exemplarAttributes = append(exemplarAttributes, geattr) + } + + } + + tableDataMap := orderedmap.New() + + tableDataMap.Set(GaugeTable, gauges) + tableDataMap.Set(GaugeDatapointTable, datapoints) + tableDataMap.Set(GaugeDatapointAttributeTable, datapointAttributes) + tableDataMap.Set(GaugeResourceAttributeTable, resourceAttributes) + tableDataMap.Set(GaugeScopeAttributeTable, scopeAttributes) + tableDataMap.Set(GaugeDatapointExemplarTable, exemplars) + tableDataMap.Set(GaugeDatapointExemplarAttributeTable, exemplarAttributes) + + errs = append(errs, kiwriter.writeMetric(pmetric.MetricTypeGauge.String(), tableDataMap)) + + return multierr.Combine(errs...) +} + +func (kiwriter *KiWriter) persistSumRecord(sumRecords []kineticaSumRecord) error { + kiwriter.logger.Debug("In persistSumRecord ...") + + var errs []error + + var sums []any + var resourceAttributes []any + var scopeAttributes []any + var datapoints []any + var datapointAttributes []any + var exemplars []any + var exemplarAttributes []any + + for _, sumrecord := range sumRecords { + + sums = append(sums, *sumrecord.sum) + + for _, sr := range sumrecord.sumResourceAttribute { + resourceAttributes = append(resourceAttributes, sr) + } + + for _, sa := range sumrecord.sumScopeAttribute { + scopeAttributes = append(scopeAttributes, sa) + } + + for _, dp := range sumrecord.datapoint { + datapoints = append(datapoints, dp) + } + + for _, dpattr := range sumrecord.datapointAttribute { + datapointAttributes = append(datapointAttributes, dpattr) + } + + for _, se := range sumrecord.exemplars { + exemplars = append(exemplars, se) + } + + for _, seattr := range sumrecord.exemplarAttribute { + exemplarAttributes = append(exemplarAttributes, seattr) + } + + } + + tableDataMap := orderedmap.New() + + tableDataMap.Set(SumTable, sums) + tableDataMap.Set(SumDatapointTable, datapoints) + tableDataMap.Set(SumDatapointAttributeTable, datapointAttributes) + tableDataMap.Set(SumResourceAttributeTable, resourceAttributes) + tableDataMap.Set(SumScopeAttributeTable, scopeAttributes) + tableDataMap.Set(SumDatapointExemplarTable, exemplars) + tableDataMap.Set(SumDataPointExemplarAttributeTable, exemplarAttributes) + + errs = append(errs, kiwriter.writeMetric(pmetric.MetricTypeSum.String(), tableDataMap)) + + return multierr.Combine(errs...) +} + +func (kiwriter *KiWriter) persistHistogramRecord(histogramRecords []kineticaHistogramRecord) error { + kiwriter.logger.Debug("In persistHistogramRecord ...") + + var errs []error + + var histograms []any + var resourceAttributes []any + var scopeAttributes []any + var datapoints []any + var datapointAttributes []any + var bucketCounts []any + var explicitBounds []any + var exemplars []any + var exemplarAttributes []any + + for _, histogramrecord := range histogramRecords { + + histograms = append(histograms, *histogramrecord.histogram) + + for _, ra := range histogramrecord.histogramResourceAttribute { + resourceAttributes = append(resourceAttributes, ra) + } + + for _, sa := range histogramrecord.histogramScopeAttribute { + scopeAttributes = append(scopeAttributes, sa) + } + + for _, dp := range histogramrecord.histogramDatapoint { + datapoints = append(datapoints, dp) + } + + for _, dpattr := range histogramrecord.histogramDatapointAtribute { + datapointAttributes = append(datapointAttributes, dpattr) + } + + for _, bc := range histogramrecord.histogramBucketCount { + bucketCounts = append(bucketCounts, bc) + } + + for _, eb := range histogramrecord.histogramExplicitBound { + explicitBounds = append(explicitBounds, eb) + } + + for _, ex := range histogramrecord.exemplars { + exemplars = append(exemplars, ex) + } + + for _, exattr := range histogramrecord.exemplarAttribute { + exemplarAttributes = append(exemplarAttributes, exattr) + } + } + + tableDataMap := orderedmap.New() + + tableDataMap.Set(HistogramTable, histograms) + tableDataMap.Set(HistogramDatapointTable, datapoints) + tableDataMap.Set(HistogramDatapointAttributeTable, datapointAttributes) + tableDataMap.Set(HistogramBucketCountsTable, bucketCounts) + tableDataMap.Set(HistogramExplicitBoundsTable, explicitBounds) + tableDataMap.Set(HistogramResourceAttributeTable, resourceAttributes) + tableDataMap.Set(HistogramScopeAttributeTable, scopeAttributes) + tableDataMap.Set(HistogramDatapointExemplarTable, exemplars) + tableDataMap.Set(HistogramDataPointExemplarAttributeTable, exemplarAttributes) + + errs = append(errs, kiwriter.writeMetric(pmetric.MetricTypeHistogram.String(), tableDataMap)) + + return multierr.Combine(errs...) +} + +func (kiwriter *KiWriter) persistExponentialHistogramRecord(exponentialHistogramRecords []kineticaExponentialHistogramRecord) error { + kiwriter.logger.Debug("In persistExponentialHistogramRecord ...") + + var errs []error + + var histograms []any + var resourceAttributes []any + var scopeAttributes []any + var datapoints []any + var datapointAttributes []any + var positiveBucketCounts []any + var negativeBucketCounts []any + var exemplars []any + var exemplarAttributes []any + + for _, histogramrecord := range exponentialHistogramRecords { + + histograms = append(histograms, *histogramrecord.histogram) + + for _, ra := range histogramrecord.histogramResourceAttribute { + resourceAttributes = append(resourceAttributes, ra) + } + + for _, sa := range histogramrecord.histogramScopeAttribute { + scopeAttributes = append(scopeAttributes, sa) + } + + for _, dp := range histogramrecord.histogramDatapoint { + datapoints = append(datapoints, dp) + } + + for _, dpattr := range histogramrecord.histogramDatapointAttribute { + datapointAttributes = append(datapointAttributes, dpattr) + } + + for _, posbc := range histogramrecord.histogramBucketPositiveCount { + positiveBucketCounts = append(positiveBucketCounts, posbc) + } + + for _, negbc := range histogramrecord.histogramBucketNegativeCount { + negativeBucketCounts = append(negativeBucketCounts, negbc) + } + + for _, ex := range histogramrecord.exemplars { + exemplars = append(exemplars, ex) + } + + for _, exattr := range histogramrecord.exemplarAttribute { + exemplarAttributes = append(exemplarAttributes, exattr) + } + } + + tableDataMap := orderedmap.New() + + tableDataMap.Set(ExpHistogramTable, histograms) + tableDataMap.Set(ExpHistogramDatapointTable, datapoints) + tableDataMap.Set(ExpHistogramDatapointAttributeTable, datapointAttributes) + tableDataMap.Set(ExpHistogramPositiveBucketCountsTable, positiveBucketCounts) + tableDataMap.Set(ExpHistogramNegativeBucketCountsTable, negativeBucketCounts) + tableDataMap.Set(ExpHistogramResourceAttributeTable, resourceAttributes) + tableDataMap.Set(ExpHistogramScopeAttributeTable, scopeAttributes) + tableDataMap.Set(ExpHistogramDatapointExemplarTable, exemplars) + tableDataMap.Set(ExpHistogramDataPointExemplarAttributeTable, exemplarAttributes) + + errs = append(errs, kiwriter.writeMetric(pmetric.MetricTypeExponentialHistogram.String(), tableDataMap)) + + return multierr.Combine(errs...) +} + +func (kiwriter *KiWriter) persistSummaryRecord(summaryRecords []kineticaSummaryRecord) error { + kiwriter.logger.Debug("In persistSummaryRecord ...") + + var errs []error + + var summaries []any + var resourceAttributes []any + var scopeAttributes []any + var datapoints []any + var datapointAttributes []any + var datapointQuantiles []any + + for _, summaryrecord := range summaryRecords { + + summaries = append(summaries, *summaryrecord.summary) + + for _, ra := range summaryrecord.summaryResourceAttribute { + resourceAttributes = append(resourceAttributes, ra) + } + + for _, sa := range summaryrecord.summaryScopeAttribute { + scopeAttributes = append(scopeAttributes, sa) + } + + for _, dp := range summaryrecord.summaryDatapoint { + datapoints = append(datapoints, dp) + } + + for _, dpattr := range summaryrecord.summaryDatapointAttribute { + datapointAttributes = append(datapointAttributes, dpattr) + } + + for _, dpq := range summaryrecord.summaryDatapointQuantileValues { + datapointQuantiles = append(datapointQuantiles, dpq) + } + } + + tableDataMap := orderedmap.New() + + tableDataMap.Set(SummaryTable, summaries) + tableDataMap.Set(SummaryDatapointTable, datapoints) + tableDataMap.Set(SummaryDatapointAttributeTable, datapointAttributes) + tableDataMap.Set(SummaryDatapointQuantileValueTable, datapointQuantiles) + tableDataMap.Set(SummaryResourceAttributeTable, resourceAttributes) + tableDataMap.Set(SummaryScopeAttributeTable, scopeAttributes) + + errs = append(errs, kiwriter.writeMetric(pmetric.MetricTypeSummary.String(), tableDataMap)) + + return multierr.Combine(errs...) + +} + +func (kiwriter *KiWriter) doChunkedInsert(_ context.Context, tableName string, records []any) error { + + // Build the final table name with the schema prepended + var finalTable string + if len(kiwriter.cfg.Schema) != 0 { + finalTable = fmt.Sprintf("%s.%s", kiwriter.cfg.Schema, tableName) + } else { + finalTable = tableName + } + + kiwriter.logger.Debug("Writing to - ", zap.String("Table", finalTable), zap.Int("Record count", len(records))) + + recordChunks := chunkBySize(records, ChunkSize) + + errsChan := make(chan error, len(recordChunks)) + + wg := &sync.WaitGroup{} + + for _, recordChunk := range recordChunks { + wg.Add(1) + go func(data []any, wg *sync.WaitGroup) { + _, err := kiwriter.Db.InsertRecordsRaw(context.TODO(), finalTable, data) + errsChan <- err + + wg.Done() + }(recordChunk, wg) + } + wg.Wait() + close(errsChan) + var errs error + for err := range errsChan { + errs = multierr.Append(errs, err) + } + return errs +}