diff --git a/ksqldb-cli/src/main/java/io/confluent/ksql/Ksql.java b/ksqldb-cli/src/main/java/io/confluent/ksql/Ksql.java index 5fc4c52c7514..b2a5c4f028dc 100644 --- a/ksqldb-cli/src/main/java/io/confluent/ksql/Ksql.java +++ b/ksqldb-cli/src/main/java/io/confluent/ksql/Ksql.java @@ -112,7 +112,11 @@ void run() { options.getOutputFormat(), restClient) ) { - cli.runInteractively(); + if (options.getExecute() != null) { + cli.runCommand(options.getExecute()); + } else { + cli.runInteractively(); + } } } } diff --git a/ksqldb-cli/src/main/java/io/confluent/ksql/cli/Cli.java b/ksqldb-cli/src/main/java/io/confluent/ksql/cli/Cli.java index cf1c845eff09..89cff4d8db13 100644 --- a/ksqldb-cli/src/main/java/io/confluent/ksql/cli/Cli.java +++ b/ksqldb-cli/src/main/java/io/confluent/ksql/cli/Cli.java @@ -22,6 +22,7 @@ import io.confluent.ksql.cli.console.cmd.RemoteServerSpecificCommand; import io.confluent.ksql.cli.console.cmd.RequestPipeliningCommand; import io.confluent.ksql.parser.DefaultKsqlParser; +import io.confluent.ksql.parser.KsqlParser; import io.confluent.ksql.parser.KsqlParser.ParsedStatement; import io.confluent.ksql.parser.SqlBaseParser; import io.confluent.ksql.parser.SqlBaseParser.PrintTopicContext; @@ -73,6 +74,8 @@ public class Cli implements KsqlRequestExecutor, Closeable { private static final int MAX_RETRIES = 10; + private static final KsqlParser KSQL_PARSER = new DefaultKsqlParser(); + private static final ClassHandlerMap2 STATEMENT_HANDLERS = HandlerMaps .forClass(StatementContext.class) @@ -179,19 +182,40 @@ private RestResponse makeKsqlRequest( throw new KsqlRestClientException("Failed to execute request " + ksql); } + public void runCommand(final String command) { + RemoteServerSpecificCommand.validateClient(terminal.writer(), restClient); + try { + // Handles the RUN SCRIPT command if found + if (!terminal.maybeHandleCliSpecificCommands(command)) { + handleLine(command); + } + } catch (final EndOfFileException exception) { + // Ignore - only used by runInteractively() to exit the CLI + } catch (final Exception exception) { + LOGGER.error("An error occurred while running a command. Error = " + + exception.getMessage(), exception); + + terminal.printError(ErrorMessageUtil.buildErrorMessage(exception), + exception.toString()); + } + + terminal.flush(); + } + public void runInteractively() { displayWelcomeMessage(); RemoteServerSpecificCommand.validateClient(terminal.writer(), restClient); boolean eof = false; while (!eof) { try { - handleLine(readLine()); + handleLine(nextNonCliCommand()); } catch (final EndOfFileException exception) { // EOF is fine, just terminate the REPL terminal.writer().println("Exiting ksqlDB."); eof = true; } catch (final Exception exception) { - LOGGER.error("", exception); + LOGGER.error("An error occurred while running a command. Error = " + + exception.getMessage(), exception); terminal.printError(ErrorMessageUtil.buildErrorMessage(exception), exception.toString()); } @@ -262,18 +286,19 @@ void handleLine(final String line) { * @return The parsed, logical line. * @throws EndOfFileException If there is no more input available from the user. */ - private String readLine() { + private String nextNonCliCommand() { while (true) { try { - final String result = terminal.readLine(); + final String line = terminal.nextNonCliCommand(); + // A 'dumb' terminal (the kind used at runtime if a 'system' terminal isn't available) will // return null on EOF and user interrupt, instead of throwing the more fine-grained // exceptions. This null-check helps ensure that, upon encountering EOF, even a 'dumb' // terminal will be able to exit intelligently. - if (result == null) { + if (line == null) { throw new EndOfFileException(); } else { - return result.trim(); + return line.trim(); } } catch (final UserInterruptException exception) { // User hit ctrl-C, just clear the current line and try again. @@ -284,8 +309,7 @@ private String readLine() { } private void handleStatements(final String line) { - final List statements = - new DefaultKsqlParser().parse(line); + final List statements = KSQL_PARSER.parse(line); final StringBuilder consecutiveStatements = new StringBuilder(); for (final ParsedStatement parsed : statements) { diff --git a/ksqldb-cli/src/main/java/io/confluent/ksql/cli/Options.java b/ksqldb-cli/src/main/java/io/confluent/ksql/cli/Options.java index 4f349a07a40a..f16d2a15c9dd 100644 --- a/ksqldb-cli/src/main/java/io/confluent/ksql/cli/Options.java +++ b/ksqldb-cli/src/main/java/io/confluent/ksql/cli/Options.java @@ -41,6 +41,8 @@ public class Options { private static final String PASSWORD_OPTION = "--password"; private static final String PASSWORD_SHORT_OPTION = "-p"; private static final String OUTPUT_FORMAT_OPTION_NAME = "--output"; + private static final String EXECUTE_OPTION = "--execute"; + private static final String EXECUTE_SHORT_OPTION = "-e"; // Only here so that the help message generated by Help.help() is accurate @Inject @@ -111,6 +113,12 @@ public class Options { + "defaults to TABULAR)") private String outputFormat = OutputFormat.TABULAR.name(); + @SuppressWarnings("unused") // Accessed via reflection + @Option( + name = {EXECUTE_OPTION, EXECUTE_SHORT_OPTION}, + description = "Execute one or more SQL statements and quit.") + private String execute = null; + public static Options parse(final String...args) throws IOException { final SingleCommand optionsParser = SingleCommand.singleCommand(Options.class); @@ -189,4 +197,14 @@ public Optional getUserNameAndPassword() { return Optional.of(BasicCredentials.of(userName, password)); } + + public String getExecute() { + if (execute == null || execute.isEmpty()) { + return execute; + } + + // Append a colon if not specified + final char lastChar = execute.charAt(execute.length() - 1); + return (lastChar != ';') ? execute + ";" : execute; + } } diff --git a/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java b/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java index 600a4c78200e..2de4eefec422 100644 --- a/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java +++ b/ksqldb-cli/src/main/java/io/confluent/ksql/cli/console/Console.java @@ -306,7 +306,7 @@ public Map getCliSpecificCommands() { return cliSpecificCommands; } - public String readLine() { + public String nextNonCliCommand() { String line; do { @@ -905,7 +905,7 @@ public void addRows(final List> fields) { } } - private boolean maybeHandleCliSpecificCommands(final String line) { + public boolean maybeHandleCliSpecificCommands(final String line) { if (line == null) { return false; } diff --git a/ksqldb-cli/src/test/java/io/confluent/ksql/KsqlTest.java b/ksqldb-cli/src/test/java/io/confluent/ksql/KsqlTest.java index 50a7d48d5f27..c21088fa1ccb 100644 --- a/ksqldb-cli/src/test/java/io/confluent/ksql/KsqlTest.java +++ b/ksqldb-cli/src/test/java/io/confluent/ksql/KsqlTest.java @@ -72,6 +72,27 @@ public void setUp() { when(cliBuilder.build(any(), any(), any(), any())).thenReturn(cli); } + @Test + public void shouldRunInteractively() { + // When: + ksql.run(); + + // Then: + verify(cli).runInteractively(); + } + + @Test + public void shouldRunNonInteractiveCommandWhenExecuteOptionIsUsed() { + // Given: + when(options.getExecute()).thenReturn("this is a command"); + + // When: + ksql.run(); + + // Then: + verify(cli).runCommand("this is a command"); + } + @Test public void shouldBuildClientWithCorrectServerAddress() { // Given: diff --git a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/CliTest.java b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/CliTest.java index 909a1e410afd..5033cef99851 100644 --- a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/CliTest.java +++ b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/CliTest.java @@ -139,8 +139,8 @@ public class CliTest { KsqlConstants.defaultSinkWindowChangeLogAdditionalRetention + 1) .build(); - @ClassRule - public static final TemporaryFolder TMP = new TemporaryFolder(); + @Rule + public final TemporaryFolder TMP = new TemporaryFolder(); @ClassRule public static final RuleChain CHAIN = RuleChain @@ -731,7 +731,7 @@ public void testRunInteractively() { } @Test - public void shouldPrintErrorIfCantConnectToRestServer() throws Exception { + public void shouldPrintErrorIfCantConnectToRestServerOnRunInteractively() throws Exception { givenRunInteractivelyWillExit(); final KsqlRestClient mockRestClient = givenMockRestClient(); @@ -745,6 +745,21 @@ public void shouldPrintErrorIfCantConnectToRestServer() throws Exception { containsString("Please ensure that the URL provided is for an active KSQL server.")); } + @Test + public void shouldPrintErrorIfCantConnectToRestServerOnRunCommand() throws Exception { + givenRunInteractivelyWillExit(); + + final KsqlRestClient mockRestClient = givenMockRestClient(); + when(mockRestClient.getServerInfo()) + .thenThrow(new KsqlRestClientException("Boom", new IOException(""))); + + new Cli(1L, 1L, mockRestClient, console) + .runCommand("this is a command"); + + assertThat(terminal.getOutputString(), + containsString("Please ensure that the URL provided is for an active KSQL server.")); + } + @Test public void shouldRegisterRemoteCommand() { assertThat(console.getCliSpecificCommands().get("server"), @@ -946,7 +961,7 @@ public void shouldHandleSetPropertyAsPartOfMultiStatementLine() { } @Test - public void shouldRunScript() throws Exception { + public void shouldRunScriptOnRunInteractively() throws Exception { // Given: final File scriptFile = TMP.newFile("script.sql"); Files.write(scriptFile.toPath(), ("" @@ -965,6 +980,22 @@ public void shouldRunScript() throws Exception { containsString("Created query with ID CSAS_SHOULDRUNSCRIPT")); } + @Test + public void shouldRunScriptOnRunCommand() throws Exception { + // Given: + final File scriptFile = TMP.newFile("script.sql"); + Files.write(scriptFile.toPath(), ("" + + "CREATE STREAM shouldRunScript AS SELECT * FROM " + ORDER_DATA_PROVIDER.kstreamName() + ";" + + "").getBytes(StandardCharsets.UTF_8)); + + // When: + localCli.runCommand("run script '" + scriptFile.getAbsolutePath() + "'"); + + // Then: + assertThat(terminal.getOutputString(), + containsString("Created query with ID CSAS_SHOULDRUNSCRIPT")); + } + @Test public void shouldUpdateCommandSequenceNumber() throws Exception { // Given: @@ -1101,7 +1132,7 @@ private void givenRequestPipelining(final String setting) { private void runCliSpecificCommand(final String command) { when(lineSupplier.get()).thenReturn(command).thenReturn(""); - console.readLine(); + console.nextNonCliCommand(); } private void givenRunInteractivelyWillExit() { diff --git a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java index 926417e6461d..39184b5f7eae 100644 --- a/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java +++ b/ksqldb-cli/src/test/java/io/confluent/ksql/cli/console/ConsoleTest.java @@ -1566,7 +1566,7 @@ public void shouldExecuteCliCommands() { .thenReturn("not a CLI command;"); // When: - console.readLine(); + console.nextNonCliCommand(); // Then: verify(cliCommand).execute(eq(ImmutableList.of()), any()); @@ -1580,7 +1580,7 @@ public void shouldExecuteCliCommandWithArgsTrimmingWhiteSpace() { .thenReturn("not a CLI command;"); // When: - console.readLine(); + console.nextNonCliCommand(); // Then: verify(cliCommand).execute(eq(ImmutableList.of("Arg0", "Arg1")), any()); @@ -1594,7 +1594,7 @@ public void shouldExecuteCliCommandWithQuotedArgsContainingSpaces() { .thenReturn("not a CLI command;"); // When: - console.readLine(); + console.nextNonCliCommand(); // Then: verify(cliCommand).execute(eq(ImmutableList.of("Arg0", "Arg 1")), any()); @@ -1608,7 +1608,7 @@ public void shouldSupportOtherWhitespaceBetweenCliCommandAndArgs() { .thenReturn("not a CLI command;"); // When: - console.readLine(); + console.nextNonCliCommand(); // Then: verify(cliCommand).execute(eq(ImmutableList.of("Arg0", "Arg 1")), any()); @@ -1622,7 +1622,7 @@ public void shouldSupportCmdBeingTerminatedWithSemiColon() { .thenReturn("not a CLI command;"); // When: - console.readLine(); + console.nextNonCliCommand(); // Then: verify(cliCommand).execute(eq(ImmutableList.of("Arg0")), any()); @@ -1636,7 +1636,7 @@ public void shouldSupportCmdBeingTerminatedWithSemiColonAndWhitespace() { .thenReturn("not a CLI command;"); // When: - console.readLine(); + console.nextNonCliCommand(); // Then: verify(cliCommand).execute(eq(ImmutableList.of("Arg0")), any()); @@ -1650,7 +1650,7 @@ public void shouldSupportCmdWithQuotedArgBeingTerminatedWithSemiColon() { .thenReturn("not a CLI command;"); // When: - console.readLine(); + console.nextNonCliCommand(); // Then: verify(cliCommand).execute(eq(ImmutableList.of("Arg0")), any()); @@ -1664,7 +1664,7 @@ public void shouldFailIfCommandNameIsQuoted() { .thenReturn("not a CLI command;"); // When: - console.readLine(); + console.nextNonCliCommand(); // Then: verify(cliCommand, never()).execute(any(), any()); @@ -1678,7 +1678,7 @@ public void shouldSwallowCliCommandLines() { .thenReturn("not a CLI command;"); // When: - final String result = console.readLine(); + final String result = console.nextNonCliCommand(); // Then: assertThat(result, is("not a CLI command;")); @@ -1692,7 +1692,7 @@ public void shouldSwallowCliCommandLinesEvenWithWhiteSpace() { .thenReturn("not a CLI command;"); // When: - final String result = console.readLine(); + final String result = console.nextNonCliCommand(); // Then: assertThat(result, is("not a CLI command;"));