Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added so that you can chose which notation you want to represent the … #5

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion data-quality-profiler/build.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
ThisBuild / organization := "io.github.6point6"
ThisBuild / version := "1.1.2"
ThisBuild / version := "1.2.0"

name := "data-quality-profiler-and-rules-engine"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import FlattenedRecords.dslJson
import uk.gov.ipt.das.dataprofiler.spark.Implicits._
import uk.gov.ipt.das.dataprofiler.identifier.AdditionalIdentifiers
import uk.gov.ipt.das.dataprofiler.profiler.input.record.keypreprocessor.{KeyPreProcessor, PassthroughKeyProcessor}
import uk.gov.ipt.das.dataprofiler.profiler.input.record.notation.Notation
import uk.gov.ipt.das.dataprofiler.profiler.input.record.notation.Notation.SquareBracketsNotation
import uk.gov.ipt.das.dataprofiler.profiler.input.records.ProfilableRecords
import uk.gov.ipt.das.dataprofiler.profiler.rule.ArrayQueryPath

Expand Down Expand Up @@ -81,10 +83,10 @@ object FlattenedRecords {
.`with`(new ConfigureScala))

def apply(profilableRecords: ProfilableRecords,
keyPreProcessor: KeyPreProcessor = PassthroughKeyProcessor()): FlattenedRecords =
keyPreProcessor: KeyPreProcessor = PassthroughKeyProcessor(), notation: Notation = SquareBracketsNotation): FlattenedRecords =
FlattenedRecords(
profilableRecords.records.map { profilableRecord =>
FlattenedProfilableRecord(profilableRecord, RecordFlattener(keyPreProcessor))
FlattenedProfilableRecord(profilableRecord, RecordFlattener(keyPreProcessor, notation))
}
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,40 +1,48 @@
package uk.gov.ipt.das.dataprofiler.profiler.input.record

import uk.gov.ipt.das.dataprofiler.profiler.input.record.keypreprocessor.KeyPreProcessor
import uk.gov.ipt.das.dataprofiler.profiler.input.record.notation.Notation
import uk.gov.ipt.das.dataprofiler.value.{ARRAY, BOOLEAN, DOUBLE, FLOAT, INT, LONG, NULL, RECORD, RecordValue, STRING}

case class RecordFlattener private (keyPreProcessor: KeyPreProcessor) {
case class RecordFlattener private (keyPreProcessor: KeyPreProcessor, notation: Notation) {

def flatten(record: ProfilableRecord): FlattenedProfilableRecord =
FlattenedProfilableRecord(
id = record.getId.getOrElse("_UNKNOWN"),
flatValues = parseBranch(flatPath = "", fullyQualifiedPath = "", record = record),
flatValues = parseBranch(flatPath = "", fullyQualifiedPath = "", record = record, notation),
additionalIdentifiers = record.additionalIdentifiers
)

private def parseValue(flatPath: String, fullyQualifiedPath: String, v: RecordValue): Seq[FlatValue] =
private def parseValue(flatPath: String, fullyQualifiedPath: String, v: RecordValue, notation: Notation): Seq[FlatValue] =
v.valueType match {
case NULL | STRING | BOOLEAN | INT | LONG | FLOAT | DOUBLE => List(FlatValue(flatPath, fullyQualifiedPath, v))
case ARRAY => v.asArray.zipWithIndex.flatMap { case (arrV: RecordValue, index: Int) =>
parseValue(s"$flatPath[]", s"$fullyQualifiedPath[$index]", arrV)
parseValue(s"$flatPath[]", createFullyQualifiedPath(fullyQualifiedPath, index, notation), arrV, notation)
}
case RECORD => parseBranch(flatPath, fullyQualifiedPath, v.asRecord)
case RECORD => parseBranch(flatPath, fullyQualifiedPath, v.asRecord, notation)
}

private def createFullyQualifiedPath(fullyQualifiedPath: String, index: Int, notation: Notation): String = {
notation match {
case Notation.DotNotation => s"$fullyQualifiedPath${notation.leftNotation}$index${notation.rightNotation}"
case Notation.SquareBracketsNotation => s"$fullyQualifiedPath${notation.leftNotation}$index${notation.rightNotation}"
}
}

private def stringFilter(key: String): String =
keyPreProcessor.keyPreProcessor(key)

private def parseBranch(flatPath: String, fullyQualifiedPath: String, record: ProfilableRecord): Seq[FlatValue] = {
private def parseBranch(flatPath: String, fullyQualifiedPath: String, record: ProfilableRecord, notation: Notation): Seq[FlatValue] = {
def genPath(path: String, key: String): String =
if (path == "") stringFilter(key) else s"$path.${stringFilter(key)}"

record.getEntries.flatMap { case (key: String, value: RecordValue) =>
parseValue(genPath(flatPath, key), genPath(fullyQualifiedPath, key), value)
parseValue(genPath(flatPath, key), genPath(fullyQualifiedPath, key), value, notation)
}
}

}
object RecordFlattener {
def apply(keyPreProcessor: KeyPreProcessor): RecordFlattener =
new RecordFlattener(keyPreProcessor)
def apply(keyPreProcessor: KeyPreProcessor, notation: Notation): RecordFlattener =
new RecordFlattener(keyPreProcessor, notation)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package uk.gov.ipt.das.dataprofiler.profiler.input.record.notation

sealed abstract class Notation(val leftNotation: String, val rightNotation: String) extends Serializable

object Notation {
final case object DotNotation extends Notation(".", "")
final case object SquareBracketsNotation extends Notation("[", "]")
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import org.scalatest.funspec.AnyFunSpec
import uk.gov.ipt.das.dataprofiler.profiler.input.reader.json.JsonInputReader.fromJsonStrings
import uk.gov.ipt.das.dataprofiler.identifier.IdentifierSource
import uk.gov.ipt.das.dataprofiler.profiler.ProfilerConfiguration
import uk.gov.ipt.das.dataprofiler.profiler.input.reader.json.IdentifierPaths
import uk.gov.ipt.das.dataprofiler.profiler.input.record.FlattenedRecords
import uk.gov.ipt.das.dataprofiler.profiler.input.reader.json.{IdentifierPaths, JsonInputReader}
import uk.gov.ipt.das.dataprofiler.profiler.input.record.keypreprocessor.PassthroughKeyProcessor
import uk.gov.ipt.das.dataprofiler.profiler.input.record.notation.Notation
import uk.gov.ipt.das.dataprofiler.profiler.input.record.{FlattenedProfilableRecord, FlattenedRecords, RecordFlattener}
import uk.gov.ipt.das.dataprofiler.profiler.rule.{ArrayQueryPath, FieldBasedMask, RecordSets}
import uk.gov.ipt.das.dataprofiler.profiler.rule.mask.OriginalValuePassthrough
import uk.gov.ipt.das.dataprofiler.wrapper.SparkSessionTestWrapper
Expand Down Expand Up @@ -118,4 +120,37 @@ class JsonLookupArrayProfilingTest extends AnyFunSpec with SparkSessionTestWrapp
assert(df.select("originalValue").tail(1).last.getString(0) === "guux")
assert(df.select("recordId").tail(1).last.getString(0) === "RECORD0")
}
it("should make flattened profileable records with dot notation on array") {
val recordStr =
s"""{
| "id": "RECORD0",
| "identifier": "information",
| "somearray": [
| {"name": "EDWARD"},
| {
| "matchme": "matches",
| "value": "foo",
| "otherVal": "find"
| },
| {
| "matchme": "DOESNTmatch",
| "value": "bar",
| "otherVal": "dontFind"
| },
| {
| "matchme": "DOESNTmatch",
| "value": "guux",
| "otherVal": "dontFind"
| }
| ],
| "valueToProfile": "THOMAS"
|}
|""".stripMargin

val result = FlattenedProfilableRecord(
JsonInputReader.parseString(recordStr),
RecordFlattener(keyPreProcessor = PassthroughKeyProcessor(), notation = Notation.SquareBracketsNotation))

println(result)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package uk.gov.ipt.das.dataprofiler.profiler

import org.scalatest.funspec.AnyFunSpec
import uk.gov.ipt.das.dataprofiler.profiler.input.reader.json.JsonInputReader.fromJsonStrings
import uk.gov.ipt.das.dataprofiler.profiler.input.record.notation.Notation
import uk.gov.ipt.das.dataprofiler.profiler.input.record.{FlatValue, FlattenedRecords}
import uk.gov.ipt.das.dataprofiler.value.{BooleanValue, DoubleValue, LongValue, StringValue}
import uk.gov.ipt.das.dataprofiler.wrapper.SparkSessionTestWrapper
Expand Down Expand Up @@ -165,4 +166,72 @@ class RecordFlattenerTest extends AnyFunSpec with SparkSessionTestWrapper {

}

it("should flatten the records in dot notation when specified in the flattener") {
val jsonRecord =
"""{
| "id": "SOME_ID",
| "aString": "some string value",
| "aBoolean": true,
| "aNumber": 1,
| "aDouble": 1.1111,
| "aRecord": {
| "subStr": "someString",
| "subRecord": {
| "strValue": "string!",
| "strValueAnother": "string two",
| "subArr": [
| 100,
| 10,
| 1
| ]
| }
| },
| "aArray": [
| "arr0",
| "arr1",
| "arr2"
| ],
| "aArrayOfRecords": [
| {"str": "hello0"},
| {"str": "hello1"},
| {"str": "hello2"}
| ]
|}
|""".stripMargin


val record = fromJsonStrings(spark, Seq(jsonRecord), idPath = Option("id"))

val flattened = FlattenedRecords(record, notation = Notation.DotNotation)

val flattenedCollected = flattened.records.collect()

assert(flattenedCollected.length == 1)

val flattenedRecord = flattenedCollected.head
val flatValues = flattenedRecord.flatValues.toList

println(flatValues)

assert(flatValues(0) === FlatValue("id", "id", StringValue("SOME_ID")))
assert(flatValues(1) === FlatValue("aString", "aString", StringValue("some string value")))
assert(flatValues(2) === FlatValue("aBoolean", "aBoolean", BooleanValue(true)))
assert(flatValues(3) === FlatValue("aNumber", "aNumber", LongValue(1)))
assert(flatValues(4) === FlatValue("aDouble", "aDouble", DoubleValue(1.1111)))
assert(flatValues(5) === FlatValue("aRecord.subStr", "aRecord.subStr", StringValue("someString")))
assert(flatValues(6) === FlatValue("aRecord.subRecord.strValue", "aRecord.subRecord.strValue", StringValue("string!")))
assert(flatValues(7) === FlatValue("aRecord.subRecord.strValueAnother", "aRecord.subRecord.strValueAnother", StringValue("string two")))
assert(flatValues(8) === FlatValue("aRecord.subRecord.subArr[]", "aRecord.subRecord.subArr.0", LongValue(100))) // without indices now
assert(flatValues(9) === FlatValue("aRecord.subRecord.subArr[]", "aRecord.subRecord.subArr.1", LongValue(10))) // without indices now
assert(flatValues(10) === FlatValue("aRecord.subRecord.subArr[]", "aRecord.subRecord.subArr.2", LongValue(1))) // without indices now
assert(flatValues(11) === FlatValue("aArray[]", "aArray.0", StringValue("arr0"))) // without indices now
assert(flatValues(12) === FlatValue("aArray[]", "aArray.1", StringValue("arr1"))) // without indices now
assert(flatValues(13) === FlatValue("aArray[]", "aArray.2", StringValue("arr2"))) // without indices now
assert(flatValues(14) === FlatValue("aArrayOfRecords[].str", "aArrayOfRecords.0.str", StringValue("hello0"))) // without indices now
assert(flatValues(15) === FlatValue("aArrayOfRecords[].str", "aArrayOfRecords.1.str", StringValue("hello1"))) // without indices now
assert(flatValues(16) === FlatValue("aArrayOfRecords[].str", "aArrayOfRecords.2.str", StringValue("hello2"))) // without indices now
}



}
Loading