From 456914dbb8d7320e8fa529e8cf9a4b51ef1caf88 Mon Sep 17 00:00:00 2001 From: "Jason T. Brown" Date: Thu, 21 Nov 2019 13:49:56 -0500 Subject: [PATCH 1/8] Add DataFrame tileStats extension Initial implementation with quantiles Signed-off-by: Jason T. Brown --- .../extensions/DataFrameMethods.scala | 3 + .../extensions/RasterFrameStatFunctions.scala | 76 +++++++++++++++++++ .../rasterframes/stats/package.scala | 59 ++++++++++++++ .../rasterframes/RasterFramesStatsSpec.scala | 66 ++++++++++++++++ 4 files changed, 204 insertions(+) create mode 100644 core/src/main/scala/org/locationtech/rasterframes/extensions/RasterFrameStatFunctions.scala create mode 100644 core/src/main/scala/org/locationtech/rasterframes/stats/package.scala create mode 100644 core/src/test/scala/org/locationtech/rasterframes/RasterFramesStatsSpec.scala diff --git a/core/src/main/scala/org/locationtech/rasterframes/extensions/DataFrameMethods.scala b/core/src/main/scala/org/locationtech/rasterframes/extensions/DataFrameMethods.scala index 9a57b9dd8..32d8e27da 100644 --- a/core/src/main/scala/org/locationtech/rasterframes/extensions/DataFrameMethods.scala +++ b/core/src/main/scala/org/locationtech/rasterframes/extensions/DataFrameMethods.scala @@ -155,6 +155,9 @@ trait DataFrameMethods[DF <: DataFrame] extends MethodExtensions[DF] with Metada def withPrefixedColumnNames(prefix: String): DF = self.columns.foldLeft(self)((df, c) ⇒ df.withColumnRenamed(c, s"$prefix$c").asInstanceOf[DF]) + /** */ + def tileStat(): RasterFrameStatFunctions = new RasterFrameStatFunctions(self) + /** * Performs a jeft join on the dataframe `right` to this one, reprojecting and merging tiles as necessary. * The operation is logically a "left outer" join, with the left side also determining the target CRS and extents. diff --git a/core/src/main/scala/org/locationtech/rasterframes/extensions/RasterFrameStatFunctions.scala b/core/src/main/scala/org/locationtech/rasterframes/extensions/RasterFrameStatFunctions.scala new file mode 100644 index 000000000..e0157eb0e --- /dev/null +++ b/core/src/main/scala/org/locationtech/rasterframes/extensions/RasterFrameStatFunctions.scala @@ -0,0 +1,76 @@ +package org.locationtech.rasterframes.extensions + +import org.locationtech.rasterframes.stats._ +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.functions.col + +final class RasterFrameStatFunctions private[rasterframes](df: DataFrame) { + + /** + * Calculates the approximate quantiles of a numerical column of a DataFrame. + * + * The result of this algorithm has the following deterministic bound: + * If the DataFrame has N elements and if we request the quantile at probability `p` up to error + * `err`, then the algorithm will return a sample `x` from the DataFrame so that the *exact* rank + * of `x` is close to (p * N). + * More precisely, + * + * {{{ + * floor((p - err) * N) <= rank(x) <= ceil((p + err) * N) + * }}} + * + * This method implements a variation of the Greenwald-Khanna algorithm (with some speed + * optimizations). + * The algorithm was first present in + * Space-efficient Online Computation of Quantile Summaries by Greenwald and Khanna. + * + * @param col the name of the numerical column + * @param probabilities a list of quantile probabilities + * Each number must belong to [0, 1]. + * For example 0 is the minimum, 0.5 is the median, 1 is the maximum. + * @param relativeError The relative target precision to achieve (greater than or equal to 0). + * If set to zero, the exact quantiles are computed, which could be very expensive. + * Note that values greater than 1 are accepted but give the same result as 1. + * @return the approximate quantiles at the given probabilities + * + * @note null and NaN values will be removed from the numerical column before calculation. If + * the dataframe is empty or the column only contains null or NaN, an empty array is returned. + * + * @since 2.0.0 + */ + def approxTileQuantile( + col: String, + probabilities: Array[Double], + relativeError: Double): Array[Double] = { + approxTileQuantile(Array(col), probabilities, relativeError).head + } + + /** + * Calculates the approximate quantiles of numerical columns of a DataFrame. + * @see `approxQuantile(col:Str* approxQuantile)` for detailed description. + * + * @param cols the names of the numerical columns + * @param probabilities a list of quantile probabilities + * Each number must belong to [0, 1]. + * For example 0 is the minimum, 0.5 is the median, 1 is the maximum. + * @param relativeError The relative target precision to achieve (greater than or equal to 0). + * If set to zero, the exact quantiles are computed, which could be very expensive. + * Note that values greater than 1 are accepted but give the same result as 1. + * @return the approximate quantiles at the given probabilities of each column + * + * @note null and NaN values will be ignored in numerical columns before calculation. For + * columns only containing null or NaN values, an empty array is returned. + * + */ + def approxTileQuantile( + cols: Array[String], + probabilities: Array[Double], + relativeError: Double): Array[Array[Double]] = { + multipleApproxQuantiles( + df.select(cols.map(col): _*), + cols, + probabilities, + relativeError).map(_.toArray).toArray + } + +} diff --git a/core/src/main/scala/org/locationtech/rasterframes/stats/package.scala b/core/src/main/scala/org/locationtech/rasterframes/stats/package.scala new file mode 100644 index 000000000..02451b235 --- /dev/null +++ b/core/src/main/scala/org/locationtech/rasterframes/stats/package.scala @@ -0,0 +1,59 @@ +package org.locationtech.rasterframes + +import geotrellis.raster.Tile +import org.locationtech.rasterframes.TileType +import org.locationtech.rasterframes.expressions.DynamicExtractors._ +import org.apache.spark.sql.{Column, DataFrame, Row} +import org.apache.spark.sql.catalyst.expressions.Cast +import org.apache.spark.sql.catalyst.util.QuantileSummaries +import org.apache.spark.sql.types.{DoubleType, NumericType} +import org.locationtech.rasterframes.expressions.accessors.ExtractTile + + +package object stats { + + def multipleApproxQuantiles(df: DataFrame, + cols: Seq[String], + probabilities: Seq[Double], + relativeError: Double): Seq[Seq[Double]] = { + require(relativeError >= 0, + s"Relative Error must be non-negative but got $relativeError") + + val columns: Seq[Column] = cols.map { colName => + val field = df.schema(colName) + + require(tileExtractor.isDefinedAt(field.dataType), + s"Quantile calculation for column $colName with data type ${field.dataType}" + + " is not supported; it must be Tile-like.") + ExtractTile(new Column(colName)) + } + + val emptySummaries = Array.fill(cols.size)( + new QuantileSummaries(QuantileSummaries.defaultCompressThreshold, relativeError)) + + def apply(summaries: Array[QuantileSummaries], row: Row): Array[QuantileSummaries] = { + var i = 0 + while (i < summaries.length) { + if (!row.isNullAt(i)) { + val t: Tile = row.getAs[Tile](i) + // now insert all the tile values into the summary for this column + t.foreachDouble(v ⇒ + if (!v.isNaN) summaries(i) = summaries(i).insert(v) + ) + } + i += 1 // next column + } + summaries + } + + def merge( + sum1: Array[QuantileSummaries], + sum2: Array[QuantileSummaries]): Array[QuantileSummaries] = { + sum1.zip(sum2).map { case (s1, s2) => s1.compress().merge(s2.compress()) } + } + val summaries = df.select(columns: _*).rdd.treeAggregate(emptySummaries)(apply, merge) + + summaries.map { summary => probabilities.flatMap(summary.query) } + } + +} diff --git a/core/src/test/scala/org/locationtech/rasterframes/RasterFramesStatsSpec.scala b/core/src/test/scala/org/locationtech/rasterframes/RasterFramesStatsSpec.scala new file mode 100644 index 000000000..b500bc2af --- /dev/null +++ b/core/src/test/scala/org/locationtech/rasterframes/RasterFramesStatsSpec.scala @@ -0,0 +1,66 @@ +/* + * This software is licensed under the Apache 2 license, quoted below. + * + * Copyright 2018 Astraea, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * [http://www.apache.org/licenses/LICENSE-2.0] + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.locationtech.rasterframes + +import org.apache.spark.sql.functions.col + +import org.locationtech.rasterframes._ +import org.locationtech.rasterframes.RasterFunctions + +class RasterFramesStatsSpec extends TestEnvironment with TestData { + + describe("DataFrame.tileStats extension methods") { + + val df = TestData.sampleGeoTiff.toDF() + .withColumn("tilePlus2", rf_local_add(col("tile"), 2)) + + it("should compute approx percentiles for a single tile col"){ + + val result = df.tileStat().approxTileQuantile( + "tile", + Array(0.10, 0.50, 0.90), + 0.00001 + ) + + result.length should be (3) + + // computing externally with numpy we arrive at 7963, 10068, 12160 for these quantiles + result should contain inOrderOnly (7963.0, 10068.0, 12160.0) + } + + it("should compute approx percentiles for many tile cols"){ + val result = df.tileStat().approxTileQuantile( + Array("tile", "tilePlus2"), + Array(0.25, 0.75), + 0.00001 + ) + result.length should be (2) + // nested inside is another array of length 2 for each p + result.foreach{c ⇒ c.length should be (2)} + + result.head should contain inOrderOnly (8701, 11261) + result.tail.head should contain inOrderOnly (8703, 11263) + } + + } + +} From c1f10aaa6241430379dba202b6b5225d976b5609 Mon Sep 17 00:00:00 2001 From: "Simeon H.K. Fitch" Date: Mon, 25 Nov 2019 10:20:42 -0500 Subject: [PATCH 2/8] Implemented three alternate approaches for discussion, from using built-in functions all the way to a custom aggregate expression. --- .../rasterframes/RasterFunctions.scala | 34 +++++- .../encoders/StandardSerializers.scala | 20 +++- .../ApproxCellQuantilesAggregate.scala | 88 ++++++++++++++ .../aggregates/HistogramAggregate.scala | 5 +- .../rasterframes/stats/CellHistogram.scala | 2 +- .../rasterframes/stats/package.scala | 7 +- .../rasterframes/RasterFramesStatsSpec.scala | 113 ++++++++++++++---- 7 files changed, 232 insertions(+), 37 deletions(-) create mode 100644 core/src/main/scala/org/locationtech/rasterframes/expressions/aggregates/ApproxCellQuantilesAggregate.scala diff --git a/core/src/main/scala/org/locationtech/rasterframes/RasterFunctions.scala b/core/src/main/scala/org/locationtech/rasterframes/RasterFunctions.scala index 574502ab4..68e2da3b1 100644 --- a/core/src/main/scala/org/locationtech/rasterframes/RasterFunctions.scala +++ b/core/src/main/scala/org/locationtech/rasterframes/RasterFunctions.scala @@ -171,9 +171,15 @@ trait RasterFunctions { /** Assign a `NoData` value to the tile column. */ def rf_with_no_data(col: Column, nodata: Column): Column = SetNoDataValue(col, nodata) - /** Compute the full column aggregate floating point histogram. */ + /** Compute the approximate aggregate floating point histogram using a streaming algorithm, with the default of 80 buckets. */ def rf_agg_approx_histogram(col: Column): TypedColumn[Any, CellHistogram] = HistogramAggregate(col) + /** Compute the approximate aggregate floating point histogram using a streaming algorithm, with the given number of buckets. */ + def rf_agg_approx_histogram(col: Column, numBuckets: Int): TypedColumn[Any, CellHistogram] = { + require(numBuckets > 0, "Must provide a positive number of buckets") + HistogramAggregate(col, numBuckets) + } + /** Compute the full column aggregate floating point statistics. */ def rf_agg_stats(col: Column): TypedColumn[Any, CellStatistics] = CellStatsAggregate(col) @@ -186,6 +192,23 @@ trait RasterFunctions { /** Computes the number of NoData cells in a column. */ def rf_agg_no_data_cells(col: Column): TypedColumn[Any, Long] = CellCountAggregate.NoDataCells(col) + /** + * Calculates the approximate quantiles of a tile column of a DataFrame. + * @param tile tile column to extract cells from. + * @param probabilities a list of quantile probabilities + * Each number must belong to [0, 1]. + * For example 0 is the minimum, 0.5 is the median, 1 is the maximum. + * @param relativeError The relative target precision to achieve (greater than or equal to 0). + * @return the approximate quantiles at the given probabilities of each column + */ + def rf_agg_approx_quantiles( + tile: Column, + probabilities: Seq[Double], + relativeError: Double = 0.00001): TypedColumn[Any, Seq[Double]] = { + require(probabilities.nonEmpty, "at least one quantile probability is required") + ApproxCellQuantilesAggregate(tile, probabilities, relativeError) + } + /** Compute the Tile-wise mean */ def rf_tile_mean(col: Column): TypedColumn[Any, Double] = TileMean(col) @@ -546,14 +569,17 @@ trait RasterFunctions { /** Return the incoming tile untouched. */ def rf_identity(tileCol: Column): Column = Identity(tileCol) - /** Create a row for each cell in Tile. */ + /** Create a row for each cell in Tile. + * The output will include the columns `column_index`, `row_index` indicating where in the tile the cell originated. */ def rf_explode_tiles(cols: Column*): Column = rf_explode_tiles_sample(1.0, None, cols: _*) - /** Create a row for each cell in Tile with random sampling and optional seed. */ + /** Create a row for each cell in Tile with random sampling and optional seed. + * The output will include the columns `column_index`, `row_index` indicating where in the tile the cell originated. */ def rf_explode_tiles_sample(sampleFraction: Double, seed: Option[Long], cols: Column*): Column = ExplodeTiles(sampleFraction, seed, cols) - /** Create a row for each cell in Tile with random sampling (no seed). */ + /** Create a row for each cell in Tile with random sampling (no seed). + * The output will include the columns `column_index`, `row_index` indicating where in the tile the cell originated. */ def rf_explode_tiles_sample(sampleFraction: Double, cols: Column*): Column = ExplodeTiles(sampleFraction, None, cols) } diff --git a/core/src/main/scala/org/locationtech/rasterframes/encoders/StandardSerializers.scala b/core/src/main/scala/org/locationtech/rasterframes/encoders/StandardSerializers.scala index 1983f8bb9..bcb7f856a 100644 --- a/core/src/main/scala/org/locationtech/rasterframes/encoders/StandardSerializers.scala +++ b/core/src/main/scala/org/locationtech/rasterframes/encoders/StandardSerializers.scala @@ -21,17 +21,21 @@ package org.locationtech.rasterframes.encoders +import java.nio.ByteBuffer + import com.github.blemale.scaffeine.Scaffeine import geotrellis.proj4.CRS import geotrellis.raster._ import geotrellis.spark._ import geotrellis.spark.tiling.LayoutDefinition import geotrellis.vector._ +import org.apache.spark.sql.catalyst.util.QuantileSummaries import org.apache.spark.sql.types._ import org.locationtech.jts.geom.Envelope import org.locationtech.rasterframes.TileType import org.locationtech.rasterframes.encoders.CatalystSerializer.{CatalystIO, _} import org.locationtech.rasterframes.model.LazyCRS +import org.locationtech.rasterframes.util.KryoSupport /** Collection of CatalystSerializers for third-party types. */ trait StandardSerializers { @@ -294,9 +298,23 @@ trait StandardSerializers { implicit val spatialKeyTLMSerializer = tileLayerMetadataSerializer[SpatialKey] implicit val spaceTimeKeyTLMSerializer = tileLayerMetadataSerializer[SpaceTimeKey] + implicit val quantileSerializer: CatalystSerializer[QuantileSummaries] = new CatalystSerializer[QuantileSummaries] { + override val schema: StructType = StructType(Seq( + StructField("quantile_serializer_kryo", BinaryType, false) + )) + + override protected def to[R](t: QuantileSummaries, io: CatalystSerializer.CatalystIO[R]): R = { + val buf = KryoSupport.serialize(t) + io.create(buf.array()) + } + + override protected def from[R](t: R, io: CatalystSerializer.CatalystIO[R]): QuantileSummaries = { + KryoSupport.deserialize[QuantileSummaries](ByteBuffer.wrap(io.getByteArray(t, 0))) + } + } } -object StandardSerializers { +object StandardSerializers extends StandardSerializers { private val s2ctCache = Scaffeine().build[String, CellType]( (s: String) => CellType.fromName(s) ) diff --git a/core/src/main/scala/org/locationtech/rasterframes/expressions/aggregates/ApproxCellQuantilesAggregate.scala b/core/src/main/scala/org/locationtech/rasterframes/expressions/aggregates/ApproxCellQuantilesAggregate.scala new file mode 100644 index 000000000..dcdf1a8a0 --- /dev/null +++ b/core/src/main/scala/org/locationtech/rasterframes/expressions/aggregates/ApproxCellQuantilesAggregate.scala @@ -0,0 +1,88 @@ +/* + * This software is licensed under the Apache 2 license, quoted below. + * + * Copyright 2019 Astraea, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * [http://www.apache.org/licenses/LICENSE-2.0] + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.locationtech.rasterframes.expressions.aggregates + +import geotrellis.raster.{Tile, isNoData} +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.util.QuantileSummaries +import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} +import org.apache.spark.sql.{Column, Encoder, Row, TypedColumn, types} +import org.apache.spark.sql.types.{DataTypes, StructField, StructType} +import org.locationtech.rasterframes.TileType +import org.locationtech.rasterframes.encoders.CatalystSerializer._ +import org.locationtech.rasterframes.expressions.accessors.ExtractTile + + +case class ApproxCellQuantilesAggregate(probabilities: Seq[Double], relativeError: Double) extends UserDefinedAggregateFunction { + import org.locationtech.rasterframes.encoders.StandardSerializers.quantileSerializer + + override def inputSchema: StructType = StructType(Seq( + StructField("value", TileType, true) + )) + + override def bufferSchema: StructType = StructType(Seq( + StructField("buffer", schemaOf[QuantileSummaries], false) + )) + + override def dataType: types.DataType = DataTypes.createArrayType(DataTypes.DoubleType) + + override def deterministic: Boolean = true + + override def initialize(buffer: MutableAggregationBuffer): Unit = + buffer.update(0, new QuantileSummaries(QuantileSummaries.defaultCompressThreshold, relativeError).toRow) + + override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { + val qs = buffer.getStruct(0).to[QuantileSummaries] + if (!input.isNullAt(0)) { + val tile = input.getAs[Tile](0) + var result = qs + tile.foreachDouble(d => if (!isNoData(d)) result = result.insert(d)) + buffer.update(0, result.toRow) + } + else buffer + } + + override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { + val left = buffer1.getStruct(0).to[QuantileSummaries] + val right = buffer2.getStruct(0).to[QuantileSummaries] + val merged = left.compress().merge(right.compress()) + buffer1.update(0, merged.toRow) + } + + override def evaluate(buffer: Row): Seq[Double] = { + val summaries = buffer.getStruct(0).to[QuantileSummaries] + probabilities.flatMap(summaries.query) + } +} + +object ApproxCellQuantilesAggregate { + private implicit def doubleSeqEncoder: Encoder[Seq[Double]] = ExpressionEncoder() + + def apply( + tile: Column, + probabilities: Seq[Double], + relativeError: Double = 0.00001): TypedColumn[Any, Seq[Double]] = { + new ApproxCellQuantilesAggregate(probabilities, relativeError)(ExtractTile(tile)) + .as(s"rf_agg_approx_quantiles") + .as[Seq[Double]] + } +} \ No newline at end of file diff --git a/core/src/main/scala/org/locationtech/rasterframes/expressions/aggregates/HistogramAggregate.scala b/core/src/main/scala/org/locationtech/rasterframes/expressions/aggregates/HistogramAggregate.scala index 5f7483b0c..aa5e89630 100644 --- a/core/src/main/scala/org/locationtech/rasterframes/expressions/aggregates/HistogramAggregate.scala +++ b/core/src/main/scala/org/locationtech/rasterframes/expressions/aggregates/HistogramAggregate.scala @@ -98,7 +98,10 @@ object HistogramAggregate { import org.locationtech.rasterframes.encoders.StandardEncoders.cellHistEncoder def apply(col: Column): TypedColumn[Any, CellHistogram] = - new HistogramAggregate()(ExtractTile(col)) + apply(col, StreamingHistogram.DEFAULT_NUM_BUCKETS) + + def apply(col: Column, numBuckets: Int): TypedColumn[Any, CellHistogram] = + new HistogramAggregate(numBuckets)(ExtractTile(col)) .as(s"rf_agg_approx_histogram($col)") .as[CellHistogram] diff --git a/core/src/main/scala/org/locationtech/rasterframes/stats/CellHistogram.scala b/core/src/main/scala/org/locationtech/rasterframes/stats/CellHistogram.scala index be3d547a3..095423755 100644 --- a/core/src/main/scala/org/locationtech/rasterframes/stats/CellHistogram.scala +++ b/core/src/main/scala/org/locationtech/rasterframes/stats/CellHistogram.scala @@ -89,7 +89,7 @@ case class CellHistogram(bins: Seq[CellHistogram.Bin]) { // derived from locationtech/geotrellis/.../StreamingHistogram.scala - private def percentileBreaks(qs: Seq[Double]): Seq[Double] = { + def percentileBreaks(qs: Seq[Double]): Seq[Double] = { if(bins.size == 1) { qs.map(z => bins.head.value) } else { diff --git a/core/src/main/scala/org/locationtech/rasterframes/stats/package.scala b/core/src/main/scala/org/locationtech/rasterframes/stats/package.scala index 02451b235..2ad1417b8 100644 --- a/core/src/main/scala/org/locationtech/rasterframes/stats/package.scala +++ b/core/src/main/scala/org/locationtech/rasterframes/stats/package.scala @@ -1,12 +1,9 @@ package org.locationtech.rasterframes import geotrellis.raster.Tile -import org.locationtech.rasterframes.TileType -import org.locationtech.rasterframes.expressions.DynamicExtractors._ -import org.apache.spark.sql.{Column, DataFrame, Row} -import org.apache.spark.sql.catalyst.expressions.Cast import org.apache.spark.sql.catalyst.util.QuantileSummaries -import org.apache.spark.sql.types.{DoubleType, NumericType} +import org.apache.spark.sql.{Column, DataFrame, Row} +import org.locationtech.rasterframes.expressions.DynamicExtractors._ import org.locationtech.rasterframes.expressions.accessors.ExtractTile diff --git a/core/src/test/scala/org/locationtech/rasterframes/RasterFramesStatsSpec.scala b/core/src/test/scala/org/locationtech/rasterframes/RasterFramesStatsSpec.scala index b500bc2af..530388cce 100644 --- a/core/src/test/scala/org/locationtech/rasterframes/RasterFramesStatsSpec.scala +++ b/core/src/test/scala/org/locationtech/rasterframes/RasterFramesStatsSpec.scala @@ -21,46 +21,109 @@ package org.locationtech.rasterframes -import org.apache.spark.sql.functions.col - -import org.locationtech.rasterframes._ -import org.locationtech.rasterframes.RasterFunctions +import org.apache.spark.sql.functions.{col, explode} class RasterFramesStatsSpec extends TestEnvironment with TestData { - describe("DataFrame.tileStats extension methods") { + import spark.implicits._ - val df = TestData.sampleGeoTiff.toDF() - .withColumn("tilePlus2", rf_local_add(col("tile"), 2)) + val df = TestData.sampleGeoTiff + .toDF() + .withColumn("tilePlus2", rf_local_add(col("tile"), 2)) - it("should compute approx percentiles for a single tile col"){ + describe("DataFrame.tileStats extension methods") { + it("should compute approx percentiles for a single tile col") { - val result = df.tileStat().approxTileQuantile( - "tile", - Array(0.10, 0.50, 0.90), - 0.00001 - ) + val result = df + .tileStat() + .approxTileQuantile( + "tile", + Array(0.10, 0.50, 0.90), + 0.00001 + ) - result.length should be (3) + result.length should be(3) // computing externally with numpy we arrive at 7963, 10068, 12160 for these quantiles - result should contain inOrderOnly (7963.0, 10068.0, 12160.0) + result should contain inOrderOnly(7963.0, 10068.0, 12160.0) } - it("should compute approx percentiles for many tile cols"){ - val result = df.tileStat().approxTileQuantile( - Array("tile", "tilePlus2"), - Array(0.25, 0.75), - 0.00001 - ) - result.length should be (2) + it("should compute approx percentiles for many tile cols") { + val result = df + .tileStat() + .approxTileQuantile( + Array("tile", "tilePlus2"), + Array(0.25, 0.75), + 0.00001 + ) + result.length should be(2) // nested inside is another array of length 2 for each p - result.foreach{c ⇒ c.length should be (2)} + result.foreach { c => + c.length should be(2) + } + + result.head should contain inOrderOnly(8701, 11261) + result.tail.head should contain inOrderOnly(8703, 11263) + } + } + + describe("Tile quantiles through built-in functions") { + + it("should compute approx percentiles for a single tile col") { + // Use "explode" + val result = df + .select(rf_explode_tiles($"tile")) + .stat + .approxQuantile("tile", Array(0.10, 0.50, 0.90), 0.00001) + + result.length should be(3) + + // computing externally with numpy we arrive at 7963, 10068, 12160 for these quantiles + result should contain inOrderOnly(7963.0, 10068.0, 12160.0) + + // Use "to_array" and built-in explode + val result2 = df + .select(explode(rf_tile_to_array_double($"tile")) as "tile") + .stat + .approxQuantile("tile", Array(0.10, 0.50, 0.90), 0.00001) + + result2.length should be(3) + + // computing externally with numpy we arrive at 7963, 10068, 12160 for these quantiles + result2 should contain inOrderOnly(7963.0, 10068.0, 12160.0) - result.head should contain inOrderOnly (8701, 11261) - result.tail.head should contain inOrderOnly (8703, 11263) } + } + + describe("Tile quantiles through existing RF functions") { + it("should compute approx percentiles for a single tile col") { + // As the number of buckets goes up, the closer we get to the "right" answer. + val result = df + .select(rf_agg_approx_histogram($"tile", 500)) + .map(h => h.percentileBreaks(Seq(0.1, 0.5, 0.9))) + .first() + + result.length should be(3) + println(result) + // computing externally with numpy we arrive at 7963, 10068, 12160 for these quantiles + // This will fail as the histogram algorithm approximates things differently (probably not as well) + // Result: List(7936.887798369705, 10034.706053861182, 12140.206924858878) + // result should contain inOrderOnly(7963.0, 10068.0, 12160.0) + } } + describe("Tile quantiles through custom aggregate") { + it("should compute approx percentiles for a single tile col") { + val result = df + .select(rf_agg_approx_quantiles($"tile", Seq(0.1, 0.5, 0.9))) + .first() + + result.length should be(3) + + // computing externally with numpy we arrive at 7963, 10068, 12160 for these quantiles + result should contain inOrderOnly(7963.0, 10068.0, 12160.0) + } + } } + From feaf097a1fa2e79665188183add41578cb95137e Mon Sep 17 00:00:00 2001 From: "Jason T. Brown" Date: Tue, 17 Dec 2019 15:07:17 -0500 Subject: [PATCH 3/8] Revert "Add DataFrame tileStats extension" This reverts commit 456914dbb8d7320e8fa529e8cf9a4b51ef1caf88. --- .../extensions/DataFrameMethods.scala | 3 - .../extensions/RasterFrameStatFunctions.scala | 76 ----------- .../rasterframes/stats/package.scala | 56 -------- .../rasterframes/RasterFramesStatsSpec.scala | 129 ------------------ 4 files changed, 264 deletions(-) delete mode 100644 core/src/main/scala/org/locationtech/rasterframes/extensions/RasterFrameStatFunctions.scala delete mode 100644 core/src/main/scala/org/locationtech/rasterframes/stats/package.scala delete mode 100644 core/src/test/scala/org/locationtech/rasterframes/RasterFramesStatsSpec.scala diff --git a/core/src/main/scala/org/locationtech/rasterframes/extensions/DataFrameMethods.scala b/core/src/main/scala/org/locationtech/rasterframes/extensions/DataFrameMethods.scala index 32d8e27da..9a57b9dd8 100644 --- a/core/src/main/scala/org/locationtech/rasterframes/extensions/DataFrameMethods.scala +++ b/core/src/main/scala/org/locationtech/rasterframes/extensions/DataFrameMethods.scala @@ -155,9 +155,6 @@ trait DataFrameMethods[DF <: DataFrame] extends MethodExtensions[DF] with Metada def withPrefixedColumnNames(prefix: String): DF = self.columns.foldLeft(self)((df, c) ⇒ df.withColumnRenamed(c, s"$prefix$c").asInstanceOf[DF]) - /** */ - def tileStat(): RasterFrameStatFunctions = new RasterFrameStatFunctions(self) - /** * Performs a jeft join on the dataframe `right` to this one, reprojecting and merging tiles as necessary. * The operation is logically a "left outer" join, with the left side also determining the target CRS and extents. diff --git a/core/src/main/scala/org/locationtech/rasterframes/extensions/RasterFrameStatFunctions.scala b/core/src/main/scala/org/locationtech/rasterframes/extensions/RasterFrameStatFunctions.scala deleted file mode 100644 index e0157eb0e..000000000 --- a/core/src/main/scala/org/locationtech/rasterframes/extensions/RasterFrameStatFunctions.scala +++ /dev/null @@ -1,76 +0,0 @@ -package org.locationtech.rasterframes.extensions - -import org.locationtech.rasterframes.stats._ -import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.functions.col - -final class RasterFrameStatFunctions private[rasterframes](df: DataFrame) { - - /** - * Calculates the approximate quantiles of a numerical column of a DataFrame. - * - * The result of this algorithm has the following deterministic bound: - * If the DataFrame has N elements and if we request the quantile at probability `p` up to error - * `err`, then the algorithm will return a sample `x` from the DataFrame so that the *exact* rank - * of `x` is close to (p * N). - * More precisely, - * - * {{{ - * floor((p - err) * N) <= rank(x) <= ceil((p + err) * N) - * }}} - * - * This method implements a variation of the Greenwald-Khanna algorithm (with some speed - * optimizations). - * The algorithm was first present in - * Space-efficient Online Computation of Quantile Summaries by Greenwald and Khanna. - * - * @param col the name of the numerical column - * @param probabilities a list of quantile probabilities - * Each number must belong to [0, 1]. - * For example 0 is the minimum, 0.5 is the median, 1 is the maximum. - * @param relativeError The relative target precision to achieve (greater than or equal to 0). - * If set to zero, the exact quantiles are computed, which could be very expensive. - * Note that values greater than 1 are accepted but give the same result as 1. - * @return the approximate quantiles at the given probabilities - * - * @note null and NaN values will be removed from the numerical column before calculation. If - * the dataframe is empty or the column only contains null or NaN, an empty array is returned. - * - * @since 2.0.0 - */ - def approxTileQuantile( - col: String, - probabilities: Array[Double], - relativeError: Double): Array[Double] = { - approxTileQuantile(Array(col), probabilities, relativeError).head - } - - /** - * Calculates the approximate quantiles of numerical columns of a DataFrame. - * @see `approxQuantile(col:Str* approxQuantile)` for detailed description. - * - * @param cols the names of the numerical columns - * @param probabilities a list of quantile probabilities - * Each number must belong to [0, 1]. - * For example 0 is the minimum, 0.5 is the median, 1 is the maximum. - * @param relativeError The relative target precision to achieve (greater than or equal to 0). - * If set to zero, the exact quantiles are computed, which could be very expensive. - * Note that values greater than 1 are accepted but give the same result as 1. - * @return the approximate quantiles at the given probabilities of each column - * - * @note null and NaN values will be ignored in numerical columns before calculation. For - * columns only containing null or NaN values, an empty array is returned. - * - */ - def approxTileQuantile( - cols: Array[String], - probabilities: Array[Double], - relativeError: Double): Array[Array[Double]] = { - multipleApproxQuantiles( - df.select(cols.map(col): _*), - cols, - probabilities, - relativeError).map(_.toArray).toArray - } - -} diff --git a/core/src/main/scala/org/locationtech/rasterframes/stats/package.scala b/core/src/main/scala/org/locationtech/rasterframes/stats/package.scala deleted file mode 100644 index 2ad1417b8..000000000 --- a/core/src/main/scala/org/locationtech/rasterframes/stats/package.scala +++ /dev/null @@ -1,56 +0,0 @@ -package org.locationtech.rasterframes - -import geotrellis.raster.Tile -import org.apache.spark.sql.catalyst.util.QuantileSummaries -import org.apache.spark.sql.{Column, DataFrame, Row} -import org.locationtech.rasterframes.expressions.DynamicExtractors._ -import org.locationtech.rasterframes.expressions.accessors.ExtractTile - - -package object stats { - - def multipleApproxQuantiles(df: DataFrame, - cols: Seq[String], - probabilities: Seq[Double], - relativeError: Double): Seq[Seq[Double]] = { - require(relativeError >= 0, - s"Relative Error must be non-negative but got $relativeError") - - val columns: Seq[Column] = cols.map { colName => - val field = df.schema(colName) - - require(tileExtractor.isDefinedAt(field.dataType), - s"Quantile calculation for column $colName with data type ${field.dataType}" + - " is not supported; it must be Tile-like.") - ExtractTile(new Column(colName)) - } - - val emptySummaries = Array.fill(cols.size)( - new QuantileSummaries(QuantileSummaries.defaultCompressThreshold, relativeError)) - - def apply(summaries: Array[QuantileSummaries], row: Row): Array[QuantileSummaries] = { - var i = 0 - while (i < summaries.length) { - if (!row.isNullAt(i)) { - val t: Tile = row.getAs[Tile](i) - // now insert all the tile values into the summary for this column - t.foreachDouble(v ⇒ - if (!v.isNaN) summaries(i) = summaries(i).insert(v) - ) - } - i += 1 // next column - } - summaries - } - - def merge( - sum1: Array[QuantileSummaries], - sum2: Array[QuantileSummaries]): Array[QuantileSummaries] = { - sum1.zip(sum2).map { case (s1, s2) => s1.compress().merge(s2.compress()) } - } - val summaries = df.select(columns: _*).rdd.treeAggregate(emptySummaries)(apply, merge) - - summaries.map { summary => probabilities.flatMap(summary.query) } - } - -} diff --git a/core/src/test/scala/org/locationtech/rasterframes/RasterFramesStatsSpec.scala b/core/src/test/scala/org/locationtech/rasterframes/RasterFramesStatsSpec.scala deleted file mode 100644 index 530388cce..000000000 --- a/core/src/test/scala/org/locationtech/rasterframes/RasterFramesStatsSpec.scala +++ /dev/null @@ -1,129 +0,0 @@ -/* - * This software is licensed under the Apache 2 license, quoted below. - * - * Copyright 2018 Astraea, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * [http://www.apache.org/licenses/LICENSE-2.0] - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - * - * SPDX-License-Identifier: Apache-2.0 - * - */ - -package org.locationtech.rasterframes - -import org.apache.spark.sql.functions.{col, explode} - -class RasterFramesStatsSpec extends TestEnvironment with TestData { - - import spark.implicits._ - - val df = TestData.sampleGeoTiff - .toDF() - .withColumn("tilePlus2", rf_local_add(col("tile"), 2)) - - describe("DataFrame.tileStats extension methods") { - it("should compute approx percentiles for a single tile col") { - - val result = df - .tileStat() - .approxTileQuantile( - "tile", - Array(0.10, 0.50, 0.90), - 0.00001 - ) - - result.length should be(3) - - // computing externally with numpy we arrive at 7963, 10068, 12160 for these quantiles - result should contain inOrderOnly(7963.0, 10068.0, 12160.0) - } - - it("should compute approx percentiles for many tile cols") { - val result = df - .tileStat() - .approxTileQuantile( - Array("tile", "tilePlus2"), - Array(0.25, 0.75), - 0.00001 - ) - result.length should be(2) - // nested inside is another array of length 2 for each p - result.foreach { c => - c.length should be(2) - } - - result.head should contain inOrderOnly(8701, 11261) - result.tail.head should contain inOrderOnly(8703, 11263) - } - } - - describe("Tile quantiles through built-in functions") { - - it("should compute approx percentiles for a single tile col") { - // Use "explode" - val result = df - .select(rf_explode_tiles($"tile")) - .stat - .approxQuantile("tile", Array(0.10, 0.50, 0.90), 0.00001) - - result.length should be(3) - - // computing externally with numpy we arrive at 7963, 10068, 12160 for these quantiles - result should contain inOrderOnly(7963.0, 10068.0, 12160.0) - - // Use "to_array" and built-in explode - val result2 = df - .select(explode(rf_tile_to_array_double($"tile")) as "tile") - .stat - .approxQuantile("tile", Array(0.10, 0.50, 0.90), 0.00001) - - result2.length should be(3) - - // computing externally with numpy we arrive at 7963, 10068, 12160 for these quantiles - result2 should contain inOrderOnly(7963.0, 10068.0, 12160.0) - - } - } - - describe("Tile quantiles through existing RF functions") { - it("should compute approx percentiles for a single tile col") { - - // As the number of buckets goes up, the closer we get to the "right" answer. - val result = df - .select(rf_agg_approx_histogram($"tile", 500)) - .map(h => h.percentileBreaks(Seq(0.1, 0.5, 0.9))) - .first() - - result.length should be(3) - println(result) - // computing externally with numpy we arrive at 7963, 10068, 12160 for these quantiles - // This will fail as the histogram algorithm approximates things differently (probably not as well) - // Result: List(7936.887798369705, 10034.706053861182, 12140.206924858878) - // result should contain inOrderOnly(7963.0, 10068.0, 12160.0) - } - } - - describe("Tile quantiles through custom aggregate") { - it("should compute approx percentiles for a single tile col") { - val result = df - .select(rf_agg_approx_quantiles($"tile", Seq(0.1, 0.5, 0.9))) - .first() - - result.length should be(3) - - // computing externally with numpy we arrive at 7963, 10068, 12160 for these quantiles - result should contain inOrderOnly(7963.0, 10068.0, 12160.0) - } - } -} - From 3f9006751eaefbb481ed0ab9b4108281df147ebf Mon Sep 17 00:00:00 2001 From: "Jason T. Brown" Date: Tue, 17 Dec 2019 15:09:39 -0500 Subject: [PATCH 4/8] Update tests to prefer use of rf_agg_approx_quantiles implementation --- .../rasterframes/stats/CellHistogram.scala | 2 +- .../rasterframes/RasterFramesStatsSpec.scala | 76 +++++++++++++++++++ 2 files changed, 77 insertions(+), 1 deletion(-) create mode 100644 core/src/test/scala/org/locationtech/rasterframes/RasterFramesStatsSpec.scala diff --git a/core/src/main/scala/org/locationtech/rasterframes/stats/CellHistogram.scala b/core/src/main/scala/org/locationtech/rasterframes/stats/CellHistogram.scala index 095423755..be3d547a3 100644 --- a/core/src/main/scala/org/locationtech/rasterframes/stats/CellHistogram.scala +++ b/core/src/main/scala/org/locationtech/rasterframes/stats/CellHistogram.scala @@ -89,7 +89,7 @@ case class CellHistogram(bins: Seq[CellHistogram.Bin]) { // derived from locationtech/geotrellis/.../StreamingHistogram.scala - def percentileBreaks(qs: Seq[Double]): Seq[Double] = { + private def percentileBreaks(qs: Seq[Double]): Seq[Double] = { if(bins.size == 1) { qs.map(z => bins.head.value) } else { diff --git a/core/src/test/scala/org/locationtech/rasterframes/RasterFramesStatsSpec.scala b/core/src/test/scala/org/locationtech/rasterframes/RasterFramesStatsSpec.scala new file mode 100644 index 000000000..e17b686c4 --- /dev/null +++ b/core/src/test/scala/org/locationtech/rasterframes/RasterFramesStatsSpec.scala @@ -0,0 +1,76 @@ +/* + * This software is licensed under the Apache 2 license, quoted below. + * + * Copyright 2018 Astraea, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * [http://www.apache.org/licenses/LICENSE-2.0] + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.locationtech.rasterframes + +import org.apache.spark.sql.functions.{col, explode} + +class RasterFramesStatsSpec extends TestEnvironment with TestData { + + import spark.implicits._ + + val df = TestData.sampleGeoTiff + .toDF() + .withColumn("tilePlus2", rf_local_add(col("tile"), 2)) + + + describe("Tile quantiles through built-in functions") { + + it("should compute approx percentiles for a single tile col") { + // Use "explode" + val result = df + .select(rf_explode_tiles($"tile")) + .stat + .approxQuantile("tile", Array(0.10, 0.50, 0.90), 0.00001) + + result.length should be(3) + + // computing externally with numpy we arrive at 7963, 10068, 12160 for these quantiles + result should contain inOrderOnly(7963.0, 10068.0, 12160.0) + + // Use "to_array" and built-in explode + val result2 = df + .select(explode(rf_tile_to_array_double($"tile")) as "tile") + .stat + .approxQuantile("tile", Array(0.10, 0.50, 0.90), 0.00001) + + result2.length should be(3) + + // computing externally with numpy we arrive at 7963, 10068, 12160 for these quantiles + result2 should contain inOrderOnly(7963.0, 10068.0, 12160.0) + + } + } + + describe("Tile quantiles through custom aggregate") { + it("should compute approx percentiles for a single tile col") { + val result = df + .select(rf_agg_approx_quantiles($"tile", Seq(0.1, 0.5, 0.9))) + .first() + + result.length should be(3) + + // computing externally with numpy we arrive at 7963, 10068, 12160 for these quantiles + result should contain inOrderOnly(7963.0, 10068.0, 12160.0) + } + } +} + From 2c244aa831062271c305d63f3195002f34191b07 Mon Sep 17 00:00:00 2001 From: "Jason T. Brown" Date: Thu, 2 Jan 2020 15:38:11 -0500 Subject: [PATCH 5/8] Add documentation, python bindings for rf_agg_approx_quantiles Attempt to register with SQL Signed-off-by: Jason T. Brown --- .../ApproxCellQuantilesAggregate.scala | 28 +++++++++++++++++++ .../rasterframes/expressions/package.scala | 1 + .../rasterframes/RasterFramesStatsSpec.scala | 11 ++++++++ docs/src/main/paradox/reference.md | 5 ++++ docs/src/main/paradox/release-notes.md | 1 + .../python/pyrasterframes/rasterfunctions.py | 16 +++++++++++ .../main/python/tests/RasterFunctionsTests.py | 8 +++++- 7 files changed, 69 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/locationtech/rasterframes/expressions/aggregates/ApproxCellQuantilesAggregate.scala b/core/src/main/scala/org/locationtech/rasterframes/expressions/aggregates/ApproxCellQuantilesAggregate.scala index dcdf1a8a0..86a52039e 100644 --- a/core/src/main/scala/org/locationtech/rasterframes/expressions/aggregates/ApproxCellQuantilesAggregate.scala +++ b/core/src/main/scala/org/locationtech/rasterframes/expressions/aggregates/ApproxCellQuantilesAggregate.scala @@ -23,7 +23,10 @@ package org.locationtech.rasterframes.expressions.aggregates import geotrellis.raster.{Tile, isNoData} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction, AggregateMode, Complete} +import org.apache.spark.sql.catalyst.expressions.{ExprId, Expression, ExpressionDescription, NamedExpression} import org.apache.spark.sql.catalyst.util.QuantileSummaries +import org.apache.spark.sql.execution.aggregate.ScalaUDAF import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.{Column, Encoder, Row, TypedColumn, types} import org.apache.spark.sql.types.{DataTypes, StructField, StructType} @@ -85,4 +88,29 @@ object ApproxCellQuantilesAggregate { .as(s"rf_agg_approx_quantiles") .as[Seq[Double]] } + + /** Adapter hack to allow UserDefinedAggregateFunction to be referenced as an expression. */ + @ExpressionDescription( + usage = "_FUNC_(tile, probabilities, relativeError) - Compute aggregate cell histogram over a tile column.", + arguments = """ + Arguments: + * tile - tile column to analyze + * probabilities - array of double values in [0, 1] at which to compute quantiles + * relativeError - non-negative error tolerance""", + examples = """ + Examples: + > SELECT _FUNC_(tile, array(0.1, 0.25, 0.5, 0.75, 0.9), 0.001); + ...""" + ) + class ApproxCellQuantilesUDAF(aggregateFunction: AggregateFunction, mode: AggregateMode, isDistinct: Boolean, resultId: ExprId) + extends AggregateExpression(aggregateFunction, mode, isDistinct, resultId) { + def this(child: Expression, probabilities: Seq[Double], relativeError: Double) = + this(ScalaUDAF(Seq(ExtractTile(child)), new ApproxCellQuantilesAggregate(probabilities, relativeError)), Complete, false, NamedExpression.newExprId) + override def nodeName: String = "rf_agg_approx_quantiles" + } + + object ApproxCellQuantilesUDAF { + def apply(child: Expression, probabilities: Seq[Double], relativeError: Double): ApproxCellQuantilesUDAF = new ApproxCellQuantilesUDAF(child, probabilities, relativeError) + def apply(child: Expression, probabilities: Seq[Double]): ApproxCellQuantilesUDAF = new ApproxCellQuantilesUDAF(child, probabilities, 0.00001) + } } \ No newline at end of file diff --git a/core/src/main/scala/org/locationtech/rasterframes/expressions/package.scala b/core/src/main/scala/org/locationtech/rasterframes/expressions/package.scala index a2a5f749c..6abd6ab93 100644 --- a/core/src/main/scala/org/locationtech/rasterframes/expressions/package.scala +++ b/core/src/main/scala/org/locationtech/rasterframes/expressions/package.scala @@ -117,6 +117,7 @@ package object expressions { registry.registerExpression[CellCountAggregate.NoDataCells]("rf_agg_no_data_cells") registry.registerExpression[CellStatsAggregate.CellStatsAggregateUDAF]("rf_agg_stats") registry.registerExpression[HistogramAggregate.HistogramAggregateUDAF]("rf_agg_approx_histogram") + registry.registerExpression[ApproxCellQuantilesAggregate.ApproxCellQuantilesUDAF]("rf_agg_approx_quantiles") registry.registerExpression[LocalStatsAggregate.LocalStatsAggregateUDAF]("rf_agg_local_stats") registry.registerExpression[LocalTileOpAggregate.LocalMinUDAF]("rf_agg_local_min") registry.registerExpression[LocalTileOpAggregate.LocalMaxUDAF]("rf_agg_local_max") diff --git a/core/src/test/scala/org/locationtech/rasterframes/RasterFramesStatsSpec.scala b/core/src/test/scala/org/locationtech/rasterframes/RasterFramesStatsSpec.scala index e17b686c4..aa58be8cd 100644 --- a/core/src/test/scala/org/locationtech/rasterframes/RasterFramesStatsSpec.scala +++ b/core/src/test/scala/org/locationtech/rasterframes/RasterFramesStatsSpec.scala @@ -71,6 +71,17 @@ class RasterFramesStatsSpec extends TestEnvironment with TestData { // computing externally with numpy we arrive at 7963, 10068, 12160 for these quantiles result should contain inOrderOnly(7963.0, 10068.0, 12160.0) } + + it("should compute approx percentiles with SQL") { + val result = df.selectExpr("rf_agg_approx_quantiles(tile, array(0.1, 0.5, 0.9), 0.00001) as iles") + .first() + .getSeq[Double](0) + + result.length should be (3) + + // computing externally with numpy we arrive at 7963, 10068, 12160 for these quantiles + result should contain inOrderOnly(7963.0, 10068.0, 12160.0) + } } } diff --git a/docs/src/main/paradox/reference.md b/docs/src/main/paradox/reference.md index aff57106e..e537beac2 100644 --- a/docs/src/main/paradox/reference.md +++ b/docs/src/main/paradox/reference.md @@ -628,6 +628,11 @@ Aggregates over the `tile` and returns statistical summaries of cell values: num Aggregates over all of the rows in DataFrame of `tile` and returns a count of each cell value to create a histogram with values are plotted on the x-axis and counts on the y-axis. Related is the @ref:[`rf_tile_histogram`](reference.md#rf-tile-histogram) function which operates on a single row at a time. +### rf_agg_approx_quantiles + + Array[Double] rf_agg_approx_quantiles(Tile tile, List[float] probabilities, float relative_error) + +Calculates the approximate quantiles of a tile column of a DataFrame. `probabilities` is a list of float values at which to compute the quantiles. These must belong to [0, 1]. For example 0 is the minimum, 0.5 is the median, 1 is the maximum. Returns an array of values approximately at the specified `probabilities`. ## Tile Local Aggregate Statistics diff --git a/docs/src/main/paradox/release-notes.md b/docs/src/main/paradox/release-notes.md index 0a5d84b57..4c3925e89 100644 --- a/docs/src/main/paradox/release-notes.md +++ b/docs/src/main/paradox/release-notes.md @@ -9,6 +9,7 @@ * Added `withSpatialIndex` to RasterSourceDataSource to pre-partition tiles based on tile extents mapped to a Z2 space-filling curve * Add `rf_mask_by_bit`, `rf_mask_by_bits` and `rf_local_extract_bits` to deal with bit packed quality masks. Updated the masking documentation to demonstrate the use of these functions. * Throw an `IllegalArgumentException` when attempting to apply a mask to a `Tile` whose `CellType` has no NoData defined. ([#409](https://github.com/locationtech/rasterframes/issues/384)) +* Add `rf_agg_approx-quantiles` function to compute cell quantiles across an entire column. ### 0.8.4 diff --git a/pyrasterframes/src/main/python/pyrasterframes/rasterfunctions.py b/pyrasterframes/src/main/python/pyrasterframes/rasterfunctions.py index 8dd7e7ac3..aa71518d0 100644 --- a/pyrasterframes/src/main/python/pyrasterframes/rasterfunctions.py +++ b/pyrasterframes/src/main/python/pyrasterframes/rasterfunctions.py @@ -313,6 +313,22 @@ def rf_agg_approx_histogram(tile_col): return _apply_column_function('rf_agg_approx_histogram', tile_col) +def rf_agg_approx_quantiles(tile_col, probabilities, relative_error=0.00001): + """ + Calculates the approximate quantiles of a tile column of a DataFrame. + + :param tile_col: column to extract cells from. + :param probabilities: a list of quantile probabilities. Each number must belong to [0, 1]. + For example 0 is the minimum, 0.5 is the median, 1 is the maximum. + :param relative_error: The relative target precision to achieve (greater than or equal to 0). Default is 0.00001 + :return: An array of values approximately at the specified `probabilities` + """ + + _jfn = RFContext.active().lookup('rf_agg_approx_quantiles') + _tile_col = _to_java_column(tile_col) + return Column(_jfn(_tile_col, probabilities, relative_error)) + + def rf_agg_stats(tile_col): """Compute the full column aggregate floating point statistics""" return _apply_column_function('rf_agg_stats', tile_col) diff --git a/pyrasterframes/src/main/python/tests/RasterFunctionsTests.py b/pyrasterframes/src/main/python/tests/RasterFunctionsTests.py index f186e6abc..49c0c7852 100644 --- a/pyrasterframes/src/main/python/tests/RasterFunctionsTests.py +++ b/pyrasterframes/src/main/python/tests/RasterFunctionsTests.py @@ -25,7 +25,7 @@ from pyspark.sql.functions import * import numpy as np -from numpy.testing import assert_equal +from numpy.testing import assert_equal, assert_allclose from unittest import skip from . import TestEnvironment @@ -133,6 +133,12 @@ def test_aggregations(self): self.assertEqual(row['rf_agg_no_data_cells(tile)'], 1000) self.assertEqual(row['rf_agg_stats(tile)'].data_cells, row['rf_agg_data_cells(tile)']) + def test_agg_approx_quantiles(self): + agg = self.rf.agg(rf_agg_approx_quantiles('tile', [0.1, 0.5, 0.9, 0.98])) + result = agg.first()[0] + # expected result from computing in external python process + assert_allclose(result, np.array([7412., 7638., 7671., 7675.])) + def test_sql(self): self.rf.createOrReplaceTempView("rf_test_sql") From 4589fe156ae46049596d803e966495742bc03d37 Mon Sep 17 00:00:00 2001 From: "Jason T. Brown" Date: Fri, 3 Jan 2020 15:52:49 -0500 Subject: [PATCH 6/8] Remove incorrect Expression and SQL registry Signed-off-by: Jason T. Brown --- .../ApproxCellQuantilesAggregate.scala | 28 ------------------- .../rasterframes/expressions/package.scala | 1 - .../rasterframes/RasterFramesStatsSpec.scala | 10 ------- docs/src/main/paradox/reference.md | 2 ++ 4 files changed, 2 insertions(+), 39 deletions(-) diff --git a/core/src/main/scala/org/locationtech/rasterframes/expressions/aggregates/ApproxCellQuantilesAggregate.scala b/core/src/main/scala/org/locationtech/rasterframes/expressions/aggregates/ApproxCellQuantilesAggregate.scala index 86a52039e..dcdf1a8a0 100644 --- a/core/src/main/scala/org/locationtech/rasterframes/expressions/aggregates/ApproxCellQuantilesAggregate.scala +++ b/core/src/main/scala/org/locationtech/rasterframes/expressions/aggregates/ApproxCellQuantilesAggregate.scala @@ -23,10 +23,7 @@ package org.locationtech.rasterframes.expressions.aggregates import geotrellis.raster.{Tile, isNoData} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, AggregateFunction, AggregateMode, Complete} -import org.apache.spark.sql.catalyst.expressions.{ExprId, Expression, ExpressionDescription, NamedExpression} import org.apache.spark.sql.catalyst.util.QuantileSummaries -import org.apache.spark.sql.execution.aggregate.ScalaUDAF import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.{Column, Encoder, Row, TypedColumn, types} import org.apache.spark.sql.types.{DataTypes, StructField, StructType} @@ -88,29 +85,4 @@ object ApproxCellQuantilesAggregate { .as(s"rf_agg_approx_quantiles") .as[Seq[Double]] } - - /** Adapter hack to allow UserDefinedAggregateFunction to be referenced as an expression. */ - @ExpressionDescription( - usage = "_FUNC_(tile, probabilities, relativeError) - Compute aggregate cell histogram over a tile column.", - arguments = """ - Arguments: - * tile - tile column to analyze - * probabilities - array of double values in [0, 1] at which to compute quantiles - * relativeError - non-negative error tolerance""", - examples = """ - Examples: - > SELECT _FUNC_(tile, array(0.1, 0.25, 0.5, 0.75, 0.9), 0.001); - ...""" - ) - class ApproxCellQuantilesUDAF(aggregateFunction: AggregateFunction, mode: AggregateMode, isDistinct: Boolean, resultId: ExprId) - extends AggregateExpression(aggregateFunction, mode, isDistinct, resultId) { - def this(child: Expression, probabilities: Seq[Double], relativeError: Double) = - this(ScalaUDAF(Seq(ExtractTile(child)), new ApproxCellQuantilesAggregate(probabilities, relativeError)), Complete, false, NamedExpression.newExprId) - override def nodeName: String = "rf_agg_approx_quantiles" - } - - object ApproxCellQuantilesUDAF { - def apply(child: Expression, probabilities: Seq[Double], relativeError: Double): ApproxCellQuantilesUDAF = new ApproxCellQuantilesUDAF(child, probabilities, relativeError) - def apply(child: Expression, probabilities: Seq[Double]): ApproxCellQuantilesUDAF = new ApproxCellQuantilesUDAF(child, probabilities, 0.00001) - } } \ No newline at end of file diff --git a/core/src/main/scala/org/locationtech/rasterframes/expressions/package.scala b/core/src/main/scala/org/locationtech/rasterframes/expressions/package.scala index 6abd6ab93..a2a5f749c 100644 --- a/core/src/main/scala/org/locationtech/rasterframes/expressions/package.scala +++ b/core/src/main/scala/org/locationtech/rasterframes/expressions/package.scala @@ -117,7 +117,6 @@ package object expressions { registry.registerExpression[CellCountAggregate.NoDataCells]("rf_agg_no_data_cells") registry.registerExpression[CellStatsAggregate.CellStatsAggregateUDAF]("rf_agg_stats") registry.registerExpression[HistogramAggregate.HistogramAggregateUDAF]("rf_agg_approx_histogram") - registry.registerExpression[ApproxCellQuantilesAggregate.ApproxCellQuantilesUDAF]("rf_agg_approx_quantiles") registry.registerExpression[LocalStatsAggregate.LocalStatsAggregateUDAF]("rf_agg_local_stats") registry.registerExpression[LocalTileOpAggregate.LocalMinUDAF]("rf_agg_local_min") registry.registerExpression[LocalTileOpAggregate.LocalMaxUDAF]("rf_agg_local_max") diff --git a/core/src/test/scala/org/locationtech/rasterframes/RasterFramesStatsSpec.scala b/core/src/test/scala/org/locationtech/rasterframes/RasterFramesStatsSpec.scala index aa58be8cd..11e5d9589 100644 --- a/core/src/test/scala/org/locationtech/rasterframes/RasterFramesStatsSpec.scala +++ b/core/src/test/scala/org/locationtech/rasterframes/RasterFramesStatsSpec.scala @@ -72,16 +72,6 @@ class RasterFramesStatsSpec extends TestEnvironment with TestData { result should contain inOrderOnly(7963.0, 10068.0, 12160.0) } - it("should compute approx percentiles with SQL") { - val result = df.selectExpr("rf_agg_approx_quantiles(tile, array(0.1, 0.5, 0.9), 0.00001) as iles") - .first() - .getSeq[Double](0) - - result.length should be (3) - - // computing externally with numpy we arrive at 7963, 10068, 12160 for these quantiles - result should contain inOrderOnly(7963.0, 10068.0, 12160.0) - } } } diff --git a/docs/src/main/paradox/reference.md b/docs/src/main/paradox/reference.md index e537beac2..09e8e3655 100644 --- a/docs/src/main/paradox/reference.md +++ b/docs/src/main/paradox/reference.md @@ -632,6 +632,8 @@ Aggregates over all of the rows in DataFrame of `tile` and returns a count of ea Array[Double] rf_agg_approx_quantiles(Tile tile, List[float] probabilities, float relative_error) +__Not supported in SQL.__ + Calculates the approximate quantiles of a tile column of a DataFrame. `probabilities` is a list of float values at which to compute the quantiles. These must belong to [0, 1]. For example 0 is the minimum, 0.5 is the median, 1 is the maximum. Returns an array of values approximately at the specified `probabilities`. ## Tile Local Aggregate Statistics From 539865cdf0288c9b64440c92a266f007f581aab6 Mon Sep 17 00:00:00 2001 From: "Jason T. Brown" Date: Fri, 3 Jan 2020 16:54:45 -0500 Subject: [PATCH 7/8] Update Python bindings and test Signed-off-by: Jason T. Brown --- .../src/main/python/tests/RasterFunctionsTests.py | 4 ++-- .../org/locationtech/rasterframes/py/PyRFContext.scala | 7 +++++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/pyrasterframes/src/main/python/tests/RasterFunctionsTests.py b/pyrasterframes/src/main/python/tests/RasterFunctionsTests.py index 49c0c7852..15a9bd016 100644 --- a/pyrasterframes/src/main/python/tests/RasterFunctionsTests.py +++ b/pyrasterframes/src/main/python/tests/RasterFunctionsTests.py @@ -136,8 +136,8 @@ def test_aggregations(self): def test_agg_approx_quantiles(self): agg = self.rf.agg(rf_agg_approx_quantiles('tile', [0.1, 0.5, 0.9, 0.98])) result = agg.first()[0] - # expected result from computing in external python process - assert_allclose(result, np.array([7412., 7638., 7671., 7675.])) + # expected result from computing in external python process; c.f. scala tests + assert_allclose(result, np.array([7963., 10068., 12160., 14366.])) def test_sql(self): diff --git a/pyrasterframes/src/main/scala/org/locationtech/rasterframes/py/PyRFContext.scala b/pyrasterframes/src/main/scala/org/locationtech/rasterframes/py/PyRFContext.scala index c31dccd38..91944cf8f 100644 --- a/pyrasterframes/src/main/scala/org/locationtech/rasterframes/py/PyRFContext.scala +++ b/pyrasterframes/src/main/scala/org/locationtech/rasterframes/py/PyRFContext.scala @@ -191,6 +191,13 @@ class PyRFContext(implicit sparkSession: SparkSession) extends RasterFunctions def rf_local_unequal_int(col: Column, scalar: Int): Column = rf_local_unequal[Int](col, scalar) + // other function support + /** py4j friendly version of this function */ + def rf_agg_approx_quantiles(tile: Column, probabilities: java.util.List[Double], relativeError: Double): TypedColumn[Any, Seq[Double]] = { + import scala.collection.JavaConverters._ + rf_agg_approx_quantiles(tile, probabilities.asScala, relativeError) + } + def _make_crs_literal(crsText: String): Column = { rasterframes.encoders.serialized_literal[CRS](LazyCRS(crsText)) } From d73e25540fe4ccfb0e5a2e2b5d48aff705751a18 Mon Sep 17 00:00:00 2001 From: "Jason T. Brown" Date: Mon, 13 Jan 2020 15:45:20 -0500 Subject: [PATCH 8/8] Fix mistakes in merge with develop Signed-off-by: Jason T. Brown --- .../functions/AggregateFunctions.scala | 19 ++++++++++++++++++- .../rasterframes/RasterFramesStatsSpec.scala | 1 + .../main/python/tests/RasterFunctionsTests.py | 1 + 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/locationtech/rasterframes/functions/AggregateFunctions.scala b/core/src/main/scala/org/locationtech/rasterframes/functions/AggregateFunctions.scala index e19374331..13d8e13b6 100644 --- a/core/src/main/scala/org/locationtech/rasterframes/functions/AggregateFunctions.scala +++ b/core/src/main/scala/org/locationtech/rasterframes/functions/AggregateFunctions.scala @@ -51,7 +51,7 @@ trait AggregateFunctions { /** Compute the cellwise/local count of NoData cells for all Tiles in a column. */ def rf_agg_local_no_data_cells(tile: Column): TypedColumn[Any, Tile] = LocalCountAggregate.LocalNoDataCellsUDAF(tile) - /** Compute the full column aggregate floating point histogram. */ + /** Compute the approximate aggregate floating point histogram using a streaming algorithm, with the default of 80 buckets. */ def rf_agg_approx_histogram(tile: Column): TypedColumn[Any, CellHistogram] = HistogramAggregate(tile) /** Compute the approximate aggregate floating point histogram using a streaming algorithm, with the given number of buckets. */ @@ -60,6 +60,23 @@ trait AggregateFunctions { HistogramAggregate(col, numBuckets) } + /** + * Calculates the approximate quantiles of a tile column of a DataFrame. + * @param tile tile column to extract cells from. + * @param probabilities a list of quantile probabilities + * Each number must belong to [0, 1]. + * For example 0 is the minimum, 0.5 is the median, 1 is the maximum. + * @param relativeError The relative target precision to achieve (greater than or equal to 0). + * @return the approximate quantiles at the given probabilities of each column + */ + def rf_agg_approx_quantiles( + tile: Column, + probabilities: Seq[Double], + relativeError: Double = 0.00001): TypedColumn[Any, Seq[Double]] = { + require(probabilities.nonEmpty, "at least one quantile probability is required") + ApproxCellQuantilesAggregate(tile, probabilities, relativeError) + } + /** Compute the full column aggregate floating point statistics. */ def rf_agg_stats(tile: Column): TypedColumn[Any, CellStatistics] = CellStatsAggregate(tile) diff --git a/core/src/test/scala/org/locationtech/rasterframes/RasterFramesStatsSpec.scala b/core/src/test/scala/org/locationtech/rasterframes/RasterFramesStatsSpec.scala index 11e5d9589..eebfe2262 100644 --- a/core/src/test/scala/org/locationtech/rasterframes/RasterFramesStatsSpec.scala +++ b/core/src/test/scala/org/locationtech/rasterframes/RasterFramesStatsSpec.scala @@ -21,6 +21,7 @@ package org.locationtech.rasterframes +import org.locationtech.rasterframes.RasterFunctions import org.apache.spark.sql.functions.{col, explode} class RasterFramesStatsSpec extends TestEnvironment with TestData { diff --git a/pyrasterframes/src/main/python/tests/RasterFunctionsTests.py b/pyrasterframes/src/main/python/tests/RasterFunctionsTests.py index 5fff71f5f..a6d19fb2c 100644 --- a/pyrasterframes/src/main/python/tests/RasterFunctionsTests.py +++ b/pyrasterframes/src/main/python/tests/RasterFunctionsTests.py @@ -38,6 +38,7 @@ class RasterFunctions(TestEnvironment): def setUp(self): + import sys if not sys.warnoptions: import warnings warnings.simplefilter("ignore")