diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/BigtableInstanceAdminClient.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/BigtableInstanceAdminClient.java
index 795e791469..793766c26a 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/BigtableInstanceAdminClient.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/BigtableInstanceAdminClient.java
@@ -523,12 +523,7 @@ public Cluster createCluster(CreateClusterRequest request) {
   public ApiFuture<Cluster> createClusterAsync(CreateClusterRequest request) {
     return ApiFutures.transform(
         stub.createClusterOperationCallable().futureCall(request.toProto(projectId)),
-        new ApiFunction<com.google.bigtable.admin.v2.Cluster, Cluster>() {
-          @Override
-          public Cluster apply(com.google.bigtable.admin.v2.Cluster proto) {
-            return Cluster.fromProto(proto);
-          }
-        },
+        Cluster::fromProto,
         MoreExecutors.directExecutor());
   }
 
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/models/Cluster.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/models/Cluster.java
index 8580fc9897..189685cfc0 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/models/Cluster.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/models/Cluster.java
@@ -177,6 +177,20 @@ public int getAutoscalingCpuPercentageTarget() {
         .getAutoscalingTargets()
         .getCpuUtilizationPercent();
   }
+  /**
+   * Get the storage utilization that the Autoscaler should be trying to achieve. This number is
+   * limited between 2560 (2.5TiB) and 5120 (5TiB) for a SSD cluster and between 8192 (8TiB) and
+   * 16384 (16TiB) for an HDD cluster; otherwise it will return INVALID_ARGUMENT error. If this
+   * value is set to 0, it will be treated as if it were set to the default value: 2560 for SSD,
+   * 8192 for HDD.
+   */
+  public int getStorageUtilizationGibPerNode() {
+    return stateProto
+        .getClusterConfig()
+        .getClusterAutoscalingConfig()
+        .getAutoscalingTargets()
+        .getStorageUtilizationGibPerNode();
+  }
 
   /**
    * The type of storage used by this cluster to serve its parent instance's tables, unless
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/models/ClusterAutoscalingConfig.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/models/ClusterAutoscalingConfig.java
index 1a0a135640..617618b800 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/models/ClusterAutoscalingConfig.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/models/ClusterAutoscalingConfig.java
@@ -104,6 +104,21 @@ public ClusterAutoscalingConfig setCpuUtilizationTargetPercent(int cpuUtilizatio
     return this;
   }
 
+  public ClusterAutoscalingConfig setStorageUtilizationGibPerNode(
+      int storageUtilizationGibPerNode) {
+    builder.setUpdateMask(
+        FieldMaskUtil.union(
+            builder.getUpdateMask(),
+            FieldMaskUtil.fromString(
+                Cluster.class,
+                "cluster_config.cluster_autoscaling_config.autoscaling_targets.storage_utilization_gib_per_node")));
+    clusterConfigBuilder
+        .getClusterAutoscalingConfigBuilder()
+        .getAutoscalingTargetsBuilder()
+        .setStorageUtilizationGibPerNode(storageUtilizationGibPerNode);
+    return this;
+  }
+
   /** Get the minimum number of nodes to scale down to. */
   public int getMinNodes() {
     return clusterConfigBuilder
@@ -131,6 +146,20 @@ public int getCpuUtilizationTargetPercent() {
         .getCpuUtilizationPercent();
   }
 
+  /**
+   * Get the storage utilization that the Autoscaler should be trying to achieve. This number is
+   * limited between 2560 (2.5TiB) and 5120 (5TiB) for a SSD cluster and between 8192 (8TiB) and
+   * 16384 (16TiB) for an HDD cluster; otherwise it will return INVALID_ARGUMENT error. If this
+   * value is set to 0, it will be treated as if it were set to the default value: 2560 for SSD,
+   * 8192 for HDD.
+   */
+  public int getStorageUtilizationGibPerNode() {
+    return clusterConfigBuilder
+        .getClusterAutoscalingConfig()
+        .getAutoscalingTargets()
+        .getStorageUtilizationGibPerNode();
+  }
+
   /**
    * Creates the request protobuf. This method is considered an internal implementation detail and
    * not meant to be used by applications.
@@ -184,6 +213,15 @@ public boolean equals(Object o) {
                 .getClusterAutoscalingConfig()
                 .getAutoscalingTargets()
                 .getCpuUtilizationPercent())
+        && Objects.equal(
+            clusterConfigBuilder
+                .getClusterAutoscalingConfig()
+                .getAutoscalingTargets()
+                .getStorageUtilizationGibPerNode(),
+            that.clusterConfigBuilder
+                .getClusterAutoscalingConfig()
+                .getAutoscalingTargets()
+                .getStorageUtilizationGibPerNode())
         && Objects.equal(clusterId, that.clusterId)
         && Objects.equal(instanceId, that.instanceId);
   }
@@ -203,6 +241,10 @@ public int hashCode() {
             .getClusterAutoscalingConfig()
             .getAutoscalingTargets()
             .getCpuUtilizationPercent(),
+        clusterConfigBuilder
+            .getClusterAutoscalingConfig()
+            .getAutoscalingTargets()
+            .getStorageUtilizationGibPerNode(),
         clusterId,
         instanceId);
   }
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/models/CreateClusterRequest.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/models/CreateClusterRequest.java
index 65ecae8d78..851f0e677f 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/models/CreateClusterRequest.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/admin/v2/models/CreateClusterRequest.java
@@ -110,12 +110,13 @@ public CreateClusterRequest setScalingMode(@Nonnull StaticClusterSize staticClus
 
   /**
    * Sets the scaling mode to autoscaling by accepting an AutoscalingConfig where min nodes, max
-   * nodes, and CPU utlization percent target are set.
+   * nodes, CPU utilization percent target, and storage utilization gib per node are set.
    */
   public CreateClusterRequest setScalingMode(@Nonnull ClusterAutoscalingConfig autoscalingConfig) {
     int minNodes = autoscalingConfig.getMinNodes();
     int maxNodes = autoscalingConfig.getMaxNodes();
     int cpuTargetPercent = autoscalingConfig.getCpuUtilizationTargetPercent();
+    int storageUtilizationGibPerNode = autoscalingConfig.getStorageUtilizationGibPerNode();
 
     proto
         .getClusterBuilder()
@@ -135,6 +136,12 @@ public CreateClusterRequest setScalingMode(@Nonnull ClusterAutoscalingConfig aut
         .getClusterAutoscalingConfigBuilder()
         .getAutoscalingTargetsBuilder()
         .setCpuUtilizationPercent(cpuTargetPercent);
+    proto
+        .getClusterBuilder()
+        .getClusterConfigBuilder()
+        .getClusterAutoscalingConfigBuilder()
+        .getAutoscalingTargetsBuilder()
+        .setStorageUtilizationGibPerNode(storageUtilizationGibPerNode);
     return this;
   }
 
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/BigtableInstanceAdminClientTests.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/BigtableInstanceAdminClientTests.java
index 7e5d3a8b05..5a7c955787 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/BigtableInstanceAdminClientTests.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/BigtableInstanceAdminClientTests.java
@@ -113,7 +113,10 @@ public class BigtableInstanceAdminClientTests {
                           .setMinServeNodes(2)
                           .build())
                   .setAutoscalingTargets(
-                      AutoscalingTargets.newBuilder().setCpuUtilizationPercent(22).build()))
+                      AutoscalingTargets.newBuilder()
+                          .setCpuUtilizationPercent(22)
+                          .setStorageUtilizationGibPerNode(3000)
+                          .build()))
           .build();
 
   @Mock private BigtableInstanceAdminStub mockStub;
@@ -575,7 +578,8 @@ public void testCreateClusterAutoscaling() {
                     ClusterAutoscalingConfig.of(INSTANCE_ID, CLUSTER_ID)
                         .setMinNodes(2)
                         .setMaxNodes(10)
-                        .setCpuUtilizationTargetPercent(22))
+                        .setCpuUtilizationTargetPercent(22)
+                        .setStorageUtilizationGibPerNode(3000))
                 .setStorageType(StorageType.SSD));
     // Verify
     assertThat(actualResult).isEqualTo(Cluster.fromProto(expectedResponse));
@@ -759,6 +763,8 @@ public void testPartialUpdateCluster() {
                         "cluster_config.cluster_autoscaling_config.autoscaling_limits.min_serve_nodes")
                     .addPaths(
                         "cluster_config.cluster_autoscaling_config.autoscaling_targets.cpu_utilization_percent")
+                    .addPaths(
+                        "cluster_config.cluster_autoscaling_config.autoscaling_targets.storage_utilization_gib_per_node")
                     .build())
             .build();
 
@@ -776,6 +782,7 @@ public void testPartialUpdateCluster() {
             ClusterAutoscalingConfig.of(INSTANCE_ID, CLUSTER_ID)
                 .setMaxNodes(10)
                 .setMinNodes(2)
+                .setStorageUtilizationGibPerNode(3000)
                 .setCpuUtilizationTargetPercent(22));
 
     // Verify
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/it/BigtableInstanceAdminClientIT.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/it/BigtableInstanceAdminClientIT.java
index fcb9c36b62..01d0280d13 100644
--- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/it/BigtableInstanceAdminClientIT.java
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/admin/v2/it/BigtableInstanceAdminClientIT.java
@@ -307,7 +307,8 @@ public void createClusterWithAutoscalingTest() {
                   ClusterAutoscalingConfig.of(newInstanceId, clusterId)
                       .setMaxNodes(4)
                       .setMinNodes(1)
-                      .setCpuUtilizationTargetPercent(20));
+                      .setCpuUtilizationTargetPercent(20)
+                      .setStorageUtilizationGibPerNode(9200));
 
       Cluster cluster = client.createCluster(createClusterRequest);
       assertThat(cluster.getId()).contains(clusterId);
@@ -315,6 +316,7 @@ public void createClusterWithAutoscalingTest() {
       assertThat(cluster.getAutoscalingMinServeNodes()).isEqualTo(1);
       assertThat(cluster.getAutoscalingMaxServeNodes()).isEqualTo(4);
       assertThat(cluster.getAutoscalingCpuPercentageTarget()).isEqualTo(20);
+      assertThat(cluster.getStorageUtilizationGibPerNode()).isEqualTo(9200);
     } catch (Exception e) {
       Assert.fail("error in the test" + e.getMessage());
     } finally {
@@ -343,6 +345,7 @@ public void createClusterWithAutoscalingAndPartialUpdateTest() {
                   ClusterAutoscalingConfig.of("ignored", clusterId)
                       .setMaxNodes(4)
                       .setMinNodes(1)
+                      .setStorageUtilizationGibPerNode(2561)
                       .setCpuUtilizationTargetPercent(20));
 
       Cluster cluster = client.createCluster(createClusterRequest);
@@ -351,6 +354,14 @@ public void createClusterWithAutoscalingAndPartialUpdateTest() {
       assertThat(cluster.getAutoscalingMinServeNodes()).isEqualTo(1);
       assertThat(cluster.getAutoscalingMaxServeNodes()).isEqualTo(4);
       assertThat(cluster.getAutoscalingCpuPercentageTarget()).isEqualTo(20);
+      assertThat(cluster.getStorageUtilizationGibPerNode()).isEqualTo(2561);
+
+      Cluster retrievedCluster = client.getCluster(newInstanceId, clusterId);
+      assertThat(retrievedCluster.getId()).contains(clusterId);
+      assertThat(retrievedCluster.getAutoscalingMinServeNodes()).isEqualTo(1);
+      assertThat(retrievedCluster.getAutoscalingMaxServeNodes()).isEqualTo(4);
+      assertThat(retrievedCluster.getAutoscalingCpuPercentageTarget()).isEqualTo(20);
+      assertThat(retrievedCluster.getStorageUtilizationGibPerNode()).isEqualTo(2561);
 
       Cluster updatedCluster =
           client.updateClusterAutoscalingConfig(
@@ -358,6 +369,13 @@ public void createClusterWithAutoscalingAndPartialUpdateTest() {
       assertThat(updatedCluster.getAutoscalingMinServeNodes()).isEqualTo(1);
       assertThat(updatedCluster.getAutoscalingMaxServeNodes()).isEqualTo(3);
       assertThat(updatedCluster.getAutoscalingCpuPercentageTarget()).isEqualTo(20);
+      assertThat(updatedCluster.getStorageUtilizationGibPerNode()).isEqualTo(2561);
+
+      Cluster retrievedUpdatedCluster = client.getCluster(newInstanceId, clusterId);
+      assertThat(retrievedUpdatedCluster.getAutoscalingMinServeNodes()).isEqualTo(1);
+      assertThat(retrievedUpdatedCluster.getAutoscalingMaxServeNodes()).isEqualTo(3);
+      assertThat(retrievedUpdatedCluster.getAutoscalingCpuPercentageTarget()).isEqualTo(20);
+      assertThat(retrievedUpdatedCluster.getStorageUtilizationGibPerNode()).isEqualTo(2561);
 
       updatedCluster =
           client.updateClusterAutoscalingConfig(
@@ -365,6 +383,13 @@ public void createClusterWithAutoscalingAndPartialUpdateTest() {
       assertThat(updatedCluster.getAutoscalingMinServeNodes()).isEqualTo(2);
       assertThat(updatedCluster.getAutoscalingMaxServeNodes()).isEqualTo(3);
       assertThat(updatedCluster.getAutoscalingCpuPercentageTarget()).isEqualTo(20);
+      assertThat(updatedCluster.getStorageUtilizationGibPerNode()).isEqualTo(2561);
+
+      retrievedUpdatedCluster = client.getCluster(newInstanceId, clusterId);
+      assertThat(retrievedUpdatedCluster.getAutoscalingMinServeNodes()).isEqualTo(2);
+      assertThat(retrievedUpdatedCluster.getAutoscalingMaxServeNodes()).isEqualTo(3);
+      assertThat(retrievedUpdatedCluster.getAutoscalingCpuPercentageTarget()).isEqualTo(20);
+      assertThat(retrievedUpdatedCluster.getStorageUtilizationGibPerNode()).isEqualTo(2561);
 
       updatedCluster =
           client.updateClusterAutoscalingConfig(
@@ -373,6 +398,13 @@ public void createClusterWithAutoscalingAndPartialUpdateTest() {
       assertThat(updatedCluster.getAutoscalingMinServeNodes()).isEqualTo(2);
       assertThat(updatedCluster.getAutoscalingMaxServeNodes()).isEqualTo(3);
       assertThat(updatedCluster.getAutoscalingCpuPercentageTarget()).isEqualTo(40);
+      assertThat(updatedCluster.getStorageUtilizationGibPerNode()).isEqualTo(2561);
+
+      retrievedUpdatedCluster = client.getCluster(newInstanceId, clusterId);
+      assertThat(retrievedUpdatedCluster.getAutoscalingMinServeNodes()).isEqualTo(2);
+      assertThat(retrievedUpdatedCluster.getAutoscalingMaxServeNodes()).isEqualTo(3);
+      assertThat(retrievedUpdatedCluster.getAutoscalingCpuPercentageTarget()).isEqualTo(40);
+      assertThat(retrievedUpdatedCluster.getStorageUtilizationGibPerNode()).isEqualTo(2561);
 
       updatedCluster =
           client.updateClusterAutoscalingConfig(
@@ -382,6 +414,44 @@ public void createClusterWithAutoscalingAndPartialUpdateTest() {
       assertThat(updatedCluster.getAutoscalingMinServeNodes()).isEqualTo(2);
       assertThat(updatedCluster.getAutoscalingMaxServeNodes()).isEqualTo(5);
       assertThat(updatedCluster.getAutoscalingCpuPercentageTarget()).isEqualTo(45);
+      assertThat(updatedCluster.getStorageUtilizationGibPerNode()).isEqualTo(2561);
+
+      retrievedUpdatedCluster = client.getCluster(newInstanceId, clusterId);
+      assertThat(retrievedUpdatedCluster.getAutoscalingMinServeNodes()).isEqualTo(2);
+      assertThat(retrievedUpdatedCluster.getAutoscalingMaxServeNodes()).isEqualTo(5);
+      assertThat(retrievedUpdatedCluster.getAutoscalingCpuPercentageTarget()).isEqualTo(45);
+      assertThat(retrievedUpdatedCluster.getStorageUtilizationGibPerNode()).isEqualTo(2561);
+
+      updatedCluster =
+          client.updateClusterAutoscalingConfig(
+              ClusterAutoscalingConfig.of(newInstanceId, clusterId)
+                  .setStorageUtilizationGibPerNode(2777));
+      assertThat(updatedCluster.getAutoscalingMinServeNodes()).isEqualTo(2);
+      assertThat(updatedCluster.getAutoscalingMaxServeNodes()).isEqualTo(5);
+      assertThat(updatedCluster.getAutoscalingCpuPercentageTarget()).isEqualTo(45);
+      assertThat(updatedCluster.getStorageUtilizationGibPerNode()).isEqualTo(2777);
+
+      retrievedUpdatedCluster = client.getCluster(newInstanceId, clusterId);
+      assertThat(retrievedUpdatedCluster.getAutoscalingMinServeNodes()).isEqualTo(2);
+      assertThat(retrievedUpdatedCluster.getAutoscalingMaxServeNodes()).isEqualTo(5);
+      assertThat(retrievedUpdatedCluster.getAutoscalingCpuPercentageTarget()).isEqualTo(45);
+      assertThat(retrievedUpdatedCluster.getStorageUtilizationGibPerNode()).isEqualTo(2777);
+
+      updatedCluster =
+          client.updateClusterAutoscalingConfig(
+              ClusterAutoscalingConfig.of(newInstanceId, clusterId)
+                  // testing default case
+                  .setStorageUtilizationGibPerNode(0));
+      assertThat(updatedCluster.getAutoscalingMinServeNodes()).isEqualTo(2);
+      assertThat(updatedCluster.getAutoscalingMaxServeNodes()).isEqualTo(5);
+      assertThat(updatedCluster.getAutoscalingCpuPercentageTarget()).isEqualTo(45);
+      assertThat(updatedCluster.getStorageUtilizationGibPerNode()).isEqualTo(2560);
+
+      retrievedUpdatedCluster = client.getCluster(newInstanceId, clusterId);
+      assertThat(retrievedUpdatedCluster.getAutoscalingMinServeNodes()).isEqualTo(2);
+      assertThat(retrievedUpdatedCluster.getAutoscalingMaxServeNodes()).isEqualTo(5);
+      assertThat(retrievedUpdatedCluster.getAutoscalingCpuPercentageTarget()).isEqualTo(45);
+      assertThat(retrievedUpdatedCluster.getStorageUtilizationGibPerNode()).isEqualTo(2560);
     } catch (Exception e) {
       Assert.fail("error in the test: " + e.getMessage());
     } finally {
@@ -414,6 +484,7 @@ public void createClusterWithManualScalingTest() {
       assertThat(cluster.getAutoscalingMaxServeNodes()).isEqualTo(0);
       assertThat(cluster.getAutoscalingMinServeNodes()).isEqualTo(0);
       assertThat(cluster.getAutoscalingCpuPercentageTarget()).isEqualTo(0);
+      assertThat(cluster.getStorageUtilizationGibPerNode()).isEqualTo(0);
     } catch (Exception e) {
       Assert.fail("error in the test: " + e.getMessage());
     } finally {
@@ -447,16 +518,19 @@ private void basicClusterOperationTestHelper(String targetInstanceId, String tar
         ClusterAutoscalingConfig.of(targetInstanceId, targetClusterId)
             .setMinNodes(1)
             .setMaxNodes(4)
+            .setStorageUtilizationGibPerNode(2877)
             .setCpuUtilizationTargetPercent(40);
     Cluster cluster = client.updateClusterAutoscalingConfig(autoscalingConfig);
     assertThat(cluster.getAutoscalingMaxServeNodes()).isEqualTo(4);
     assertThat(cluster.getAutoscalingMinServeNodes()).isEqualTo(1);
     assertThat(cluster.getAutoscalingCpuPercentageTarget()).isEqualTo(40);
+    assertThat(cluster.getStorageUtilizationGibPerNode()).isEqualTo(2877);
 
     Cluster updatedCluster = client.disableClusterAutoscaling(targetInstanceId, targetClusterId, 3);
     assertThat(updatedCluster.getServeNodes()).isEqualTo(3);
     assertThat(updatedCluster.getAutoscalingMaxServeNodes()).isEqualTo(0);
     assertThat(updatedCluster.getAutoscalingMinServeNodes()).isEqualTo(0);
     assertThat(updatedCluster.getAutoscalingCpuPercentageTarget()).isEqualTo(0);
+    assertThat(updatedCluster.getStorageUtilizationGibPerNode()).isEqualTo(0);
   }
 }