Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.locationtech.geomesa.fs.storage.core.schemes.XZ2Scheme$XZ2Function
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,15 @@ object StorageMetadata {
factory.createPoint(new Coordinate(x, y))
}

/**
* Creates a z2 prefix based on a partition key
*
* @param value z2 partition value
* @param bits number of bits used in the z2 partition scheme
* @return
*/
def encodePartition(value: String, bits: Int): String = hexFormat.toHexDigits(value.toLong, bits / 4)

/**
* Calculate encoded ranges
*
Expand Down Expand Up @@ -226,11 +235,11 @@ object StorageMetadata {
if (env.isNull) {
throw new NullPointerException("Geometry has a null envelope")
}
hexFormat.toHexDigits(sfc.index(env.getMinX, env.getMinY, env.getMaxX, env.getMaxY))
toHex(sfc.index(env.getMinX, env.getMinY, env.getMaxX, env.getMaxY))
}

override def decode(value: String): Geometry = {
val (xmin, ymin, xmax, ymax) = sfc.invert(HexFormat.fromHexDigitsToLong(value))
val (xmin, ymin, xmax, ymax) = sfc.invert(fromHex(value))
val ring = Array(
new Coordinate(xmin, ymin),
new Coordinate(xmin, ymax),
Expand All @@ -248,6 +257,11 @@ object StorageMetadata {
* @param maxRanges a rough upper limit on the number of ranges to generate
*/
def ranges(queries: Seq[(Double, Double, Double, Double)], maxRanges: Option[Int] = None): Seq[(String, String)] =
sfc.ranges(queries, maxRanges).map(r => hexFormat.toHexDigits(r.lower) -> hexFormat.toHexDigits(r.upper))
sfc.ranges(queries, maxRanges).map(r => toHex(r.lower) -> toHex(r.upper))

// our z values use 25 bits, so we only need 7 digits to hex encode the full value
// we bit-shift by 3 to move dead bits to the end for better prefix matching
private def toHex(z: Long): String = hexFormat.toHexDigits(z << 3, 7)
private def fromHex(hex: String): Long = HexFormat.fromHexDigitsToLong(hex) >>> 3
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,62 +10,12 @@ package org.locationtech.geomesa.fs.storage.core
package schemes

import org.geotools.api.feature.simple.SimpleFeatureType
import org.geotools.api.filter.Filter
import org.locationtech.geomesa.filter.FilterHelper
import org.locationtech.geomesa.fs.storage.core.{PartitionScheme, PartitionSchemeFactory}
import org.locationtech.geomesa.utils.geotools.GeometryUtils
import org.locationtech.geomesa.zorder.sfcurve.IndexRange
import org.locationtech.jts.geom.Geometry

import java.util.regex.Pattern
import scala.reflect.ClassTag

abstract class SpatialScheme(id: String, attribute: String, bits: Int) extends PartitionScheme {

require(bits % 2 == 0, "Resolution must be an even number")

protected val format = s"%0${digits(bits)}d"

override val name: String = s"$id:attribute=$attribute:bits=$bits"

protected def digits(bits: Int): Int

protected def generateRanges(xy: Seq[(Double, Double, Double, Double)]): Seq[IndexRange]

override def getRangesForFilter(filter: Filter): Option[Seq[PartitionRange]] = {
getGeoms(filter).map { ranges =>
val builder = new RangeBuilder()
ranges.foreach { range =>
val lower = format.format(range.lower)
val upper = format.format(range.upper + 1)
builder += PartitionRange(name, lower, upper)
}
builder.result()
}
}

override def getPartitionsForFilter(filter: Filter): Option[Seq[PartitionKey]] = {
getGeoms(filter).orElse(Some(generateRanges(Seq((-180, -90, 180, 90))))).map { ranges =>
ranges.flatMap { range =>
val lower = range.lower
val steps = 1 + (range.upper - lower).toInt
Seq.tabulate(steps)(i => PartitionKey(name, format.format(lower + i)))
}
}
}

private def getGeoms(filter: Filter): Option[Seq[IndexRange]] = {
val geometries = FilterHelper.extractGeometries(filter, attribute, intersect = true)
if (geometries.isEmpty) {
None
} else if (geometries.disjoint) {
Some(Seq.empty)
} else {
Some(generateRanges(geometries.values.map(GeometryUtils.bounds)))
}
}
}

object SpatialScheme {

import org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType
Expand Down Expand Up @@ -97,6 +47,6 @@ object SpatialScheme {
}
}

def buildPartitionScheme(bits: Int, geom: String, geomIndex: Int): SpatialScheme
def buildPartitionScheme(bits: Int, geom: String, geomIndex: Int): PartitionScheme
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,163 @@ package schemes

import org.geotools.api.feature.simple.SimpleFeature
import org.geotools.api.filter.Filter
import org.geotools.api.filter.expression.{Expression, ExpressionVisitor}
import org.geotools.filter.FunctionExpressionImpl
import org.geotools.filter.capability.FunctionNameImpl
import org.geotools.filter.capability.FunctionNameImpl.parameter
import org.locationtech.geomesa.curve.XZ2SFC
import org.locationtech.geomesa.filter.FilterHelper
import org.locationtech.geomesa.fs.storage.core.StorageMetadata.XZ2Encoder
import org.locationtech.geomesa.fs.storage.core.schemes.SpatialScheme.SpatialPartitionSchemeFactory
import org.locationtech.geomesa.zorder.sfcurve.IndexRange
import org.locationtech.geomesa.fs.storage.core.schemes.XZ2Scheme.incrementHex
import org.locationtech.geomesa.utils.geotools.GeometryUtils
import org.locationtech.jts.geom.Geometry

case class XZ2Scheme(attribute: String, index: Int, bits: Int) extends SpatialScheme(XZ2Scheme.name, attribute, bits) {
import java.util.HexFormat
import scala.annotation.tailrec

private val xz2 = XZ2SFC((bits / 2).asInstanceOf[Short])
/**
* XZ2 spatial scheme.
*
* This scheme uses a fixed high-resolution curve for indexing, then truncates the value to create partition groups,
* in order to align with the iceberg 'truncate' transforms.
*
* Index values are (max) 7-digit hex-encoded longs, as we're only using 25 bits for the high-resolution curve,
* we can fit it in 7 digits.
*
* @param attribute name of the attribute being partitioned
* @param index index in the sft of the attribute being partitioned
* @param bits number of bits of resolution used for partitioning
*/
case class XZ2Scheme(attribute: String, index: Int, bits: Int) extends PartitionScheme {

import FilterHelper.ff
import XZ2Scheme.FullHexDigits

require(bits % 4 == 0, s"Bit precision must be a multiple of 4, but received $bits")

private val xz2 = XZ2SFC
private val hexFormat = HexFormat.of()

// partition level derived from bits parameter
// each level adds 2 bits (4 quadrants)
private val partitionLevel = (bits / 2).toShort
// number of hex digits used to represent our z value - bits = (xz2.g - partitionLevel) * 2, then divide by 4 to get hex
private val digits = FullHexDigits - ((xz2.g - partitionLevel) / 2)

lazy private val wholeWorldRanges = Some(generateRanges(Seq((-180, -90, 180, 90))))

override val name: String = s"${XZ2Scheme.name}:attribute=$attribute:bits=$bits"

override def getPartition(feature: SimpleFeature): PartitionKey = {
val geometry = feature.getAttribute(index).asInstanceOf[Geometry]
val envelope = geometry.getEnvelopeInternal
PartitionKey(name, format.format(xz2.index(envelope.getMinX, envelope.getMinY, envelope.getMaxX, envelope.getMaxY)))
val envelope = feature.getAttribute(index).asInstanceOf[Geometry].getEnvelopeInternal
val zValue = xz2.index(envelope.getMinX, envelope.getMinY, envelope.getMaxX, envelope.getMaxY)
PartitionKey(name, truncateToPartition(zValue))
}

override def getCoveringFilter(partition: PartitionKey): Filter = {
// TODO maybe we can improve this with *some* kind of bbox?
val zPrefix = partition.value
val lower = zPrefix.padTo(FullHexDigits, '0')
val upper = zPrefix.padTo(FullHexDigits, 'f')
ff.between(ff.function(XZ2Scheme.FunctionName.getName), ff.literal(lower), ff.literal(upper))
}

// TODO https://geomesa.atlassian.net/browse/GEOMESA-2967
override def getCoveringFilter(partition: PartitionKey): Filter =
throw new UnsupportedOperationException("https://geomesa.atlassian.net/browse/GEOMESA-2967")
override def getRangesForFilter(filter: Filter): Option[Seq[PartitionRange]] = {
val geometries = FilterHelper.extractGeometries(filter, attribute, intersect = true)
if (geometries.isEmpty) {
None
} else if (geometries.disjoint) {
Some(Seq.empty)
} else {
Some(generateRanges(geometries.values.map(GeometryUtils.bounds)))
}
}

// the max XZ2 value is (4^((bits / 2) + 1) - 1) / 3
// this calculates the number of digits in that value
override protected def digits(bits: Int): Int = math.ceil(((bits / 2) + 1) * math.log10(4) - math.log10(3)).toInt
override def getPartitionsForFilter(filter: Filter): Option[Seq[PartitionKey]] = {
getRangesForFilter(filter).orElse(wholeWorldRanges).map { ranges =>
ranges.flatMap { range =>
Iterator.iterate(range.lower)(incrementHex).takeWhile(_ < range.upper).map(PartitionKey(name, _))
}
}
}

override protected def generateRanges(xy: Seq[(Double, Double, Double, Double)]): Seq[IndexRange] = xz2.ranges(xy)
private def generateRanges(xy: Seq[(Double, Double, Double, Double)]): Seq[PartitionRange] = {
val builder = new RangeBuilder()
xz2.ranges(xy).foreach { range =>
val lower = truncateToPartition(range.lower)
// index ranges are inclusive, but partition ranges are exclusive
val upper = incrementHex(truncateToPartition(range.upper))
builder += PartitionRange(name, lower, upper)
}
builder.result()
}

// truncates a full-resolution index to a partition group ID
private def truncateToPartition(fullIndex: Long): String = hexFormat.toHexDigits(fullIndex << 3, FullHexDigits).take(digits)
}

object XZ2Scheme extends SpatialPartitionSchemeFactory[Geometry]("xz2") {
override def buildPartitionScheme(bits: Int, geom: String, geomIndex: Int): SpatialScheme =

// number of digits required to index a "full" (g == 12) xz value, which is 25 bits
private val FullHexDigits = 7

val FunctionName = new FunctionNameImpl("xz2", classOf[String], parameter("geom", classOf[String], 0, 1))

override def buildPartitionScheme(bits: Int, geom: String, geomIndex: Int): PartitionScheme =
XZ2Scheme(geom, geomIndex, bits)

private def incrementHex(hex: String): String = incrementHex(hex, hex.length - 1)

@tailrec
private def incrementHex(hex: String, pos: Int): String = {
val c = hex.charAt(pos)
if (c != 'f') {
hex.substring(0, pos) + (c + 1).toChar + hex.substring(pos + 1)
} else if (pos == 0) {
hex + '0' // note: this isn't actually incrementing the value but should sort after all the valid hex values
} else {
incrementHex(hex.substring(0, pos) + '0' + hex.substring(pos + 1), pos - 1)
}
}

/**
* Function to calculate an XZ2 hex-encoded value
*/
class XZ2Function extends FunctionExpressionImpl(FunctionName) {

private var expression: Expression = _

override def setParameters(params: java.util.List[Expression]): Unit = {
super.setParameters(params)
if (params.isEmpty) {
expression = GetDefaultGeometry
} else {
expression = getExpression(0)
}
}

override def evaluate(o: AnyRef): AnyRef = {
if (o == null) {
return null
}
val value = expression.evaluate(o, classOf[Geometry])
if (value == null) {
return null
}
XZ2Encoder.encode(value)
}
}

private object GetDefaultGeometry extends Expression {

override def evaluate(obj: Any): Geometry = obj match {
case sf: SimpleFeature => sf.getDefaultGeometry.asInstanceOf[Geometry]
case _ => null
}

override def evaluate[T](obj: Any, context: Class[T]): T = evaluate(obj).asInstanceOf[T] // only called by our code, above

override def accept(visitor: ExpressionVisitor, extraData: Any): AnyRef = throw new UnsupportedOperationException()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,56 @@ import org.geotools.api.feature.simple.SimpleFeature
import org.geotools.api.filter.Filter
import org.geotools.geometry.jts.ReferencedEnvelope
import org.locationtech.geomesa.curve.Z2SFC
import org.locationtech.geomesa.filter.FilterHelper
import org.locationtech.geomesa.fs.storage.core.schemes.SpatialScheme.SpatialPartitionSchemeFactory
import org.locationtech.geomesa.utils.geotools.GeometryUtils
import org.locationtech.geomesa.zorder.sfcurve.IndexRange
import org.locationtech.jts.geom.Point

case class Z2Scheme(attribute: String, index: Int, bits: Int) extends SpatialScheme(Z2Scheme.name, attribute, bits) {
case class Z2Scheme(attribute: String, index: Int, bits: Int) extends PartitionScheme {

import org.locationtech.geomesa.filter.{andFilters, ff}
import org.locationtech.geomesa.utils.geotools.CRS_EPSG_4326

require(bits % 2 == 0, s"Bit precision must be an even number, but received $bits")

private val xyBits = bits / 2
private val z2 = new Z2SFC(xyBits)
private val xRadius = (360d / math.pow(2, xyBits)) / 2
private val yRadius = (180d / math.pow(2, xyBits)) / 2
private val format = s"%0${digits(bits)}d"

lazy private val wholeWorldRanges = Some(z2.ranges(Seq((-180, -90, 180, 90))))

override val name: String = s"${Z2Scheme.name}:attribute=$attribute:bits=$bits"

override def getPartition(feature: SimpleFeature): PartitionKey = {
val pt = feature.getAttribute(index).asInstanceOf[Point]
PartitionKey(name, format.format(z2.index(pt.getX, pt.getY)))
}

override def getRangesForFilter(filter: Filter): Option[Seq[PartitionRange]] = {
getIndexRanges(filter).map { ranges =>
val builder = new RangeBuilder()
ranges.foreach { range =>
val lower = format.format(range.lower)
val upper = format.format(range.upper + 1)
builder += PartitionRange(name, lower, upper)
}
builder.result()
}
}

override def getPartitionsForFilter(filter: Filter): Option[Seq[PartitionKey]] = {
getIndexRanges(filter).orElse(wholeWorldRanges).map { ranges =>
ranges.flatMap { range =>
val lower = range.lower
val steps = 1 + (range.upper - lower).toInt
Seq.tabulate(steps)(i => PartitionKey(name, format.format(lower + i)))
}
}
}

override def getCoveringFilter(partition: PartitionKey): Filter = {
val (x, y) = z2.invert(partition.value.toLong)
val (xmin, xmax) = (x - xRadius, x + xRadius)
Expand All @@ -47,12 +78,22 @@ case class Z2Scheme(attribute: String, index: Int, bits: Int) extends SpatialSch
andFilters(Seq(bbox) ++ xExclusive ++ yExclusive)
}

override protected def digits(bits: Int): Int = math.ceil(bits * math.log10(2)).toInt
private def getIndexRanges(filter: Filter): Option[Seq[IndexRange]] = {
val geometries = FilterHelper.extractGeometries(filter, attribute, intersect = true)
if (geometries.isEmpty) {
None
} else if (geometries.disjoint) {
Some(Seq.empty)
} else {
Some(z2.ranges(geometries.values.map(GeometryUtils.bounds)))
}
}

override protected def generateRanges(xy: Seq[(Double, Double, Double, Double)]): Seq[IndexRange] = z2.ranges(xy)
// number of digits required to print our partition values
private def digits(bits: Int): Int = math.ceil(bits * math.log10(2)).toInt
}

object Z2Scheme extends SpatialPartitionSchemeFactory[Point]("z2") {
override def buildPartitionScheme(bits: Int, geom: String, geomIndex: Int): SpatialScheme =
override def buildPartitionScheme(bits: Int, geom: String, geomIndex: Int): PartitionScheme =
Z2Scheme(geom, geomIndex, bits)
}
Loading
Loading