Skip to content

Commit

Permalink
ARROW-385: Refactors metric system
Browse files Browse the repository at this point in the history
Arrow has some support for metrics, but the metrics registry is by default
not configured to export values. It also forces user to use yammer/codahale
metrics library instead of the library of their choice.

To allow for integration with other metrics system, replace it with a notification
mechanism to alert user on allocation.
  • Loading branch information
laurentgo committed Nov 22, 2016
1 parent f082b17 commit 83020f1
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 209 deletions.
7 changes: 0 additions & 7 deletions java/memory/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,6 @@
<name>Arrow Memory</name>

<dependencies>

<dependency>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>3.0.1</version>
</dependency>

<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
Expand Down
31 changes: 2 additions & 29 deletions java/memory/src/main/java/io/netty/buffer/LargeBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,43 +17,16 @@
*/
package io.netty.buffer;

import java.util.concurrent.atomic.AtomicLong;

/**
* A MutableWrappedByteBuf that also maintains a metric of the number of huge buffer bytes and counts.
*/
public class LargeBuffer extends MutableWrappedByteBuf {

private final AtomicLong hugeBufferSize;
private final AtomicLong hugeBufferCount;

private final int initCap;

public LargeBuffer(ByteBuf buffer, AtomicLong hugeBufferSize, AtomicLong hugeBufferCount) {
public LargeBuffer(ByteBuf buffer) {
super(buffer);
initCap = buffer.capacity();
this.hugeBufferCount = hugeBufferCount;
this.hugeBufferSize = hugeBufferSize;
}

@Override
public ByteBuf copy(int index, int length) {
return new LargeBuffer(buffer.copy(index, length), hugeBufferSize, hugeBufferCount);
return new LargeBuffer(buffer.copy(index, length));
}

@Override
public boolean release() {
return release(1);
}

@Override
public boolean release(int decrement) {
boolean released = unwrap().release(decrement);
if (released) {
hugeBufferSize.addAndGet(-initCap);
hugeBufferCount.decrementAndGet();
}
return released;
}

}
151 changes: 67 additions & 84 deletions java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,24 @@
*/
package io.netty.buffer;

import io.netty.util.internal.StringUtil;
import static org.apache.arrow.memory.util.AssertionUtil.ASSERT_ENABLED;

import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.arrow.memory.OutOfMemoryException;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import io.netty.util.internal.StringUtil;

/**
* The base allocator that we use for all of Arrow's memory management. Returns UnsafeDirectLittleEndian buffers.
*/
public class PooledByteBufAllocatorL {
private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("drill.allocator");
private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("arrow.allocator");

private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60;


public static final String METRIC_PREFIX = "drill.allocator.";

private final MetricRegistry registry;
private final AtomicLong hugeBufferSize = new AtomicLong(0);
private final AtomicLong hugeBufferCount = new AtomicLong(0);
private final AtomicLong normalBufferSize = new AtomicLong(0);
Expand All @@ -51,8 +43,7 @@ public class PooledByteBufAllocatorL {
private final InnerAllocator allocator;
public final UnsafeDirectLittleEndian empty;

public PooledByteBufAllocatorL(MetricRegistry registry) {
this.registry = registry;
public PooledByteBufAllocatorL() {
allocator = new InnerAllocator();
empty = new UnsafeDirectLittleEndian(new DuplicatedByteBuf(Unpooled.EMPTY_BUFFER));
}
Expand All @@ -70,13 +61,56 @@ public int getChunkSize() {
return allocator.chunkSize;
}

private class InnerAllocator extends PooledByteBufAllocator {
public long getHugeBufferSize() {
return hugeBufferSize.get();
}

public long getHugeBufferCount() {
return hugeBufferCount.get();
}

public long getNormalBufferSize() {
return normalBufferSize.get();
}

public long getNormalBufferCount() {
return normalBufferSize.get();
}

private static class AccountedUnsafeDirectLittleEndian extends UnsafeDirectLittleEndian {
private final long initialCapacity;
private final AtomicLong count;
private final AtomicLong size;

private AccountedUnsafeDirectLittleEndian(LargeBuffer buf, AtomicLong count, AtomicLong size) {
super(buf);
this.initialCapacity = buf.capacity();
this.count = count;
this.size = size;
}

private AccountedUnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong count, AtomicLong size) {
super(buf);
this.initialCapacity = buf.capacity();
this.count = count;
this.size = size;
}

@Override
public boolean release(int decrement) {
boolean released = super.release(decrement);
if (released) {
count.decrementAndGet();
size.addAndGet(-initialCapacity);
}
return released;
}

}

private class InnerAllocator extends PooledByteBufAllocator {
private final PoolArena<ByteBuffer>[] directArenas;
private final MemoryStatusThread statusThread;
private final Histogram largeBuffersHist;
private final Histogram normalBuffersHist;
private final int chunkSize;

public InnerAllocator() {
Expand All @@ -98,50 +132,6 @@ public InnerAllocator() {
} else {
statusThread = null;
}
removeOldMetrics();

registry.register(METRIC_PREFIX + "normal.size", new Gauge<Long>() {
@Override
public Long getValue() {
return normalBufferSize.get();
}
});

registry.register(METRIC_PREFIX + "normal.count", new Gauge<Long>() {
@Override
public Long getValue() {
return normalBufferCount.get();
}
});

registry.register(METRIC_PREFIX + "huge.size", new Gauge<Long>() {
@Override
public Long getValue() {
return hugeBufferSize.get();
}
});

registry.register(METRIC_PREFIX + "huge.count", new Gauge<Long>() {
@Override
public Long getValue() {
return hugeBufferCount.get();
}
});

largeBuffersHist = registry.histogram(METRIC_PREFIX + "huge.hist");
normalBuffersHist = registry.histogram(METRIC_PREFIX + "normal.hist");

}


private synchronized void removeOldMetrics() {
registry.removeMatching(new MetricFilter() {
@Override
public boolean matches(String name, Metric metric) {
return name.startsWith("drill.allocator.");
}

});
}

private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCapacity) {
Expand All @@ -154,27 +144,30 @@ private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCa
// This is beyond chunk size so we'll allocate separately.
ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity, maxCapacity);

final long capacity = buf.capacity();

hugeBufferCount.incrementAndGet();
hugeBufferSize.addAndGet(buf.capacity());
largeBuffersHist.update(buf.capacity());
// logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception());
return new UnsafeDirectLittleEndian(new LargeBuffer(buf, hugeBufferSize, hugeBufferCount));
hugeBufferSize.addAndGet(capacity);

// logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception());
return new AccountedUnsafeDirectLittleEndian(new LargeBuffer(buf), hugeBufferCount, hugeBufferSize);
} else {
// within chunk, use arena.
ByteBuf buf = directArena.allocate(cache, initialCapacity, maxCapacity);
if (!(buf instanceof PooledUnsafeDirectByteBuf)) {
fail();
}

normalBuffersHist.update(buf.capacity());
if (ASSERT_ENABLED) {
normalBufferSize.addAndGet(buf.capacity());
normalBufferCount.incrementAndGet();
if (!ASSERT_ENABLED) {
return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf);
}

return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf, normalBufferCount,
normalBufferSize);
final long capacity = buf.capacity();

normalBufferCount.incrementAndGet();
normalBufferSize.addAndGet(capacity);

return new AccountedUnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf, normalBufferCount, normalBufferSize);
}

} else {
Expand All @@ -184,9 +177,10 @@ private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCa

private UnsupportedOperationException fail() {
return new UnsupportedOperationException(
"Arrow requries that the JVM used supports access sun.misc.Unsafe. This platform didn't provide that functionality.");
"Arrow requires that the JVM used supports access sun.misc.Unsafe. This platform didn't provide that functionality.");
}

@Override
public UnsafeDirectLittleEndian directBuffer(int initialCapacity, int maxCapacity) {
if (initialCapacity == 0 && maxCapacity == 0) {
newDirectBuffer(initialCapacity, maxCapacity);
Expand Down Expand Up @@ -215,9 +209,8 @@ private void validate(int initialCapacity, int maxCapacity) {
private class MemoryStatusThread extends Thread {

public MemoryStatusThread() {
super("memory-status-logger");
super("allocation.logger");
this.setDaemon(true);
this.setName("allocation.logger");
}

@Override
Expand All @@ -229,12 +222,11 @@ public void run() {
} catch (InterruptedException e) {
return;
}

}
}

}

@Override
public String toString() {
StringBuilder buf = new StringBuilder();
buf.append(directArenas.length);
Expand All @@ -260,13 +252,4 @@ public String toString() {


}

public static final boolean ASSERT_ENABLED;

static {
boolean isAssertEnabled = false;
assert isAssertEnabled = true;
ASSERT_ENABLED = isAssertEnabled;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package io.netty.buffer;

import io.netty.util.internal.PlatformDependent;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -32,43 +30,33 @@
* The underlying class we use for little-endian access to memory. Is used underneath ArrowBufs to abstract away the
* Netty classes and underlying Netty memory management.
*/
public final class UnsafeDirectLittleEndian extends WrappedByteBuf {
public class UnsafeDirectLittleEndian extends WrappedByteBuf {
private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
private static final AtomicLong ID_GENERATOR = new AtomicLong(0);

public final long id = ID_GENERATOR.incrementAndGet();
private final AbstractByteBuf wrapped;
private final long memoryAddress;

private final AtomicLong bufferCount;
private final AtomicLong bufferSize;
private final long initCap;

UnsafeDirectLittleEndian(DuplicatedByteBuf buf) {
this(buf, true, null, null);
this(buf, true);
}

UnsafeDirectLittleEndian(LargeBuffer buf) {
this(buf, true, null, null);
this(buf, true);
}

UnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong bufferCount, AtomicLong bufferSize) {
this(buf, true, bufferCount, bufferSize);
UnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf) {
this(buf, true);

}

private UnsafeDirectLittleEndian(AbstractByteBuf buf, boolean fake, AtomicLong bufferCount, AtomicLong bufferSize) {
private UnsafeDirectLittleEndian(AbstractByteBuf buf, boolean fake) {
super(buf);
if (!NATIVE_ORDER || buf.order() != ByteOrder.BIG_ENDIAN) {
throw new IllegalStateException("Arrow only runs on LittleEndian systems.");
}

this.bufferCount = bufferCount;
this.bufferSize = bufferSize;

// initCap is used if we're tracking memory release. If we're in non-debug mode, we'll skip this.
this.initCap = ASSERT_ENABLED ? buf.capacity() : -1;

this.wrapped = buf;
this.memoryAddress = buf.memoryAddress();
}
Expand Down Expand Up @@ -244,16 +232,6 @@ public boolean release() {
return release(1);
}

@Override
public boolean release(int decrement) {
final boolean released = super.release(decrement);
if (ASSERT_ENABLED && released && bufferCount != null && bufferSize != null) {
bufferCount.decrementAndGet();
bufferSize.addAndGet(-initCap);
}
return released;
}

@Override
public int setBytes(int index, InputStream in, int length) throws IOException {
wrapped.checkIndex(index, length);
Expand Down
Loading

0 comments on commit 83020f1

Please sign in to comment.