Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: new and initialize commands for migration tool #6988

Merged
merged 10 commits into from
Feb 17, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions ksqldb-tools/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,26 @@
<version>${io.confluent.ksql.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-common</artifactId>
</dependency>
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-common</artifactId>
</dependency>
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-rest-client</artifactId>
</dependency>
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-rest-client</artifactId>
</dependency>
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-rest-client</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.tools.migrations;

import java.util.Map;

public class MigrationConfig {

public static final String KSQL_MIGRATIONS_STREAM_NAME = "ksql.migrations.stream.name";
public static final String KSQL_MIGRATIONS_TABLE_NAME = "ksql.migrations.table.name";
public static final String KSQL_MIGRATIONS_STREAM_TOPIC_NAME = "ksql.migrations.stream.topic.name";
public static final String KSQL_MIGRATIONS_TABLE_TOPIC_NAME = "ksql.migrations.table.topic.name";
public static final String KSQL_MIGRATIONS_TOPIC_REPLICAS = "ksql.migrations.topic.replicas";
public static final String KSQL_SERVER_URL = "ksql.server.url";

private static Map<String, String> configs;

public MigrationConfig(final Map<String, String> configs) {
this.configs = configs;
}

public String getKsqlMigrationsStreamName() {
return configs.getOrDefault(KSQL_MIGRATIONS_STREAM_NAME, "migration_events");
}

public String getKsqlMigrationsTableName() {
return configs.getOrDefault(KSQL_MIGRATIONS_TABLE_NAME, "schema_version");
}

public String getKsqlMigrationsStreamTopicName() {
return configs.getOrDefault(
KSQL_MIGRATIONS_STREAM_TOPIC_NAME, "default_ksql_migration_events");
}

public String getKsqlMigrationsTableTopicName() {
return configs.getOrDefault(
KSQL_MIGRATIONS_TABLE_TOPIC_NAME, "default_ksql_schema_version");
}

public String getKsqlMigrationsTopicReplicas() {
return configs.getOrDefault(
KSQL_MIGRATIONS_TOPIC_REPLICAS, "1");
}

public String getKsqlServerUrl() {
return configs.getOrDefault(
KSQL_SERVER_URL, "http://localhost:8088");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.confluent.ksql.tools.migrations.commands.ApplyMigrationCommand;
import io.confluent.ksql.tools.migrations.commands.CleanMigrationsCommand;
import io.confluent.ksql.tools.migrations.commands.CreateMigrationCommand;
import io.confluent.ksql.tools.migrations.commands.InitializeMigrationCommand;
import io.confluent.ksql.tools.migrations.commands.MigrationInfoCommand;
import io.confluent.ksql.tools.migrations.commands.NewMigrationCommand;
import io.confluent.ksql.tools.migrations.commands.ValidateMigrationsCommand;
Expand All @@ -41,7 +42,8 @@
ApplyMigrationCommand.class,
MigrationInfoCommand.class,
CleanMigrationsCommand.class,
ValidateMigrationsCommand.class
ValidateMigrationsCommand.class,
InitializeMigrationCommand.class
},
defaultCommand = Help.class
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.tools.migrations.commands;

import com.github.rvesse.airline.annotations.Command;
import io.confluent.ksql.properties.PropertiesUtil;
import io.confluent.ksql.rest.client.KsqlRestClient;
import io.confluent.ksql.rest.client.RestResponse;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.tools.migrations.MigrationConfig;
import java.io.File;
import java.util.Collections;
import java.util.Optional;

@Command(
name = "initialize",
description = "Initializes the schema metadata (stream and table)."
)
public class InitializeMigrationCommand extends BaseCommand {

private String createEventStream(final String name, final String topic, final String replicas) {
return "CREATE STREAM " + name + " (\n"
+ " version_key STRING KEY,\n"
+ " version STRING,\n"
+ " name STRING,\n"
+ " state STRING, \n"
+ " checksum STRING,\n"
+ " started_on STRING,\n"
+ " completed_on STRING,\n"
+ " previous STRING\n"
+ ") WITH ( \n"
+ " KAFKA_TOPIC='" + topic + "',\n"
+ " VALUE_FORMAT='JSON',\n"
+ " PARTITIONS=1,\n"
+ " REPLICAS= " + replicas + " \n"
+ ");\n";
}

private String createVersionTable(final String name, final String topic) {
return "CREATE TABLE " + name + "\n"
+ " WITH (\n"
+ " KAFKA_TOPIC='" + topic + "'\n"
+ " )\n"
+ " AS SELECT \n"
+ " version_key, \n"
+ " latest_by_offset(version) as version, \n"
+ " latest_by_offset(name) AS name, \n"
+ " latest_by_offset(state) AS state, \n"
+ " latest_by_offset(checksum) AS checksum, \n"
+ " latest_by_offset(started_on) AS started_on, \n"
+ " latest_by_offset(completed_on) AS completed_on, \n"
+ " latest_by_offset(previous) AS previous\n"
+ " FROM migration_events \n"
+ " GROUP BY version_key;\n";
}

@Override
public void run() {
final MigrationConfig properties = new MigrationConfig(
PropertiesUtil.loadProperties(new File("ksql-migrations.properties")));
final String eventStreamCommand = createEventStream(
properties.getKsqlMigrationsStreamName(),
properties.getKsqlMigrationsStreamTopicName(),
properties.getKsqlMigrationsTopicReplicas()
);
final String versionTableCommand = createVersionTable(
properties.getKsqlMigrationsTableName(),
properties.getKsqlMigrationsTableTopicName()
);
if (dryRun) {
System.out.println(eventStreamCommand);
System.out.println(versionTableCommand);
} else {
final KsqlRestClient client = KsqlRestClient.create(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason we can't use the actual Java client here? Unless we can think of downsides, it'd be great to dogfood. Plus we'd like to eventually migrate the CLI to use the Java client (and away from KsqlRestClient) as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, but for now KsqlRestClient is used in MigrationConfig to query the /info endpoint. This is temporary until the Java client supports this functionality.

properties.getKsqlServerUrl(),
Collections.EMPTY_MAP,
Collections.EMPTY_MAP,
Optional.empty()
);
final RestResponse<KsqlEntityList> streamResponse =
client.makeKsqlRequest(eventStreamCommand);
final RestResponse<KsqlEntityList> tableResponse =
client.makeKsqlRequest(versionTableCommand);

if (streamResponse.isSuccessful()) {
System.out.println("Successfully created migration stream");
} else {
System.out.println("Failed to create migration stream: "
+ streamResponse.getErrorMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should sanity-check the error message the user gets if they try calling initialize twice (i.e., the stream already exists). It may make sense to check for the existence of the stream/table ahead of issuing the create calls and print a more targeted error message in that case. The targeted error message could mention the clean command which can be used if the user wants to start over with migrations. (This command could also help get them out of situations where for whatever reason the stream was created but the table wasn't.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

}

if (tableResponse.isSuccessful()) {
System.out.println("Successfully created migration table");
} else {
System.out.println("Failed to create migration table: "
+ tableResponse.getErrorMessage());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
import com.github.rvesse.airline.annotations.Arguments;
import com.github.rvesse.airline.annotations.Command;
import com.github.rvesse.airline.annotations.restrictions.Required;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

@Command(
name = "new",
Expand All @@ -31,7 +35,11 @@ public class NewMigrationCommand extends BaseCommand {

@Override
public void run() {
throw new UnsupportedOperationException();
try {
Files.createDirectories(Paths.get(projectPath));
new File(projectPath + "/ksql-migrations.properties").createNewFile();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new config should have some default properties, and perhaps commented properties names in case the user wants to customize it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 but I was going to suggest this as a follow-up. I think we should update the new command to take in the ksqlDB server URL, which we can then pre-populate into the config file. This is the only required config in the config file, so we definitely want it.

I don't feel strongly about whether we populate some of the other configs with defaults or not. Could be nice for some of the ones we expect users are more likely to want to change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, but I'll leave this as a follow-up

} catch (IOException e) {
e.printStackTrace();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't have to be in this PR, but we should make sure we sanity-check the error messages users get if the tool doesn't have permissions to create the files/directories, or if file/directories already exist. We should also sanity check the tool's behavior (do we overwrite existing files, etc).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 not only that, we should return correct error codes (using System.exit) if necessary so that it can be properly piped into scripts. I think getting error messages correct is a huge part of making this tool successful.

cc @colinhicks @spena for their opinion on how this should work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd also recommend avoiding error stacks in the output that users see (logs can have the error messages). This turns people off from these tools and makes them look very rough around the edges.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, very important to return the correct error codes from the process. This is relevant for CI, too, where a non-zero code is expected to signal job step failure. We want to avoid forcing users to otherwise detect failure by parsing the output.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. I also usually avoid putting everything into one try/catch when I want to know what failed. This way I can display something like:

$ ksql-migrations new /unknown-dir/migrations-project
Creating directory: <project-dir>
Failed creating directory: "Directory /unknown-dir does not exist"

Which then you can return know what error code to return, like System.exit(2). This number will let user know what's wrong when writing scripts.

When working with user tools, it is always recommended to display a good error message and an error status. They will thank you a lot if they write their on bash scripts to automate this tool.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. For now, they all have the same error code, but after more parts of the tool are complete we can go through the errors and categorize them.

}
}

}