Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implements an ArrowRootAllocationProvider SPI #1040

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Adds documentation and tests for ArrowRootAllocationProvider spi
  • Loading branch information
drewfarris committed Jan 9, 2025
commit 4ddbc49096e11feb9fb6de2d0f3e52eb938e9410
98 changes: 88 additions & 10 deletions src/main/java/emissary/spi/ArrowRootAllocatorProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,114 @@

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;

/**
* Provides a central class for obtaining references to the Arrow root memory allocator. Activate this by including it
* in the list of classes in
*
* <pre>
* META - INF / services / emissary.spi.InitializationProvider
* </pre>
*
* Classes wishing to get a reference to the Arrow root allocator should call the {@link #getArrowRootAllocator()}. They
* are free to create child allocators as needed, but they are responsible for managing any buffers created using either
* the root or a chile allocator and calling
*
* <pre>
* close()
* </pre>
*
* on them when they are no longer needed. The {@link #shutdown()} method will automatically close any child allocators
* created, but will throw an {@link java.lang.IllegalStateException} if there are allocated buffers that have not been
* closed (potentially leaking memory). Provides debug and trace level logging for detailed behavior.
*/
public class ArrowRootAllocatorProvider implements InitializationProvider {
private static final Logger logger = LoggerFactory.getLogger(ArrowRootAllocatorProvider.class);

private static final Object initalizationLock = new Object();
private static final Object allocatorLock = new Object();
private static BufferAllocator arrowRootAllocator = null;

@Override
public void initialize() {
synchronized (initalizationLock) {
logger.trace("Waiting for allocator lock in initialize()");
synchronized (allocatorLock) {
logger.debug("Creating new Arrow root allocator");

// creates a RootAllocator with the default memory settings, we may consider implementing a limit here
// that is set via a system property here instead.
arrowRootAllocator = new RootAllocator();

logger.trace("Releasing allocator lock in initialize()");
}
}

/** Shuts down the root allocator and any child allocators */
@Override
public void shutdown() {
synchronized (initalizationLock) {
logger.trace("Waiting for allocator lock in shutdown()");
synchronized (allocatorLock) {
logger.trace("Closing Arrow allocators");
Collection<BufferAllocator> children = arrowRootAllocator.getChildAllocators();
if (children.isEmpty()) {
logger.trace("Root allocator has no children to close");
} else {
if (logger.isTraceEnabled()) {
logger.trace("Attempting to clode {} child allocators", children.size());
}
for (BufferAllocator child : children) {
if (logger.isDebugEnabled()) {
logger.debug("Shutting down child allocator: {}", child.getName());
}
try {
child.close();
if (logger.isTraceEnabled()) {
logger.trace("Successfully closed child allocator {}", child.getName());
}
} catch (IllegalStateException e) {
// it's ok to catch this, another IllegalStateException will be thrown when closing the root allocator.
logger.warn("IllegalStateException when closing child allocator {}, message: {}", child.getName(), e.getMessage());
}
}
}

logger.trace("Closing root allocator");
arrowRootAllocator.close();
logger.debug("Successfully closed root allocator");
arrowRootAllocator = null;
logger.trace("Releasing allocator lock in shutdown()");
}
InitializationProvider.super.shutdown();
}

/**
* Obtain a reference to the arrow root allocator. Any buffers or child allocators allocated using this instance must be
*
* <pre>
* close()
* </pre>
*
* 'd once they are no longer used.
*
* @return the Arrow root allocator
*/
public static BufferAllocator getArrowRootAllocator() {
synchronized (initalizationLock) {
if (arrowRootAllocator == null) {
throw new IllegalStateException("Arrow Root Allocator has not been initalized by the " +
"ArrowRootAllocatorProvider or is already shutdown, is emissary.spi.ArrowRootAllocatorProver " +
"listed in META-INF/services/emissary.spi.InitalizationProvider?");
} else {
return arrowRootAllocator;
logger.trace("Waiting for allocator lock in getArrowRootAllocator()");
synchronized (allocatorLock) {
try {
if (arrowRootAllocator == null) {
throw new IllegalStateException("Arrow Root Allocator has not been initialized by the " +
"ArrowRootAllocatorProvider or is already shutdown, is emissary.spi.ArrowRootAllocatorProver " +
"listed in META-INF/services/emissary.spi.InitializationProvider?");
} else {
logger.trace("Returning root allocator");
return arrowRootAllocator;
}
} finally {
logger.trace("Releasing allocator lock in getArrowRootAllocator()");
}
}
}
Expand Down
80 changes: 79 additions & 1 deletion src/test/java/emissary/spi/ArrowRootAllocatorProviderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,107 @@

import emissary.test.core.junit5.UnitTest;

import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

/**
* Tests various ArrowRootAllocatorProvider scenarios and demonstrates expected behavior when conditions related to
* failing to close various Arrow resources occurs.
*/
public class ArrowRootAllocatorProviderTest extends UnitTest {
/** shutdown is clean if no memory has been allocated and no child allocators have been created */
@Test
public void testArrowRootAllocatorProvider() {
ArrowRootAllocatorProvider provider = new ArrowRootAllocatorProvider();
provider.initialize();
BufferAllocator allocator = ArrowRootAllocatorProvider.getArrowRootAllocator();
assertNotNull(allocator);
provider.shutdown();
}

/** creating a buffer and not closing it will cause a leak */
@Test
public void testArrowRootAllocatorShutdownLeak() {
final ArrowRootAllocatorProvider provider = new ArrowRootAllocatorProvider();
provider.initialize();
BufferAllocator allocatorOne = ArrowRootAllocatorProvider.getArrowRootAllocator();
assertNotNull(allocatorOne);
ArrowBuf buffer = allocatorOne.buffer(1024);
assertThrows(IllegalStateException.class, provider::shutdown,
"expected IllegalStateException attempting to shutdown allocator with allocated buffer open");
}

/**
* creating a child allocator and not closing it before the root allocator provider is shutdown is OK, as long as that
* child allocator doesn't have any open buffers. The root allocator provider attempts to shut down all children.
*/
@Test
public void testArrowRootAllocatorShutdownChildClean() {
final ArrowRootAllocatorProvider provider = new ArrowRootAllocatorProvider();
provider.initialize();
BufferAllocator allocatorOne = ArrowRootAllocatorProvider.getArrowRootAllocator();
assertNotNull(allocatorOne);
BufferAllocator allocatorChild = allocatorOne.newChildAllocator("child", 1024, 2048);
assertNotNull(allocatorChild);
}

/**
* creating a child allocator and not closing its buffers before the root allocator provider is shutdown should fail
* when the root allocator provider attempts to shut down all children.
*/
@Test
public void testArrowRootAllocatorShutdownChildLeak() {
final ArrowRootAllocatorProvider provider = new ArrowRootAllocatorProvider();
provider.initialize();
BufferAllocator allocatorOne = ArrowRootAllocatorProvider.getArrowRootAllocator();
assertNotNull(allocatorOne);
BufferAllocator allocatorChild = allocatorOne.newChildAllocator("child", 1024, 2048);
allocatorChild.buffer(1024);
assertNotNull(allocatorChild);
assertThrows(IllegalStateException.class, provider::shutdown,
"expected IllegalStateException attempting to shutdown allocator with child allocator open");
}

/** both allocated buffers and child allocators must be closed before the root allocator can be shutdown cleanly */
@Test
public void testArrowRootAllocatorShutdownAfterProperClose() {
final ArrowRootAllocatorProvider provider = new ArrowRootAllocatorProvider();
provider.initialize();
BufferAllocator allocatorOne = ArrowRootAllocatorProvider.getArrowRootAllocator();
assertNotNull(allocatorOne);
BufferAllocator allocatorChild = allocatorOne.newChildAllocator("child", 1024, 2048);
ArrowBuf buffer = allocatorChild.buffer(1024);
buffer.close();
allocatorChild.close();
provider.shutdown();
}

/** the root allocator can't be obtained after shutdown */
@Test()
public void testArrowRootAllocatorProviderAfterShutdown() {
ArrowRootAllocatorProvider provider = new ArrowRootAllocatorProvider();
provider.initialize();
BufferAllocator allocatorOne = ArrowRootAllocatorProvider.getArrowRootAllocator();
assertNotNull(allocatorOne);
provider.shutdown();
assertThrows(IllegalStateException.class, ArrowRootAllocatorProvider::getArrowRootAllocator, "expected IllegalStateException");
assertThrows(IllegalStateException.class, ArrowRootAllocatorProvider::getArrowRootAllocator,
"expected IllegalStateException attempting to get an allocator after shutdown");
}

/** the root allocator won't allocate after shutdown */
@Test
public void testArrowRootAllocatorProviderAllocateAfterShutdown() {
ArrowRootAllocatorProvider provider = new ArrowRootAllocatorProvider();
provider.initialize();
BufferAllocator allocator = ArrowRootAllocatorProvider.getArrowRootAllocator();
assertNotNull(allocator);
provider.shutdown();
assertThrows(IllegalStateException.class, () -> allocator.buffer(1024),
"expected IllegalStateException attempting to allocate after provider is shutdown");
}
}

Loading