Skip to content

Commit

Permalink
delete and batch insert to staging V2 in cassandra
Browse files Browse the repository at this point in the history
  • Loading branch information
Shubhashree Hebbar authored and Shubhashree Hebbar committed Dec 3, 2018
1 parent 6afa076 commit 46f9935
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,10 @@ public interface StagingAccessor {
@Query("insert into staging_v2(time, type, exep , msg) values(?, ?, ? , ?)")
BoundStatement batchInsertBy(UUID time, String type, String exception, String message);

@Query("select * from staging_v2 limit ?")
ResultSet batchSelectBy(int batchSize);

@Query("delete from staging_v2 where time = ?")
ResultSet deleteBy(UUID timeUUID);

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@
package com.intuit.wasabi.repository.cassandra.impl;

import com.codahale.metrics.annotation.Timed;
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.*;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.exceptions.ReadTimeoutException;
import com.datastax.driver.core.exceptions.UnavailableException;
Expand Down Expand Up @@ -703,6 +700,29 @@ public void pushAssignmentsToStaging(String type, String exception, Collection<S
}
}

public Map<String, String> getBatchPayloadsFromStaging(int batchSize){

Map<String, String> payloads = null;

try {
ResultSet result = stagingAccessor.batchSelectBy(batchSize);
payloads = getPayloadsFromCassandraResult(result);
} catch (ReadTimeoutException | UnavailableException | NoHostAvailableException e){
throw new RepositoryException("Could not get the payloads from staging", e);
}
return payloads;
}

public void deleteFromStaging(UUID timeUUID){

try{
stagingAccessor.deleteBy(timeUUID);
} catch (UnavailableException | NoHostAvailableException e){
throw new RepositoryException("Could not delete the payloads from staging", e);
}
}


@Override
public void updateBucketAssignmentCount(Experiment experiment, Assignment assignment, boolean countUp) {
Optional<Bucket.Label> labelOptional = Optional.ofNullable(assignment.getBucketLabel());
Expand Down Expand Up @@ -791,4 +811,18 @@ public Map<Experiment.ID, AssignmentCounts> getBucketAssignmentCountsInParallel(
}
return assignmentCountsBuilder.build();
}

private Map<String, String> getPayloadsFromCassandraResult(ResultSet result){
List<Row> resultRows = null;
Map<String, String> payloads = new HashMap<>();
if(result != null && !result.isExhausted()){
resultRows = result.all();
for(Row row : resultRows){
UUID time = row.get("time", UUID.class);
String payload = row.getString("msg");
payloads.put(time.toString(), payload);
}
}
return payloads;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*******************************************************************************
* Copyright 2016 Intuit
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*******************************************************************************/
package com.intuit.wasabi.repository.cassandra.accessor;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.mapping.Result;
import com.intuit.wasabi.repository.cassandra.IntegrationTestBase;
import com.intuit.wasabi.repository.cassandra.pojo.Experiment;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.*;

import static org.junit.Assert.assertEquals;

public class StagingAccessorITest extends IntegrationTestBase {
// static StagingAccessor accessor;
//
// @BeforeClass
// public static void setup() {
// IntegrationTestBase.setup();
// if (accessor != null) return;
// accessor = manager.createAccessor(StagingAccessor.class);
// }
//
//
// @Test
// public void testBatchSelectBy(){
//
// int batchSize = 20;
// String message = "message";
// int i;
// for(i=0 ; i<100 ; i++){
// accessor.insertBy("test","test", message+i);
// }
//
// ResultSet resultSet = accessor.batchSelectBy(batchSize);
// assertEquals(batchSize, resultSet.all().size());
// }
//
// @Test
// public void testDeleteBy(){
// accessor.deleteBy(UUID.fromString("0f698e10-f45a-11e8-baac-779486c46c22"));
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@
*******************************************************************************/
package com.intuit.wasabi.repository.cassandra.impl;

import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.*;
import com.datastax.driver.core.exceptions.ReadTimeoutException;
import com.datastax.driver.core.exceptions.WriteTimeoutException;
import com.datastax.driver.mapping.MappingManager;
Expand Down Expand Up @@ -89,6 +86,7 @@
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.any;
Expand Down Expand Up @@ -551,6 +549,35 @@ public void testRemoveIndexExperimentsToUserWriteException() {
APPLICATION_NAME);
}

@Test
public void testGetBatchPayloadsFromStaging(){

int i;
String message = "message";
String time = "time";
List<Row> mockRows = new ArrayList<Row>();
for(i=0;i<10;i++){
mockRows.add(mock(Row.class,message+i));
long ms = new Date().getTime();
when(mockRows.get(i).get("time", UUID.class)).thenReturn(new UUID(ms+i, ms+i+1));
when(mockRows.get(i).getString("msg")).thenReturn(message+i);
}

ResultSet resultSet = mock(ResultSet.class);
when(stagingAccessor.batchSelectBy(anyInt())).thenReturn(resultSet);
when(resultSet.all()).thenReturn(mockRows);

Map<String, String> payloads = repository.getBatchPayloadsFromStaging(10);
assertThat(payloads.keySet().size(), is(10));
}

@Test()
public void testDeleteFromStaging(){

when(stagingAccessor.deleteBy(any())).thenReturn(null);
repository.deleteFromStaging(UUID.randomUUID());
}

@Test
public void testPushAssignmentToStaging() {
repository.pushAssignmentToStaging("type", "string1", "string2");
Expand Down

0 comments on commit 46f9935

Please sign in to comment.