Skip to content

Commit

Permalink
[C++] Other fixes for ratification (apache#190)
Browse files Browse the repository at this point in the history
* Rename sqlClient to sql_client on test_app_cli.cc

* Add missing parameters on PreparedStatement constructor

* Add NOTE to PreparedStatement destructor

* Parse PreparedStatement's dataset and parameters schema when constructing

* Rename Flight SQL Actions constants

* Remove unnecessary 'using' keyword on server.h

* Clean up header files and includes

* Rename getters for schemas on PreparedStatement and make them const

* Handle possible protobuf parsing errors on server.cc

* Move CreateStatementQueryTicket implementation to sql namespace
  • Loading branch information
rafael-telles committed Dec 3, 2021
1 parent 8095b6e commit b26bfc4
Show file tree
Hide file tree
Showing 18 changed files with 179 additions and 177 deletions.
2 changes: 1 addition & 1 deletion cpp/src/arrow/flight/flight_sql/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@

#pragma once

#include "arrow/flight/flight_sql/client.h"
#include <arrow/flight/flight_sql/client.h>
75 changes: 38 additions & 37 deletions cpp/src/arrow/flight/flight_sql/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@
#include <arrow/util/logging.h>
#include <google/protobuf/any.pb.h>

#include <memory>
#include <utility>
namespace pb = arrow::flight::protocol;

namespace arrow {
namespace flight {
Expand All @@ -41,17 +40,21 @@ FlightSqlClient::FlightSqlClient(std::unique_ptr<FlightClient> client)
: impl_(internal::FlightClientImpl_Create(std::move(client))) {}

FlightSqlClient::PreparedStatement::PreparedStatement(
std::shared_ptr<internal::FlightClientImpl> client, const std::string& query,
pb::sql::ActionCreatePreparedStatementResult& prepared_statement_result,
std::shared_ptr<internal::FlightClientImpl> client, std::string handle,
std::shared_ptr<Schema> dataset_schema, std::shared_ptr<Schema> parameter_schema,
FlightCallOptions options)
: client_(client),
: client_(std::move(client)),
options_(std::move(options)),
prepared_statement_result_(std::move(prepared_statement_result)),
handle_(std::move(handle)),
dataset_schema_(std::move(dataset_schema)),
parameter_schema_(std::move(parameter_schema)),
is_closed_(false) {}

FlightSqlClient::~FlightSqlClient() = default;

FlightSqlClient::PreparedStatement::~PreparedStatement() {
if (IsClosed()) return;

const Status status = Close();
if (!status.ok()) {
ARROW_LOG(ERROR) << "Failed to delete PreparedStatement: " << status.ToString();
Expand Down Expand Up @@ -290,8 +293,28 @@ FlightSqlClient::Prepare(const FlightCallOptions& options, const std::string& qu
return Status::Invalid("Unable to unpack ActionCreatePreparedStatementResult");
}

return std::make_shared<PreparedStatement>(impl_, query, prepared_statement_result,
options);
const std::string& serialized_dataset_schema =
prepared_statement_result.dataset_schema();
const std::string& serialized_parameter_schema =
prepared_statement_result.parameter_schema();

std::shared_ptr<Schema> dataset_schema;
if (!serialized_dataset_schema.empty()) {
io::BufferReader dataset_schema_reader(serialized_dataset_schema);
ipc::DictionaryMemo in_memo;
ARROW_ASSIGN_OR_RAISE(dataset_schema, ReadSchema(&dataset_schema_reader, &in_memo));
}
std::shared_ptr<Schema> parameter_schema;
if (!serialized_parameter_schema.empty()) {
io::BufferReader parameter_schema_reader(serialized_parameter_schema);
ipc::DictionaryMemo in_memo;
ARROW_ASSIGN_OR_RAISE(parameter_schema,
ReadSchema(&parameter_schema_reader, &in_memo));
}
auto handle = prepared_statement_result.prepared_statement_handle();

return std::make_shared<PreparedStatement>(impl_, handle, dataset_schema,
parameter_schema, options);
}

arrow::Result<std::unique_ptr<FlightInfo>> FlightSqlClient::PreparedStatement::Execute() {
Expand All @@ -301,8 +324,7 @@ arrow::Result<std::unique_ptr<FlightInfo>> FlightSqlClient::PreparedStatement::E

pb::sql::CommandPreparedStatementQuery execute_query_command;

execute_query_command.set_prepared_statement_handle(
prepared_statement_result_.prepared_statement_handle());
execute_query_command.set_prepared_statement_handle(handle_);

google::protobuf::Any any;
any.PackFrom(execute_query_command);
Expand Down Expand Up @@ -336,8 +358,7 @@ arrow::Result<int64_t> FlightSqlClient::PreparedStatement::ExecuteUpdate() {
}

pb::sql::CommandPreparedStatementUpdate command;
command.set_prepared_statement_handle(
prepared_statement_result_.prepared_statement_handle());
command.set_prepared_statement_handle(handle_);
const FlightDescriptor& descriptor = GetFlightDescriptorForCommand(command);
std::unique_ptr<FlightStreamWriter> writer;
std::unique_ptr<FlightMetadataReader> reader;
Expand Down Expand Up @@ -377,26 +398,12 @@ Status FlightSqlClient::PreparedStatement::SetParameters(

bool FlightSqlClient::PreparedStatement::IsClosed() { return is_closed_; }

arrow::Result<std::shared_ptr<Schema>>
FlightSqlClient::PreparedStatement::GetResultSetSchema() {
auto& args = prepared_statement_result_.dataset_schema();
std::shared_ptr<Buffer> schema_buffer = std::make_shared<Buffer>(args);

io::BufferReader reader(schema_buffer);

ipc::DictionaryMemo in_memo;
return ReadSchema(&reader, &in_memo);
std::shared_ptr<Schema> FlightSqlClient::PreparedStatement::dataset_schema() const {
return dataset_schema_;
}

arrow::Result<std::shared_ptr<Schema>>
FlightSqlClient::PreparedStatement::GetParameterSchema() {
auto& args = prepared_statement_result_.parameter_schema();
std::shared_ptr<Buffer> schema_buffer = std::make_shared<Buffer>(args);

io::BufferReader reader(schema_buffer);

ipc::DictionaryMemo in_memo;
return ReadSchema(&reader, &in_memo);
std::shared_ptr<Schema> FlightSqlClient::PreparedStatement::parameter_schema() const {
return parameter_schema_;
}

Status FlightSqlClient::PreparedStatement::Close() {
Expand All @@ -405,8 +412,7 @@ Status FlightSqlClient::PreparedStatement::Close() {
}
google::protobuf::Any command;
pb::sql::ActionClosePreparedStatementRequest request;
request.set_prepared_statement_handle(
prepared_statement_result_.prepared_statement_handle());
request.set_prepared_statement_handle(handle_);

command.PackFrom(request);

Expand All @@ -431,11 +437,6 @@ arrow::Result<std::unique_ptr<FlightInfo>> FlightSqlClient::GetSqlInfo(
return GetFlightInfoForCommand(*impl_, options, command);
}

arrow::Result<std::unique_ptr<FlightInfo>> FlightSqlClient::GetSqlInfo(
const FlightCallOptions& options, const std::vector<pb::sql::SqlInfo>& sql_info) {
return GetSqlInfo(options, reinterpret_cast<const std::vector<int>&>(sql_info));
}

} // namespace sql
} // namespace flight
} // namespace arrow
37 changes: 18 additions & 19 deletions cpp/src/arrow/flight/flight_sql/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@

#pragma once

#include <arrow/flight/Flight.pb.h>
#include <arrow/flight/client.h>
#include <arrow/flight/flight_sql/FlightSql.pb.h>
#include <arrow/flight/types.h>
#include <arrow/result.h>
#include <arrow/status.h>
#include <google/protobuf/message.h>

namespace pb = arrow::flight::protocol;
#include <memory>
#include <string>

namespace arrow {
namespace flight {
Expand Down Expand Up @@ -159,13 +157,6 @@ class ARROW_EXPORT FlightSqlClient {
arrow::Result<std::unique_ptr<FlightInfo>> GetSqlInfo(const FlightCallOptions& options,
const std::vector<int>& sql_info);

/// \brief Request a list of SQL information.
/// \param[in] options RPC-layer hints for this call.
/// \param[in] sql_info the SQL info required.
/// \return The FlightInfo describing where to access the dataset.
arrow::Result<std::unique_ptr<FlightInfo>> GetSqlInfo(
const FlightCallOptions& options, const std::vector<pb::sql::SqlInfo>& sql_info);

/// \brief Create a prepared statement object.
/// \param[in] options RPC-layer hints for this call.
/// \param[in] query The query that will be executed.
Expand All @@ -178,21 +169,29 @@ class ARROW_EXPORT FlightSqlClient {
class PreparedStatement {
std::shared_ptr<internal::FlightClientImpl> client_;
FlightCallOptions options_;
pb::sql::ActionCreatePreparedStatementResult prepared_statement_result_;
std::string handle_;
std::shared_ptr<Schema> dataset_schema_;
std::shared_ptr<Schema> parameter_schema_;
std::shared_ptr<RecordBatch> parameter_binding_;
bool is_closed_;

public:
/// \brief Constructor for the PreparedStatement class.
/// \param[in] query The query that will be executed.
PreparedStatement(
std::shared_ptr<internal::FlightClientImpl> client_, const std::string& query,
pb::sql::ActionCreatePreparedStatementResult& prepared_statement_result,
FlightCallOptions options);
/// \param[in] client Client object used to make the RPC requests.
/// \param[in] handle Handle for this prepared statement.
/// \param[in] dataset_schema Schema of the resulting dataset.
/// \param[in] parameter_schema Schema of the parameters (if any).
/// \param[in] options RPC-layer hints for this call.
PreparedStatement(std::shared_ptr<internal::FlightClientImpl> client,
std::string handle, std::shared_ptr<Schema> dataset_schema,
std::shared_ptr<Schema> parameter_schema,
FlightCallOptions options);

/// \brief Default destructor for the PreparedStatement class.
/// The destructor will call the Close method from the class in order,
/// to send a request to close the PreparedStatement.
/// NOTE: It is best to explicitly close the PreparedStatement, otherwise
/// errors can't be caught.
~PreparedStatement();

/// \brief Executes the prepared statement query on the server.
Expand All @@ -205,11 +204,11 @@ class ARROW_EXPORT FlightSqlClient {

/// \brief Retrieve the parameter schema from the query.
/// \return The parameter schema from the query.
arrow::Result<std::shared_ptr<Schema>> GetParameterSchema();
std::shared_ptr<Schema> parameter_schema() const;

/// \brief Retrieve the ResultSet schema from the query.
/// \return The ResultSet schema from the query.
arrow::Result<std::shared_ptr<Schema>> GetResultSetSchema();
std::shared_ptr<Schema> dataset_schema() const;

/// \brief Set a RecordBatch that contains the parameters that will be bind.
/// \param parameter_binding_ The parameters that will be bind.
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/flight/flight_sql/client_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include <arrow/flight/types.h>

#include <memory>

namespace arrow {
namespace flight {

Expand Down
9 changes: 4 additions & 5 deletions cpp/src/arrow/flight/flight_sql/client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ TEST(TestFlightSqlClient, TestPreparedStatementExecuteParameterBinding) {

ASSERT_OK_AND_ASSIGN(auto prepared_statement, sql_client.Prepare(call_options, query));

ASSERT_OK_AND_ASSIGN(auto parameter_schema, prepared_statement->GetParameterSchema());
auto parameter_schema = prepared_statement->parameter_schema();

arrow::Int64Builder int_builder;
ASSERT_OK(int_builder.Append(1));
Expand Down Expand Up @@ -459,10 +459,9 @@ TEST(TestFlightSqlClient, TestGetSqlInfo) {
auto client_mock = std::make_shared<internal::FlightClientImpl>();
FlightSqlClient sql_client(client_mock);

std::vector<pb::sql::SqlInfo> sql_info{
pb::sql::SqlInfo::FLIGHT_SQL_SERVER_NAME,
pb::sql::SqlInfo::FLIGHT_SQL_SERVER_VERSION,
pb::sql::SqlInfo::FLIGHT_SQL_SERVER_ARROW_VERSION};
std::vector<int> sql_info{pb::sql::SqlInfo::FLIGHT_SQL_SERVER_NAME,
pb::sql::SqlInfo::FLIGHT_SQL_SERVER_VERSION,
pb::sql::SqlInfo::FLIGHT_SQL_SERVER_ARROW_VERSION};
pb::sql::CommandGetSqlInfo command;

for (const auto& info : sql_info) command.add_info(info);
Expand Down
14 changes: 6 additions & 8 deletions cpp/src/arrow/flight/flight_sql/example/sqlite_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
// specific language governing permissions and limitations
// under the License.

#include "arrow/flight/flight_sql/example/sqlite_server.h"

#include <arrow/api.h>
#include <arrow/flight/flight_sql/example/sqlite_server.h>
#include <arrow/flight/flight_sql/example/sqlite_statement.h>
#include <arrow/flight/flight_sql/example/sqlite_statement_batch_reader.h>
#include <arrow/flight/flight_sql/example/sqlite_tables_schema_batch_reader.h>
#include <arrow/flight/flight_sql/server.h>
#include <sqlite3.h>

#include <boost/algorithm/string.hpp>
Expand All @@ -25,12 +29,6 @@
#include <boost/uuid/uuid_io.hpp>
#include <sstream>

#include "arrow/api.h"
#include "arrow/flight/flight_sql/example/sqlite_statement.h"
#include "arrow/flight/flight_sql/example/sqlite_statement_batch_reader.h"
#include "arrow/flight/flight_sql/example/sqlite_tables_schema_batch_reader.h"
#include "arrow/flight/flight_sql/server.h"

namespace arrow {
namespace flight {
namespace sql {
Expand Down
11 changes: 6 additions & 5 deletions cpp/src/arrow/flight/flight_sql/example/sqlite_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@

#pragma once

#include <arrow/api.h>
#include <arrow/flight/flight_sql/example/sqlite_statement.h>
#include <arrow/flight/flight_sql/example/sqlite_statement_batch_reader.h>
#include <arrow/flight/flight_sql/server.h>
#include <sqlite3.h>

#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <map>

#include "arrow/api.h"
#include "arrow/flight/flight_sql/example/sqlite_statement.h"
#include "arrow/flight/flight_sql/example/sqlite_statement_batch_reader.h"
#include "arrow/flight/flight_sql/server.h"
#include <memory>
#include <string>

namespace arrow {
namespace flight {
Expand Down
7 changes: 2 additions & 5 deletions cpp/src/arrow/flight/flight_sql/example/sqlite_statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,12 @@
// specific language governing permissions and limitations
// under the License.

#include "arrow/flight/flight_sql/example/sqlite_statement.h"

#include <arrow/flight/flight_sql/example/sqlite_server.h>
#include <arrow/flight/flight_sql/example/sqlite_statement.h>
#include <sqlite3.h>

#include <boost/algorithm/string.hpp>

#include "arrow/api.h"
#include "arrow/flight/flight_sql/example/sqlite_server.h"

namespace arrow {
namespace flight {
namespace sql {
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/arrow/flight/flight_sql/example/sqlite_statement.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

#pragma once

#include <arrow/type_fwd.h>
#include <sqlite3.h>

#include "arrow/api.h"
#include <memory>
#include <string>

namespace arrow {
namespace flight {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@
// specific language governing permissions and limitations
// under the License.

#include "arrow/flight/flight_sql/example/sqlite_statement_batch_reader.h"

#include <arrow/api.h>
#include <arrow/flight/flight_sql/example/sqlite_statement.h>
#include <arrow/flight/flight_sql/example/sqlite_statement_batch_reader.h>
#include <sqlite3.h>

#include "arrow/api.h"
#include "arrow/flight/flight_sql/example/sqlite_statement.h"

#define STRING_BUILDER_CASE(TYPE_CLASS, STMT, COLUMN) \
case TYPE_CLASS##Type::type_id: { \
int bytes = sqlite3_column_bytes(STMT, COLUMN); \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

#pragma once

#include <arrow/flight/flight_sql/example/sqlite_statement.h>
#include <arrow/record_batch.h>
#include <sqlite3.h>

#include "arrow/api.h"
#include "arrow/flight/flight_sql/example/sqlite_statement.h"
#include <memory>

namespace arrow {
namespace flight {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#include "arrow/flight/flight_sql/example/sqlite_tables_schema_batch_reader.h"
#include <arrow/flight/flight_sql/example/sqlite_tables_schema_batch_reader.h>

#include <arrow/flight/flight_sql/server.h>
#include <arrow/ipc/writer.h>
Expand All @@ -24,8 +24,8 @@

#include <sstream>

#include "arrow/flight/flight_sql/example/sqlite_server.h"
#include "arrow/flight/flight_sql/example/sqlite_statement.h"
#include <arrow/flight/flight_sql/example/sqlite_server.h>
#include <arrow/flight/flight_sql/example/sqlite_statement.h>

namespace arrow {
namespace flight {
Expand Down
Loading

0 comments on commit b26bfc4

Please sign in to comment.