Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-274: Add NullableMapVector to support nullable maps #128

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 16 additions & 39 deletions java/vector/src/main/codegen/templates/MapWriters.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@
*/

<@pp.dropOutputFile />
<#list ["Single"] as mode>
<#list ["Nullable", "Single"] as mode>
<@pp.changeOutputFile name="/org/apache/arrow/vector/complex/impl/${mode}MapWriter.java" />
<#assign index = "idx()">
<#if mode == "Single">
<#assign containerClass = "MapVector" />
<#assign index = "idx()">
<#else>
<#assign containerClass = "RepeatedMapVector" />
<#assign index = "currentChildIndex">
<#assign containerClass = "NullableMapVector" />
</#if>

<#include "/@includes/license.ftl" />
Expand All @@ -49,9 +48,13 @@ public class ${mode}MapWriter extends AbstractFieldWriter {

protected final ${containerClass} container;
private final Map<String, FieldWriter> fields = Maps.newHashMap();
<#if mode == "Repeated">private int currentChildIndex = 0;</#if>

public ${mode}MapWriter(${containerClass} container) {
<#if mode == "Single">
if (container instanceof NullableMapVector) {
throw new IllegalArgumentException("Invalid container: " + container);
}
</#if>
this.container = container;
}

Expand All @@ -75,12 +78,12 @@ public MapWriter map(String name) {
FieldWriter writer = fields.get(name.toLowerCase());
if(writer == null){
int vectorCount=container.size();
MapVector vector = container.addOrGet(name, MinorType.MAP, MapVector.class);
NullableMapVector vector = container.addOrGet(name, MinorType.MAP, NullableMapVector.class);
writer = new PromotableWriter(vector, container);
if(vectorCount != container.size()) {
writer.allocate();
}
writer.setPosition(${index});
writer.setPosition(idx());
fields.put(name.toLowerCase(), writer);
}
return writer;
Expand Down Expand Up @@ -117,40 +120,12 @@ public ListWriter list(String name) {
if (container.size() > vectorCount) {
writer.allocate();
}
writer.setPosition(${index});
writer.setPosition(idx());
fields.put(name.toLowerCase(), writer);
}
return writer;
}

<#if mode == "Repeated">
public void start() {
// update the repeated vector to state that there is current+1 objects.
final RepeatedMapHolder h = new RepeatedMapHolder();
final RepeatedMapVector map = (RepeatedMapVector) container;
final RepeatedMapVector.Mutator mutator = map.getMutator();

// Make sure that the current vector can support the end position of this list.
if(container.getValueCapacity() <= idx()) {
mutator.setValueCount(idx()+1);
}

map.getAccessor().get(idx(), h);
if (h.start >= h.end) {
container.getMutator().startNewValue(idx());
}
currentChildIndex = container.getMutator().add(idx());
for(final FieldWriter w : fields.values()) {
w.setPosition(currentChildIndex);
}
}


public void end() {
// noop
}
<#else>

public void setValueCount(int count) {
container.getMutator().setValueCount(count);
}
Expand All @@ -165,14 +140,16 @@ public void setPosition(int index) {

@Override
public void start() {
<#if mode == "Single">
<#else>
container.getMutator().setIndexDefined(idx());
</#if>
}

@Override
public void end() {
}

</#if>

<#list vv.types as type><#list type.minor as minor>
<#assign lowerName = minor.class?uncap_first />
<#if lowerName == "int" ><#assign lowerName = "integer" /></#if>
Expand Down Expand Up @@ -204,7 +181,7 @@ public void end() {
if (currentVector == null || currentVector != vector) {
vector.allocateNewSafe();
}
writer.setPosition(${index});
writer.setPosition(idx());
fields.put(name.toLowerCase(), writer);
}
return writer;
Expand Down
2 changes: 2 additions & 0 deletions java/vector/src/main/codegen/templates/UnionListWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,13 @@ public void start() {
vector.getMutator().setNotNull(idx());
offsets.getMutator().setSafe(idx() + 1, nextOffset);
writer.setPosition(nextOffset);
writer.start();
}

@Override
public void end() {
// if (inMap) {
writer.end();
inMap = false;
final int nextOffset = offsets.getAccessor().get(idx() + 1);
offsets.getMutator().setSafe(idx() + 1, nextOffset + 1);
Expand Down
6 changes: 3 additions & 3 deletions java/vector/src/main/codegen/templates/UnionVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class UnionVector implements FieldVector {
MapVector internalMap;
UInt1Vector typeVector;

private MapVector mapVector;
private NullableMapVector mapVector;
private ListVector listVector;

private FieldReader reader;
Expand Down Expand Up @@ -127,10 +127,10 @@ public List<BufferBacked> getFieldInnerVectors() {
throw new UnsupportedOperationException();
}

public MapVector getMap() {
public NullableMapVector getMap() {
if (mapVector == null) {
int vectorCount = internalMap.size();
mapVector = internalMap.addOrGet("map", MinorType.MAP, MapVector.class);
mapVector = internalMap.addOrGet("map", MinorType.MAP, NullableMapVector.class);
if (internalMap.size() > vectorCount) {
mapVector.allocateNew();
if (callBack != null) {
Expand Down
2 changes: 1 addition & 1 deletion java/vector/src/main/codegen/templates/UnionWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void endList() {

private MapWriter getMapWriter() {
if (mapWriter == null) {
mapWriter = new SingleMapWriter(data.getMap());
mapWriter = new NullableMapWriter(data.getMap());
mapWriter.setPosition(idx());
writers.add(mapWriter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.arrow.vector;

public interface NullableVector extends ValueVector{
public interface NullableVector extends ValueVector {

ValueVector getValuesVector();
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ private void appendNodes(FieldVector vector, List<ArrowFieldNode> nodes, List<Ar
List<ArrowBuf> fieldBuffers = vector.getFieldBuffers();
List<ArrowVectorType> expectedBuffers = vector.getField().getTypeLayout().getVectorTypes();
if (fieldBuffers.size() != expectedBuffers.size()) {
throw new IllegalArgumentException("wrong number of buffers for field " + vector.getField() + ". found: " + fieldBuffers);
throw new IllegalArgumentException(String.format(
"wrong number of buffers for field %s in vector %s. found: %s",
vector.getField(), vector.getClass().getSimpleName(), fieldBuffers));
}
buffers.addAll(fieldBuffers);
for (FieldVector child : vector.getChildrenFromFields()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,20 @@
package org.apache.arrow.vector.complex;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import javax.annotation.Nullable;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.BaseDataValueVector;
import org.apache.arrow.vector.BaseValueVector;
import org.apache.arrow.vector.BufferBacked;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.complex.impl.SingleMapReaderImpl;
import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.holders.ComplexHolder;
import org.apache.arrow.vector.schema.ArrowFieldNode;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.types.pojo.ArrowType.Tuple;
Expand All @@ -51,24 +46,20 @@

import io.netty.buffer.ArrowBuf;

public class MapVector extends AbstractMapVector implements FieldVector {
public class MapVector extends AbstractMapVector {
//private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapVector.class);

private final SingleMapReaderImpl reader = new SingleMapReaderImpl(MapVector.this);
private final SingleMapReaderImpl reader = new SingleMapReaderImpl(this);
private final Accessor accessor = new Accessor();
private final Mutator mutator = new Mutator();
int valueCount;

// TODO: validity vector
private final List<BufferBacked> innerVectors = Collections.unmodifiableList(Arrays.<BufferBacked>asList());

public MapVector(String name, BufferAllocator allocator, CallBack callBack){
public MapVector(String name, BufferAllocator allocator, CallBack callBack) {
super(name, allocator, callBack);
}

@Override
public FieldReader getReader() {
//return new SingleMapReaderImpl(MapVector.this);
return reader;
}

Expand Down Expand Up @@ -126,16 +117,16 @@ public int getBufferSizeFor(final int valueCount) {

@Override
public ArrowBuf[] getBuffers(boolean clear) {
int expectedSize = getBufferSize();
int actualSize = super.getBufferSize();

Preconditions.checkArgument(expectedSize == actualSize, expectedSize + " != " + actualSize);
// int expectedSize = getBufferSize();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we remove this method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do.

// int actualSize = super.getBufferSize();
//
// Preconditions.checkArgument(expectedSize == actualSize, expectedSize + " != " + actualSize);
return super.getBuffers(clear);
}

@Override
public TransferPair getTransferPair(BufferAllocator allocator) {
return new MapTransferPair(this, name, allocator);
return new MapTransferPair(this, new MapVector(name, allocator, callBack), false);
}

@Override
Expand All @@ -145,18 +136,14 @@ public TransferPair makeTransferPair(ValueVector to) {

@Override
public TransferPair getTransferPair(String ref, BufferAllocator allocator) {
return new MapTransferPair(this, ref, allocator);
return new MapTransferPair(this, new MapVector(ref, allocator, callBack), false);
}

protected static class MapTransferPair implements TransferPair{
private final TransferPair[] pairs;
private final MapVector from;
private final MapVector to;

public MapTransferPair(MapVector from, String name, BufferAllocator allocator) {
this(from, new MapVector(name, allocator, from.callBack), false);
}

public MapTransferPair(MapVector from, MapVector to) {
this(from, to, true);
}
Expand Down Expand Up @@ -335,7 +322,6 @@ public void close() {
super.close();
}

@Override
public void initializeChildrenFromFields(List<Field> children) {
for (Field field : children) {
MinorType minorType = Types.getMinorTypeForArrowType(field.getType());
Expand All @@ -344,25 +330,9 @@ public void initializeChildrenFromFields(List<Field> children) {
}
}

@Override

public List<FieldVector> getChildrenFromFields() {
return getChildren();
}

@Override
public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) {
BaseDataValueVector.load(getFieldInnerVectors(), ownBuffers);
// TODO: something with fieldNode?
}

@Override
public List<ArrowBuf> getFieldBuffers() {
return BaseDataValueVector.unload(getFieldInnerVectors());
}

@Override
public List<BufferBacked> getFieldInnerVectors() {
return innerVectors;
}

}
Loading