Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigtable): introduce DataConnection #9323

Merged
merged 2 commits into from
Jun 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions google/cloud/bigtable/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ add_library(
completion_queue.h
data_client.cc
data_client.h
data_connection.cc
data_connection.h
expr.cc
expr.h
filters.h
Expand Down Expand Up @@ -177,8 +179,6 @@ add_library(
internal/connection_refresh_state.h
internal/convert_policies.cc
internal/convert_policies.h
internal/data_connection.cc
internal/data_connection.h
internal/data_connection_impl.cc
internal/data_connection_impl.h
internal/default_row_reader.cc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "google/cloud/bigtable/internal/data_connection.h"
#include "google/cloud/bigtable/data_connection.h"
#include "google/cloud/bigtable/internal/bigtable_stub_factory.h"
#include "google/cloud/bigtable/internal/data_connection_impl.h"
#include "google/cloud/bigtable/internal/defaults.h"
Expand All @@ -25,16 +25,15 @@

namespace google {
namespace cloud {
namespace bigtable_internal {
namespace bigtable {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
namespace {

std::vector<bigtable::FailedMutation> MakeUnimplementedFailedMutations(
std::size_t n) {
std::vector<bigtable::FailedMutation> mutations;
std::vector<FailedMutation> MakeUnimplementedFailedMutations(std::size_t n) {
std::vector<FailedMutation> mutations;
mutations.reserve(n);
for (int i = 0; i != static_cast<int>(n); ++i) {
mutations.emplace_back(bigtable::FailedMutation(
mutations.emplace_back(FailedMutation(
Status(StatusCode::kUnimplemented, "not implemented"), i));
}
return mutations;
Expand All @@ -46,116 +45,111 @@ DataConnection::~DataConnection() = default;

Status DataConnection::Apply(
// NOLINTNEXTLINE(performance-unnecessary-value-param)
std::string const&, std::string const&, bigtable::SingleRowMutation) {
std::string const&, std::string const&, SingleRowMutation) {
return Status(StatusCode::kUnimplemented, "not implemented");
}

future<Status> DataConnection::AsyncApply(
// NOLINTNEXTLINE(performance-unnecessary-value-param)
std::string const&, std::string const&, bigtable::SingleRowMutation) {
std::string const&, std::string const&, SingleRowMutation) {
return make_ready_future(
Status(StatusCode::kUnimplemented, "not implemented"));
}

std::vector<bigtable::FailedMutation> DataConnection::BulkApply(
std::vector<FailedMutation> DataConnection::BulkApply(
// NOLINTNEXTLINE(performance-unnecessary-value-param)
std::string const&, std::string const&, bigtable::BulkMutation mut) {
std::string const&, std::string const&, BulkMutation mut) {
return MakeUnimplementedFailedMutations(mut.size());
}

future<std::vector<bigtable::FailedMutation>> DataConnection::AsyncBulkApply(
future<std::vector<FailedMutation>> DataConnection::AsyncBulkApply(
// NOLINTNEXTLINE(performance-unnecessary-value-param)
std::string const&, std::string const&, bigtable::BulkMutation mut) {
std::string const&, std::string const&, BulkMutation mut) {
return make_ready_future(MakeUnimplementedFailedMutations(mut.size()));
}

bigtable::RowReader DataConnection::ReadRows(
std::string const&, std::string const&,
RowReader DataConnection::ReadRows(
// NOLINTNEXTLINE(performance-unnecessary-value-param)
bigtable::RowSet, std::int64_t, bigtable::Filter) {
return MakeRowReader(std::make_shared<StatusOnlyRowReader>(
std::string const&, std::string const&, RowSet, std::int64_t, Filter) {
return MakeRowReader(std::make_shared<bigtable_internal::StatusOnlyRowReader>(
Status(StatusCode::kUnimplemented, "not implemented")));
}

StatusOr<std::pair<bool, bigtable::Row>> DataConnection::ReadRow(
StatusOr<std::pair<bool, Row>> DataConnection::ReadRow(
// NOLINTNEXTLINE(performance-unnecessary-value-param)
std::string const&, std::string const&, std::string, bigtable::Filter) {
std::string const&, std::string const&, std::string, Filter) {
return Status(StatusCode::kUnimplemented, "not implemented");
}

StatusOr<bigtable::MutationBranch> DataConnection::CheckAndMutateRow(
// NOLINTNEXTLINE(performance-unnecessary-value-param)
std::string const&, std::string const&, std::string, bigtable::Filter,
StatusOr<MutationBranch> DataConnection::CheckAndMutateRow(
std::string const&, std::string const&,
// NOLINTNEXTLINE(performance-unnecessary-value-param)
std::vector<bigtable::Mutation>, std::vector<bigtable::Mutation>) {
std::string, Filter, std::vector<Mutation>, std::vector<Mutation>) {
return Status(StatusCode::kUnimplemented, "not implemented");
}

future<StatusOr<bigtable::MutationBranch>>
DataConnection::AsyncCheckAndMutateRow(
// NOLINTNEXTLINE(performance-unnecessary-value-param)
std::string const&, std::string const&, std::string, bigtable::Filter,
future<StatusOr<MutationBranch>> DataConnection::AsyncCheckAndMutateRow(
std::string const&, std::string const&,
// NOLINTNEXTLINE(performance-unnecessary-value-param)
std::vector<bigtable::Mutation>, std::vector<bigtable::Mutation>) {
return make_ready_future<StatusOr<bigtable::MutationBranch>>(
std::string, Filter, std::vector<Mutation>, std::vector<Mutation>) {
return make_ready_future<StatusOr<MutationBranch>>(
Status(StatusCode::kUnimplemented, "not implemented"));
}

StatusOr<std::vector<bigtable::RowKeySample>> DataConnection::SampleRows(
StatusOr<std::vector<RowKeySample>> DataConnection::SampleRows(
std::string const&, std::string const&) {
return Status(StatusCode::kUnimplemented, "not implemented");
}

future<StatusOr<std::vector<bigtable::RowKeySample>>>
DataConnection::AsyncSampleRows(std::string const&, std::string const&) {
return make_ready_future<StatusOr<std::vector<bigtable::RowKeySample>>>(
future<StatusOr<std::vector<RowKeySample>>> DataConnection::AsyncSampleRows(
std::string const&, std::string const&) {
return make_ready_future<StatusOr<std::vector<RowKeySample>>>(
Status(StatusCode::kUnimplemented, "not implemented"));
}

StatusOr<bigtable::Row> DataConnection::ReadModifyWriteRow(
StatusOr<Row> DataConnection::ReadModifyWriteRow(
// NOLINTNEXTLINE(performance-unnecessary-value-param)
google::bigtable::v2::ReadModifyWriteRowRequest) {
return Status(StatusCode::kUnimplemented, "not implemented");
}

future<StatusOr<bigtable::Row>> DataConnection::AsyncReadModifyWriteRow(
future<StatusOr<Row>> DataConnection::AsyncReadModifyWriteRow(
// NOLINTNEXTLINE(performance-unnecessary-value-param)
google::bigtable::v2::ReadModifyWriteRowRequest) {
return make_ready_future<StatusOr<bigtable::Row>>(
return make_ready_future<StatusOr<Row>>(
Status(StatusCode::kUnimplemented, "not implemented"));
}

void DataConnection::AsyncReadRows(
std::string const&, std::string const&,
// NOLINTNEXTLINE(performance-unnecessary-value-param)
std::function<future<bool>(bigtable::Row)>,
// NOLINTNEXTLINE(performance-unnecessary-value-param)
std::function<void(Status)> on_finish, bigtable::RowSet, std::int64_t,
std::function<future<bool>(Row)>, std::function<void(Status)> on_finish,
// NOLINTNEXTLINE(performance-unnecessary-value-param)
bigtable::Filter) {
RowSet, std::int64_t, Filter) {
on_finish(Status(StatusCode::kUnimplemented, "not implemented"));
}

future<StatusOr<std::pair<bool, bigtable::Row>>> DataConnection::AsyncReadRow(
future<StatusOr<std::pair<bool, Row>>> DataConnection::AsyncReadRow(
// NOLINTNEXTLINE(performance-unnecessary-value-param)
std::string const&, std::string const&, std::string, bigtable::Filter) {
return make_ready_future<StatusOr<std::pair<bool, bigtable::Row>>>(
std::string const&, std::string const&, std::string, Filter) {
return make_ready_future<StatusOr<std::pair<bool, Row>>>(
Status(StatusCode::kUnimplemented, "not implemented"));
}

std::shared_ptr<DataConnection> MakeDataConnection(Options options) {
google::cloud::internal::CheckExpectedOptions<
CommonOptionList, GrpcOptionList, bigtable::DataPolicyOptionList>(
options, __func__);
CommonOptionList, GrpcOptionList, DataPolicyOptionList>(options,
__func__);
options = bigtable::internal::DefaultDataOptions(std::move(options));
auto background = internal::MakeBackgroundThreadsFactory(options)();
auto background =
google::cloud::internal::MakeBackgroundThreadsFactory(options)();
auto stub = bigtable_internal::CreateBigtableStub(background->cq(), options);
return std::make_shared<bigtable_internal::DataConnectionImpl>(
std::move(background), std::move(stub), std::move(options));
}

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace bigtable_internal
} // namespace bigtable
} // namespace cloud
} // namespace google

Expand All @@ -164,7 +158,7 @@ namespace cloud {
namespace bigtable_internal {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN

std::shared_ptr<DataConnection> MakeDataConnection(
std::shared_ptr<bigtable::DataConnection> MakeDataConnection(
std::shared_ptr<BigtableStub> stub, Options options) {
options = bigtable::internal::DefaultDataOptions(std::move(options));
auto background = internal::MakeBackgroundThreadsFactory(options)();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_DATA_CONNECTION_H
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_DATA_CONNECTION_H
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_DATA_CONNECTION_H
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_DATA_CONNECTION_H

#include "google/cloud/bigtable/filters.h"
#include "google/cloud/bigtable/internal/bigtable_stub.h"
Expand All @@ -32,8 +32,7 @@

namespace google {
namespace cloud {
// TODO(#8860) - Make this class public, when it is ready.
namespace bigtable_internal {
namespace bigtable {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN

/**
Expand All @@ -54,65 +53,60 @@ class DataConnection {
virtual Options options() { return Options{}; }

virtual Status Apply(std::string const& app_profile_id,
std::string const& table_name,
bigtable::SingleRowMutation mut);
std::string const& table_name, SingleRowMutation mut);

virtual future<Status> AsyncApply(std::string const& app_profile_id,
std::string const& table_name,
bigtable::SingleRowMutation mut);
SingleRowMutation mut);

virtual std::vector<bigtable::FailedMutation> BulkApply(
virtual std::vector<FailedMutation> BulkApply(
std::string const& app_profile_id, std::string const& table_name,
bigtable::BulkMutation mut);
BulkMutation mut);

virtual future<std::vector<bigtable::FailedMutation>> AsyncBulkApply(
virtual future<std::vector<FailedMutation>> AsyncBulkApply(
std::string const& app_profile_id, std::string const& table_name,
bigtable::BulkMutation mut);
BulkMutation mut);

virtual bigtable::RowReader ReadRows(std::string const& app_profile_id,
std::string const& table_name,
bigtable::RowSet row_set,
std::int64_t rows_limit,
bigtable::Filter filter);
virtual RowReader ReadRows(std::string const& app_profile_id,
std::string const& table_name, RowSet row_set,
std::int64_t rows_limit, Filter filter);

virtual StatusOr<std::pair<bool, bigtable::Row>> ReadRow(
virtual StatusOr<std::pair<bool, Row>> ReadRow(
std::string const& app_profile_id, std::string const& table_name,
std::string row_key, bigtable::Filter filter);
std::string row_key, Filter filter);

virtual StatusOr<bigtable::MutationBranch> CheckAndMutateRow(
virtual StatusOr<MutationBranch> CheckAndMutateRow(
std::string const& app_profile_id, std::string const& table_name,
std::string row_key, bigtable::Filter filter,
std::vector<bigtable::Mutation> true_mutations,
std::vector<bigtable::Mutation> false_mutations);
std::string row_key, Filter filter, std::vector<Mutation> true_mutations,
std::vector<Mutation> false_mutations);

virtual future<StatusOr<bigtable::MutationBranch>> AsyncCheckAndMutateRow(
virtual future<StatusOr<MutationBranch>> AsyncCheckAndMutateRow(
std::string const& app_profile_id, std::string const& table_name,
std::string row_key, bigtable::Filter filter,
std::vector<bigtable::Mutation> true_mutations,
std::vector<bigtable::Mutation> false_mutations);
std::string row_key, Filter filter, std::vector<Mutation> true_mutations,
std::vector<Mutation> false_mutations);

virtual StatusOr<std::vector<bigtable::RowKeySample>> SampleRows(
virtual StatusOr<std::vector<RowKeySample>> SampleRows(
std::string const& app_profile_id, std::string const& table_name);

virtual future<StatusOr<std::vector<bigtable::RowKeySample>>> AsyncSampleRows(
virtual future<StatusOr<std::vector<RowKeySample>>> AsyncSampleRows(
std::string const& app_profile_id, std::string const& table_name);

virtual StatusOr<bigtable::Row> ReadModifyWriteRow(
virtual StatusOr<Row> ReadModifyWriteRow(
google::bigtable::v2::ReadModifyWriteRowRequest request);

virtual future<StatusOr<bigtable::Row>> AsyncReadModifyWriteRow(
virtual future<StatusOr<Row>> AsyncReadModifyWriteRow(
google::bigtable::v2::ReadModifyWriteRowRequest request);

virtual void AsyncReadRows(std::string const& app_profile_id,
std::string const& table_name,
std::function<future<bool>(bigtable::Row)> on_row,
std::function<future<bool>(Row)> on_row,
std::function<void(Status)> on_finish,
bigtable::RowSet row_set, std::int64_t rows_limit,
bigtable::Filter filter);
RowSet row_set, std::int64_t rows_limit,
Filter filter);

virtual future<StatusOr<std::pair<bool, bigtable::Row>>> AsyncReadRow(
virtual future<StatusOr<std::pair<bool, Row>>> AsyncReadRow(
std::string const& app_profile_id, std::string const& table_name,
std::string row_key, bigtable::Filter filter);
std::string row_key, Filter filter);
};

/**
Expand All @@ -136,13 +130,13 @@ class DataConnection {
* `GOOGLE_CLOUD_CPP_ENABLE_CLOG=yes` in the environment and unexpected
* options will be logged.
*
* @param opts (optional) Configure the `DataConnection` created by this
* function.
* @param options (optional) Configure the `DataConnection` created by this
* function.
*/
std::shared_ptr<DataConnection> MakeDataConnection(Options options = {});

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace bigtable_internal
} // namespace bigtable
} // namespace cloud
} // namespace google

Expand All @@ -151,12 +145,12 @@ namespace cloud {
namespace bigtable_internal {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN

std::shared_ptr<DataConnection> MakeDataConnection(
std::shared_ptr<bigtable::DataConnection> MakeDataConnection(
std::shared_ptr<BigtableStub> stub, Options options);

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace bigtable_internal
} // namespace cloud
} // namespace google

#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_DATA_CONNECTION_H
#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_DATA_CONNECTION_H
4 changes: 2 additions & 2 deletions google/cloud/bigtable/google_cloud_cpp_bigtable.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ google_cloud_cpp_bigtable_hdrs = [
"column_family.h",
"completion_queue.h",
"data_client.h",
"data_connection.h",
"expr.h",
"filters.h",
"iam_binding.h",
Expand Down Expand Up @@ -79,7 +80,6 @@ google_cloud_cpp_bigtable_hdrs = [
"internal/common_client.h",
"internal/connection_refresh_state.h",
"internal/convert_policies.h",
"internal/data_connection.h",
"internal/data_connection_impl.h",
"internal/default_row_reader.h",
"internal/defaults.h",
Expand Down Expand Up @@ -145,6 +145,7 @@ google_cloud_cpp_bigtable_srcs = [
"client_options.cc",
"cluster_config.cc",
"data_client.cc",
"data_connection.cc",
"expr.cc",
"iam_binding.cc",
"iam_policy.cc",
Expand All @@ -167,7 +168,6 @@ google_cloud_cpp_bigtable_srcs = [
"internal/bulk_mutator.cc",
"internal/connection_refresh_state.cc",
"internal/convert_policies.cc",
"internal/data_connection.cc",
"internal/data_connection_impl.cc",
"internal/default_row_reader.cc",
"internal/defaults.cc",
Expand Down
Loading