Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slippy map writing support. #572

Merged
merged 3 commits into from
Nov 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ lazy val docs = project
Compile / paradoxMaterialTheme ~= { _
.withRepository(uri("https://github.com/locationtech/rasterframes"))
.withCustomStylesheet("assets/custom.css")
.withCopyright("""&copy; 2017-2019 <a href="https://astraea.earth">Astraea</a>, Inc. All rights reserved.""")
.withCopyright("""&copy; 2017-2021 <a href="https://astraea.earth">Astraea</a>, Inc. All rights reserved.""")
.withLogo("assets/images/RF-R.svg")
.withFavicon("assets/images/RasterFrames_32x32.ico")
.withColor("blue-grey", "light-blue")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.{Column, Row, TypedColumn}

class ProjectedLayerMetadataAggregate(destCRS: CRS, destDims: Dimensions[Int]) extends UserDefinedAggregateFunction {

import ProjectedLayerMetadataAggregate._

def inputSchema: StructType = InputRecord.inputRecordEncoder.schema
Expand All @@ -47,10 +48,10 @@ class ProjectedLayerMetadataAggregate(destCRS: CRS, destDims: Dimensions[Int]) e
def initialize(buffer: MutableAggregationBuffer): Unit = ()

def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if(!input.isNullAt(0)) {
if (!input.isNullAt(0)) {
val in = input.as[InputRecord]

if(buffer.isNullAt(0)) {
if (buffer.isNullAt(0)) {
in.toBufferRecord(destCRS).write(buffer)
} else {
val br = buffer.as[BufferRecord]
Expand All @@ -71,16 +72,15 @@ class ProjectedLayerMetadataAggregate(destCRS: CRS, destDims: Dimensions[Int]) e
case _ => ()
}

def evaluate(buffer: Row): Any = {
val buf = buffer.as[BufferRecord]
if (buf.isEmpty) throw new IllegalArgumentException("Can not collect metadata from empty data frame.")
def evaluate(buffer: Row): Any =
Option(buffer).map(_.as[BufferRecord]).filter(!_.isEmpty).map(buf => {
val re = RasterExtent(buf.extent, buf.cellSize)
val layout = LayoutDefinition(re, destDims.cols, destDims.rows)

val re = RasterExtent(buf.extent, buf.cellSize)
val layout = LayoutDefinition(re, destDims.cols, destDims.rows)
val kb = KeyBounds(layout.mapTransform(buf.extent))
TileLayerMetadata(buf.cellType, layout, buf.extent, destCRS, kb).toRow

val kb = KeyBounds(layout.mapTransform(buf.extent))
TileLayerMetadata(buf.cellType, layout, buf.extent, destCRS, kb).toRow
}
}).getOrElse(throw new IllegalArgumentException("Can not collect metadata from empty data frame."))
}

object ProjectedLayerMetadataAggregate {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ org.locationtech.rasterframes.datasource.raster.RasterSourceDataSource
org.locationtech.rasterframes.datasource.geojson.GeoJsonDataSource
org.locationtech.rasterframes.datasource.stac.api.StacApiDataSource
org.locationtech.rasterframes.datasource.tiles.TilesDataSource
org.locationtech.rasterframes.datasource.slippy.SlippyDataSource
77 changes: 77 additions & 0 deletions datasource/src/main/resources/slippy.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
<!DOCTYPE html>
<!--
~ This software is licensed under the Apache 2 license, quoted below.
~
~ Copyright 2021 Astraea. Inc.
~
~ Licensed under the Apache License, Version 2.0 (the "License"); you may not
~ use this file except in compliance with the License. You may obtain a copy of
~ the License at
~
~ [http://www.apache.org/licenses/LICENSE-2.0]
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
~ License for the specific language governing permissions and limitations under
~ the License.
~
~
-->

<html lang="en">
<head>
<title>RasterFrames Rendering</title>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0"/>
<meta property="eai:center" content="(${viewLat},${viewLon})"/>
<meta property="eai:maxZoom" content="${maxNativeZoom}"/>
<link rel="stylesheet" href="https://unpkg.com/leaflet@1.3.1/dist/leaflet.css" integrity="sha512-Rksm5RenBEKSKFjgI3a41vrjkw4EVPlJ3+OiI65vTjIdo9brlAacEuKOiQ5OFh7cOI1bkDwLqdLw3Zg0cRJAAQ==" crossorigin=""/>
<script src="https://unpkg.com/leaflet@1.3.1/dist/leaflet.js" integrity="sha512-/Nsx9X4HebavoBvEBuyp3I7od5tA0UzAxs+j83KgC8PU0kgB4XiK4Lfe4y4cgBtaRJQEIFCW+oC506aPT2L1zw==" crossorigin=""></script>
<link rel="stylesheet" href="https://unpkg.com/leaflet-control-geocoder/dist/Control.Geocoder.css" />
<script src="https://unpkg.com/leaflet-control-geocoder/dist/Control.Geocoder.js"></script>
<style>
#mapid {
position: absolute;
top: 10px;
bottom: 10px;
left: 10px;
right: 10px;
}
</style>
</head>
<body>

<div id="mapid"></div>

<script>

var map = L.map('mapid')
.setView([${viewLat}, ${viewLon}], ${maxNativeZoom});

L.tileLayer('https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png', {attribution: '&copy; <a href="https://www.openstreetmap.org/copyright">OpenStreetMap</a> contributors'}).addTo(map);

L.tileLayer(
'{z}/{x}/{y}.png', {
maxZoom: 18,
maxNativeZoom: ${maxNativeZoom}
}
).addTo(map);

L.control.scale().addTo(map);

L.Control.geocoder().addTo(map);

var popup = L.popup();

function showPos(e) {
popup
.setLatLng(e.latlng)
.setContent(e.latlng.toString())
.openOn(map);
}

map.on('click', showPos);
</script>
</body>
</html>
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package org.locationtech.rasterframes
import cats.syntax.option._
import io.circe.Json
import io.circe.parser
import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import sttp.model.Uri

Expand Down Expand Up @@ -72,4 +73,48 @@ package object datasource {
def jsonParam(key: String, parameters: CaseInsensitiveStringMap): Option[Json] =
if(parameters.containsKey(key)) parser.parse(parameters.get(key)).toOption
else None


/**
* Convenience grouping for transient columns defining spatial context.
*/
private[rasterframes]
case class SpatialComponents(crsColumn: Column,
extentColumn: Column,
dimensionColumn: Column,
cellTypeColumn: Column)

private[rasterframes]
object SpatialComponents {
def apply(tileColumn: Column, crsColumn: Column, extentColumn: Column): SpatialComponents = {
val dim = rf_dimensions(tileColumn) as "dims"
val ct = rf_cell_type(tileColumn) as "cellType"
SpatialComponents(crsColumn, extentColumn, dim, ct)
}
def apply(prColumn : Column): SpatialComponents = {
SpatialComponents(
rf_crs(prColumn) as "crs",
rf_extent(prColumn) as "extent",
rf_dimensions(prColumn) as "dims",
rf_cell_type(prColumn) as "cellType"
)
}
}

/**
* If the given DataFrame has extent and CRS columns return the DataFrame, the CRS column an extent column.
* Otherwise, see if there's a `ProjectedRaster` column add `crs` and `extent` columns extracted from the
* `ProjectedRaster` column to the returned DataFrame.
*
* @param d DataFrame to process.
* @return Tuple containing the updated DataFrame followed by the CRS column and the extent column
*/
private[rasterframes]
def projectSpatialComponents(d: DataFrame): Option[SpatialComponents] =
d.tileColumns.headOption.zip(d.crsColumns.headOption.zip(d.extentColumns.headOption)).headOption
.map { case (tile, (crs, extent)) => SpatialComponents(tile, crs, extent) }
.orElse(
d.projRasterColumns.headOption
.map(pr => SpatialComponents(pr))
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Copyright (c) 2020 Astraea, Inc. All right reserved.
*/

package org.locationtech.rasterframes.datasource.slippy

import geotrellis.layer.{SpatialKey, TileLayerMetadata, ZoomedLayoutScheme}
import geotrellis.proj4.{LatLng, WebMercator}
import geotrellis.raster._
import geotrellis.raster.render.ColorRamp
import geotrellis.raster.resample.Bilinear
import geotrellis.spark._
import geotrellis.spark.pyramid.Pyramid
import geotrellis.spark.store.slippy.HadoopSlippyTileWriter
import geotrellis.vector.reproject.Implicits._
import org.apache.commons.text.StringSubstitutor
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.locationtech.rasterframes.encoders.StandardEncoders
import org.locationtech.rasterframes.expressions.aggregates.ProjectedLayerMetadataAggregate
import org.locationtech.rasterframes.util.withResource
import org.locationtech.rasterframes.{rf_agg_approx_histogram, _}
import org.locationtech.rasterframes.datasource._

import java.io.PrintStream
import java.net.URI
import java.nio.file.Paths
import scala.io.Source
import RenderingProfiles._
import org.locationtech.rasterframes.datasource.slippy.RenderingModes.{RenderingMode, Uniform}

object DataFrameSlippyExport extends StandardEncoders {
val destCRS = WebMercator

/**
* Export tiles as a slippy map.
* NB: Temporal components are ignored blindly.
*
* @param dest URI for Hadoop supported storage endpoint (e.g. 'file://', 'hdfs://', etc.).
* @param profile Rendering profile
*/
def writeSlippyTiles(df: DataFrame, dest: URI, profile: Profile): SlippyResult = {

val spark = df.sparkSession
implicit val sc = spark.sparkContext

val outputPath: String = dest.toASCIIString

require(
df.tileColumns.length >= profile.expectedBands, // TODO: Do we want to allow this greater than case? Warn the user?
s"Selected rendering mode '${profile}' expected ${profile.expectedBands} bands.")

// select only the tile columns given by user and crs, extent columns which are fallback if first `column` is not a PRT
val SpatialComponents(crs, extent, dims, cellType) = projectSpatialComponents(df)
.getOrElse(
throw new IllegalArgumentException("Provided dataframe did not have an Extent and/or CRS"))

val tlm: TileLayerMetadata[SpatialKey] =
df.select(
ProjectedLayerMetadataAggregate(
destCRS,
extent,
crs,
cellType,
dims
)
)
.first()

val rfLayer = df
.toLayer(tlm)
// TODO: this should be fixed in RasterFrames
.na
.drop()
.persist()
.asInstanceOf[RasterFrameLayer]

val inputRDD: MultibandTileLayerRDD[SpatialKey] =
rfLayer.toMultibandTileLayerRDD match {
case Left(spatial) => spatial
case Right(_) =>
throw new NotImplementedError(
"Dataframes with multiple temporal values are not yet supported.")
}

val tileColumns = rfLayer.tileColumns

val rp = profile match {
case up: UniformColorRampProfile =>
val hist = rfLayer
.select(rf_agg_approx_histogram(tileColumns.head))
.first()
up.toResolvedProfile(hist)
case up: UniformRGBColorProfile =>
require(tileColumns.length >= 3)
val stats = rfLayer
.select(
rf_agg_stats(tileColumns(0)),
rf_agg_stats(tileColumns(1)),
rf_agg_stats(tileColumns(2)))
.first()
up.toResolvedProfile(stats._1, stats._2, stats._3)
case o => o
}

val layoutScheme = ZoomedLayoutScheme(WebMercator, tileSize = 256)

val (zoom, reprojected) = inputRDD.reproject(WebMercator, layoutScheme, Bilinear)
val renderer = (_: SpatialKey, tile: MultibandTile) => rp.render(tile).bytes
val writer = new HadoopSlippyTileWriter[MultibandTile](outputPath, "png")(renderer)

// Pyramiding up the zoom levels, write our tiles out to the local file system.
Pyramid.upLevels(reprojected, layoutScheme, zoom, Bilinear) { (rdd, z) =>
writer.write(z, rdd)
}

rfLayer.unpersist()

val center = reprojected.metadata.extent.center
.reproject(WebMercator, LatLng)

SlippyResult(dest, center.getY, center.getX, zoom)
}

def writeSlippyTiles(df: DataFrame, dest: URI, renderingMode: RenderingMode): SlippyResult = {

val profile = (df.tileColumns.length, renderingMode) match {
case (1, Uniform) => UniformColorRampProfile(greyscale)
case (_, Uniform) => UniformRGBColorProfile()
case (1, _) => ColorRampProfile(greyscale)
case _ => RGBColorProfile()
}
writeSlippyTiles(df, dest, profile)
}

def writeSlippyTiles(df: DataFrame, dest: URI, colorRamp: ColorRamp, renderingMode: RenderingMode): SlippyResult = {
val profile = renderingMode match {
case Uniform ⇒ UniformColorRampProfile(colorRamp)
case _ ⇒ ColorRampProfile(colorRamp)
}
writeSlippyTiles(df, dest, profile)
}

case class SlippyResult(dest: URI, centerLat: Double, centerLon: Double, maxZoom: Int) {
// for python interop
def outputUrl(): String = dest.toASCIIString

def writeHtml(spark: SparkSession): Unit = {
import java.util.{HashMap => JMap}

val subst = new StringSubstitutor(new JMap[String, String]() {
put("maxNativeZoom", maxZoom.toString)
put("id", Paths.get(dest.getPath).getFileName.toString)
put("viewLat", centerLat.toString)
put("viewLon", centerLon.toString)
})

val rawLines = Source.fromInputStream(getClass.getResourceAsStream("/slippy.html")).getLines()

val fs = FileSystem.get(dest, spark.sparkContext.hadoopConfiguration)

withResource(fs.create(new Path(new Path(dest), "index.html"), true)) { hout =>
val out = new PrintStream(hout, true, "UTF-8")
for (line <- rawLines) {
out.println(subst.replace(line))
}
}
}
}
}
Loading