Skip to content

Commit

Permalink
docs: Add samples and tests for ingestion from Kafka sources (googlea…
Browse files Browse the repository at this point in the history
…pis#2315)

* docs: Add samples and tests for ingestion from Kafka sources

* docs: Styles fixes for samples/tests
  • Loading branch information
michaelpri10 authored Jan 23, 2025
1 parent 5e80b57 commit eea603b
Show file tree
Hide file tree
Showing 4 changed files with 355 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package pubsub;

// [START pubsub_create_topic_with_aws_msk_ingestion]

import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithAwsMskIngestionExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String topicId = "your-topic-id";
// AWS MSK ingestion settings.
String clusterArn = "cluster-arn";
String mskTopic = "msk-topic";
String awsRoleArn = "aws-role-arn";
String gcpServiceAccount = "gcp-service-account";

createTopicWithAwsMskIngestionExample(
projectId, topicId, clusterArn, mskTopic, awsRoleArn, gcpServiceAccount);
}

public static void createTopicWithAwsMskIngestionExample(
String projectId,
String topicId,
String clusterArn,
String mskTopic,
String awsRoleArn,
String gcpServiceAccount)
throws IOException {
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
TopicName topicName = TopicName.of(projectId, topicId);

IngestionDataSourceSettings.AwsMsk awsMsk =
IngestionDataSourceSettings.AwsMsk.newBuilder()
.setClusterArn(clusterArn)
.setTopic(mskTopic)
.setAwsRoleArn(awsRoleArn)
.setGcpServiceAccount(gcpServiceAccount)
.build();
IngestionDataSourceSettings ingestionDataSourceSettings =
IngestionDataSourceSettings.newBuilder().setAwsMsk(awsMsk).build();

Topic topic =
topicAdminClient.createTopic(
Topic.newBuilder()
.setName(topicName.toString())
.setIngestionDataSourceSettings(ingestionDataSourceSettings)
.build());

System.out.println("Created topic with AWS MSK ingestion settings: " + topic.getAllFields());
}
}
}
// [END pubsub_create_topic_with_aws_msk_ingestion]
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package pubsub;

// [START pubsub_create_topic_with_azure_event_hubs_ingestion]

import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithAzureEventHubsIngestionExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String topicId = "your-topic-id";
// Azure Event Hubs ingestion settings.
String resourceGroup = "resource-group";
String namespace = "namespace";
String eventHub = "event-hub";
String clientId = "client-id";
String tenantId = "tenant-id";
String subscriptionId = "subscription-id";
String gcpServiceAccount = "gcp-service-account";

createTopicWithAzureEventHubsIngestionExample(
projectId,
topicId,
resourceGroup,
namespace,
eventHub,
clientId,
tenantId,
subscriptionId,
gcpServiceAccount);
}

public static void createTopicWithAzureEventHubsIngestionExample(
String projectId,
String topicId,
String resourceGroup,
String namespace,
String eventHub,
String clientId,
String tenantId,
String subscriptionId,
String gcpServiceAccount)
throws IOException {
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
TopicName topicName = TopicName.of(projectId, topicId);

IngestionDataSourceSettings.AzureEventHubs azureEventHubs =
IngestionDataSourceSettings.AzureEventHubs.newBuilder()
.setResourceGroup(resourceGroup)
.setNamespace(namespace)
.setEventHub(eventHub)
.setClientId(clientId)
.setTenantId(tenantId)
.setSubscriptionId(subscriptionId)
.setGcpServiceAccount(gcpServiceAccount)
.build();
IngestionDataSourceSettings ingestionDataSourceSettings =
IngestionDataSourceSettings.newBuilder().setAzureEventHubs(azureEventHubs).build();

Topic topic =
topicAdminClient.createTopic(
Topic.newBuilder()
.setName(topicName.toString())
.setIngestionDataSourceSettings(ingestionDataSourceSettings)
.build());

System.out.println(
"Created topic with Azure Event Hubs ingestion settings: " + topic.getAllFields());
}
}
}
// [END pubsub_create_topic_with_azure_event_hubs_ingestion]
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package pubsub;

// [START pubsub_create_topic_with_confluent_cloud_ingestion]

import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithConfluentCloudIngestionExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String topicId = "your-topic-id";
// Confluent Cloud ingestion settings.
String bootstrapServer = "bootstrap-server";
String clusterId = "cluster-id";
String confluentTopic = "confluent-topic";
String identityPoolId = "identity-pool-id";
String gcpServiceAccount = "gcp-service-account";

createTopicWithConfluentCloudIngestionExample(
projectId,
topicId,
bootstrapServer,
clusterId,
confluentTopic,
identityPoolId,
gcpServiceAccount);
}

public static void createTopicWithConfluentCloudIngestionExample(
String projectId,
String topicId,
String bootstrapServer,
String clusterId,
String confluentTopic,
String identityPoolId,
String gcpServiceAccount)
throws IOException {
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
TopicName topicName = TopicName.of(projectId, topicId);

IngestionDataSourceSettings.ConfluentCloud confluentCloud =
IngestionDataSourceSettings.ConfluentCloud.newBuilder()
.setBootstrapServer(bootstrapServer)
.setClusterId(clusterId)
.setTopic(confluentTopic)
.setIdentityPoolId(identityPoolId)
.setGcpServiceAccount(gcpServiceAccount)
.build();
IngestionDataSourceSettings ingestionDataSourceSettings =
IngestionDataSourceSettings.newBuilder().setConfluentCloud(confluentCloud).build();

Topic topic =
topicAdminClient.createTopic(
Topic.newBuilder()
.setName(topicName.toString())
.setIngestionDataSourceSettings(ingestionDataSourceSettings)
.build());

System.out.println(
"Created topic with Confluent Cloud ingestion settings: " + topic.getAllFields());
}
}
}
// [END pubsub_create_topic_with_confluent_cloud_ingestion]
104 changes: 102 additions & 2 deletions samples/snippets/src/test/java/pubsub/AdminIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public class AdminIT {
private static final String kinesisIngestionTopicId = "kinesis-ingestion-topic-" + _suffix;
private static final String cloudStorageIngestionTopicId =
"cloud-storage-ingestion-topic-" + _suffix;
private static final String awsMskIngestionTopicId = "aws-msk-ingestion-topic-" + _suffix;
private static final String confluentCloudIngestionTopicId =
"confluent-cloud-ingestion-topic-" + _suffix;
private static final String azureEventHubsIngestionTopicId =
"azure-event-hubs-ingestion-topic-" + _suffix;
private static final String pullSubscriptionId = "iam-pull-subscription-" + _suffix;
private static final String pushSubscriptionId = "iam-push-subscription-" + _suffix;
private static final String orderedSubscriptionId = "iam-ordered-subscription-" + _suffix;
Expand All @@ -66,6 +71,9 @@ public class AdminIT {
"java_samples_data_set" + _suffix.replace("-", "_");
private static final String bigquerySubscriptionId = "iam-bigquery-subscription-" + _suffix;
private static final String bigqueryTableId = "java_samples_table_" + _suffix;
private static final String gcpServiceAccount =
"fake-service-account@fake-gcp-project.iam.gserviceaccount.com";
// AWS Kinesis ingestion settings.
private static final String streamArn =
"arn:aws:kinesis:us-west-2:111111111111:stream/fake-stream-name";
private static final String consumerArn =
Expand All @@ -75,20 +83,41 @@ public class AdminIT {
"arn:aws:kinesis:us-west-2:111111111111:stream/fake-stream-name/"
+ "consumer/consumer-2:2222222222";
private static final String awsRoleArn = "arn:aws:iam::111111111111:role/fake-role-name";
private static final String gcpServiceAccount =
"fake-service-account@fake-gcp-project.iam.gserviceaccount.com";
// GCS ingestion settings.
private static final String cloudStorageBucket = "pubsub-cloud-storage-bucket";
private static final String cloudStorageInputFormat = "text";
private static final String cloudStorageTextDelimiter = ",";
private static final String cloudStorageMatchGlob = "**.txt";
private static final String cloudStorageMinimumObjectCreateTime = "1970-01-01T00:00:01Z";
private static final String cloudStorageMinimumObjectCreateTimeSeconds = "seconds: 1";
// AWS MSK ingestion settings.
String clusterArn =
"arn:aws:kafka:us-east-1:111111111111:cluster/fake-cluster-name/11111111-1111-1";
String mskTopic = "fake-msk-topic-name";
// Confluent Cloud ingestion settings.
String bootstrapServer = "fake-bootstrap-server-id.us-south1.gcp.confluent.cloud:9092";
String clusterId = "fake-cluster-id";
String confluentTopic = "fake-confluent-topic-name";
String identityPoolId = "fake-pool-id";
// Azure Event Hubs ingestion settings.
String resourceGroup = "fake-resource-group";
String namespace = "fake-namespace";
String eventHub = "fake-event-hub";
String clientId = "11111111-1111-1111-1111-111111111111";
String tenantId = "22222222-2222-2222-2222-222222222222";
String subscriptionId = "33333333-3333-3333-3333-333333333333";

private static final TopicName topicName = TopicName.of(projectId, topicId);
private static final TopicName kinesisIngestionTopicName =
TopicName.of(projectId, kinesisIngestionTopicId);
private static final TopicName cloudStorageIngestionTopicName =
TopicName.of(projectId, cloudStorageIngestionTopicId);
private static final TopicName awsMskIngestionTopicName =
TopicName.of(projectId, awsMskIngestionTopicId);
private static final TopicName confluentCloudIngestionTopicName =
TopicName.of(projectId, confluentCloudIngestionTopicId);
private static final TopicName azureEventHubsIngestionTopicName =
TopicName.of(projectId, azureEventHubsIngestionTopicId);
private static final SubscriptionName pullSubscriptionName =
SubscriptionName.of(projectId, pullSubscriptionId);
private static final SubscriptionName pushSubscriptionName =
Expand Down Expand Up @@ -361,5 +390,76 @@ public void testAdmin() throws Exception {
// Test delete Cloud Storage ingestion topic.
DeleteTopicExample.deleteTopicExample(projectId, cloudStorageIngestionTopicId);
assertThat(bout.toString()).contains("Deleted topic.");

bout.reset();
// Test create topic with AWS MSK ingestion settings.
CreateTopicWithAwsMskIngestionExample.createTopicWithAwsMskIngestionExample(
projectId,
awsMskIngestionTopicId,
clusterArn,
mskTopic,
awsRoleArn,
gcpServiceAccount);
assertThat(bout.toString())
.contains("google.pubsub.v1.Topic.name=" + awsMskIngestionTopicName.toString());
assertThat(bout.toString()).contains(clusterArn);
assertThat(bout.toString()).contains(mskTopic);
assertThat(bout.toString()).contains(awsRoleArn);
assertThat(bout.toString()).contains(gcpServiceAccount);

bout.reset();
// Test delete AWS MSK ingestion topic.
DeleteTopicExample.deleteTopicExample(projectId, awsMskIngestionTopicId);
assertThat(bout.toString()).contains("Deleted topic.");

bout.reset();
// Test create topic with Confluent Cloud ingestion settings.
CreateTopicWithConfluentCloudIngestionExample.createTopicWithConfluentCloudIngestionExample(
projectId,
confluentCloudIngestionTopicId,
bootstrapServer,
clusterId,
confluentTopic,
identityPoolId,
gcpServiceAccount);
assertThat(bout.toString())
.contains("google.pubsub.v1.Topic.name=" + confluentCloudIngestionTopicName.toString());
assertThat(bout.toString()).contains(bootstrapServer);
assertThat(bout.toString()).contains(clusterId);
assertThat(bout.toString()).contains(confluentTopic);
assertThat(bout.toString()).contains(identityPoolId);
assertThat(bout.toString()).contains(gcpServiceAccount);

bout.reset();
// Test delete Confluent Cloud ingestion topic.
DeleteTopicExample.deleteTopicExample(projectId, confluentCloudIngestionTopicId);
assertThat(bout.toString()).contains("Deleted topic.");

bout.reset();
// Test create topic with Azure Event Hubs ingestion settings.
CreateTopicWithAzureEventHubsIngestionExample.createTopicWithAzureEventHubsIngestionExample(
projectId,
azureEventHubsIngestionTopicId,
resourceGroup,
namespace,
eventHub,
clientId,
tenantId,
subscriptionId,
gcpServiceAccount);
assertThat(bout.toString()).contains(
"google.pubsub.v1.Topic.name=" + azureEventHubsIngestionTopicName.toString());
assertThat(bout.toString()).contains(resourceGroup);
assertThat(bout.toString()).contains(namespace);
assertThat(bout.toString()).contains(eventHub);
assertThat(bout.toString()).contains(clientId);
assertThat(bout.toString()).contains(tenantId);
assertThat(bout.toString()).contains(subscriptionId);
assertThat(bout.toString()).contains(gcpServiceAccount);

bout.reset();
// Test delete Azure Event Hubs ingestion topic.
DeleteTopicExample.deleteTopicExample(projectId, azureEventHubsIngestionTopicId);
assertThat(bout.toString()).contains("Deleted topic.");
}
}

0 comments on commit eea603b

Please sign in to comment.