From 46f9935acbcd4a8c6b9a9455235802955ca20752 Mon Sep 17 00:00:00 2001 From: Shubhashree Hebbar Date: Mon, 3 Dec 2018 08:47:28 +0530 Subject: [PATCH 1/4] delete and batch insert to staging V2 in cassandra --- .../cassandra/accessor/StagingAccessor.java | 6 ++ .../impl/CassandraAssignmentsRepository.java | 42 +++++++++++-- .../accessor/StagingAccessorITest.java | 59 +++++++++++++++++++ .../CassandraAssignmentsRepositoryTest.java | 35 +++++++++-- 4 files changed, 134 insertions(+), 8 deletions(-) create mode 100644 modules/repository-datastax/src/test/java/com/intuit/wasabi/repository/cassandra/accessor/StagingAccessorITest.java diff --git a/modules/repository-datastax/src/main/java/com/intuit/wasabi/repository/cassandra/accessor/StagingAccessor.java b/modules/repository-datastax/src/main/java/com/intuit/wasabi/repository/cassandra/accessor/StagingAccessor.java index 427fc32bf..2ca186629 100644 --- a/modules/repository-datastax/src/main/java/com/intuit/wasabi/repository/cassandra/accessor/StagingAccessor.java +++ b/modules/repository-datastax/src/main/java/com/intuit/wasabi/repository/cassandra/accessor/StagingAccessor.java @@ -33,4 +33,10 @@ public interface StagingAccessor { @Query("insert into staging_v2(time, type, exep , msg) values(?, ?, ? , ?)") BoundStatement batchInsertBy(UUID time, String type, String exception, String message); + @Query("select * from staging_v2 limit ?") + ResultSet batchSelectBy(int batchSize); + + @Query("delete from staging_v2 where time = ?") + ResultSet deleteBy(UUID timeUUID); + } \ No newline at end of file diff --git a/modules/repository-datastax/src/main/java/com/intuit/wasabi/repository/cassandra/impl/CassandraAssignmentsRepository.java b/modules/repository-datastax/src/main/java/com/intuit/wasabi/repository/cassandra/impl/CassandraAssignmentsRepository.java index fa05c5bef..d406d93c8 100644 --- a/modules/repository-datastax/src/main/java/com/intuit/wasabi/repository/cassandra/impl/CassandraAssignmentsRepository.java +++ b/modules/repository-datastax/src/main/java/com/intuit/wasabi/repository/cassandra/impl/CassandraAssignmentsRepository.java @@ -16,10 +16,7 @@ package com.intuit.wasabi.repository.cassandra.impl; import com.codahale.metrics.annotation.Timed; -import com.datastax.driver.core.BatchStatement; -import com.datastax.driver.core.BoundStatement; -import com.datastax.driver.core.ResultSetFuture; -import com.datastax.driver.core.Session; +import com.datastax.driver.core.*; import com.datastax.driver.core.exceptions.NoHostAvailableException; import com.datastax.driver.core.exceptions.ReadTimeoutException; import com.datastax.driver.core.exceptions.UnavailableException; @@ -703,6 +700,29 @@ public void pushAssignmentsToStaging(String type, String exception, Collection getBatchPayloadsFromStaging(int batchSize){ + + Map payloads = null; + + try { + ResultSet result = stagingAccessor.batchSelectBy(batchSize); + payloads = getPayloadsFromCassandraResult(result); + } catch (ReadTimeoutException | UnavailableException | NoHostAvailableException e){ + throw new RepositoryException("Could not get the payloads from staging", e); + } + return payloads; + } + + public void deleteFromStaging(UUID timeUUID){ + + try{ + stagingAccessor.deleteBy(timeUUID); + } catch (UnavailableException | NoHostAvailableException e){ + throw new RepositoryException("Could not delete the payloads from staging", e); + } + } + + @Override public void updateBucketAssignmentCount(Experiment experiment, Assignment assignment, boolean countUp) { Optional labelOptional = Optional.ofNullable(assignment.getBucketLabel()); @@ -791,4 +811,18 @@ public Map getBucketAssignmentCountsInParallel( } return assignmentCountsBuilder.build(); } + + private Map getPayloadsFromCassandraResult(ResultSet result){ + List resultRows = null; + Map payloads = new HashMap<>(); + if(result != null && !result.isExhausted()){ + resultRows = result.all(); + for(Row row : resultRows){ + UUID time = row.get("time", UUID.class); + String payload = row.getString("msg"); + payloads.put(time.toString(), payload); + } + } + return payloads; + } } \ No newline at end of file diff --git a/modules/repository-datastax/src/test/java/com/intuit/wasabi/repository/cassandra/accessor/StagingAccessorITest.java b/modules/repository-datastax/src/test/java/com/intuit/wasabi/repository/cassandra/accessor/StagingAccessorITest.java new file mode 100644 index 000000000..66b7c174f --- /dev/null +++ b/modules/repository-datastax/src/test/java/com/intuit/wasabi/repository/cassandra/accessor/StagingAccessorITest.java @@ -0,0 +1,59 @@ +/******************************************************************************* + * Copyright 2016 Intuit + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +package com.intuit.wasabi.repository.cassandra.accessor; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.mapping.Result; +import com.intuit.wasabi.repository.cassandra.IntegrationTestBase; +import com.intuit.wasabi.repository.cassandra.pojo.Experiment; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.*; + +import static org.junit.Assert.assertEquals; + +public class StagingAccessorITest extends IntegrationTestBase { +// static StagingAccessor accessor; +// +// @BeforeClass +// public static void setup() { +// IntegrationTestBase.setup(); +// if (accessor != null) return; +// accessor = manager.createAccessor(StagingAccessor.class); +// } +// +// +// @Test +// public void testBatchSelectBy(){ +// +// int batchSize = 20; +// String message = "message"; +// int i; +// for(i=0 ; i<100 ; i++){ +// accessor.insertBy("test","test", message+i); +// } +// +// ResultSet resultSet = accessor.batchSelectBy(batchSize); +// assertEquals(batchSize, resultSet.all().size()); +// } +// +// @Test +// public void testDeleteBy(){ +// accessor.deleteBy(UUID.fromString("0f698e10-f45a-11e8-baac-779486c46c22")); +// } +} \ No newline at end of file diff --git a/modules/repository-datastax/src/test/java/com/intuit/wasabi/repository/cassandra/impl/CassandraAssignmentsRepositoryTest.java b/modules/repository-datastax/src/test/java/com/intuit/wasabi/repository/cassandra/impl/CassandraAssignmentsRepositoryTest.java index b6f376b7d..59297fdbb 100644 --- a/modules/repository-datastax/src/test/java/com/intuit/wasabi/repository/cassandra/impl/CassandraAssignmentsRepositoryTest.java +++ b/modules/repository-datastax/src/test/java/com/intuit/wasabi/repository/cassandra/impl/CassandraAssignmentsRepositoryTest.java @@ -15,10 +15,7 @@ *******************************************************************************/ package com.intuit.wasabi.repository.cassandra.impl; -import com.datastax.driver.core.BatchStatement; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.ResultSetFuture; -import com.datastax.driver.core.Session; +import com.datastax.driver.core.*; import com.datastax.driver.core.exceptions.ReadTimeoutException; import com.datastax.driver.core.exceptions.WriteTimeoutException; import com.datastax.driver.mapping.MappingManager; @@ -89,6 +86,7 @@ import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.any; @@ -551,6 +549,35 @@ public void testRemoveIndexExperimentsToUserWriteException() { APPLICATION_NAME); } + @Test + public void testGetBatchPayloadsFromStaging(){ + + int i; + String message = "message"; + String time = "time"; + List mockRows = new ArrayList(); + for(i=0;i<10;i++){ + mockRows.add(mock(Row.class,message+i)); + long ms = new Date().getTime(); + when(mockRows.get(i).get("time", UUID.class)).thenReturn(new UUID(ms+i, ms+i+1)); + when(mockRows.get(i).getString("msg")).thenReturn(message+i); + } + + ResultSet resultSet = mock(ResultSet.class); + when(stagingAccessor.batchSelectBy(anyInt())).thenReturn(resultSet); + when(resultSet.all()).thenReturn(mockRows); + + Map payloads = repository.getBatchPayloadsFromStaging(10); + assertThat(payloads.keySet().size(), is(10)); + } + + @Test() + public void testDeleteFromStaging(){ + + when(stagingAccessor.deleteBy(any())).thenReturn(null); + repository.deleteFromStaging(UUID.randomUUID()); + } + @Test public void testPushAssignmentToStaging() { repository.pushAssignmentToStaging("type", "string1", "string2"); From 20e86d2d558c7f11508b01a7a535d1dcff0d4505 Mon Sep 17 00:00:00 2001 From: Shubhashree Hebbar Date: Mon, 3 Dec 2018 10:09:39 +0530 Subject: [PATCH 2/4] modified return of batch reading --- .../cassandra/impl/CassandraAssignmentsRepository.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/modules/repository-datastax/src/main/java/com/intuit/wasabi/repository/cassandra/impl/CassandraAssignmentsRepository.java b/modules/repository-datastax/src/main/java/com/intuit/wasabi/repository/cassandra/impl/CassandraAssignmentsRepository.java index d406d93c8..d5dbd1e1b 100644 --- a/modules/repository-datastax/src/main/java/com/intuit/wasabi/repository/cassandra/impl/CassandraAssignmentsRepository.java +++ b/modules/repository-datastax/src/main/java/com/intuit/wasabi/repository/cassandra/impl/CassandraAssignmentsRepository.java @@ -700,9 +700,9 @@ public void pushAssignmentsToStaging(String type, String exception, Collection getBatchPayloadsFromStaging(int batchSize){ + public Map getBatchPayloadsFromStaging(int batchSize){ - Map payloads = null; + Map payloads = null; try { ResultSet result = stagingAccessor.batchSelectBy(batchSize); @@ -812,15 +812,15 @@ public Map getBucketAssignmentCountsInParallel( return assignmentCountsBuilder.build(); } - private Map getPayloadsFromCassandraResult(ResultSet result){ + private Map getPayloadsFromCassandraResult(ResultSet result){ List resultRows = null; - Map payloads = new HashMap<>(); + Map payloads = new HashMap<>(); if(result != null && !result.isExhausted()){ resultRows = result.all(); for(Row row : resultRows){ UUID time = row.get("time", UUID.class); String payload = row.getString("msg"); - payloads.put(time.toString(), payload); + payloads.put(time, payload); } } return payloads; From 54d430757982bcd9f3821eff94643a57cc6d900d Mon Sep 17 00:00:00 2001 From: Shubhashree Hebbar Date: Mon, 3 Dec 2018 10:29:44 +0530 Subject: [PATCH 3/4] modified test cases --- .../cassandra/impl/CassandraAssignmentsRepositoryTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/repository-datastax/src/test/java/com/intuit/wasabi/repository/cassandra/impl/CassandraAssignmentsRepositoryTest.java b/modules/repository-datastax/src/test/java/com/intuit/wasabi/repository/cassandra/impl/CassandraAssignmentsRepositoryTest.java index 59297fdbb..e0c21da9e 100644 --- a/modules/repository-datastax/src/test/java/com/intuit/wasabi/repository/cassandra/impl/CassandraAssignmentsRepositoryTest.java +++ b/modules/repository-datastax/src/test/java/com/intuit/wasabi/repository/cassandra/impl/CassandraAssignmentsRepositoryTest.java @@ -567,7 +567,7 @@ public void testGetBatchPayloadsFromStaging(){ when(stagingAccessor.batchSelectBy(anyInt())).thenReturn(resultSet); when(resultSet.all()).thenReturn(mockRows); - Map payloads = repository.getBatchPayloadsFromStaging(10); + Map payloads = repository.getBatchPayloadsFromStaging(10); assertThat(payloads.keySet().size(), is(10)); } From b3c8e85339e605e69e2b495c5fe66cace84e8242 Mon Sep 17 00:00:00 2001 From: Shubhashree Hebbar Date: Tue, 4 Dec 2018 09:20:38 +0530 Subject: [PATCH 4/4] added missing columns to alter table script --- .../cassandra/migration/V028__Alter_Experiment_Add_Columns.cql | 2 ++ 1 file changed, 2 insertions(+) diff --git a/modules/repository-datastax/src/main/resources/com/intuit/wasabi/repository/impl/cassandra/migration/V028__Alter_Experiment_Add_Columns.cql b/modules/repository-datastax/src/main/resources/com/intuit/wasabi/repository/impl/cassandra/migration/V028__Alter_Experiment_Add_Columns.cql index 46648f586..12d874b0a 100644 --- a/modules/repository-datastax/src/main/resources/com/intuit/wasabi/repository/impl/cassandra/migration/V028__Alter_Experiment_Add_Columns.cql +++ b/modules/repository-datastax/src/main/resources/com/intuit/wasabi/repository/impl/cassandra/migration/V028__Alter_Experiment_Add_Columns.cql @@ -2,3 +2,5 @@ alter TABLE experiment ADD hypothesis_is_correct varchar; alter TABLE experiment ADD results varchar; +alter TABLE experiment ADD experiment_type text; +alter TABLE experiment ADD source_url text; \ No newline at end of file