From 5c820a27e4a312dac3f294541828956287c9c472 Mon Sep 17 00:00:00 2001 From: jbeemster Date: Wed, 24 Jan 2024 09:36:05 +0100 Subject: [PATCH] Update Collector to v3.0.1 (closes #14) --- README.md | 19 ++++++++------ examples/confluent/main.tf | 14 +++++++---- examples/default/main.tf | 15 ++++++------ main.tf | 12 +++++---- templates/config.hocon.tmpl | 49 ++++++++++++++++++++----------------- templates/user-data.sh.tmpl | 2 ++ variables.tf | 27 ++++++++++++++------ 7 files changed, 83 insertions(+), 55 deletions(-) diff --git a/README.md b/README.md index 7c9881d..464ba31 100644 --- a/README.md +++ b/README.md @@ -51,7 +51,7 @@ module "bad_1_eh_topic" { module "collector_lb" { source = "snowplow-devops/lb/azurerm" - version = "0.1.1" + version = "0.2.0" name = "collector-lb" resource_group_name = var.resource_group_name @@ -71,10 +71,11 @@ module "collector_event_hub" { ingress_port = module.collector_lb.agw_backend_egress_port - good_topic_name = module.raw_eh_topic.name - bad_topic_name = module.bad_1_eh_topic.name - kafka_brokers = module.pipeline_eh_namespace.broker - kafka_password = module.pipeline_eh_namespace.read_write_primary_connection_string + good_topic_name = module.raw_eh_topic.name + good_topic_kafka_password = module.raw_eh_topic.read_write_primary_connection_string + bad_topic_name = module.bad_1_eh_topic.name + bad_topic_kafka_password = module.bad_1_eh_topic.read_write_primary_connection_string + kafka_brokers = module.pipeline_eh_namespace.broker ssh_public_key = "your-public-key-here" ssh_ip_allowlist = ["0.0.0.0/0"] @@ -116,25 +117,27 @@ module "collector_event_hub" { | Name | Description | Type | Default | Required | |------|-------------|------|---------|:--------:| +| [bad\_topic\_kafka\_password](#input\_bad\_topic\_kafka\_password) | Password for connection to Kafka cluster under PlainLoginModule (note: as default the EventHubs topic connection string for writing is expected) | `string` | n/a | yes | | [bad\_topic\_name](#input\_bad\_topic\_name) | The name of the bad Kafka topic that the collector will insert failed data into | `string` | n/a | yes | +| [good\_topic\_kafka\_password](#input\_good\_topic\_kafka\_password) | Password for connection to Kafka cluster under PlainLoginModule (note: as default the EventHubs topic connection string for writing is expected) | `string` | n/a | yes | | [good\_topic\_name](#input\_good\_topic\_name) | The name of the good Kafka topic that the collector will insert good data into | `string` | n/a | yes | | [ingress\_port](#input\_ingress\_port) | The port that the collector will be bound to and expose over HTTP | `number` | n/a | yes | | [kafka\_brokers](#input\_kafka\_brokers) | The brokers to configure for access to the Kafka Cluster (note: as default the EventHubs namespace broker) | `string` | n/a | yes | -| [kafka\_password](#input\_kafka\_password) | Password for connection to Kafka cluster under PlainLoginModule (note: as default the EventHubs namespace connection string is expected) | `string` | n/a | yes | | [name](#input\_name) | A name which will be pre-pended to the resources created | `string` | n/a | yes | | [resource\_group\_name](#input\_resource\_group\_name) | The name of the resource group to deploy the service into | `string` | n/a | yes | | [ssh\_public\_key](#input\_ssh\_public\_key) | The SSH public key attached for access to the servers | `string` | n/a | yes | | [subnet\_id](#input\_subnet\_id) | The subnet id to deploy the load balancer across | `string` | n/a | yes | | [accept\_limited\_use\_license](#input\_accept\_limited\_use\_license) | Acceptance of the SLULA terms (https://docs.snowplow.io/limited-use-license-1.0/) | `bool` | `false` | no | -| [app\_version](#input\_app\_version) | App version to use. This variable facilitates dev flow, the modules may not work with anything other than the default value. | `string` | `"2.9.0"` | no | +| [app\_version](#input\_app\_version) | App version to use. This variable facilitates dev flow, the modules may not work with anything other than the default value. | `string` | `"3.0.1"` | no | | [application\_gateway\_backend\_address\_pool\_ids](#input\_application\_gateway\_backend\_address\_pool\_ids) | The ID of an Application Gateway backend address pool to bind the VM scale-set to the load balancer | `list(string)` | `[]` | no | | [associate\_public\_ip\_address](#input\_associate\_public\_ip\_address) | Whether to assign a public ip address to this instance | `bool` | `true` | no | +| [bad\_topic\_kafka\_username](#input\_bad\_topic\_kafka\_username) | Username for connection to Kafka cluster under PlainLoginModule (default: '$ConnectionString' which is used for EventHubs) | `string` | `"$ConnectionString"` | no | | [byte\_limit](#input\_byte\_limit) | The amount of bytes to buffer events before pushing them to Kinesis | `number` | `1000000` | no | | [cookie\_domain](#input\_cookie\_domain) | Optional first party cookie domain for the collector to set cookies on (e.g. acme.com) | `string` | `""` | no | | [custom\_paths](#input\_custom\_paths) | Optional custom paths that the collector will respond to, typical paths to override are '/com.snowplowanalytics.snowplow/tp2', '/com.snowplowanalytics.iglu/v1' and '/r/tp2'. e.g. { "/custom/path/" : "/com.snowplowanalytics.snowplow/tp2"} | `map(string)` | `{}` | no | +| [good\_topic\_kafka\_username](#input\_good\_topic\_kafka\_username) | Username for connection to Kafka cluster under PlainLoginModule (default: '$ConnectionString' which is used for EventHubs) | `string` | `"$ConnectionString"` | no | | [java\_opts](#input\_java\_opts) | Custom JAVA Options | `string` | `"-XX:InitialRAMPercentage=75 -XX:MaxRAMPercentage=75"` | no | | [kafka\_source](#input\_kafka\_source) | The source providing the Kafka connectivity (def: azure\_event\_hubs) | `string` | `"azure_event_hubs"` | no | -| [kafka\_username](#input\_kafka\_username) | Username for connection to Kafka cluster under PlainLoginModule (default: '$ConnectionString' which is used for EventHubs) | `string` | `"$ConnectionString"` | no | | [record\_limit](#input\_record\_limit) | The number of events to buffer before pushing them to Kinesis | `number` | `500` | no | | [ssh\_ip\_allowlist](#input\_ssh\_ip\_allowlist) | The comma-seperated list of CIDR ranges to allow SSH traffic from | `list(string)` |
[
"0.0.0.0/0"
]
| no | | [tags](#input\_tags) | The tags to append to this resource | `map(string)` | `{}` | no | diff --git a/examples/confluent/main.tf b/examples/confluent/main.tf index 02633f2..3663354 100644 --- a/examples/confluent/main.tf +++ b/examples/confluent/main.tf @@ -46,6 +46,8 @@ module "collector_lb" { module "collector_event_hub" { source = "../.." + accept_limited_use_license = true + name = "${local.name}-collector-server" resource_group_name = azurerm_resource_group.rg.name subnet_id = lookup(module.vnet.vnet_subnets_name_id, "pipeline1") @@ -54,11 +56,13 @@ module "collector_event_hub" { ingress_port = module.collector_lb.agw_backend_egress_port - good_topic_name = local.good_topic_name - bad_topic_name = local.bad_topic_name - kafka_brokers = local.kafka_brokers - kafka_username = local.kafka_username - kafka_password = local.kafka_password + good_topic_name = local.good_topic_name + good_topic_kafka_username = local.kafka_username + good_topic_kafka_password = local.kafka_password + bad_topic_name = local.bad_topic_name + bad_topic_kafka_username = local.kafka_username + bad_topic_kafka_password = local.kafka_password + kafka_brokers = local.kafka_brokers kafka_source = "confluent_cloud" diff --git a/examples/default/main.tf b/examples/default/main.tf index 54fff58..c9eaf91 100644 --- a/examples/default/main.tf +++ b/examples/default/main.tf @@ -38,7 +38,6 @@ module "bad_1_eh_topic" { resource_group_name = azurerm_resource_group.rg.name } - module "vnet" { source = "snowplow-devops/vnet/azurerm" version = "0.1.2" @@ -49,10 +48,9 @@ module "vnet" { depends_on = [azurerm_resource_group.rg] } - module "collector_lb" { source = "snowplow-devops/lb/azurerm" - version = "0.1.1" + version = "0.2.0" name = "${local.name}-clb" resource_group_name = azurerm_resource_group.rg.name @@ -66,6 +64,8 @@ module "collector_lb" { module "collector_event_hub" { source = "../.." + accept_limited_use_license = true + name = "${local.name}-collector-server" resource_group_name = azurerm_resource_group.rg.name subnet_id = lookup(module.vnet.vnet_subnets_name_id, "pipeline1") @@ -74,10 +74,11 @@ module "collector_event_hub" { ingress_port = module.collector_lb.agw_backend_egress_port - good_topic_name = module.raw_eh_topic.name - bad_topic_name = module.bad_1_eh_topic.name - kafka_brokers = module.pipeline_eh_namespace.broker - kafka_password = module.pipeline_eh_namespace.read_write_primary_connection_string + good_topic_name = module.raw_eh_topic.name + good_topic_kafka_password = module.raw_eh_topic.read_write_primary_connection_string + bad_topic_name = module.bad_1_eh_topic.name + bad_topic_kafka_password = module.bad_1_eh_topic.read_write_primary_connection_string + kafka_brokers = module.pipeline_eh_namespace.broker ssh_public_key = local.ssh_public_key ssh_ip_allowlist = ["0.0.0.0/0"] diff --git a/main.tf b/main.tf index 86270b8..121eb07 100644 --- a/main.tf +++ b/main.tf @@ -115,11 +115,13 @@ locals { paths = var.custom_paths cookie_domain = var.cookie_domain - good_topic_name = var.good_topic_name - bad_topic_name = var.bad_topic_name - kafka_brokers = var.kafka_brokers - kafka_username = var.kafka_username - kafka_password = var.kafka_password + good_topic_name = var.good_topic_name + good_topic_kafka_username = var.good_topic_kafka_username + good_topic_kafka_password = var.good_topic_kafka_password + bad_topic_name = var.bad_topic_name + bad_topic_kafka_username = var.bad_topic_kafka_username + bad_topic_kafka_password = var.bad_topic_kafka_password + kafka_brokers = var.kafka_brokers byte_limit = var.byte_limit record_limit = var.record_limit diff --git a/templates/config.hocon.tmpl b/templates/config.hocon.tmpl index b7b8d4b..e69a558 100644 --- a/templates/config.hocon.tmpl +++ b/templates/config.hocon.tmpl @@ -59,18 +59,36 @@ collector { enabled = false } streams { - good = ${good_topic_name} - bad = ${bad_topic_name} useIpAddressAsPartitionKey = false - sink { - enabled = kafka + good { + name = ${good_topic_name} brokers = "${kafka_brokers}" - producerConf = {"security.protocol":"SASL_SSL","sasl.mechanism":"PLAIN","sasl.jaas.config":"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${kafka_username}\" password=\"${kafka_password}\";"} + producerConf { + "client.id" = $${PRODUCER_GOOD_CLIENT_ID} + "security.protocol" = "SASL_SSL" + "sasl.mechanism" = "PLAIN" + "sasl.jaas.config" = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${good_topic_kafka_username}\" password=\"${good_topic_kafka_password}\";" + } + buffer { + byteLimit = ${byte_limit} + recordLimit = ${record_limit} + timeLimit = ${time_limit_ms} + } } - buffer { - byteLimit = ${byte_limit} - recordLimit = ${record_limit} - timeLimit = ${time_limit_ms} + bad { + name = ${bad_topic_name} + brokers = "${kafka_brokers}" + producerConf { + "client.id" = $${PRODUCER_BAD_CLIENT_ID} + "security.protocol" = "SASL_SSL" + "sasl.mechanism" = "PLAIN" + "sasl.jaas.config" = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${bad_topic_kafka_username}\" password=\"${bad_topic_kafka_password}\";" + } + buffer { + byteLimit = ${byte_limit} + recordLimit = ${record_limit} + timeLimit = ${time_limit_ms} + } } } telemetry { @@ -87,16 +105,3 @@ collector { instanceId = $${INSTANCE_ID} } } -akka { - loglevel = WARNING - loggers = ["akka.event.slf4j.Slf4jLogger"] - http.server { - remote-address-header = on - raw-request-uri-header = on - parsing { - max-uri-length = 32768 - uri-parsing-mode = relaxed - } - max-connections = 2048 - } -} diff --git a/templates/user-data.sh.tmpl b/templates/user-data.sh.tmpl index f5b2a72..249bf88 100644 --- a/templates/user-data.sh.tmpl +++ b/templates/user-data.sh.tmpl @@ -19,6 +19,8 @@ sudo docker run \ --env JDK_JAVA_OPTIONS='${java_opts}' \ --env ACCEPT_LIMITED_USE_LICENSE=${accept_limited_use_license} \ --env INSTANCE_ID=$(get_instance_id) \ + --env PRODUCER_BAD_CLIENT_ID="$${HOSTNAME}-bad" \ + --env PRODUCER_GOOD_CLIENT_ID="$${HOSTNAME}-good" \ snowplow/scala-stream-collector-kafka:${version} \ --config /snowplow/config/collector.hocon diff --git a/variables.tf b/variables.tf index b009daf..006c988 100644 --- a/variables.tf +++ b/variables.tf @@ -22,7 +22,7 @@ variable "resource_group_name" { variable "app_version" { description = "App version to use. This variable facilitates dev flow, the modules may not work with anything other than the default value." type = string - default = "2.9.0" + default = "3.0.1" } variable "subnet_id" { @@ -89,24 +89,35 @@ variable "good_topic_name" { type = string } -variable "bad_topic_name" { - description = "The name of the bad Kafka topic that the collector will insert failed data into" +variable "good_topic_kafka_username" { + description = "Username for connection to Kafka cluster under PlainLoginModule (default: '$ConnectionString' which is used for EventHubs)" type = string + default = "$ConnectionString" } -variable "kafka_brokers" { - description = "The brokers to configure for access to the Kafka Cluster (note: as default the EventHubs namespace broker)" +variable "good_topic_kafka_password" { + description = "Password for connection to Kafka cluster under PlainLoginModule (note: as default the EventHubs topic connection string for writing is expected)" + type = string +} + +variable "bad_topic_name" { + description = "The name of the bad Kafka topic that the collector will insert failed data into" type = string } -variable "kafka_username" { +variable "bad_topic_kafka_username" { description = "Username for connection to Kafka cluster under PlainLoginModule (default: '$ConnectionString' which is used for EventHubs)" type = string default = "$ConnectionString" } -variable "kafka_password" { - description = "Password for connection to Kafka cluster under PlainLoginModule (note: as default the EventHubs namespace connection string is expected)" +variable "bad_topic_kafka_password" { + description = "Password for connection to Kafka cluster under PlainLoginModule (note: as default the EventHubs topic connection string for writing is expected)" + type = string +} + +variable "kafka_brokers" { + description = "The brokers to configure for access to the Kafka Cluster (note: as default the EventHubs namespace broker)" type = string }