Skip to content

Commit

Permalink
feat(migrations): implement create command (#7133)
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia authored Mar 2, 2021
1 parent 986cbe3 commit c91e23c
Show file tree
Hide file tree
Showing 6 changed files with 373 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,49 +15,154 @@

package io.confluent.ksql.tools.migrations.commands;

import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getAllVersions;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getFilePathForVersion;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getFilePrefixForVersion;
import static io.confluent.ksql.tools.migrations.util.MigrationsDirectoryUtil.getMigrationsDirFromConfigFile;

import com.github.rvesse.airline.annotations.Arguments;
import com.github.rvesse.airline.annotations.Command;
import com.github.rvesse.airline.annotations.Option;
import com.github.rvesse.airline.annotations.help.Examples;
import com.github.rvesse.airline.annotations.restrictions.Required;
import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.tools.migrations.MigrationException;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.List;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Command(
name = "create",
description = "Create a migration file with <desc> as description, which will be used to "
+ "apply the next schema version."
description = "Create a blank migration file with <description> as description, which "
+ "can then be edited and applied as the next schema version."
)
@Examples(
examples = "$ ksql-migrations create add_users",
examples = "$ ksql-migrations create Add_users",
descriptions = "Creates a new migrations file for adding a users table to ksqlDB "
+ "(e.g. V000002__Add_users.sql)"
)
public class CreateMigrationCommand extends BaseCommand {

private static final Logger LOGGER = LoggerFactory.getLogger(CreateMigrationCommand.class);

private static final String INVALID_FILENAME_CHARS_PATTERN = "\\s|/|\\\\|:|\\*|\\?|\"|<|>|\\|";

@Option(
name = {"-v", "--version"},
description = "the schema version to initialize, defaults to the next"
+ " schema version."
+ " schema version based on existing migration files."
)
private int version;

@Required
@Arguments(
title = "desc",
title = "description",
description = "The description for the migration."
)
private String description;

@Override
protected int command() {
throw new UnsupportedOperationException();
return command(getMigrationsDirFromConfigFile(configFile));
}

@VisibleForTesting
int command(final String migrationsDir) {
if (!validateVersionDoesNotAlreadyExist(migrationsDir) || !validateDescriptionNotEmpty()) {
return 1;
}

try {
final int newVersion = version != 0 ? version : getLatestVersion(migrationsDir) + 1;
createMigrationsFile(newVersion, migrationsDir);
} catch (MigrationException e) {
LOGGER.error(e.getMessage());
return 1;
}

return 0;
}

@Override
protected Logger getLogger() {
return LOGGER;
}

/**
* @return true if validation succeeds, else false
*/
private boolean validateVersionDoesNotAlreadyExist(final String migrationsDir) {
// no explicit version was specified, nothing to verify
// (airline actually can't distinguish between explicit 0 and nothing specified,
// but we won't worry about this edge case for now)
if (version == 0) {
return true;
}

final Optional<String> existingFile;
try {
existingFile = getFilePathForVersion(String.valueOf(version), migrationsDir);
} catch (MigrationException e) {
LOGGER.error(e.getMessage());
return false;
}

if (existingFile.isPresent()) {
LOGGER.error("Found existing migrations file for version {}: {}",
version, existingFile.get());
return false;
}

return true;
}

/**
* @return true if validation succeeds, else false
*/
private boolean validateDescriptionNotEmpty() {
if (description.isEmpty()) {
LOGGER.error("Description cannot be empty.");
return false;
}
return true;
}

private void createMigrationsFile(final int newVersion, final String migrationsDir) {
if (newVersion <= 0) {
throw new MigrationException("Invalid version file version: " + newVersion
+ ". Version must be a positive integer.");
}
if (newVersion > 999999) {
throw new MigrationException("Invalid version file version: " + newVersion
+ ". Version must fit into a six-digit integer.");
}

final String filename = getNewFileName(newVersion, description);
final String filePath = Paths.get(migrationsDir, filename).toString();
try {
LOGGER.info("Creating file: " + filePath);
final boolean result = new File(filePath).createNewFile();
if (!result) {
throw new IllegalStateException("File should not exist");
}
} catch (IOException | IllegalStateException e) {
throw new MigrationException(String.format(
"Failed to create file %s: %s", filePath, e.getMessage()));
}
}

private static int getLatestVersion(final String migrationsDir) {
final List<Integer> allVersions = getAllVersions(migrationsDir);
return allVersions.size() != 0 ? allVersions.get(allVersions.size() - 1) : 0;
}

private static String getNewFileName(final int newVersion, final String description) {
final String versionPrefix = getFilePrefixForVersion(String.valueOf(newVersion));
final String descriptionSuffix = description.replaceAll(INVALID_FILENAME_CHARS_PATTERN, "_");
return versionPrefix + "__" + descriptionSuffix + ".sql";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,26 @@
import java.nio.file.Paths;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class MigrationsDirectoryUtil {

public static final String MIGRATIONS_DIR = "migrations";
public static final String MIGRATIONS_CONFIG_FILE = "ksql-migrations.properties";

private static final Pattern MIGRATION_FILE_MATCHER = Pattern.compile("V([0-9]{6})__(.+)\\.sql");

private static final Logger LOGGER = LoggerFactory.getLogger(MigrationsDirectoryUtil.class);

private MigrationsDirectoryUtil() {
}

Expand All @@ -50,11 +57,15 @@ public static String getMigrationsDirFromConfigFile(final String configFilePath)
return parentDir.resolve(MIGRATIONS_DIR).toString();
}

public static String getFilePrefixForVersion(final String version) {
return "V" + StringUtils.leftPad(version, 6, "0");
}

public static Optional<String> getFilePathForVersion(
final String version,
final String migrationsDir
) {
final String prefix = "V" + StringUtils.leftPad(version, 6, "0");
final String prefix = getFilePrefixForVersion(version);

final File directory = new File(migrationsDir);
if (!directory.isDirectory()) {
Expand Down Expand Up @@ -97,44 +108,63 @@ public static String computeHashForFile(final String filename) {
}
}

public static String getNameFromMigrationFilePath(final String filename) {
return filename
.substring(filename.indexOf("__") + 2, filename.indexOf(".sql"))
.replace('_', ' ');
}

public static int getVersionFromMigrationFilePath(final String filename) {
final Matcher matcher = Pattern.compile("V([0-9]{6})__.*\\.sql").matcher(filename);
if (matcher.find()) {
final int version = Integer.parseInt(matcher.group(1));
if (version > 0) {
return version;
} else {
throw new MigrationException("Version number must be positive - found " + filename);
}
} else {
throw new MigrationException(
"File path does not match expected pattern V<six digit number>__<name>.sql: " + filename);
}
/**
* @return all migration file versions in sorted order
*/
public static List<Integer> getAllVersions(final String migrationsDir) {
return getAllMigrations(migrationsDir).stream()
.map(Migration::getVersion)
.collect(Collectors.toList());
}

/**
* @return all migration files in sorted order
*/
public static List<Migration> getAllMigrations(final String migrationsDir) {
final File directory = new File(migrationsDir);
if (!directory.isDirectory()) {
throw new MigrationException(migrationsDir + " is not a directory.");
}

final String[] names = directory.list();
if (names == null) {
final String[] allNames;
try {
allNames = directory.list();
} catch (SecurityException e) {
throw new MigrationException("Failed to retrieve files from " + migrationsDir
+ ": " + e.getMessage());
}
if (allNames == null) {
throw new MigrationException("Failed to retrieve files from " + migrationsDir);
}

return Arrays.stream(names)
final List<String> filenames = Arrays.stream(allNames)
.sorted()
.map(name -> new Migration(
getVersionFromMigrationFilePath(name),
getNameFromMigrationFilePath(name),
migrationsDir + "/" + name))
.filter(name -> !new File(name).isDirectory())
.collect(Collectors.toList());

final List<Migration> migrations = new ArrayList<>();
for (final String filename : filenames) {
final Matcher matcher = MIGRATION_FILE_MATCHER.matcher(filename);
if (!matcher.find()) {
LOGGER.warn("Skipping file does not match expected migration file pattern "
+ "'V<six digit number>__<name>.sql': {}", filename);
continue;
}

final int version = Integer.parseInt(matcher.group(1));
if (version <= 0) {
throw new MigrationException(
"Migration file versions must be positive. Found: " + filename);
}

final String description = matcher.group(2).replace('_', ' ');

migrations.add(new Migration(
version,
description,
migrationsDir + "/" + filename
));
}

return migrations;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,14 @@ public void testApply() throws IOException {
// Migration file
createMigrationFile(
1,
"foo",
MigrationsDirectoryUtil.getMigrationsDirFromConfigFile(configFilePath),
"foo FOO fO0",
configFilePath,
"CREATE STREAM FOO (A STRING) WITH (KAFKA_TOPIC='FOO', PARTITIONS=1, VALUE_FORMAT='DELIMITED');"
);
createMigrationFile(
2,
"bar",
MigrationsDirectoryUtil.getMigrationsDirFromConfigFile(configFilePath),
"bar_bar_BAR",
configFilePath,
"CREATE STREAM BAR (A STRING) WITH (KAFKA_TOPIC='BAR', PARTITIONS=1, VALUE_FORMAT='DELIMITED');"
);

Expand All @@ -125,23 +125,23 @@ public void testApply() throws IOException {
final List<StreamedRow> current = makeKsqlQuery("SELECT * FROM migration_schema_versions WHERE VERSION_KEY='CURRENT';");
assertThatEventually(() -> current.size(), is(2));
assertThatEventually(() -> current.get(1).getRow().get().getColumns().get(1), is("2"));
assertThatEventually(() -> current.get(1).getRow().get().getColumns().get(2), is("bar"));
assertThatEventually(() -> current.get(1).getRow().get().getColumns().get(2), is("bar bar BAR"));
assertThatEventually(() -> current.get(1).getRow().get().getColumns().get(3), is("MIGRATED"));
assertThatEventually(() -> current.get(1).getRow().get().getColumns().get(7), is("1"));

// verify version 1
final List<StreamedRow> version1 = makeKsqlQuery("SELECT * FROM migration_schema_versions WHERE VERSION_KEY='1';");
assertThatEventually(() -> version1.size(), is(2));
assertThatEventually(() -> version1.get(1).getRow().get().getColumns().get(1), is("1"));
assertThatEventually(() -> version1.get(1).getRow().get().getColumns().get(2), is("foo"));
assertThatEventually(() -> version1.get(1).getRow().get().getColumns().get(2), is("foo FOO fO0"));
assertThatEventually(() -> version1.get(1).getRow().get().getColumns().get(3), is("MIGRATED"));
assertThatEventually(() -> version1.get(1).getRow().get().getColumns().get(7), is("<none>"));

// verify version 2
final List<StreamedRow> version2 = makeKsqlQuery("SELECT * FROM migration_schema_versions WHERE VERSION_KEY='CURRENT';");
assertThatEventually(() -> version2.size(), is(2));
assertThatEventually(() -> version2.get(1).getRow().get().getColumns().get(1), is("2"));
assertThatEventually(() -> version2.get(1).getRow().get().getColumns().get(2), is("bar"));
assertThatEventually(() -> version2.get(1).getRow().get().getColumns().get(2), is("bar bar BAR"));
assertThatEventually(() -> version2.get(1).getRow().get().getColumns().get(3), is("MIGRATED"));
assertThatEventually(() -> version2.get(1).getRow().get().getColumns().get(7), is("1"));
}
Expand Down Expand Up @@ -272,14 +272,25 @@ private boolean isKey(final FieldInfo fieldInfo) {
private static void createMigrationFile(
final int version,
final String name,
final String migrationsDir,
final String configFilePath,
final String content
) throws IOException {
final String filePath = migrationsDir
+ String.format("/V00000%d__%s.sql", version, name.replace(' ', '_'));
assertThat(new File(filePath).createNewFile(), is(true));
PrintWriter out = new PrintWriter(filePath, Charset.defaultCharset().name());
out.println(content);
out.close();
// use `create` to create empty file
final int status = MIGRATIONS_CLI.parse("--config-file", configFilePath,
"create", name, "-v", String.valueOf(version)).run();
assertThat(status, is(0));

// validate file created
final File filePath = new File(Paths.get(
MigrationsDirectoryUtil.getMigrationsDirFromConfigFile(configFilePath),
String.format("/V00000%d__%s.sql", version, name.replace(' ', '_'))
).toString());
assertThat(filePath.exists(), is(true));
assertThat(filePath.isDirectory(), is(false));

// write contents to file
try (PrintWriter out = new PrintWriter(filePath, Charset.defaultCharset().name())) {
out.println(content);
}
}
}
Loading

0 comments on commit c91e23c

Please sign in to comment.