Skip to content

Commit

Permalink
ARROW-274: Add NullableMapVector to support nullable maps
Browse files Browse the repository at this point in the history
  • Loading branch information
julienledem committed Aug 30, 2016
1 parent e197b2d commit 8780f48
Show file tree
Hide file tree
Showing 14 changed files with 276 additions and 67 deletions.
5 changes: 3 additions & 2 deletions java/vector/src/main/codegen/templates/MapWriters.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<#list ["Single"] as mode>
<@pp.changeOutputFile name="/org/apache/arrow/vector/complex/impl/${mode}MapWriter.java" />
<#if mode == "Single">
<#assign containerClass = "MapVector" />
<#assign containerClass = "NullableMapVector" />
<#assign index = "idx()">
<#else>
<#assign containerClass = "RepeatedMapVector" />
Expand Down Expand Up @@ -75,7 +75,7 @@ public MapWriter map(String name) {
FieldWriter writer = fields.get(name.toLowerCase());
if(writer == null){
int vectorCount=container.size();
MapVector vector = container.addOrGet(name, MinorType.MAP, MapVector.class);
MapVector vector = container.addOrGet(name, MinorType.MAP, NullableMapVector.class);
writer = new PromotableWriter(vector, container);
if(vectorCount != container.size()) {
writer.allocate();
Expand Down Expand Up @@ -165,6 +165,7 @@ public void setPosition(int index) {
@Override
public void start() {
container.getMutator().setIndexDefined(idx());
}
@Override
Expand Down
6 changes: 3 additions & 3 deletions java/vector/src/main/codegen/templates/UnionVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class UnionVector implements FieldVector {
MapVector internalMap;
UInt1Vector typeVector;

private MapVector mapVector;
private NullableMapVector mapVector;
private ListVector listVector;

private FieldReader reader;
Expand Down Expand Up @@ -127,10 +127,10 @@ public List<BufferBacked> getFieldInnerVectors() {
throw new UnsupportedOperationException();
}

public MapVector getMap() {
public NullableMapVector getMap() {
if (mapVector == null) {
int vectorCount = internalMap.size();
mapVector = internalMap.addOrGet("map", MinorType.MAP, MapVector.class);
mapVector = internalMap.addOrGet("map", MinorType.MAP, NullableMapVector.class);
if (internalMap.size() > vectorCount) {
mapVector.allocateNew();
if (callBack != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.arrow.vector;

public interface NullableVector extends ValueVector{
public interface NullableVector extends ValueVector {

ValueVector getValuesVector();
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ private void appendNodes(FieldVector vector, List<ArrowFieldNode> nodes, List<Ar
List<ArrowBuf> fieldBuffers = vector.getFieldBuffers();
List<ArrowVectorType> expectedBuffers = vector.getField().getTypeLayout().getVectorTypes();
if (fieldBuffers.size() != expectedBuffers.size()) {
throw new IllegalArgumentException("wrong number of buffers for field " + vector.getField() + ". found: " + fieldBuffers);
throw new IllegalArgumentException(String.format(
"wrong number of buffers for field %s in vector %s. found: %s",
vector.getField(), vector.getClass().getSimpleName(), fieldBuffers));
}
buffers.addAll(fieldBuffers);
for (FieldVector child : vector.getChildrenFromFields()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,20 @@
package org.apache.arrow.vector.complex;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import javax.annotation.Nullable;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.BaseDataValueVector;
import org.apache.arrow.vector.BaseValueVector;
import org.apache.arrow.vector.BufferBacked;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.complex.impl.SingleMapReaderImpl;
import org.apache.arrow.vector.complex.reader.FieldReader;
import org.apache.arrow.vector.holders.ComplexHolder;
import org.apache.arrow.vector.schema.ArrowFieldNode;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.arrow.vector.types.pojo.ArrowType.Tuple;
Expand All @@ -51,18 +46,15 @@

import io.netty.buffer.ArrowBuf;

public class MapVector extends AbstractMapVector implements FieldVector {
public class MapVector extends AbstractMapVector {
//private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapVector.class);

private final SingleMapReaderImpl reader = new SingleMapReaderImpl(MapVector.this);
private final Accessor accessor = new Accessor();
private final Mutator mutator = new Mutator();
int valueCount;

// TODO: validity vector
private final List<BufferBacked> innerVectors = Collections.unmodifiableList(Arrays.<BufferBacked>asList());

public MapVector(String name, BufferAllocator allocator, CallBack callBack){
public MapVector(String name, BufferAllocator allocator, CallBack callBack) {
super(name, allocator, callBack);
}

Expand Down Expand Up @@ -335,7 +327,6 @@ public void close() {
super.close();
}

@Override
public void initializeChildrenFromFields(List<Field> children) {
for (Field field : children) {
MinorType minorType = Types.getMinorTypeForArrowType(field.getType());
Expand All @@ -344,25 +335,9 @@ public void initializeChildrenFromFields(List<Field> children) {
}
}

@Override

public List<FieldVector> getChildrenFromFields() {
return getChildren();
}

@Override
public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) {
BaseDataValueVector.load(getFieldInnerVectors(), ownBuffers);
// TODO: something with fieldNode?
}

@Override
public List<ArrowBuf> getFieldBuffers() {
return BaseDataValueVector.unload(getFieldInnerVectors());
}

@Override
public List<BufferBacked> getFieldInnerVectors() {
return innerVectors;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.vector.complex;

import static com.google.common.base.Preconditions.checkNotNull;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.BaseDataValueVector;
import org.apache.arrow.vector.BufferBacked;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.NullableVectorDefinitionSetter;
import org.apache.arrow.vector.UInt1Vector;
import org.apache.arrow.vector.holders.ComplexHolder;
import org.apache.arrow.vector.schema.ArrowFieldNode;
import org.apache.arrow.vector.util.CallBack;

import com.google.common.collect.ObjectArrays;

import io.netty.buffer.ArrowBuf;

public class NullableMapVector extends MapVector implements FieldVector {

private final UInt1Vector bits;

private final List<BufferBacked> innerVectors;

private final Accessor accessor;
private final Mutator mutator;

public NullableMapVector(String name, BufferAllocator allocator, CallBack callBack) {
super(name, checkNotNull(allocator), callBack);
this.bits = new UInt1Vector("$bits$", allocator);
this.innerVectors = Collections.unmodifiableList(Arrays.<BufferBacked>asList(bits));
this.accessor = new Accessor();
this.mutator = new Mutator();
}

@Override
public void loadFieldBuffers(ArrowFieldNode fieldNode, List<ArrowBuf> ownBuffers) {
BaseDataValueVector.load(getFieldInnerVectors(), ownBuffers);
this.valueCount = fieldNode.getLength();
}

@Override
public List<ArrowBuf> getFieldBuffers() {
return BaseDataValueVector.unload(getFieldInnerVectors());
}

@Override
public List<BufferBacked> getFieldInnerVectors() {
return innerVectors;
}

@Override
public int getValueCapacity() {
return Math.min(bits.getValueCapacity(), super.getValueCapacity());
}

@Override
public ArrowBuf[] getBuffers(boolean clear) {
return ObjectArrays.concat(bits.getBuffers(clear), super.getBuffers(clear), ArrowBuf.class);
}

@Override
public void close() {
bits.close();
super.close();
}

@Override
public void clear() {
bits.clear();
super.clear();
}

@Override
public int getBufferSize(){
return super.getBufferSize() + bits.getBufferSize();
}

@Override
public int getBufferSizeFor(final int valueCount) {
if (valueCount == 0) {
return 0;
}
return super.getBufferSizeFor(valueCount)
+ bits.getBufferSizeFor(valueCount);
}

@Override
public void setInitialCapacity(int numRecords) {
bits.setInitialCapacity(numRecords);
super.setInitialCapacity(numRecords);
}

@Override
public boolean allocateNewSafe() {
/* Boolean to keep track if all the memory allocations were successful
* Used in the case of composite vectors when we need to allocate multiple
* buffers for multiple vectors. If one of the allocations failed we need to
* clear all the memory that we allocated
*/
boolean success = false;
try {
success = super.allocateNewSafe() && bits.allocateNewSafe();
} finally {
if (!success) {
clear();
}
}
bits.zeroVector();
return success;
}
public final class Accessor extends MapVector.Accessor {
final UInt1Vector.Accessor bAccessor = bits.getAccessor();

@Override
public Object getObject(int index) {
if (isNull(index)) {
return null;
} else {
return super.getObject(index);
}
}

@Override
public void get(int index, ComplexHolder holder) {
holder.isSet = isSet(index);
super.get(index, holder);
}

@Override
public boolean isNull(int index) {
return isSet(index) == 0;
}

public int isSet(int index){
return bAccessor.get(index);
}

}

public final class Mutator extends MapVector.Mutator implements NullableVectorDefinitionSetter {

private Mutator(){
}

@Override
public void setIndexDefined(int index){
bits.getMutator().set(index, 1);
}

public void setNull(int index){
bits.getMutator().setSafe(index, 0);
}

public boolean isSafe(int outIndex) {
return outIndex < NullableMapVector.this.getValueCapacity();
}

@Override
public void setValueCount(int valueCount) {
assert valueCount >= 0;
super.setValueCount(valueCount);
bits.getMutator().setValueCount(valueCount);
}

}

@Override
public Accessor getAccessor() {
return accessor;
}

@Override
public Mutator getMutator() {
return mutator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.complex.NullableMapVector;
import org.apache.arrow.vector.complex.StateTool;
import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter;
import org.apache.arrow.vector.types.Types.MinorType;
Expand Down Expand Up @@ -121,7 +122,7 @@ public MapWriter directMap(){
switch(mode){

case INIT:
MapVector map = (MapVector) container;
NullableMapVector map = (NullableMapVector) container;
mapRoot = new SingleMapWriter(map);
mapRoot.setPosition(idx());
mode = Mode.MAP;
Expand All @@ -142,7 +143,7 @@ public MapWriter rootAsMap() {
switch(mode){

case INIT:
MapVector map = container.addOrGet(name, MinorType.MAP, MapVector.class);
NullableMapVector map = container.addOrGet(name, MinorType.MAP, NullableMapVector.class);
mapRoot = new SingleMapWriter(map);
mapRoot.setPosition(idx());
mode = Mode.MAP;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ public static TypeLayout getTypeLayout(final ArrowType arrowType) {

@Override public TypeLayout visit(Tuple type) {
List<VectorLayout> vectors = asList(
// TODO: add validity vector in Map
// validityVector()
validityVector()
);
return new TypeLayout(vectors);
}
Expand Down
Loading

0 comments on commit 8780f48

Please sign in to comment.