From cba00b501ec1498f330dd6c6e3cee824b7857a65 Mon Sep 17 00:00:00 2001 From: Luis Bianchin Date: Wed, 10 Jul 2019 16:05:35 +0200 Subject: [PATCH 1/4] Update test to validate avro encoding --- .../spotify/dbeam/jobs/JdbcAvroJobTest.scala | 33 ++++++++++++------- 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/dbeam-core/src/test/scala/com/spotify/dbeam/jobs/JdbcAvroJobTest.scala b/dbeam-core/src/test/scala/com/spotify/dbeam/jobs/JdbcAvroJobTest.scala index fa3d29e1..dd340ca4 100644 --- a/dbeam-core/src/test/scala/com/spotify/dbeam/jobs/JdbcAvroJobTest.scala +++ b/dbeam-core/src/test/scala/com/spotify/dbeam/jobs/JdbcAvroJobTest.scala @@ -17,19 +17,20 @@ package com.spotify.dbeam.jobs +import com.spotify.dbeam.JdbcTestFixtures +import com.spotify.dbeam.avro.JdbcAvroMetering +import com.spotify.dbeam.options.OutputOptions + import java.io.File import java.nio.file.{Files, Path} import java.util +import java.util.stream.{Collectors, StreamSupport} import java.util.{Comparator, UUID} -import com.spotify.dbeam.JdbcTestFixtures -import com.spotify.dbeam.avro.JdbcAvroMetering -import com.spotify.dbeam.options.OutputOptions import org.apache.avro.Schema -import org.apache.avro.generic.GenericRecord -import org.apache.beam.sdk.io.AvroSource +import org.apache.avro.file.DataFileReader +import org.apache.avro.generic.{GenericDatumReader, GenericRecord} import org.apache.beam.sdk.options.PipelineOptionsFactory -import org.apache.beam.sdk.testing.SourceTestUtils import org.junit.runner.RunWith import org.scalatest._ import org.scalatest.junit.JUnitRunner @@ -74,8 +75,9 @@ class JdbcAvroJobTest extends FlatSpec with Matchers with BeforeAndAfterAll { "--username=", "--passwordFile=" + passwordFile.getAbsolutePath, "--table=COFFEES", - "--output=" + dir.getAbsolutePath) - ) + "--output=" + dir.getAbsolutePath, + "--avroCodec=deflate1" + )) val files: Array[File] = dir.listFiles() files.map(_.getName) should contain theSameElementsAs Seq( "_AVRO_SCHEMA.avsc", "_METRICS.json", "_SERVICE_METRICS.json", @@ -83,13 +85,20 @@ class JdbcAvroJobTest extends FlatSpec with Matchers with BeforeAndAfterAll { files.filter(_.getName.equals("_queries"))(0).listFiles().map(_.getName) should contain theSameElementsAs Seq("query_0.sql") val schema = new Schema.Parser().parse(new File(dir, "_AVRO_SCHEMA.avsc")) - val source: AvroSource[GenericRecord] = AvroSource - .from(new File(dir, "part-00000-of-00001.avro").toString) - .withSchema(schema) - val records: util.List[GenericRecord] = SourceTestUtils.readFromSource(source, null) + val records: util.List[GenericRecord] = + readAvroRecords(new File(dir, "part-00000-of-00001.avro"), schema) records should have size 2 } + private def readAvroRecords(avroFile: File, schema: Schema): util.List[GenericRecord] = { + val datum: GenericDatumReader[GenericRecord] = new GenericDatumReader(schema) + val dataFileReader = new DataFileReader(avroFile, datum) + val records: util.List[GenericRecord] = StreamSupport.stream(dataFileReader.spliterator(), false) + .collect(Collectors.toList()) + dataFileReader.close() + records + } + "JdbcAvroJob" should "have a default exit code" in { ExceptionHandling.exitCode(new IllegalStateException()) should be (49) } From 8049604b05c8d34924599ba5d12efc21e0b95eed Mon Sep 17 00:00:00 2001 From: Luis Bianchin Date: Wed, 10 Jul 2019 17:38:18 +0200 Subject: [PATCH 2/4] JdbcAvroRecordTest test read multiple --- .../spotify/dbeam/avro/JdbcAvroRecordTest.scala | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/dbeam-core/src/test/scala/com/spotify/dbeam/avro/JdbcAvroRecordTest.scala b/dbeam-core/src/test/scala/com/spotify/dbeam/avro/JdbcAvroRecordTest.scala index 3eccfeeb..e853a5c8 100644 --- a/dbeam-core/src/test/scala/com/spotify/dbeam/avro/JdbcAvroRecordTest.scala +++ b/dbeam-core/src/test/scala/com/spotify/dbeam/avro/JdbcAvroRecordTest.scala @@ -40,7 +40,7 @@ class JdbcAvroRecordTest extends FlatSpec with Matchers with BeforeAndAfterAll { private val record1 = JdbcTestFixtures.record1 override def beforeAll(): Unit = { - JdbcTestFixtures.createFixtures(db, Seq(JdbcTestFixtures.record1)) + JdbcTestFixtures.createFixtures(db, Seq(JdbcTestFixtures.record1, JdbcTestFixtures.record2)) } it should "create schema" in { @@ -139,20 +139,24 @@ class JdbcAvroRecordTest extends FlatSpec with Matchers with BeforeAndAfterAll { it should "encode jdbc result set to valid avro" in { val rs = db.source.createConnection().createStatement().executeQuery(s"SELECT * FROM COFFEES") val schema = JdbcAvroSchema.createAvroSchema(rs, "dbeam_generated","connection", "doc", false) - rs.next() - val converter = JdbcAvroRecordConverter.create(rs) val dataFileWriter = new DataFileWriter(new GenericDatumWriter[GenericRecord](schema)) val outputStream = new ByteArrayOutputStream() dataFileWriter.create(schema, outputStream) // convert and write - dataFileWriter.appendEncoded(converter.convertResultSetIntoAvroBytes()) + while (rs.next()) { + dataFileWriter.appendEncoded(converter.convertResultSetIntoAvroBytes()) + } dataFileWriter.flush() + outputStream.close() // transform to generic record val inputStream = new SeekableByteArrayInput(outputStream.toByteArray) val dataFileReader = new DataFileReader[GenericRecord](inputStream, new GenericDatumReader[GenericRecord](schema)) - val record: GenericRecord = dataFileReader.iterator().next() + val records: Seq[GenericRecord] = + dataFileReader.asInstanceOf[java.lang.Iterable[GenericRecord]].asScala.toSeq + records.size should be (2) + val record: GenericRecord = records.filter(r => r.get(0).toString == record1._1).head record shouldNot be (null) record.getSchema should be (schema) From 5e0fceac0a53ffac7dcbcc4143875d88d7cf37ee Mon Sep 17 00:00:00 2001 From: Luis Bianchin Date: Wed, 10 Jul 2019 17:42:31 +0200 Subject: [PATCH 3/4] Favor flush and close --- .../src/main/java/com/spotify/dbeam/avro/JdbcAvroIO.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroIO.java b/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroIO.java index 78966b7b..0c8e34d8 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroIO.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroIO.java @@ -201,7 +201,7 @@ public void write(String query) throws Exception { dataFileWriter.appendEncoded(converter.convertResultSetIntoAvroBytes()); this.metering.incrementRecordCount(); } - this.dataFileWriter.sync(); + this.dataFileWriter.flush(); this.metering.exposeWriteElapsedMs(System.currentTimeMillis() - startMs); this.metering.exposeWrittenBytes(this.countingOutputStream.getCount()); } @@ -214,7 +214,7 @@ protected void finishWrite() throws Exception { connection.close(); } if (dataFileWriter != null) { - dataFileWriter.flush(); + dataFileWriter.close(); } logger.info("jdbcavroio : Write finished"); } From 5059b23f57904d8488b5b707a8cad7e7c27c995f Mon Sep 17 00:00:00 2001 From: Luis Bianchin Date: Wed, 10 Jul 2019 17:42:50 +0200 Subject: [PATCH 4/4] Fix avro direct encoding Based on https://issues.apache.org/jira/browse/AVRO-1093 --- .../java/com/spotify/dbeam/avro/JdbcAvroRecordConverter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroRecordConverter.java b/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroRecordConverter.java index 813480a6..3a39c31d 100644 --- a/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroRecordConverter.java +++ b/dbeam-core/src/main/java/com/spotify/dbeam/avro/JdbcAvroRecordConverter.java @@ -114,7 +114,7 @@ public ByteBuffer convertResultSetIntoAvroBytes() } } binaryEncoder.flush(); - return ByteBuffer.wrap(out.getBufffer()); + return ByteBuffer.wrap(out.getBufffer(), 0, out.size()); } }