diff --git a/data-quality-profiler/build.sbt b/data-quality-profiler/build.sbt index b4b43eb..1e8fe9a 100644 --- a/data-quality-profiler/build.sbt +++ b/data-quality-profiler/build.sbt @@ -1,5 +1,5 @@ ThisBuild / organization := "io.github.6point6" -ThisBuild / version := "1.1.2" +ThisBuild / version := "1.2.0" name := "data-quality-profiler-and-rules-engine" diff --git a/data-quality-profiler/src/main/scala/uk/gov/ipt/das/dataprofiler/profiler/input/record/FlattenedRecords.scala b/data-quality-profiler/src/main/scala/uk/gov/ipt/das/dataprofiler/profiler/input/record/FlattenedRecords.scala index 299db9b..431af3b 100644 --- a/data-quality-profiler/src/main/scala/uk/gov/ipt/das/dataprofiler/profiler/input/record/FlattenedRecords.scala +++ b/data-quality-profiler/src/main/scala/uk/gov/ipt/das/dataprofiler/profiler/input/record/FlattenedRecords.scala @@ -10,6 +10,8 @@ import FlattenedRecords.dslJson import uk.gov.ipt.das.dataprofiler.spark.Implicits._ import uk.gov.ipt.das.dataprofiler.identifier.AdditionalIdentifiers import uk.gov.ipt.das.dataprofiler.profiler.input.record.keypreprocessor.{KeyPreProcessor, PassthroughKeyProcessor} +import uk.gov.ipt.das.dataprofiler.profiler.input.record.notation.Notation +import uk.gov.ipt.das.dataprofiler.profiler.input.record.notation.Notation.SquareBracketsNotation import uk.gov.ipt.das.dataprofiler.profiler.input.records.ProfilableRecords import uk.gov.ipt.das.dataprofiler.profiler.rule.ArrayQueryPath @@ -81,10 +83,10 @@ object FlattenedRecords { .`with`(new ConfigureScala)) def apply(profilableRecords: ProfilableRecords, - keyPreProcessor: KeyPreProcessor = PassthroughKeyProcessor()): FlattenedRecords = + keyPreProcessor: KeyPreProcessor = PassthroughKeyProcessor(), notation: Notation = SquareBracketsNotation): FlattenedRecords = FlattenedRecords( profilableRecords.records.map { profilableRecord => - FlattenedProfilableRecord(profilableRecord, RecordFlattener(keyPreProcessor)) + FlattenedProfilableRecord(profilableRecord, RecordFlattener(keyPreProcessor, notation)) } ) diff --git a/data-quality-profiler/src/main/scala/uk/gov/ipt/das/dataprofiler/profiler/input/record/RecordFlattener.scala b/data-quality-profiler/src/main/scala/uk/gov/ipt/das/dataprofiler/profiler/input/record/RecordFlattener.scala index b10cf44..c133d79 100644 --- a/data-quality-profiler/src/main/scala/uk/gov/ipt/das/dataprofiler/profiler/input/record/RecordFlattener.scala +++ b/data-quality-profiler/src/main/scala/uk/gov/ipt/das/dataprofiler/profiler/input/record/RecordFlattener.scala @@ -1,40 +1,48 @@ package uk.gov.ipt.das.dataprofiler.profiler.input.record import uk.gov.ipt.das.dataprofiler.profiler.input.record.keypreprocessor.KeyPreProcessor +import uk.gov.ipt.das.dataprofiler.profiler.input.record.notation.Notation import uk.gov.ipt.das.dataprofiler.value.{ARRAY, BOOLEAN, DOUBLE, FLOAT, INT, LONG, NULL, RECORD, RecordValue, STRING} -case class RecordFlattener private (keyPreProcessor: KeyPreProcessor) { +case class RecordFlattener private (keyPreProcessor: KeyPreProcessor, notation: Notation) { def flatten(record: ProfilableRecord): FlattenedProfilableRecord = FlattenedProfilableRecord( id = record.getId.getOrElse("_UNKNOWN"), - flatValues = parseBranch(flatPath = "", fullyQualifiedPath = "", record = record), + flatValues = parseBranch(flatPath = "", fullyQualifiedPath = "", record = record, notation), additionalIdentifiers = record.additionalIdentifiers ) - private def parseValue(flatPath: String, fullyQualifiedPath: String, v: RecordValue): Seq[FlatValue] = + private def parseValue(flatPath: String, fullyQualifiedPath: String, v: RecordValue, notation: Notation): Seq[FlatValue] = v.valueType match { case NULL | STRING | BOOLEAN | INT | LONG | FLOAT | DOUBLE => List(FlatValue(flatPath, fullyQualifiedPath, v)) case ARRAY => v.asArray.zipWithIndex.flatMap { case (arrV: RecordValue, index: Int) => - parseValue(s"$flatPath[]", s"$fullyQualifiedPath[$index]", arrV) + parseValue(s"$flatPath[]", createFullyQualifiedPath(fullyQualifiedPath, index, notation), arrV, notation) } - case RECORD => parseBranch(flatPath, fullyQualifiedPath, v.asRecord) + case RECORD => parseBranch(flatPath, fullyQualifiedPath, v.asRecord, notation) } + private def createFullyQualifiedPath(fullyQualifiedPath: String, index: Int, notation: Notation): String = { + notation match { + case Notation.DotNotation => s"$fullyQualifiedPath${notation.leftNotation}$index${notation.rightNotation}" + case Notation.SquareBracketsNotation => s"$fullyQualifiedPath${notation.leftNotation}$index${notation.rightNotation}" + } + } + private def stringFilter(key: String): String = keyPreProcessor.keyPreProcessor(key) - private def parseBranch(flatPath: String, fullyQualifiedPath: String, record: ProfilableRecord): Seq[FlatValue] = { + private def parseBranch(flatPath: String, fullyQualifiedPath: String, record: ProfilableRecord, notation: Notation): Seq[FlatValue] = { def genPath(path: String, key: String): String = if (path == "") stringFilter(key) else s"$path.${stringFilter(key)}" record.getEntries.flatMap { case (key: String, value: RecordValue) => - parseValue(genPath(flatPath, key), genPath(fullyQualifiedPath, key), value) + parseValue(genPath(flatPath, key), genPath(fullyQualifiedPath, key), value, notation) } } } object RecordFlattener { - def apply(keyPreProcessor: KeyPreProcessor): RecordFlattener = - new RecordFlattener(keyPreProcessor) + def apply(keyPreProcessor: KeyPreProcessor, notation: Notation): RecordFlattener = + new RecordFlattener(keyPreProcessor, notation) } \ No newline at end of file diff --git a/data-quality-profiler/src/main/scala/uk/gov/ipt/das/dataprofiler/profiler/input/record/notation/Notation.scala b/data-quality-profiler/src/main/scala/uk/gov/ipt/das/dataprofiler/profiler/input/record/notation/Notation.scala new file mode 100644 index 0000000..94480e4 --- /dev/null +++ b/data-quality-profiler/src/main/scala/uk/gov/ipt/das/dataprofiler/profiler/input/record/notation/Notation.scala @@ -0,0 +1,8 @@ +package uk.gov.ipt.das.dataprofiler.profiler.input.record.notation + +sealed abstract class Notation(val leftNotation: String, val rightNotation: String) extends Serializable + +object Notation { + final case object DotNotation extends Notation(".", "") + final case object SquareBracketsNotation extends Notation("[", "]") +} diff --git a/data-quality-profiler/src/test/scala/uk/gov/ipt/das/dataprofiler/parser/JsonLookupArrayProfilingTest.scala b/data-quality-profiler/src/test/scala/uk/gov/ipt/das/dataprofiler/parser/JsonLookupArrayProfilingTest.scala index 28b118f..1e72ffa 100644 --- a/data-quality-profiler/src/test/scala/uk/gov/ipt/das/dataprofiler/parser/JsonLookupArrayProfilingTest.scala +++ b/data-quality-profiler/src/test/scala/uk/gov/ipt/das/dataprofiler/parser/JsonLookupArrayProfilingTest.scala @@ -4,8 +4,10 @@ import org.scalatest.funspec.AnyFunSpec import uk.gov.ipt.das.dataprofiler.profiler.input.reader.json.JsonInputReader.fromJsonStrings import uk.gov.ipt.das.dataprofiler.identifier.IdentifierSource import uk.gov.ipt.das.dataprofiler.profiler.ProfilerConfiguration -import uk.gov.ipt.das.dataprofiler.profiler.input.reader.json.IdentifierPaths -import uk.gov.ipt.das.dataprofiler.profiler.input.record.FlattenedRecords +import uk.gov.ipt.das.dataprofiler.profiler.input.reader.json.{IdentifierPaths, JsonInputReader} +import uk.gov.ipt.das.dataprofiler.profiler.input.record.keypreprocessor.PassthroughKeyProcessor +import uk.gov.ipt.das.dataprofiler.profiler.input.record.notation.Notation +import uk.gov.ipt.das.dataprofiler.profiler.input.record.{FlattenedProfilableRecord, FlattenedRecords, RecordFlattener} import uk.gov.ipt.das.dataprofiler.profiler.rule.{ArrayQueryPath, FieldBasedMask, RecordSets} import uk.gov.ipt.das.dataprofiler.profiler.rule.mask.OriginalValuePassthrough import uk.gov.ipt.das.dataprofiler.wrapper.SparkSessionTestWrapper @@ -118,4 +120,37 @@ class JsonLookupArrayProfilingTest extends AnyFunSpec with SparkSessionTestWrapp assert(df.select("originalValue").tail(1).last.getString(0) === "guux") assert(df.select("recordId").tail(1).last.getString(0) === "RECORD0") } + it("should make flattened profileable records with dot notation on array") { + val recordStr = + s"""{ + | "id": "RECORD0", + | "identifier": "information", + | "somearray": [ + | {"name": "EDWARD"}, + | { + | "matchme": "matches", + | "value": "foo", + | "otherVal": "find" + | }, + | { + | "matchme": "DOESNTmatch", + | "value": "bar", + | "otherVal": "dontFind" + | }, + | { + | "matchme": "DOESNTmatch", + | "value": "guux", + | "otherVal": "dontFind" + | } + | ], + | "valueToProfile": "THOMAS" + |} + |""".stripMargin + + val result = FlattenedProfilableRecord( + JsonInputReader.parseString(recordStr), + RecordFlattener(keyPreProcessor = PassthroughKeyProcessor(), notation = Notation.SquareBracketsNotation)) + + println(result) + } } \ No newline at end of file diff --git a/data-quality-profiler/src/test/scala/uk/gov/ipt/das/dataprofiler/profiler/RecordFlattenerTest.scala b/data-quality-profiler/src/test/scala/uk/gov/ipt/das/dataprofiler/profiler/RecordFlattenerTest.scala index 6863358..2d8126d 100644 --- a/data-quality-profiler/src/test/scala/uk/gov/ipt/das/dataprofiler/profiler/RecordFlattenerTest.scala +++ b/data-quality-profiler/src/test/scala/uk/gov/ipt/das/dataprofiler/profiler/RecordFlattenerTest.scala @@ -2,6 +2,7 @@ package uk.gov.ipt.das.dataprofiler.profiler import org.scalatest.funspec.AnyFunSpec import uk.gov.ipt.das.dataprofiler.profiler.input.reader.json.JsonInputReader.fromJsonStrings +import uk.gov.ipt.das.dataprofiler.profiler.input.record.notation.Notation import uk.gov.ipt.das.dataprofiler.profiler.input.record.{FlatValue, FlattenedRecords} import uk.gov.ipt.das.dataprofiler.value.{BooleanValue, DoubleValue, LongValue, StringValue} import uk.gov.ipt.das.dataprofiler.wrapper.SparkSessionTestWrapper @@ -165,4 +166,72 @@ class RecordFlattenerTest extends AnyFunSpec with SparkSessionTestWrapper { } + it("should flatten the records in dot notation when specified in the flattener") { + val jsonRecord = + """{ + | "id": "SOME_ID", + | "aString": "some string value", + | "aBoolean": true, + | "aNumber": 1, + | "aDouble": 1.1111, + | "aRecord": { + | "subStr": "someString", + | "subRecord": { + | "strValue": "string!", + | "strValueAnother": "string two", + | "subArr": [ + | 100, + | 10, + | 1 + | ] + | } + | }, + | "aArray": [ + | "arr0", + | "arr1", + | "arr2" + | ], + | "aArrayOfRecords": [ + | {"str": "hello0"}, + | {"str": "hello1"}, + | {"str": "hello2"} + | ] + |} + |""".stripMargin + + + val record = fromJsonStrings(spark, Seq(jsonRecord), idPath = Option("id")) + + val flattened = FlattenedRecords(record, notation = Notation.DotNotation) + + val flattenedCollected = flattened.records.collect() + + assert(flattenedCollected.length == 1) + + val flattenedRecord = flattenedCollected.head + val flatValues = flattenedRecord.flatValues.toList + + println(flatValues) + + assert(flatValues(0) === FlatValue("id", "id", StringValue("SOME_ID"))) + assert(flatValues(1) === FlatValue("aString", "aString", StringValue("some string value"))) + assert(flatValues(2) === FlatValue("aBoolean", "aBoolean", BooleanValue(true))) + assert(flatValues(3) === FlatValue("aNumber", "aNumber", LongValue(1))) + assert(flatValues(4) === FlatValue("aDouble", "aDouble", DoubleValue(1.1111))) + assert(flatValues(5) === FlatValue("aRecord.subStr", "aRecord.subStr", StringValue("someString"))) + assert(flatValues(6) === FlatValue("aRecord.subRecord.strValue", "aRecord.subRecord.strValue", StringValue("string!"))) + assert(flatValues(7) === FlatValue("aRecord.subRecord.strValueAnother", "aRecord.subRecord.strValueAnother", StringValue("string two"))) + assert(flatValues(8) === FlatValue("aRecord.subRecord.subArr[]", "aRecord.subRecord.subArr.0", LongValue(100))) // without indices now + assert(flatValues(9) === FlatValue("aRecord.subRecord.subArr[]", "aRecord.subRecord.subArr.1", LongValue(10))) // without indices now + assert(flatValues(10) === FlatValue("aRecord.subRecord.subArr[]", "aRecord.subRecord.subArr.2", LongValue(1))) // without indices now + assert(flatValues(11) === FlatValue("aArray[]", "aArray.0", StringValue("arr0"))) // without indices now + assert(flatValues(12) === FlatValue("aArray[]", "aArray.1", StringValue("arr1"))) // without indices now + assert(flatValues(13) === FlatValue("aArray[]", "aArray.2", StringValue("arr2"))) // without indices now + assert(flatValues(14) === FlatValue("aArrayOfRecords[].str", "aArrayOfRecords.0.str", StringValue("hello0"))) // without indices now + assert(flatValues(15) === FlatValue("aArrayOfRecords[].str", "aArrayOfRecords.1.str", StringValue("hello1"))) // without indices now + assert(flatValues(16) === FlatValue("aArrayOfRecords[].str", "aArrayOfRecords.2.str", StringValue("hello2"))) // without indices now + } + + + }