Skip to content

Commit

Permalink
Encode binary
Browse files Browse the repository at this point in the history
  • Loading branch information
labianchin committed Jul 8, 2019
1 parent d5f5ba7 commit ffc31da
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 17 deletions.
20 changes: 6 additions & 14 deletions dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,8 @@
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Map;

import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileConstants;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
Expand Down Expand Up @@ -145,6 +143,7 @@ private static class JdbcAvroWriter extends FileBasedSink.Writer<Void, String> {
super(writeOperation, MimeTypes.BINARY);
this.dynamicDestinations = dynamicDestinations;
this.jdbcAvroArgs = jdbcAvroArgs;
this.metering = JdbcAvroMetering.create();
}

public Void getDestination() {
Expand All @@ -157,15 +156,13 @@ protected void prepareWrite(WritableByteChannel channel) throws Exception {
logger.info("jdbcavroio : Preparing write...");
connection = jdbcAvroArgs.jdbcConnectionConfiguration().createConnection();
Void destination = getDestination();
CodecFactory codec = dynamicDestinations.getCodec(destination);
Schema schema = dynamicDestinations.getSchema(destination);
dataFileWriter = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>(schema))
.setCodec(codec)
.setCodec(dynamicDestinations.getCodec(destination))
.setSyncInterval(syncInterval);
dataFileWriter.setMeta("created_by", this.getClass().getCanonicalName());
this.countingOutputStream = new CountingOutputStream(Channels.newOutputStream(channel));
dataFileWriter.create(schema, this.countingOutputStream);
this.metering = JdbcAvroMetering.create();
logger.info("jdbcavroio : Write prepared");
}

Expand All @@ -187,6 +184,8 @@ private ResultSet executeQuery(String query) throws Exception {
statement.getFetchSize());
ResultSet resultSet = statement.executeQuery();
this.metering.exposeExecuteQueryMs(System.currentTimeMillis() - startTime);
checkArgument(resultSet != null,
"JDBC resultSet was not properly created");
return resultSet;
}

Expand All @@ -195,18 +194,11 @@ public void write(String query) throws Exception {
checkArgument(dataFileWriter != null,
"Avro DataFileWriter was not properly created");
logger.info("jdbcavroio : Starting write...");
Schema schema = dynamicDestinations.getSchema(getDestination());
try (ResultSet resultSet = executeQuery(query)) {
checkArgument(resultSet != null,
"JDBC resultSet was not properly created");
final Map<Integer, JdbcAvroRecord.SqlFunction<ResultSet, Object>>
mappings = JdbcAvroRecord.computeAllMappings(resultSet);
final int columnCount = resultSet.getMetaData().getColumnCount();
long startMs = metering.startWriteMeter();
final JdbcAvroRecordConverter converter = JdbcAvroRecordConverter.create(resultSet);
while (resultSet.next()) {
final GenericRecord genericRecord = JdbcAvroRecord.convertResultSetIntoAvroRecord(
schema, resultSet, mappings, columnCount);
this.dataFileWriter.append(genericRecord);
dataFileWriter.appendEncoded(converter.convertResultSetIntoAvroBytes());
this.metering.incrementRecordCount();
}
this.dataFileWriter.sync();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*-
* -\-\-
* DBeam Core
* --
* Copyright (C) 2016 - 2019 Spotify AB
* --
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* -/-/-
*/

package com.spotify.dbeam.avro;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;

import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;

public class JdbcAvroRecordConverter {
private final JdbcAvroRecord.SqlFunction<ResultSet, Object>[] mappings;
private final int columnCount;
private final ResultSet resultSet;
private final EncoderFactory encoderFactory = EncoderFactory.get();

public JdbcAvroRecordConverter(
JdbcAvroRecord.SqlFunction<ResultSet, Object>[] mappings, int columnCount,
ResultSet resultSet) {
this.mappings = mappings;
this.columnCount = columnCount;
this.resultSet = resultSet;
}

public static JdbcAvroRecordConverter create(ResultSet resultSet)
throws SQLException {
return new JdbcAvroRecordConverter(
computeAllMappings(resultSet),
resultSet.getMetaData().getColumnCount(),
resultSet);
}

@SuppressWarnings("unchecked")
static JdbcAvroRecord.SqlFunction<ResultSet, Object>[] computeAllMappings(ResultSet resultSet)
throws SQLException {
final ResultSetMetaData meta = resultSet.getMetaData();
final int columnCount = meta.getColumnCount();

final JdbcAvroRecord.SqlFunction<ResultSet, Object>[] mappings =
new JdbcAvroRecord.SqlFunction[columnCount + 1];

for (int i = 1; i <= columnCount; i++) {
mappings[i] = JdbcAvroRecord.computeMapping(meta, i);
}
return mappings;
}

private BinaryEncoder binaryEncoder = null;

public static class MyByteArrayOutputStream extends ByteArrayOutputStream {

MyByteArrayOutputStream(int size) {
super(size);
}

// provide access to internal buffer, avoiding copy
byte[] getBufffer() {
return buf;
}
}

/**
* Read data from a single row of result set and and encode into a Avro record as byte array.
* Directly reading and encoding has the benefit of less need for copying bytes between objects.
*/
public ByteBuffer convertResultSetIntoAvroBytes()
throws SQLException, IOException {
final MyByteArrayOutputStream out = new MyByteArrayOutputStream(columnCount * 64);
binaryEncoder = encoderFactory.directBinaryEncoder(out, binaryEncoder);
for (int i = 1; i <= columnCount; i++) {
final Object value = mappings[i].apply(resultSet);
if (value == null || resultSet.wasNull()) {
binaryEncoder.writeIndex(0);
binaryEncoder.writeNull();
} else {
binaryEncoder.writeIndex(1);
if (value instanceof String) {
binaryEncoder.writeString((String) value);
} else if (value instanceof Long) {
binaryEncoder.writeLong((Long) value);
} else if (value instanceof Integer) {
binaryEncoder.writeInt((Integer) value);
} else if (value instanceof Boolean) {
binaryEncoder.writeBoolean((Boolean) value);
} else if (value instanceof ByteBuffer) {
binaryEncoder.writeBytes((ByteBuffer) value);
} else if (value instanceof Double) {
binaryEncoder.writeDouble((Double) value);
} else if (value instanceof Float) {
binaryEncoder.writeFloat((Float) value);
}
}
}
binaryEncoder.flush();
return ByteBuffer.wrap(out.getBufffer());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,9 @@ class JdbcAvroRecordTest extends FlatSpec with Matchers with BeforeAndAfterAll {

record shouldNot be (null)
record.getSchema should be (schema)
record.getSchema.getFields.size() should be (12)
record.get(0) should be (record1._1)
record.get(0).toString should be (record1._1)
record.get(1) should be (record1._2.map(x => x : java.lang.Integer).orNull)
record.get(2) should be (record1._3.toString)
record.get(2).toString should be (record1._3.toString)
record.get(3) should be (record1._4)
record.get(4) should be (record1._5)
record.get(5) should be (new java.lang.Boolean(record1._6))
Expand Down

0 comments on commit ffc31da

Please sign in to comment.