diff --git a/features/build.gradle b/features/build.gradle index a0f0bf961f..645d5b39c3 100644 --- a/features/build.gradle +++ b/features/build.gradle @@ -1,5 +1,6 @@ dependencies { compile project(':utils') + testCompile project(':testkit') // Scala graph compile "org.scala-graph:graph-core_$scalaVersion:$scalaGraphVersion" diff --git a/features/src/test/scala/com/salesforce/op/features/FeatureBuilderTest.scala b/features/src/test/scala/com/salesforce/op/features/FeatureBuilderTest.scala index 756f19d7e7..961c197c9e 100644 --- a/features/src/test/scala/com/salesforce/op/features/FeatureBuilderTest.scala +++ b/features/src/test/scala/com/salesforce/op/features/FeatureBuilderTest.scala @@ -35,19 +35,18 @@ import java.util import com.salesforce.op.aggregators._ import com.salesforce.op.features.types._ import com.salesforce.op.stages.FeatureGeneratorStage -import com.salesforce.op.test.{Passenger, TestSparkContext} -import com.twitter.algebird.MonoidAggregator +import com.salesforce.op.test.{FeatureAsserts, Passenger, TestSparkContext} import org.apache.spark.sql.{DataFrame, Row} import org.joda.time.Duration import org.junit.runner.RunWith +import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner -import org.scalatest.{FlatSpec, Matchers} -import scala.reflect.runtime.universe._ +case class FeatureBuilderContainerTest(s: String, l: Long, d: Double) @RunWith(classOf[JUnitRunner]) -class FeatureBuilderTest extends FlatSpec with TestSparkContext { +class FeatureBuilderTest extends FlatSpec with TestSparkContext with FeatureAsserts { private val name = "feature" private val passenger = Passenger.newBuilder() @@ -180,51 +179,3 @@ class FeatureBuilderTest extends FlatSpec with TestSparkContext { } } - -/** - * Assert feature instance on a given input/output - */ -object assertFeature extends Matchers { - - /** - * Assert feature instance on a given input/output - * - * @param f feature to assert - * @param in input value - * @param out expected output value - * @param name expected name - * @param isResponse is expected to be a response - * @param aggregator expected aggregator - * @param aggregateWindow expected aggregate window - * @param tti expected input typetag - * @param wtt expected output typetag - * @tparam I input type - * @tparam O output feature type - */ - def apply[I, O <: FeatureType](f: FeatureLike[O])( - in: I, out: O, name: String, isResponse: Boolean = false, - aggregator: WeakTypeTag[O] => MonoidAggregator[Event[O], _, O] = - (wtt: WeakTypeTag[O]) => MonoidAggregatorDefaults.aggregatorOf[O](wtt), - aggregateWindow: Option[Duration] = None - )(implicit tti: WeakTypeTag[I], wtt: WeakTypeTag[O]): Unit = { - f.name shouldBe name - f.isResponse shouldBe isResponse - f.parents shouldBe Nil - f.uid.startsWith(wtt.tpe.dealias.toString.split("\\.").last) shouldBe true - f.wtt.tpe =:= wtt.tpe shouldBe true - f.isRaw shouldBe true - f.typeName shouldBe wtt.tpe.typeSymbol.fullName - - f.originStage shouldBe a[FeatureGeneratorStage[_, _ <: FeatureType]] - val fg = f.originStage.asInstanceOf[FeatureGeneratorStage[I, O]] - fg.tti shouldBe tti - fg.aggregator shouldBe aggregator(wtt) - fg.extractFn(in) shouldBe out - fg.extractSource.nonEmpty shouldBe true // TODO we should eval the code here: eval(fg.extractSource)(in) - fg.getOutputFeatureName shouldBe name - fg.outputIsResponse shouldBe isResponse - fg.aggregateWindow shouldBe aggregateWindow - fg.uid.startsWith(classOf[FeatureGeneratorStage[I, O]].getSimpleName) shouldBe true - } - -} diff --git a/testkit/src/main/scala/com/salesforce/op/test/FeatureAsserts.scala b/testkit/src/main/scala/com/salesforce/op/test/FeatureAsserts.scala new file mode 100644 index 0000000000..833f4b3f78 --- /dev/null +++ b/testkit/src/main/scala/com/salesforce/op/test/FeatureAsserts.scala @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2017, Salesforce.com, Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * * Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.salesforce.op.test + +import com.salesforce.op.aggregators.{Event, MonoidAggregatorDefaults} +import com.salesforce.op.features.FeatureLike +import com.salesforce.op.features.types.FeatureType +import com.salesforce.op.stages.FeatureGeneratorStage +import com.twitter.algebird.MonoidAggregator +import org.joda.time.Duration +import org.scalatest.Matchers + +import scala.reflect.runtime.universe.WeakTypeTag + +/** + * Asserts for Feature instances on a given input/output + */ +trait FeatureAsserts extends Matchers { + + /** + * Assert Feature instance on a given input/output + * + * @param f feature to assert + * @param in input value + * @param out expected output value + * @param name expected name + * @param isResponse is expected to be a response + * @param aggregator expected aggregator + * @param aggregateWindow expected aggregate window + * @param tti expected input typetag + * @param wtt expected output typetag + * @tparam I input type + * @tparam O output feature type + */ + def assertFeature[I, O <: FeatureType](f: FeatureLike[O])( + in: I, out: O, name: String, isResponse: Boolean = false, + aggregator: WeakTypeTag[O] => MonoidAggregator[Event[O], _, O] = + (wtt: WeakTypeTag[O]) => MonoidAggregatorDefaults.aggregatorOf[O](wtt), + aggregateWindow: Option[Duration] = None + )(implicit tti: WeakTypeTag[I], wtt: WeakTypeTag[O]): Unit = { + f.name shouldBe name + f.isResponse shouldBe isResponse + f.parents shouldBe Nil + f.uid.startsWith(wtt.tpe.dealias.toString.split("\\.").last) shouldBe true + f.wtt.tpe =:= wtt.tpe shouldBe true + f.isRaw shouldBe true + f.typeName shouldBe wtt.tpe.typeSymbol.fullName + + f.originStage shouldBe a[FeatureGeneratorStage[_, _ <: FeatureType]] + val fg = f.originStage.asInstanceOf[FeatureGeneratorStage[I, O]] + fg.tti shouldBe tti + fg.aggregator shouldBe aggregator(wtt) + fg.extractFn(in) shouldBe out + fg.extractSource.nonEmpty shouldBe true // TODO we should eval the code here: eval(fg.extractSource)(in) + fg.getOutputFeatureName shouldBe name + fg.outputIsResponse shouldBe isResponse + fg.aggregateWindow shouldBe aggregateWindow + fg.uid.startsWith(classOf[FeatureGeneratorStage[I, O]].getSimpleName) shouldBe true + } + +} diff --git a/features/src/main/scala/com/salesforce/op/test/FeatureTestBase.scala b/testkit/src/main/scala/com/salesforce/op/test/FeatureTestBase.scala similarity index 100% rename from features/src/main/scala/com/salesforce/op/test/FeatureTestBase.scala rename to testkit/src/main/scala/com/salesforce/op/test/FeatureTestBase.scala diff --git a/features/src/main/scala/com/salesforce/op/test/TestFeatureBuilder.scala b/testkit/src/main/scala/com/salesforce/op/test/TestFeatureBuilder.scala similarity index 54% rename from features/src/main/scala/com/salesforce/op/test/TestFeatureBuilder.scala rename to testkit/src/main/scala/com/salesforce/op/test/TestFeatureBuilder.scala index 9a74fbcd2d..ea835bc74f 100644 --- a/features/src/main/scala/com/salesforce/op/test/TestFeatureBuilder.scala +++ b/testkit/src/main/scala/com/salesforce/op/test/TestFeatureBuilder.scala @@ -30,14 +30,20 @@ package com.salesforce.op.test -import com.salesforce.op.features.types.{FeatureType, FeatureTypeSparkConverter} +import java.text.SimpleDateFormat + +import com.salesforce.op.features.types._ import com.salesforce.op.features.{Feature, FeatureBuilder, FeatureSparkTypes} +import com.salesforce.op.testkit.RandomList.UniformGeolocation +import com.salesforce.op.testkit._ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.types.StructType +import scala.collection.mutable.ArrayBuffer import scala.reflect.runtime.universe._ + /** * Test Feature Builder is a factory for creating datasets and features for tests */ @@ -249,15 +255,161 @@ case object TestFeatureBuilder { f5name = DefaultFeatureNames.f5, data) } + /** + * Build a dataset with arbitrary amount of features of specified types + * + * @param data data + * @param spark spark session + * @return dataset with arbitrary amount of features of specified types + */ + def apply(data: Seq[FeatureType]*)(implicit spark: SparkSession): (DataFrame, Array[Feature[_ <: FeatureType]]) = { + val iterators = data.map(_.iterator).toArray + val rows = ArrayBuffer.empty[Row] + val featureValues = ArrayBuffer.empty[Array[FeatureType]] + + while (iterators.forall(_.hasNext)) { + val vals: Array[FeatureType] = iterators.map(_.next()) + val sparkVals = vals.map(FeatureTypeSparkConverter.toSpark) + rows += Row.fromSeq(sparkVals) + featureValues += vals + } + + require(rows.nonEmpty && featureValues.nonEmpty, "Number of rows must be positive") + + val features: Array[Feature[_ <: FeatureType]] = featureValues.head.zipWithIndex.map { case (f, i) => + val wtt = FeatureType.featureTypeTag(f.getClass.getName).asInstanceOf[WeakTypeTag[FeatureType]] + feature[FeatureType](name = s"f${i + 1}")(wtt) + }.toArray + + val schema = StructType(features.map(FeatureSparkTypes.toStructField(_))) + dataframeOfRows(schema, rows) -> features + } + + private val InitDate = new SimpleDateFormat("dd/MM/yy").parse("18/04/19") + + /** + * Build a dataset with random features of specified size + * + * @param numOfRows number of rows to generate (must be positive) + * @param spark spark session + * @return dataset with random features of specified size + */ + // scalastyle:off parameter.number + def random + ( + numOfRows: Int = 10 + )( + vectors: => Seq[OPVector] = RandomVector.sparse(RandomReal.normal[Real](), 10).limit(numOfRows), + textLists: => Seq[TextList] = RandomList.ofTexts(RandomText.strings(0, 10), maxLen = 10).limit(numOfRows), + dateLists: => Seq[DateList] = RandomList.ofDates( + RandomIntegral.dates(InitDate, 1000, 1000000), maxLen = 10 + ).limit(numOfRows), + dateTimeLists: => Seq[DateList] = RandomList.ofDateTimes( + RandomIntegral.datetimes(InitDate, 1000, 1000000), maxLen = 10 + ).limit(numOfRows), + geoLocations: => Seq[Geolocation] = RandomList.ofGeolocations.limit(numOfRows), + base64Maps: => Seq[Base64Map] = RandomMap.of[Base64, Base64Map](RandomText.base64(5, 10), 0, 5).limit(numOfRows), + binaryMaps: => Seq[BinaryMap] = RandomMap.ofBinaries(0.5, 0, 5).limit(numOfRows), + comboBoxMaps: => Seq[ComboBoxMap] = RandomMap.of[ComboBox, ComboBoxMap]( + RandomText.comboBoxes(List("choice1", "choice2", "choice3")), 0, 5 + ).limit(numOfRows), + currencyMaps: => Seq[CurrencyMap] = RandomMap.ofReals[Currency, CurrencyMap]( + RandomReal.poisson[Currency](5.0), 0, 5 + ).limit(numOfRows), + dateMaps: => Seq[DateMap] = RandomMap.of( + RandomIntegral.dates(InitDate, 1000, 1000000), 0, 5 + ).limit(numOfRows), + dateTimeMaps: => Seq[DateTimeMap] = RandomMap.of( + RandomIntegral.datetimes(InitDate, 1000, 1000000), 0, 5 + ).limit(numOfRows), + emailMaps: => Seq[EmailMap] = RandomMap.of( + RandomText.emailsOn(RandomStream.of(List("example.com", "test.com"))), 0, 5 + ).limit(numOfRows), + idMaps: => Seq[IDMap] = RandomMap.of[ID, IDMap](RandomText.ids, 0, 5).limit(numOfRows), + integralMaps: => Seq[IntegralMap] = RandomMap.of(RandomIntegral.integrals, 0, 5).limit(numOfRows), + multiPickListMaps: => Seq[MultiPickListMap] = RandomMap.ofMultiPickLists( + RandomMultiPickList.of(RandomText.countries, maxLen = 5), 0, 5 + ).limit(numOfRows), + percentMaps: => Seq[PercentMap] = RandomMap.ofReals[Percent, PercentMap]( + RandomReal.normal[Percent](50, 5), 0, 5 + ).limit(numOfRows), + phoneMaps: => Seq[PhoneMap] = RandomMap.of[Phone, PhoneMap](RandomText.phones, 0, 5).limit(numOfRows), + pickListMaps: => Seq[PickListMap] = RandomMap.of[PickList, PickListMap]( + RandomText.pickLists(List("pick1", "pick2", "pick3")), 0, 5 + ).limit(numOfRows), + realMaps: => Seq[RealMap] = RandomMap.ofReals[Real, RealMap](RandomReal.normal[Real](), 0, 5).limit(numOfRows), + textAreaMaps: => Seq[TextAreaMap] = RandomMap.of[TextArea, TextAreaMap]( + RandomText.textAreas(0, 50), 0, 5 + ).limit(numOfRows), + textMaps: => Seq[TextMap] = RandomMap.of[Text, TextMap](RandomText.strings(0, 10), 0, 5).limit(numOfRows), + urlMaps: => Seq[URLMap] = RandomMap.of[URL, URLMap](RandomText.urls, 0, 5).limit(numOfRows), + countryMaps: => Seq[CountryMap] = RandomMap.of[Country, CountryMap](RandomText.countries, 0, 5).limit(numOfRows), + stateMaps: => Seq[StateMap] = RandomMap.of[State, StateMap](RandomText.states, 0, 5).limit(numOfRows), + cityMaps: => Seq[CityMap] = RandomMap.of[City, CityMap](RandomText.cities, 0, 5).limit(numOfRows), + postalCodeMaps: => Seq[PostalCodeMap] = RandomMap.of[PostalCode, PostalCodeMap]( + RandomText.postalCodes, 0, 5 + ).limit(numOfRows), + streetMaps: => Seq[StreetMap] = RandomMap.of[Street, StreetMap](RandomText.streets, 0, 5).limit(numOfRows), + geoLocationMaps: => Seq[GeolocationMap] = RandomMap.ofGeolocations[UniformGeolocation]( + RandomList.ofGeolocations, 0, 5 + ).limit(numOfRows), + binaries: => Seq[Binary] = RandomBinary(0.5).limit(numOfRows), + currencies: => Seq[Currency] = RandomReal.poisson[Currency](5.0).limit(numOfRows), + dates: => Seq[Date] = RandomIntegral.dates(InitDate, 1000, 1000000).limit(numOfRows), + dateTimes: => Seq[DateTime] = RandomIntegral.datetimes(InitDate, 1000, 1000000).limit(numOfRows), + integrals: => Seq[Integral] = RandomIntegral.integrals.limit(numOfRows), + percents: => Seq[Percent] = RandomReal.normal[Percent](50, 5).limit(numOfRows), + reals: => Seq[Real] = RandomReal.normal[Real]().limit(numOfRows), + realNNs: => Seq[RealNN] = RandomReal.normal[RealNN]().limit(numOfRows), + multiPickLists: => Seq[MultiPickList] = RandomMultiPickList.of(RandomText.countries, maxLen = 5).limit(numOfRows), + base64s: => Seq[Base64] = RandomText.base64(5, 10).limit(numOfRows), + comboBoxes: => Seq[ComboBox] = RandomText.comboBoxes(List("choice1", "choice2", "choice3")).limit(numOfRows), + emails: => Seq[Email] = RandomText.emailsOn(RandomStream.of(List("example.com", "test.com"))).limit(numOfRows), + ids: => Seq[ID] = RandomText.ids.limit(numOfRows), + phones: => Seq[Phone] = RandomText.phones.limit(numOfRows), + pickLists: => Seq[PickList] = RandomText.pickLists(List("pick1", "pick2", "pick3")).limit(numOfRows), + texts: => Seq[Text] = RandomText.base64(5, 10).limit(numOfRows), + textAreas: => Seq[TextArea] = RandomText.textAreas(0, 50).limit(numOfRows), + urls: => Seq[URL] = RandomText.urls.limit(numOfRows), + countries: => Seq[Country] = RandomText.countries.limit(numOfRows), + states: => Seq[State] = RandomText.states.limit(numOfRows), + cities: => Seq[City] = RandomText.cities.limit(numOfRows), + postalCodes: => Seq[PostalCode] = RandomText.postalCodes.limit(numOfRows), + streets: => Seq[Street] = RandomText.streets.limit(numOfRows) + )(implicit spark: SparkSession): (DataFrame, Array[Feature[_ <: FeatureType]]) = { + + require(numOfRows > 0, "Number of rows must be positive") + + val data: Array[Seq[FeatureType]] = Array( + vectors, textLists, dateLists, dateTimeLists, geoLocations, + base64Maps, binaryMaps, comboBoxMaps, currencyMaps, dateMaps, + dateTimeMaps, emailMaps, idMaps, integralMaps, multiPickListMaps, + percentMaps, phoneMaps, pickListMaps, realMaps, textAreaMaps, + textMaps, urlMaps, countryMaps, stateMaps, cityMaps, + postalCodeMaps, streetMaps, geoLocationMaps, binaries, currencies, + dates, dateTimes, integrals, percents, reals, realNNs, + multiPickLists, base64s, comboBoxes, emails, ids, phones, + pickLists, texts, textAreas, urls, countries, states, + cities, postalCodes, streets) + + this.apply(data: _*)(spark) + } + // scalastyle:on + private def dataframe[T <: Product](schema: StructType, data: Seq[T])(implicit spark: SparkSession): DataFrame = { + val rows = data.map(p => Row.fromSeq( + p.productIterator.toSeq.map { case f: FeatureType => FeatureTypeSparkConverter.toSpark(f) } + )) + dataframeOfRows(schema, rows) + } + + private def dataframeOfRows(schema: StructType, data: Seq[Row])(implicit spark: SparkSession): DataFrame = { import spark.implicits._ implicit val rowEncoder = RowEncoder(schema) - - data.map(p => Row.fromSeq( - p.productIterator.toSeq.map { case f: FeatureType => FeatureTypeSparkConverter.toSpark(f) } - )).toDF() + data.toDF() } private def feature[T <: FeatureType](name: String)(implicit tt: WeakTypeTag[T]) = FeatureBuilder.fromRow[T](name)(tt).asPredictor + } diff --git a/testkit/src/main/scala/com/salesforce/op/testkit/RandomData.scala b/testkit/src/main/scala/com/salesforce/op/testkit/RandomData.scala index 71ead66d7a..931b8a5aaa 100644 --- a/testkit/src/main/scala/com/salesforce/op/testkit/RandomData.scala +++ b/testkit/src/main/scala/com/salesforce/op/testkit/RandomData.scala @@ -30,7 +30,7 @@ package com.salesforce.op.testkit -import com.salesforce.op.features.types.{FeatureType} +import com.salesforce.op.features.types.FeatureType import scala.language.postfixOps import scala.util.Random diff --git a/testkit/src/main/scala/com/salesforce/op/testkit/RandomReal.scala b/testkit/src/main/scala/com/salesforce/op/testkit/RandomReal.scala index 78e335b8a1..abd598cb99 100644 --- a/testkit/src/main/scala/com/salesforce/op/testkit/RandomReal.scala +++ b/testkit/src/main/scala/com/salesforce/op/testkit/RandomReal.scala @@ -133,7 +133,6 @@ object RandomReal { ): RandomReal[DataType] = RandomReal[DataType](new GammaGenerator(shape, scale)) - /** * Generator of real-number feature types with log-normal distribution * @@ -155,8 +154,10 @@ object RandomReal { * @tparam DataType the type of data * @return a generator of reals */ - def weibull[DataType <: Real : WeakTypeTag](alpha: Double = 1.0, beta: Double = 5.0): - RandomReal[DataType] = RandomReal[DataType](new WeibullGenerator(alpha, beta)) + def weibull[DataType <: Real : WeakTypeTag]( + alpha: Double = 1.0, beta: Double = 5.0 + ): RandomReal[DataType] = + RandomReal[DataType](new WeibullGenerator(alpha, beta)) class UniformDistribution(min: Double, max: Double) extends RandomDataGenerator[Double] { private val source = new UniformGenerator diff --git a/testkit/src/main/scala/com/salesforce/op/testkit/RandomText.scala b/testkit/src/main/scala/com/salesforce/op/testkit/RandomText.scala index 2ebf6339f3..755b3da33d 100644 --- a/testkit/src/main/scala/com/salesforce/op/testkit/RandomText.scala +++ b/testkit/src/main/scala/com/salesforce/op/testkit/RandomText.scala @@ -121,8 +121,8 @@ object RandomText { * Produces emails in the specified collection of random domains * domains can be provided with probabilities, e.g. * {{{ - * emails(RandomStream of List("gitmo.mil", "kremlin.ru")) - * emails(RandomStream of List("gitmo.mil", "kremlin.ru") distributedAs List(0.9, 1.0)) + * emailsOn(RandomStream of List("gitmo.mil", "kremlin.ru")) + * emailsOn(RandomStream of List("gitmo.mil", "kremlin.ru") distributedAs List(0.9, 1.0)) * }}} * * @param domains producer of random email domains diff --git a/testkit/src/main/scala/com/salesforce/op/testkit/RandomVector.scala b/testkit/src/main/scala/com/salesforce/op/testkit/RandomVector.scala index ed1ed22c95..91f8593582 100644 --- a/testkit/src/main/scala/com/salesforce/op/testkit/RandomVector.scala +++ b/testkit/src/main/scala/com/salesforce/op/testkit/RandomVector.scala @@ -33,8 +33,8 @@ package com.salesforce.op.testkit import language.postfixOps import com.salesforce.op.features.types._ import org.apache.spark.ml.linalg._ +import com.salesforce.op.utils.spark.RichVector._ -import scala.reflect.runtime.universe.WeakTypeTag import scala.util.Random import com.github.fommil.netlib.LAPACK.{getInstance => lapack} import org.netlib.util.intW @@ -49,70 +49,9 @@ case class RandomVector(values: RandomStream[Vector]) object RandomVector { - private def asDense(size: Int, source: Iterator[Double]) = - new DenseVector(source take size toArray) - - private def asSparse(size: Int, source: Iterator[Option[Double]]): SparseVector = { - val indexed = source.take(size).zipWithIndex collect { case (Some(x), i) => i -> x } toArray - - new SparseVector(size, indexed map (_._1), indexed map (_._2)) - } - - /** - * Builds a sparse vector out of a given partial function; a Map is also a partial function - * @param size vector size - * @param source function that produces (or not) a value at a given index - * @return - */ - private def asSparse(size: Int, source: PartialFunction[Int, Double]): SparseVector = { - asSparse(size, 0 to size map source.lift iterator) - } - type RandomReals = RandomData[Real] type RandomVectors = RandomStream[Vector] - /** - * Generating dense vectors of given length, from a real value generator. - * There is a big reason why RandomDataGenerator is not involved here. A bug in spark. - * @param length vector length - * @param valueGenerator generator of individual values - * @return a stream of dense vectors - */ - private def denseVectors(length: Int, valueGenerator: RandomReals): RandomVectors = { - val iteratorOfOptions = valueGenerator.streamOfValues - val iteratorOfDoubles: Iterator[Double] = iteratorOfOptions collect { case Some(x) => x } - - new RandomVectors(_ => asDense(length, iteratorOfDoubles)) { - override def reset(seed: Long): Unit = { - valueGenerator.reset(seed) - } - } - } - - private def denseVectorsFromStream(length: Int, values: RandomStream[Double]): RandomVectors = { - new RandomVectors(rng => asDense(length, values(rng))) - } - - /** - * Generating dense vectors of given length, from a real value generator. - * There is a big reason why RandomDataGenerator is not involved here. A bug in spark. - * @param length vector length - * @param valueGenerator generator of individual values - * @return - */ - private def sparseVectors(length: Int, valueGenerator: RandomReals): RandomVectors = { - val iteratorOfReals: Iterator[Real] = valueGenerator - val iteratorOfValues: Iterator[Option[Double]] = iteratorOfReals map (_.value) - - new RandomVectors(_ => asSparse(length, iteratorOfValues)) { - override def reset(seed: Long): Unit = { valueGenerator.reset(seed) } - } - } - - private def sparseVectors(length: Int, values: RandomStream[Option[Double]]): RandomVectors = { - new RandomVectors(rng => asSparse(length, values(rng))) - } - /** * Produces random dense vectors with a given distribution * @@ -135,46 +74,9 @@ object RandomVector { RandomVector(sparseVectors(length, dataSource)) } - private def add(xs: Array[Double], ys: Array[Double]): Array[Double] = { - require(xs.length == ys.length, - s"Expected lengths to be the same, got ${xs.length} and ${ys.length}") - val zs = new Array[Double](xs.length) - - for { i <- xs.indices } zs(i) = xs(i) + ys(i) - zs - } - - private def add(xs: Vector, ys: Vector): Vector = { - new DenseVector(add(xs.toDense.values, ys.toDense.values)) - } - - private def cholesky(m: Matrix): Matrix = { - val n = m.numCols - val resultCode = new intW(0) - val a = m.toArray - for { - i <- 0 until n - j <- i + 1 until n - } { - a(i + j * n) = 0.0 - } - - lapack.dpotrf("L", n, a, n, resultCode) - - resultCode.`val` match { - case code if code < 0 => - throw new IllegalStateException(s"LAPACK returned $code; arg ${-code} is illegal") - case code if code > 0 => - throw new IllegalArgumentException ( - s"LAPACKreturned $code because matrix is not positive definite.") - case _ => // do nothing - } - - new DenseMatrix(n, n, a) - } - /** * Produces normally distributed random vectors with a given mean and covariance matrix. + * * @param mean the mean value of generated vectors * @param covMatrix the covariance matrix of generate vectors * @return a RandomVector generator that produces vectors satisfying the given conditions @@ -186,7 +88,7 @@ object RandomVector { s"Expected mean vector size ${covMatrix.numRows}, got ${mean.size}") val transform = cholesky(covMatrix) val source = denseVectors(mean.size, RandomReal.normal()) - val vectors = source map (v => add(mean, transform.multiply(v))) + val vectors = source map (v => mean + transform.multiply(v)) new RandomVector(vectors) } @@ -214,10 +116,80 @@ object RandomVector { val size = probabilitiesOfOne.size val streams = probabilitiesOfOne map RandomStream.ofOnesAndZeros def values(rng: Random) = streams.map(_.apply(rng).next) - def vectors(rng: Random) = asDense(size, values(rng).iterator) - RandomVector(new RandomVectors(vectors)) } + + private def cholesky(m: Matrix): Matrix = { + val n = m.numCols + val resultCode = new intW(0) + val a = m.toArray + for { + i <- 0 until n + j <- i + 1 until n + } { + a(i + j * n) = 0.0 + } + + lapack.dpotrf("L", n, a, n, resultCode) + + resultCode.`val` match { + case code if code < 0 => + throw new IllegalStateException(s"LAPACK returned $code; arg ${-code} is illegal") + case code if code > 0 => + throw new IllegalArgumentException (s"LAPACK returned $code because matrix is not positive definite.") + case _ => // do nothing + } + + new DenseMatrix(n, n, a) + } + + private def asDense(size: Int, source: Iterator[Double]) = + new DenseVector(source take size toArray) + + private def asSparse(size: Int, source: Iterator[Option[Double]]): SparseVector = { + val indexed = source.take(size).zipWithIndex collect { case (Some(x), i) => i -> x } toArray + + new SparseVector(size, indexed map (_._1), indexed map (_._2)) + } + + /** + * Generating dense vectors of given length, from a real value generator. + * There is a big reason why RandomDataGenerator is not involved here. A bug in spark. + * @param length vector length + * @param valueGenerator generator of individual values + * @return a stream of dense vectors + */ + private def denseVectors(length: Int, valueGenerator: RandomReals): RandomVectors = { + val iteratorOfOptions = valueGenerator.streamOfValues + val iteratorOfDoubles: Iterator[Double] = iteratorOfOptions collect { case Some(x) => x } + + new RandomVectors(_ => asDense(length, iteratorOfDoubles)) { + override def reset(seed: Long): Unit = { + valueGenerator.reset(seed) + } + } + } + + private def denseVectorsFromStream(length: Int, values: RandomStream[Double]): RandomVectors = { + new RandomVectors(rng => asDense(length, values(rng))) + } + + /** + * Generating dense vectors of given length, from a real value generator. + * There is a big reason why RandomDataGenerator is not involved here. A bug in spark. + * @param length vector length + * @param valueGenerator generator of individual values + * @return + */ + private def sparseVectors(length: Int, valueGenerator: RandomReals): RandomVectors = { + val iteratorOfReals: Iterator[Real] = valueGenerator + val iteratorOfValues: Iterator[Option[Double]] = iteratorOfReals map (_.value) + + new RandomVectors(_ => asSparse(length, iteratorOfValues)) { + override def reset(seed: Long): Unit = { valueGenerator.reset(seed) } + } + } + } diff --git a/features/src/test/scala/com/salesforce/op/features/TestFeatureBuilderTest.scala b/testkit/src/test/scala/com/salesforce/op/test/TestFeatureBuilderTest.scala similarity index 72% rename from features/src/test/scala/com/salesforce/op/features/TestFeatureBuilderTest.scala rename to testkit/src/test/scala/com/salesforce/op/test/TestFeatureBuilderTest.scala index b8b0fcb88a..d2904f7e7c 100644 --- a/features/src/test/scala/com/salesforce/op/features/TestFeatureBuilderTest.scala +++ b/testkit/src/test/scala/com/salesforce/op/test/TestFeatureBuilderTest.scala @@ -28,11 +28,10 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -package com.salesforce.op.features - +package com.salesforce.op.test +import com.salesforce.op.features.{FeatureLike, FeatureSparkTypes} import com.salesforce.op.features.types._ -import com.salesforce.op.test.{TestFeatureBuilder, TestSparkContext} import com.salesforce.op.utils.spark.RichRow._ import org.apache.spark.sql.DataFrame import org.junit.runner.RunWith @@ -40,11 +39,8 @@ import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner -case class FeatureBuilderContainerTest(s: String, l: Long, d: Double) - - @RunWith(classOf[JUnitRunner]) -class TestFeatureBuilderTest extends FlatSpec with TestSparkContext { +class TestFeatureBuilderTest extends FlatSpec with TestSparkContext with FeatureAsserts { Spec(TestFeatureBuilder.getClass) should "create a dataset with one feature" in { val res@(ds, f1) = TestFeatureBuilder[Real](Seq(Real(1), Real(2L), Real(3.1f), Real(4.5))) @@ -126,6 +122,44 @@ class TestFeatureBuilderTest extends FlatSpec with TestSparkContext { assertResults(ds, res, expected = Seq(("one", 1, 1.0, -1, List("1", "2")), ("two", 2, 2.3, 1, List("3", "4")))) } + it should "create a dataset with arbitrary amount of features" in { + val (ds, features) = TestFeatureBuilder( + Seq(Real(0.0)), Seq(Text("a")), Seq(Integral(5L)), Seq(Real(1.0)), Seq(Text("b")), + Seq(MultiPickList(Set("3", "4"))), Seq(Real(-3.0)) + ) + features.length shouldBe 7 + ds.count() shouldBe 1 + ds.schema.fields.map(f => f.name -> f.dataType) should contain theSameElementsInOrderAs + features.map(f => f.name -> FeatureSparkTypes.sparkTypeOf(f.wtt)) + + assertFeature(features(0).asInstanceOf[FeatureLike[Real]])(name = "f1", in = ds.head(), out = Real(0.0)) + assertFeature(features(1).asInstanceOf[FeatureLike[Text]])(name = "f2", in = ds.head(), out = Text("a")) + assertFeature(features(2).asInstanceOf[FeatureLike[Integral]])(name = "f3", in = ds.head(), out = Integral(5L)) + assertFeature(features(3).asInstanceOf[FeatureLike[Real]])(name = "f4", in = ds.head(), out = Real(1.0)) + assertFeature(features(4).asInstanceOf[FeatureLike[Text]])(name = "f5", in = ds.head(), out = Text("b")) + assertFeature(features(5).asInstanceOf[FeatureLike[MultiPickList]])( + name = "f6", in = ds.head(), out = MultiPickList(Set("3", "4"))) + assertFeature(features(6).asInstanceOf[FeatureLike[Real]])(name = "f7", in = ds.head(), out = Real(-3.0)) + } + + it should "create a dataset with all random features" in { + val numOfRows = 15 + val (ds, features) = TestFeatureBuilder.random(numOfRows = numOfRows)() + features.length shouldBe 51 + + ds.schema.fields.map(f => f.name -> f.dataType) should contain theSameElementsInOrderAs + features.map(f => f.name -> FeatureSparkTypes.sparkTypeOf(f.wtt)) + + ds.count() shouldBe numOfRows + } + + it should "error creating a dataset with invalid number of rows" in { + the[IllegalArgumentException] thrownBy TestFeatureBuilder.random(numOfRows = 0)() + the[IllegalArgumentException] thrownBy TestFeatureBuilder.random(numOfRows = -1)() + the[IllegalArgumentException] thrownBy TestFeatureBuilder(Seq.empty[Real], + Seq.empty[Real], Seq.empty[Real], Seq.empty[Real], Seq.empty[Real], Seq.empty[Real]) + } + private def assertResults(ds: DataFrame, res: Product, expected: Traversable[Any]): Unit = { val features = res.productIterator.collect { case f: FeatureLike[_] => f }.toArray @@ -133,7 +167,7 @@ class TestFeatureBuilderTest extends FlatSpec with TestSparkContext { features.map(f => f.name -> FeatureSparkTypes.sparkTypeOf(f.wtt)) ds.collect().map(row => features.map(f => row.getAny(f.name))) should contain theSameElementsInOrderAs - expected.map{ case v: Product => v; case v => Tuple1(v) }.map(_.productIterator.toArray) + expected.map { case v: Product => v; case v => Tuple1(v) }.map(_.productIterator.toArray) } }