Skip to content

Commit

Permalink
ingestionEngine creation failure tes
Browse files Browse the repository at this point in the history
Signed-off-by: Yupeng Fu <yupeng@uber.com>
  • Loading branch information
yupeng9 committed Jan 26, 2025
1 parent c38c302 commit f35df02
Showing 1 changed file with 32 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.util.concurrent.atomic.AtomicLong;

import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;

public class IngestionEngineTests extends EngineTestCase {

Expand Down Expand Up @@ -122,6 +124,36 @@ public void testRecovery() throws IOException {
waitForResults(ingestionEngine, 4);
}

public void testCreationFailure() throws IOException {
// Simulate an error scenario
Store mockStore = mock(Store.class);
doThrow(new IOException("Simulated IOException")).when(mockStore).readLastCommittedSegmentsInfo();

final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

FakeIngestionSource.FakeIngestionConsumerFactory consumerFactory = new FakeIngestionSource.FakeIngestionConsumerFactory(messages);
EngineConfig engineConfig = config(
indexSettings,
store,
createTempDir(),
NoMergePolicy.INSTANCE,
null,
null,
globalCheckpoint::get
);
// overwrite the config with ingestion engine settings
String mapping = "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}";
MapperService mapperService = createMapperService(mapping);
engineConfig = config(engineConfig, () -> new DocumentMapperForType(mapperService.documentMapper(), null));
try {
new IngestionEngine(engineConfig, consumerFactory);
fail("Expected EngineException to be thrown");
} catch (EngineException e) {
assertEquals("failed to create engine", e.getMessage());
assertTrue(e.getCause() instanceof IOException);
}
}

private IngestionEngine buildIngestionEngine(AtomicLong globalCheckpoint, Store store, IndexSettings settings) throws IOException {
FakeIngestionSource.FakeIngestionConsumerFactory consumerFactory = new FakeIngestionSource.FakeIngestionConsumerFactory(messages);
EngineConfig engineConfig = config(settings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get);
Expand Down

0 comments on commit f35df02

Please sign in to comment.