From 3fa1fa0ff33d1a6e2f2b35219d8705c9b758dbd5 Mon Sep 17 00:00:00 2001
From: Igor Bernstein <igorbernstein@google.com>
Date: Tue, 18 Sep 2018 18:52:16 -0400
Subject: [PATCH] Bigtable: limit mutation sizes in the client to avoid
 overloading the server (#3695)

* Bigtable: limit mutation sizes in the client to avoid overloading the server

* address feedback
---
 .../bigtable/data/v2/models/Mutation.java     | 43 +++++++++++++------
 .../bigtable/data/v2/models/MutationTest.java | 36 ++++++++++++++++
 2 files changed, 66 insertions(+), 13 deletions(-)

diff --git a/google-cloud-clients/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Mutation.java b/google-cloud-clients/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Mutation.java
index fde6a5f4e49e..943ecffadc27 100644
--- a/google-cloud-clients/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Mutation.java
+++ b/google-cloud-clients/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Mutation.java
@@ -15,6 +15,7 @@
  */
 package com.google.cloud.bigtable.data.v2.models;
 
+import com.google.api.core.InternalApi;
 import com.google.bigtable.v2.Mutation.DeleteFromColumn;
 import com.google.bigtable.v2.Mutation.DeleteFromFamily;
 import com.google.bigtable.v2.Mutation.DeleteFromRow;
@@ -36,11 +37,19 @@
  * encapsulate a list of mutations that will to be applied to a single row.
  */
 public final class Mutation implements MutationApi<Mutation>, Serializable {
-  private static final long serialVersionUID = 5893216644683374339L;
+  private static final long serialVersionUID = 5893216644683374340L;
+
+  @InternalApi("Visible for testing")
+  static final int MAX_MUTATIONS = 100_000;
+  @InternalApi("Visible for testing")
+  static final int MAX_BYTE_SIZE = 200 * 1024 * 1024;
 
   private transient ImmutableList.Builder<com.google.bigtable.v2.Mutation> mutations =
       ImmutableList.builder();
 
+  private int numMutations;
+  private long byteSize;
+
   public static Mutation create() {
     return new Mutation();
   }
@@ -95,7 +104,7 @@ public Mutation setCell(
     Preconditions.checkNotNull(value, "value can't be null.");
     Preconditions.checkArgument(timestamp != -1, "Serverside timestamps are not supported");
 
-    com.google.bigtable.v2.Mutation mutation =
+    addMutation(
         com.google.bigtable.v2.Mutation.newBuilder()
             .setSetCell(
                 SetCell.newBuilder()
@@ -104,9 +113,8 @@ public Mutation setCell(
                     .setTimestampMicros(timestamp)
                     .setValue(value)
                     .build())
-            .build();
+            .build());
 
-    mutations.add(mutation);
     return this;
   }
 
@@ -161,9 +169,8 @@ public Mutation deleteCells(
         throw new IllegalArgumentException("Unknown end bound: " + timestampRange.getEndBound());
     }
 
-    com.google.bigtable.v2.Mutation mutation =
-        com.google.bigtable.v2.Mutation.newBuilder().setDeleteFromColumn(builder.build()).build();
-    mutations.add(mutation);
+    addMutation(
+        com.google.bigtable.v2.Mutation.newBuilder().setDeleteFromColumn(builder.build()).build());
 
     return this;
   }
@@ -172,26 +179,36 @@ public Mutation deleteCells(
   public Mutation deleteFamily(@Nonnull String familyName) {
     Validations.validateFamily(familyName);
 
-    com.google.bigtable.v2.Mutation mutation =
+    addMutation(
         com.google.bigtable.v2.Mutation.newBuilder()
             .setDeleteFromFamily(DeleteFromFamily.newBuilder().setFamilyName(familyName).build())
-            .build();
-    mutations.add(mutation);
+            .build());
 
     return this;
   }
 
   @Override
   public Mutation deleteRow() {
-    com.google.bigtable.v2.Mutation mutation =
+    addMutation(
         com.google.bigtable.v2.Mutation.newBuilder()
             .setDeleteFromRow(DeleteFromRow.getDefaultInstance())
-            .build();
-    mutations.add(mutation);
+            .build());
 
     return this;
   }
 
+  private void addMutation(com.google.bigtable.v2.Mutation mutation) {
+    Preconditions.checkState(numMutations + 1 <= MAX_MUTATIONS,
+        "Too many mutations per row");
+    Preconditions.checkState(byteSize + mutation.getSerializedSize() <= MAX_BYTE_SIZE,
+        "Byte size of mutations is too large");
+
+    numMutations++;
+    byteSize += mutation.getSerializedSize();
+
+    mutations.add(mutation);
+  }
+
   private static ByteString wrapByteString(String str) {
     if (str == null) {
       return null;
diff --git a/google-cloud-clients/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/MutationTest.java b/google-cloud-clients/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/MutationTest.java
index f6a9954d3aaa..8ba5b5cbddc9 100644
--- a/google-cloud-clients/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/MutationTest.java
+++ b/google-cloud-clients/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/MutationTest.java
@@ -35,6 +35,7 @@
 
 @RunWith(JUnit4.class)
 public class MutationTest {
+
   private Mutation mutation;
 
   @Before
@@ -176,4 +177,39 @@ public void serializationTest() throws IOException, ClassNotFoundException {
     Mutation actual = (Mutation) ois.readObject();
     assertThat(actual.getMutations()).isEqualTo(expected.getMutations());
   }
+
+  @Test
+  public void tooManyMutationsTest() {
+    Mutation mutation = Mutation.create();
+
+    for (int i = 0; i < Mutation.MAX_MUTATIONS; i++) {
+      mutation.setCell("f", "", "");
+    }
+
+    Exception actualError = null;
+
+    try {
+      mutation.setCell("f", "", "");
+    } catch (Exception e) {
+      actualError = e;
+    }
+
+    assertThat(actualError).isInstanceOf(IllegalStateException.class);
+  }
+
+  @Test
+  public void tooLargeRequest() {
+    Mutation mutation = Mutation.create();
+
+    Exception actualError = null;
+
+    try {
+      mutation.setCell("f", ByteString.copyFromUtf8(""),
+          ByteString.copyFrom(new byte[Mutation.MAX_BYTE_SIZE]));
+    } catch (Exception e) {
+      actualError = e;
+    }
+
+    assertThat(actualError).isInstanceOf(IllegalStateException.class);
+  }
 }