diff --git a/java/vector/src/main/codegen/templates/MapWriters.java b/java/vector/src/main/codegen/templates/MapWriters.java index 8a8983a1497cc..7f319a9ca34d8 100644 --- a/java/vector/src/main/codegen/templates/MapWriters.java +++ b/java/vector/src/main/codegen/templates/MapWriters.java @@ -17,14 +17,13 @@ */ <@pp.dropOutputFile /> -<#list ["Single"] as mode> +<#list ["Nullable", "Single"] as mode> <@pp.changeOutputFile name="/org/apache/arrow/vector/complex/impl/${mode}MapWriter.java" /> +<#assign index = "idx()"> <#if mode == "Single"> <#assign containerClass = "MapVector" /> -<#assign index = "idx()"> <#else> -<#assign containerClass = "RepeatedMapVector" /> -<#assign index = "currentChildIndex"> +<#assign containerClass = "NullableMapVector" /> <#include "/@includes/license.ftl" /> @@ -49,9 +48,13 @@ public class ${mode}MapWriter extends AbstractFieldWriter { protected final ${containerClass} container; private final Map fields = Maps.newHashMap(); - <#if mode == "Repeated">private int currentChildIndex = 0; public ${mode}MapWriter(${containerClass} container) { + <#if mode == "Single"> + if (container instanceof NullableMapVector) { + throw new IllegalArgumentException("Invalid container: " + container); + } + this.container = container; } @@ -75,12 +78,12 @@ public MapWriter map(String name) { FieldWriter writer = fields.get(name.toLowerCase()); if(writer == null){ int vectorCount=container.size(); - MapVector vector = container.addOrGet(name, MinorType.MAP, MapVector.class); + NullableMapVector vector = container.addOrGet(name, MinorType.MAP, NullableMapVector.class); writer = new PromotableWriter(vector, container); if(vectorCount != container.size()) { writer.allocate(); } - writer.setPosition(${index}); + writer.setPosition(idx()); fields.put(name.toLowerCase(), writer); } return writer; @@ -117,40 +120,12 @@ public ListWriter list(String name) { if (container.size() > vectorCount) { writer.allocate(); } - writer.setPosition(${index}); + writer.setPosition(idx()); fields.put(name.toLowerCase(), writer); } return writer; } - <#if mode == "Repeated"> - public void start() { - // update the repeated vector to state that there is current+1 objects. - final RepeatedMapHolder h = new RepeatedMapHolder(); - final RepeatedMapVector map = (RepeatedMapVector) container; - final RepeatedMapVector.Mutator mutator = map.getMutator(); - - // Make sure that the current vector can support the end position of this list. - if(container.getValueCapacity() <= idx()) { - mutator.setValueCount(idx()+1); - } - - map.getAccessor().get(idx(), h); - if (h.start >= h.end) { - container.getMutator().startNewValue(idx()); - } - currentChildIndex = container.getMutator().add(idx()); - for(final FieldWriter w : fields.values()) { - w.setPosition(currentChildIndex); - } - } - - - public void end() { - // noop - } - <#else> - public void setValueCount(int count) { container.getMutator().setValueCount(count); } @@ -165,14 +140,16 @@ public void setPosition(int index) { @Override public void start() { + <#if mode == "Single"> + <#else> + container.getMutator().setIndexDefined(idx()); + } @Override public void end() { } - - <#list vv.types as type><#list type.minor as minor> <#assign lowerName = minor.class?uncap_first /> <#if lowerName == "int" ><#assign lowerName = "integer" /> @@ -204,7 +181,7 @@ public void end() { if (currentVector == null || currentVector != vector) { vector.allocateNewSafe(); } - writer.setPosition(${index}); + writer.setPosition(idx()); fields.put(name.toLowerCase(), writer); } return writer; diff --git a/java/vector/src/main/codegen/templates/UnionListWriter.java b/java/vector/src/main/codegen/templates/UnionListWriter.java index 49d57e716bc8a..d502803d71616 100644 --- a/java/vector/src/main/codegen/templates/UnionListWriter.java +++ b/java/vector/src/main/codegen/templates/UnionListWriter.java @@ -160,11 +160,13 @@ public void start() { vector.getMutator().setNotNull(idx()); offsets.getMutator().setSafe(idx() + 1, nextOffset); writer.setPosition(nextOffset); + writer.start(); } @Override public void end() { // if (inMap) { + writer.end(); inMap = false; final int nextOffset = offsets.getAccessor().get(idx() + 1); offsets.getMutator().setSafe(idx() + 1, nextOffset + 1); diff --git a/java/vector/src/main/codegen/templates/UnionVector.java b/java/vector/src/main/codegen/templates/UnionVector.java index 72125fa50fb82..3014bbba9d52d 100644 --- a/java/vector/src/main/codegen/templates/UnionVector.java +++ b/java/vector/src/main/codegen/templates/UnionVector.java @@ -72,7 +72,7 @@ public class UnionVector implements FieldVector { MapVector internalMap; UInt1Vector typeVector; - private MapVector mapVector; + private NullableMapVector mapVector; private ListVector listVector; private FieldReader reader; @@ -127,10 +127,10 @@ public List getFieldInnerVectors() { throw new UnsupportedOperationException(); } - public MapVector getMap() { + public NullableMapVector getMap() { if (mapVector == null) { int vectorCount = internalMap.size(); - mapVector = internalMap.addOrGet("map", MinorType.MAP, MapVector.class); + mapVector = internalMap.addOrGet("map", MinorType.MAP, NullableMapVector.class); if (internalMap.size() > vectorCount) { mapVector.allocateNew(); if (callBack != null) { diff --git a/java/vector/src/main/codegen/templates/UnionWriter.java b/java/vector/src/main/codegen/templates/UnionWriter.java index 1137e2cb0207a..460ec1c0d9586 100644 --- a/java/vector/src/main/codegen/templates/UnionWriter.java +++ b/java/vector/src/main/codegen/templates/UnionWriter.java @@ -74,7 +74,7 @@ public void endList() { private MapWriter getMapWriter() { if (mapWriter == null) { - mapWriter = new SingleMapWriter(data.getMap()); + mapWriter = new NullableMapWriter(data.getMap()); mapWriter.setPosition(idx()); writers.add(mapWriter); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/NullableVector.java b/java/vector/src/main/java/org/apache/arrow/vector/NullableVector.java index 00c33fc2d6e6c..0212b3c0d7b95 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/NullableVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/NullableVector.java @@ -17,7 +17,7 @@ */ package org.apache.arrow.vector; -public interface NullableVector extends ValueVector{ +public interface NullableVector extends ValueVector { ValueVector getValuesVector(); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java index e4d37bf47d114..3375a7d5c311b 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorUnloader.java @@ -68,7 +68,9 @@ private void appendNodes(FieldVector vector, List nodes, List fieldBuffers = vector.getFieldBuffers(); List expectedBuffers = vector.getField().getTypeLayout().getVectorTypes(); if (fieldBuffers.size() != expectedBuffers.size()) { - throw new IllegalArgumentException("wrong number of buffers for field " + vector.getField() + ". found: " + fieldBuffers); + throw new IllegalArgumentException(String.format( + "wrong number of buffers for field %s in vector %s. found: %s", + vector.getField(), vector.getClass().getSimpleName(), fieldBuffers)); } buffers.addAll(fieldBuffers); for (FieldVector child : vector.getChildrenFromFields()) { diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java index e3696588e6006..1b8483a3d41be 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java @@ -18,9 +18,7 @@ package org.apache.arrow.vector.complex; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -28,15 +26,12 @@ import javax.annotation.Nullable; import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.vector.BaseDataValueVector; import org.apache.arrow.vector.BaseValueVector; -import org.apache.arrow.vector.BufferBacked; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.ValueVector; import org.apache.arrow.vector.complex.impl.SingleMapReaderImpl; import org.apache.arrow.vector.complex.reader.FieldReader; import org.apache.arrow.vector.holders.ComplexHolder; -import org.apache.arrow.vector.schema.ArrowFieldNode; import org.apache.arrow.vector.types.Types; import org.apache.arrow.vector.types.Types.MinorType; import org.apache.arrow.vector.types.pojo.ArrowType.Tuple; @@ -49,26 +44,20 @@ import com.google.common.collect.Ordering; import com.google.common.primitives.Ints; -import io.netty.buffer.ArrowBuf; - -public class MapVector extends AbstractMapVector implements FieldVector { +public class MapVector extends AbstractMapVector { //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapVector.class); - private final SingleMapReaderImpl reader = new SingleMapReaderImpl(MapVector.this); + private final SingleMapReaderImpl reader = new SingleMapReaderImpl(this); private final Accessor accessor = new Accessor(); private final Mutator mutator = new Mutator(); int valueCount; - // TODO: validity vector - private final List innerVectors = Collections.unmodifiableList(Arrays.asList()); - - public MapVector(String name, BufferAllocator allocator, CallBack callBack){ + public MapVector(String name, BufferAllocator allocator, CallBack callBack) { super(name, allocator, callBack); } @Override public FieldReader getReader() { - //return new SingleMapReaderImpl(MapVector.this); return reader; } @@ -124,18 +113,9 @@ public int getBufferSizeFor(final int valueCount) { return (int) bufferSize; } - @Override - public ArrowBuf[] getBuffers(boolean clear) { - int expectedSize = getBufferSize(); - int actualSize = super.getBufferSize(); - - Preconditions.checkArgument(expectedSize == actualSize, expectedSize + " != " + actualSize); - return super.getBuffers(clear); - } - @Override public TransferPair getTransferPair(BufferAllocator allocator) { - return new MapTransferPair(this, name, allocator); + return new MapTransferPair(this, new MapVector(name, allocator, callBack), false); } @Override @@ -145,7 +125,7 @@ public TransferPair makeTransferPair(ValueVector to) { @Override public TransferPair getTransferPair(String ref, BufferAllocator allocator) { - return new MapTransferPair(this, ref, allocator); + return new MapTransferPair(this, new MapVector(ref, allocator, callBack), false); } protected static class MapTransferPair implements TransferPair{ @@ -153,10 +133,6 @@ protected static class MapTransferPair implements TransferPair{ private final MapVector from; private final MapVector to; - public MapTransferPair(MapVector from, String name, BufferAllocator allocator) { - this(from, new MapVector(name, allocator, from.callBack), false); - } - public MapTransferPair(MapVector from, MapVector to) { this(from, to, true); } @@ -335,7 +311,6 @@ public void close() { super.close(); } - @Override public void initializeChildrenFromFields(List children) { for (Field field : children) { MinorType minorType = Types.getMinorTypeForArrowType(field.getType()); @@ -344,25 +319,9 @@ public void initializeChildrenFromFields(List children) { } } - @Override + public List getChildrenFromFields() { return getChildren(); } - @Override - public void loadFieldBuffers(ArrowFieldNode fieldNode, List ownBuffers) { - BaseDataValueVector.load(getFieldInnerVectors(), ownBuffers); - // TODO: something with fieldNode? - } - - @Override - public List getFieldBuffers() { - return BaseDataValueVector.unload(getFieldInnerVectors()); - } - - @Override - public List getFieldInnerVectors() { - return innerVectors; - } - } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/NullableMapVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/NullableMapVector.java new file mode 100644 index 0000000000000..6b257c095d28e --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/NullableMapVector.java @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.vector.complex; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.BaseDataValueVector; +import org.apache.arrow.vector.BufferBacked; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.NullableVectorDefinitionSetter; +import org.apache.arrow.vector.UInt1Vector; +import org.apache.arrow.vector.ValueVector; +import org.apache.arrow.vector.complex.impl.NullableMapReaderImpl; +import org.apache.arrow.vector.complex.reader.FieldReader; +import org.apache.arrow.vector.holders.ComplexHolder; +import org.apache.arrow.vector.schema.ArrowFieldNode; +import org.apache.arrow.vector.util.CallBack; +import org.apache.arrow.vector.util.TransferPair; + +import com.google.common.collect.ObjectArrays; + +import io.netty.buffer.ArrowBuf; + +public class NullableMapVector extends MapVector implements FieldVector { + + private final NullableMapReaderImpl reader = new NullableMapReaderImpl(this); + + protected final UInt1Vector bits; + + private final List innerVectors; + + private final Accessor accessor; + private final Mutator mutator; + + public NullableMapVector(String name, BufferAllocator allocator, CallBack callBack) { + super(name, checkNotNull(allocator), callBack); + this.bits = new UInt1Vector("$bits$", allocator); + this.innerVectors = Collections.unmodifiableList(Arrays.asList(bits)); + this.accessor = new Accessor(); + this.mutator = new Mutator(); + } + + @Override + public void loadFieldBuffers(ArrowFieldNode fieldNode, List ownBuffers) { + BaseDataValueVector.load(getFieldInnerVectors(), ownBuffers); + this.valueCount = fieldNode.getLength(); + } + + @Override + public List getFieldBuffers() { + return BaseDataValueVector.unload(getFieldInnerVectors()); + } + + @Override + public List getFieldInnerVectors() { + return innerVectors; + } + + @Override + public FieldReader getReader() { + return reader; + } + + @Override + public TransferPair getTransferPair(BufferAllocator allocator) { + return new NullableMapTransferPair(this, new NullableMapVector(name, allocator, callBack), false); + } + + @Override + public TransferPair makeTransferPair(ValueVector to) { + return new NullableMapTransferPair(this, (NullableMapVector) to, true); + } + + @Override + public TransferPair getTransferPair(String ref, BufferAllocator allocator) { + return new NullableMapTransferPair(this, new NullableMapVector(ref, allocator, callBack), false); + } + + protected class NullableMapTransferPair extends MapTransferPair { + + private NullableMapVector target; + + protected NullableMapTransferPair(NullableMapVector from, NullableMapVector to, boolean allocate) { + super(from, to, allocate); + this.target = to; + } + + @Override + public void transfer() { + bits.transferTo(target.bits); + super.transfer(); + } + + @Override + public void copyValueSafe(int fromIndex, int toIndex) { + target.bits.copyFromSafe(fromIndex, toIndex, bits); + super.copyValueSafe(fromIndex, toIndex); + } + + @Override + public void splitAndTransfer(int startIndex, int length) { + bits.splitAndTransferTo(startIndex, length, target.bits); + super.splitAndTransfer(startIndex, length); + } + } + + @Override + public int getValueCapacity() { + return Math.min(bits.getValueCapacity(), super.getValueCapacity()); + } + + @Override + public ArrowBuf[] getBuffers(boolean clear) { + return ObjectArrays.concat(bits.getBuffers(clear), super.getBuffers(clear), ArrowBuf.class); + } + + @Override + public void close() { + bits.close(); + super.close(); + } + + @Override + public void clear() { + bits.clear(); + super.clear(); + } + + + @Override + public int getBufferSize(){ + return super.getBufferSize() + bits.getBufferSize(); + } + + @Override + public int getBufferSizeFor(final int valueCount) { + if (valueCount == 0) { + return 0; + } + return super.getBufferSizeFor(valueCount) + + bits.getBufferSizeFor(valueCount); + } + + @Override + public void setInitialCapacity(int numRecords) { + bits.setInitialCapacity(numRecords); + super.setInitialCapacity(numRecords); + } + + @Override + public boolean allocateNewSafe() { + /* Boolean to keep track if all the memory allocations were successful + * Used in the case of composite vectors when we need to allocate multiple + * buffers for multiple vectors. If one of the allocations failed we need to + * clear all the memory that we allocated + */ + boolean success = false; + try { + success = super.allocateNewSafe() && bits.allocateNewSafe(); + } finally { + if (!success) { + clear(); + } + } + bits.zeroVector(); + return success; + } + public final class Accessor extends MapVector.Accessor { + final UInt1Vector.Accessor bAccessor = bits.getAccessor(); + + @Override + public Object getObject(int index) { + if (isNull(index)) { + return null; + } else { + return super.getObject(index); + } + } + + @Override + public void get(int index, ComplexHolder holder) { + holder.isSet = isSet(index); + super.get(index, holder); + } + + @Override + public boolean isNull(int index) { + return isSet(index) == 0; + } + + public int isSet(int index){ + return bAccessor.get(index); + } + + } + + public final class Mutator extends MapVector.Mutator implements NullableVectorDefinitionSetter { + + private Mutator(){ + } + + @Override + public void setIndexDefined(int index){ + bits.getMutator().setSafe(index, 1); + } + + public void setNull(int index){ + bits.getMutator().setSafe(index, 0); + } + + @Override + public void setValueCount(int valueCount) { + assert valueCount >= 0; + super.setValueCount(valueCount); + bits.getMutator().setValueCount(valueCount); + } + + @Override + public void generateTestData(int valueCount){ + super.generateTestData(valueCount); + bits.getMutator().generateTestDataAlt(valueCount); + } + + @Override + public void reset(){ + bits.getMutator().setValueCount(0); + } + + } + + @Override + public Accessor getAccessor() { + return accessor; + } + + @Override + public Mutator getMutator() { + return mutator; + } +} diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/AbstractBaseReader.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/AbstractBaseReader.java index 259a954233c06..e7c3c8c7e4b42 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/AbstractBaseReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/AbstractBaseReader.java @@ -19,15 +19,10 @@ import java.util.Iterator; -import com.google.flatbuffers.FlatBufferBuilder; -import org.apache.arrow.flatbuf.Type; -import org.apache.arrow.flatbuf.Union; -import org.apache.arrow.flatbuf.UnionMode; import org.apache.arrow.vector.complex.reader.FieldReader; import org.apache.arrow.vector.complex.writer.BaseWriter.ListWriter; import org.apache.arrow.vector.complex.writer.FieldWriter; import org.apache.arrow.vector.holders.UnionHolder; -import org.apache.arrow.vector.types.pojo.Field; abstract class AbstractBaseReader implements FieldReader{ @@ -44,7 +39,7 @@ public void setPosition(int index){ this.index = index; } - int idx(){ + protected int idx(){ return index; } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/ComplexWriterImpl.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/ComplexWriterImpl.java index 89bfefc8f19e3..761b1b43c08aa 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/ComplexWriterImpl.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/ComplexWriterImpl.java @@ -19,6 +19,7 @@ import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.NullableMapVector; import org.apache.arrow.vector.complex.StateTool; import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter; import org.apache.arrow.vector.types.Types.MinorType; @@ -29,7 +30,7 @@ public class ComplexWriterImpl extends AbstractFieldWriter implements ComplexWriter { // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ComplexWriterImpl.class); - private SingleMapWriter mapRoot; + private NullableMapWriter mapRoot; private UnionListWriter listRoot; private final MapVector container; @@ -121,8 +122,8 @@ public MapWriter directMap(){ switch(mode){ case INIT: - MapVector map = (MapVector) container; - mapRoot = new SingleMapWriter(map); + NullableMapVector map = (NullableMapVector) container; + mapRoot = new NullableMapWriter(map); mapRoot.setPosition(idx()); mode = Mode.MAP; break; @@ -142,8 +143,8 @@ public MapWriter rootAsMap() { switch(mode){ case INIT: - MapVector map = container.addOrGet(name, MinorType.MAP, MapVector.class); - mapRoot = new SingleMapWriter(map); + NullableMapVector map = container.addOrGet(name, MinorType.MAP, NullableMapVector.class); + mapRoot = new NullableMapWriter(map); mapRoot.setPosition(idx()); mode = Mode.MAP; break; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/NullableMapReaderImpl.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/NullableMapReaderImpl.java new file mode 100644 index 0000000000000..18b35c194a184 --- /dev/null +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/NullableMapReaderImpl.java @@ -0,0 +1,45 @@ +/******************************************************************************* + + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.arrow.vector.complex.impl; + +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.NullableMapVector; +import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter; + +public class NullableMapReaderImpl extends SingleMapReaderImpl { + + private NullableMapVector nullableMapVector; + + public NullableMapReaderImpl(MapVector vector) { + super((NullableMapVector)vector); + this.nullableMapVector = (NullableMapVector)vector; + } + + @Override + public void copyAsValue(MapWriter writer){ + NullableMapWriter impl = (NullableMapWriter) writer; + impl.container.copyFromSafe(idx(), impl.idx(), nullableMapVector); + } + + @Override + public void copyAsField(String name, MapWriter writer){ + NullableMapWriter impl = (NullableMapWriter) writer.map(name); + impl.container.copyFromSafe(idx(), impl.idx(), nullableMapVector); + } +} diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/SingleMapReaderImpl.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/SingleMapReaderImpl.java index 1c43240901c4f..ae17b4bbb10dd 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/SingleMapReaderImpl.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/impl/SingleMapReaderImpl.java @@ -1,5 +1,3 @@ - - /******************************************************************************* * Licensed to the Apache Software Foundation (ASF) under one @@ -27,9 +25,9 @@ import org.apache.arrow.vector.complex.MapVector; import org.apache.arrow.vector.complex.reader.FieldReader; import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter; +import org.apache.arrow.vector.types.Types.MinorType; import com.google.common.collect.Maps; -import org.apache.arrow.vector.types.Types.MinorType; @SuppressWarnings("unused") public class SingleMapReaderImpl extends AbstractFieldReader{ diff --git a/java/vector/src/main/java/org/apache/arrow/vector/schema/TypeLayout.java b/java/vector/src/main/java/org/apache/arrow/vector/schema/TypeLayout.java index 15cd49865bdce..9f1efd056cb08 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/schema/TypeLayout.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/schema/TypeLayout.java @@ -90,8 +90,7 @@ public static TypeLayout getTypeLayout(final ArrowType arrowType) { @Override public TypeLayout visit(Tuple type) { List vectors = asList( - // TODO: add validity vector in Map -// validityVector() + validityVector() ); return new TypeLayout(vectors); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java b/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java index 4d0d9ee114ad8..5eef8a008a923 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/types/Types.java @@ -47,7 +47,7 @@ import org.apache.arrow.vector.ValueVector; import org.apache.arrow.vector.ZeroVector; import org.apache.arrow.vector.complex.ListVector; -import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.NullableMapVector; import org.apache.arrow.vector.complex.UnionVector; import org.apache.arrow.vector.complex.impl.BigIntWriterImpl; import org.apache.arrow.vector.complex.impl.BitWriterImpl; @@ -58,7 +58,7 @@ import org.apache.arrow.vector.complex.impl.IntWriterImpl; import org.apache.arrow.vector.complex.impl.IntervalDayWriterImpl; import org.apache.arrow.vector.complex.impl.IntervalYearWriterImpl; -import org.apache.arrow.vector.complex.impl.SingleMapWriter; +import org.apache.arrow.vector.complex.impl.NullableMapWriter; import org.apache.arrow.vector.complex.impl.SmallIntWriterImpl; import org.apache.arrow.vector.complex.impl.TimeStampWriterImpl; import org.apache.arrow.vector.complex.impl.TimeWriterImpl; @@ -139,12 +139,12 @@ public Field getField() { @Override public FieldVector getNewVector(String name, BufferAllocator allocator, CallBack callBack, int... precisionScale) { - return new MapVector(name, allocator, callBack); + return new NullableMapVector(name, allocator, callBack); } @Override public FieldWriter getNewFieldWriter(ValueVector vector) { - return new SingleMapWriter((MapVector) vector); + return new NullableMapWriter((NullableMapVector) vector); } }, // an empty map column. Useful for conceptual setup. Children listed within here diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java index 85bb2cfc99f81..7dcb8977c0d7f 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestVectorUnloadLoad.java @@ -22,6 +22,7 @@ import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.NullableMapVector; import org.apache.arrow.vector.complex.impl.ComplexWriterImpl; import org.apache.arrow.vector.complex.impl.SingleMapReaderImpl; import org.apache.arrow.vector.complex.reader.BaseReader.MapReader; @@ -60,14 +61,14 @@ public void test() throws IOException { } writer.setValueCount(count); - VectorUnloader vectorUnloader = new VectorUnloader((MapVector)parent.getChild("root")); + VectorUnloader vectorUnloader = new VectorUnloader(parent.getChild("root")); schema = vectorUnloader.getSchema(); try ( ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch(); BufferAllocator finalVectorsAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE); MapVector newParent = new MapVector("parent", finalVectorsAllocator, null)) { - MapVector root = newParent.addOrGet("root", MinorType.MAP, MapVector.class); + FieldVector root = newParent.addOrGet("root", MinorType.MAP, NullableMapVector.class); VectorLoader vectorLoader = new VectorLoader(schema, root); vectorLoader.load(recordBatch); diff --git a/java/vector/src/test/java/org/apache/arrow/vector/complex/impl/TestPromotableWriter.java b/java/vector/src/test/java/org/apache/arrow/vector/complex/impl/TestPromotableWriter.java index 24f00f14df001..689c96fda9202 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/complex/impl/TestPromotableWriter.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/complex/impl/TestPromotableWriter.java @@ -25,8 +25,8 @@ import org.apache.arrow.vector.DirtyRootAllocator; import org.apache.arrow.vector.complex.AbstractMapVector; import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.NullableMapVector; import org.apache.arrow.vector.complex.UnionVector; -import org.apache.arrow.vector.holders.UInt4Holder; import org.apache.arrow.vector.types.Types.MinorType; import org.junit.After; import org.junit.Before; @@ -51,7 +51,7 @@ public void terminate() throws Exception { public void testPromoteToUnion() throws Exception { try (final AbstractMapVector container = new MapVector(EMPTY_SCHEMA_PATH, allocator, null); - final MapVector v = container.addOrGet("test", MinorType.MAP, MapVector.class); + final NullableMapVector v = container.addOrGet("test", MinorType.MAP, NullableMapVector.class); final PromotableWriter writer = new PromotableWriter(v, container)) { container.allocateNew(); diff --git a/java/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestComplexWriter.java b/java/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestComplexWriter.java index bc17a2b2835c2..fa710dae5eee8 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestComplexWriter.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/complex/writer/TestComplexWriter.java @@ -17,7 +17,6 @@ */ package org.apache.arrow.vector.complex.writer; -import io.netty.buffer.ArrowBuf; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.complex.ListVector; @@ -41,6 +40,8 @@ import org.junit.Assert; import org.junit.Test; +import io.netty.buffer.ArrowBuf; + public class TestComplexWriter { static final BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE); @@ -71,6 +72,36 @@ public void simpleNestedTypes() { parent.close(); } + @Test + public void nullableMap() { + MapVector parent = new MapVector("parent", allocator, null); + ComplexWriter writer = new ComplexWriterImpl("root", parent); + MapWriter rootWriter = writer.rootAsMap(); + MapWriter mapWriter = rootWriter.map("map"); + BigIntWriter nested = mapWriter.bigInt("nested"); + for (int i = 0; i < COUNT; i++) { + if (i % 2 == 0) { + mapWriter.setPosition(i); + mapWriter.start(); + nested.writeBigInt(i); + mapWriter.end(); + } + } + writer.setValueCount(COUNT); + MapReader rootReader = new SingleMapReaderImpl(parent).reader("root"); + for (int i = 0; i < COUNT; i++) { + rootReader.setPosition(i); + if (i % 2 == 0) { + Assert.assertNotNull(rootReader.reader("map").readObject()); + Assert.assertEquals(i, rootReader.reader("map").reader("nested").readLong().longValue()); + } else { + Assert.assertNull(rootReader.reader("map").readObject()); + } + } + + parent.close(); + } + @Test public void listScalarType() { ListVector listVector = new ListVector("list", allocator, null); diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java index 11de0a2ef00a0..ad301689cd1e2 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java @@ -31,6 +31,7 @@ import org.apache.arrow.vector.VectorLoader; import org.apache.arrow.vector.VectorUnloader; import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.NullableMapVector; import org.apache.arrow.vector.complex.impl.ComplexWriterImpl; import org.apache.arrow.vector.complex.impl.SingleMapReaderImpl; import org.apache.arrow.vector.complex.reader.BaseReader.MapReader; @@ -47,10 +48,13 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import io.netty.buffer.ArrowBuf; public class TestArrowFile { + private static final Logger LOGGER = LoggerFactory.getLogger(TestArrowFile.class); private static final int COUNT = 10; private BufferAllocator allocator; @@ -72,7 +76,7 @@ public void testWrite() throws IOException { BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); MapVector parent = new MapVector("parent", vectorAllocator, null)) { writeData(count, parent); - write((MapVector)parent.getChild("root"), file); + write(parent.getChild("root"), file); } } @@ -82,10 +86,10 @@ public void testWriteComplex() throws IOException { int count = COUNT; try ( BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); - MapVector parent = new MapVector("parent", vectorAllocator, null)) { + NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null)) { writeComplexData(count, parent); validateComplexContent(count, parent); - write((MapVector)parent.getChild("root"), file); + write(parent.getChild("root"), file); } } @@ -147,7 +151,7 @@ public void testWriteRead() throws IOException { BufferAllocator originalVectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); MapVector parent = new MapVector("parent", originalVectorAllocator, null)) { writeData(count, parent); - write((MapVector)parent.getChild("root"), file); + write(parent.getChild("root"), file); } // read @@ -160,11 +164,11 @@ public void testWriteRead() throws IOException { ) { ArrowFooter footer = arrowReader.readFooter(); Schema schema = footer.getSchema(); - System.out.println("reading schema: " + schema); + LOGGER.debug("reading schema: " + schema); // initialize vectors - MapVector root = parent.addOrGet("root", MinorType.MAP, MapVector.class); + NullableMapVector root = parent.addOrGet("root", MinorType.MAP, NullableMapVector.class); VectorLoader vectorLoader = new VectorLoader(schema, root); @@ -204,7 +208,7 @@ public void testWriteReadComplex() throws IOException { BufferAllocator originalVectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); MapVector parent = new MapVector("parent", originalVectorAllocator, null)) { writeComplexData(count, parent); - write((MapVector)parent.getChild("root"), file); + write(parent.getChild("root"), file); } // read @@ -213,16 +217,15 @@ public void testWriteReadComplex() throws IOException { FileInputStream fileInputStream = new FileInputStream(file); ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), readerAllocator); BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE); - MapVector parent = new MapVector("parent", vectorAllocator, null) + NullableMapVector parent = new NullableMapVector("parent", vectorAllocator, null) ) { ArrowFooter footer = arrowReader.readFooter(); Schema schema = footer.getSchema(); - System.out.println("reading schema: " + schema); + LOGGER.debug("reading schema: " + schema); // initialize vectors - MapVector root = parent.addOrGet("root", MinorType.MAP, MapVector.class); - + NullableMapVector root = parent.addOrGet("root", MinorType.MAP, NullableMapVector.class); VectorLoader vectorLoader = new VectorLoader(schema, root); List recordBatches = footer.getRecordBatches(); @@ -237,16 +240,16 @@ public void testWriteReadComplex() throws IOException { public void printVectors(List vectors) { for (FieldVector vector : vectors) { - System.out.println(vector.getField().getName()); + LOGGER.debug(vector.getField().getName()); Accessor accessor = vector.getAccessor(); int valueCount = accessor.getValueCount(); for (int i = 0; i < valueCount; i++) { - System.out.println(accessor.getObject(i)); + LOGGER.debug(String.valueOf(accessor.getObject(i))); } } } - private void validateComplexContent(int count, MapVector parent) { + private void validateComplexContent(int count, NullableMapVector parent) { printVectors(parent.getChildrenFromFields()); MapReader rootReader = new SingleMapReaderImpl(parent).reader("root"); @@ -259,10 +262,10 @@ private void validateComplexContent(int count, MapVector parent) { } } - private void write(MapVector parent, File file) throws FileNotFoundException, IOException { + private void write(FieldVector parent, File file) throws FileNotFoundException, IOException { VectorUnloader vectorUnloader = new VectorUnloader(parent); Schema schema = vectorUnloader.getSchema(); - System.out.println("writing schema: " + schema); + LOGGER.debug("writing schema: " + schema); try ( FileOutputStream fileOutputStream = new FileOutputStream(file); ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema); @@ -308,8 +311,8 @@ public void testWriteReadMultipleRBs() throws IOException { ) { ArrowFooter footer = arrowReader.readFooter(); Schema schema = footer.getSchema(); - System.out.println("reading schema: " + schema); - MapVector root = parent.addOrGet("root", MinorType.MAP, MapVector.class); + LOGGER.debug("reading schema: " + schema); + NullableMapVector root = parent.addOrGet("root", MinorType.MAP, NullableMapVector.class); VectorLoader vectorLoader = new VectorLoader(schema, root); List recordBatches = footer.getRecordBatches(); Assert.assertEquals(2, recordBatches.size()); diff --git a/java/vector/src/test/java/org/apache/arrow/vector/pojo/TestConvert.java b/java/vector/src/test/java/org/apache/arrow/vector/pojo/TestConvert.java index e557cc84f3bae..61327f1970e83 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/pojo/TestConvert.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/pojo/TestConvert.java @@ -22,8 +22,6 @@ import static org.junit.Assert.assertEquals; import org.apache.arrow.flatbuf.UnionMode; -import static org.junit.Assert.assertEquals; - import org.apache.arrow.vector.types.pojo.ArrowType.FloatingPoint; import org.apache.arrow.vector.types.pojo.ArrowType.Int; import org.apache.arrow.vector.types.pojo.ArrowType.List;