diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/resources/META-INF/services/org.geotools.api.filter.expression.Function b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/resources/META-INF/services/org.geotools.api.filter.expression.Function new file mode 100644 index 00000000000..5b67a9225c6 --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/resources/META-INF/services/org.geotools.api.filter.expression.Function @@ -0,0 +1 @@ +org.locationtech.geomesa.fs.storage.core.schemes.XZ2Scheme$XZ2Function diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/scala/org/locationtech/geomesa/fs/storage/core/StorageMetadata.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/scala/org/locationtech/geomesa/fs/storage/core/StorageMetadata.scala index b2e8771a157..b2dababf41a 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/scala/org/locationtech/geomesa/fs/storage/core/StorageMetadata.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/scala/org/locationtech/geomesa/fs/storage/core/StorageMetadata.scala @@ -191,6 +191,15 @@ object StorageMetadata { factory.createPoint(new Coordinate(x, y)) } + /** + * Creates a z2 prefix based on a partition key + * + * @param value z2 partition value + * @param bits number of bits used in the z2 partition scheme + * @return + */ + def encodePartition(value: String, bits: Int): String = hexFormat.toHexDigits(value.toLong, bits / 4) + /** * Calculate encoded ranges * @@ -226,11 +235,11 @@ object StorageMetadata { if (env.isNull) { throw new NullPointerException("Geometry has a null envelope") } - hexFormat.toHexDigits(sfc.index(env.getMinX, env.getMinY, env.getMaxX, env.getMaxY)) + toHex(sfc.index(env.getMinX, env.getMinY, env.getMaxX, env.getMaxY)) } override def decode(value: String): Geometry = { - val (xmin, ymin, xmax, ymax) = sfc.invert(HexFormat.fromHexDigitsToLong(value)) + val (xmin, ymin, xmax, ymax) = sfc.invert(fromHex(value)) val ring = Array( new Coordinate(xmin, ymin), new Coordinate(xmin, ymax), @@ -248,6 +257,11 @@ object StorageMetadata { * @param maxRanges a rough upper limit on the number of ranges to generate */ def ranges(queries: Seq[(Double, Double, Double, Double)], maxRanges: Option[Int] = None): Seq[(String, String)] = - sfc.ranges(queries, maxRanges).map(r => hexFormat.toHexDigits(r.lower) -> hexFormat.toHexDigits(r.upper)) + sfc.ranges(queries, maxRanges).map(r => toHex(r.lower) -> toHex(r.upper)) + + // our z values use 25 bits, so we only need 7 digits to hex encode the full value + // we bit-shift by 3 to move dead bits to the end for better prefix matching + private def toHex(z: Long): String = hexFormat.toHexDigits(z << 3, 7) + private def fromHex(hex: String): Long = HexFormat.fromHexDigitsToLong(hex) >>> 3 } } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/scala/org/locationtech/geomesa/fs/storage/core/schemes/SpatialScheme.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/scala/org/locationtech/geomesa/fs/storage/core/schemes/SpatialScheme.scala index bde88196e40..98e1277068b 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/scala/org/locationtech/geomesa/fs/storage/core/schemes/SpatialScheme.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/scala/org/locationtech/geomesa/fs/storage/core/schemes/SpatialScheme.scala @@ -10,62 +10,12 @@ package org.locationtech.geomesa.fs.storage.core package schemes import org.geotools.api.feature.simple.SimpleFeatureType -import org.geotools.api.filter.Filter -import org.locationtech.geomesa.filter.FilterHelper import org.locationtech.geomesa.fs.storage.core.{PartitionScheme, PartitionSchemeFactory} -import org.locationtech.geomesa.utils.geotools.GeometryUtils -import org.locationtech.geomesa.zorder.sfcurve.IndexRange import org.locationtech.jts.geom.Geometry import java.util.regex.Pattern import scala.reflect.ClassTag -abstract class SpatialScheme(id: String, attribute: String, bits: Int) extends PartitionScheme { - - require(bits % 2 == 0, "Resolution must be an even number") - - protected val format = s"%0${digits(bits)}d" - - override val name: String = s"$id:attribute=$attribute:bits=$bits" - - protected def digits(bits: Int): Int - - protected def generateRanges(xy: Seq[(Double, Double, Double, Double)]): Seq[IndexRange] - - override def getRangesForFilter(filter: Filter): Option[Seq[PartitionRange]] = { - getGeoms(filter).map { ranges => - val builder = new RangeBuilder() - ranges.foreach { range => - val lower = format.format(range.lower) - val upper = format.format(range.upper + 1) - builder += PartitionRange(name, lower, upper) - } - builder.result() - } - } - - override def getPartitionsForFilter(filter: Filter): Option[Seq[PartitionKey]] = { - getGeoms(filter).orElse(Some(generateRanges(Seq((-180, -90, 180, 90))))).map { ranges => - ranges.flatMap { range => - val lower = range.lower - val steps = 1 + (range.upper - lower).toInt - Seq.tabulate(steps)(i => PartitionKey(name, format.format(lower + i))) - } - } - } - - private def getGeoms(filter: Filter): Option[Seq[IndexRange]] = { - val geometries = FilterHelper.extractGeometries(filter, attribute, intersect = true) - if (geometries.isEmpty) { - None - } else if (geometries.disjoint) { - Some(Seq.empty) - } else { - Some(generateRanges(geometries.values.map(GeometryUtils.bounds))) - } - } -} - object SpatialScheme { import org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType @@ -97,6 +47,6 @@ object SpatialScheme { } } - def buildPartitionScheme(bits: Int, geom: String, geomIndex: Int): SpatialScheme + def buildPartitionScheme(bits: Int, geom: String, geomIndex: Int): PartitionScheme } } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/scala/org/locationtech/geomesa/fs/storage/core/schemes/XZ2Scheme.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/scala/org/locationtech/geomesa/fs/storage/core/schemes/XZ2Scheme.scala index 2bb4923f75e..0e7611909cd 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/scala/org/locationtech/geomesa/fs/storage/core/schemes/XZ2Scheme.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/scala/org/locationtech/geomesa/fs/storage/core/schemes/XZ2Scheme.scala @@ -11,33 +11,163 @@ package schemes import org.geotools.api.feature.simple.SimpleFeature import org.geotools.api.filter.Filter +import org.geotools.api.filter.expression.{Expression, ExpressionVisitor} +import org.geotools.filter.FunctionExpressionImpl +import org.geotools.filter.capability.FunctionNameImpl +import org.geotools.filter.capability.FunctionNameImpl.parameter import org.locationtech.geomesa.curve.XZ2SFC +import org.locationtech.geomesa.filter.FilterHelper +import org.locationtech.geomesa.fs.storage.core.StorageMetadata.XZ2Encoder import org.locationtech.geomesa.fs.storage.core.schemes.SpatialScheme.SpatialPartitionSchemeFactory -import org.locationtech.geomesa.zorder.sfcurve.IndexRange +import org.locationtech.geomesa.fs.storage.core.schemes.XZ2Scheme.incrementHex +import org.locationtech.geomesa.utils.geotools.GeometryUtils import org.locationtech.jts.geom.Geometry -case class XZ2Scheme(attribute: String, index: Int, bits: Int) extends SpatialScheme(XZ2Scheme.name, attribute, bits) { +import java.util.HexFormat +import scala.annotation.tailrec - private val xz2 = XZ2SFC((bits / 2).asInstanceOf[Short]) +/** + * XZ2 spatial scheme. + * + * This scheme uses a fixed high-resolution curve for indexing, then truncates the value to create partition groups, + * in order to align with the iceberg 'truncate' transforms. + * + * Index values are (max) 7-digit hex-encoded longs, as we're only using 25 bits for the high-resolution curve, + * we can fit it in 7 digits. + * + * @param attribute name of the attribute being partitioned + * @param index index in the sft of the attribute being partitioned + * @param bits number of bits of resolution used for partitioning + */ +case class XZ2Scheme(attribute: String, index: Int, bits: Int) extends PartitionScheme { + + import FilterHelper.ff + import XZ2Scheme.FullHexDigits + + require(bits % 4 == 0, s"Bit precision must be a multiple of 4, but received $bits") + + private val xz2 = XZ2SFC + private val hexFormat = HexFormat.of() + + // partition level derived from bits parameter + // each level adds 2 bits (4 quadrants) + private val partitionLevel = (bits / 2).toShort + // number of hex digits used to represent our z value - bits = (xz2.g - partitionLevel) * 2, then divide by 4 to get hex + private val digits = FullHexDigits - ((xz2.g - partitionLevel) / 2) + + lazy private val wholeWorldRanges = Some(generateRanges(Seq((-180, -90, 180, 90)))) + + override val name: String = s"${XZ2Scheme.name}:attribute=$attribute:bits=$bits" override def getPartition(feature: SimpleFeature): PartitionKey = { - val geometry = feature.getAttribute(index).asInstanceOf[Geometry] - val envelope = geometry.getEnvelopeInternal - PartitionKey(name, format.format(xz2.index(envelope.getMinX, envelope.getMinY, envelope.getMaxX, envelope.getMaxY))) + val envelope = feature.getAttribute(index).asInstanceOf[Geometry].getEnvelopeInternal + val zValue = xz2.index(envelope.getMinX, envelope.getMinY, envelope.getMaxX, envelope.getMaxY) + PartitionKey(name, truncateToPartition(zValue)) + } + + override def getCoveringFilter(partition: PartitionKey): Filter = { + // TODO maybe we can improve this with *some* kind of bbox? + val zPrefix = partition.value + val lower = zPrefix.padTo(FullHexDigits, '0') + val upper = zPrefix.padTo(FullHexDigits, 'f') + ff.between(ff.function(XZ2Scheme.FunctionName.getName), ff.literal(lower), ff.literal(upper)) } - // TODO https://geomesa.atlassian.net/browse/GEOMESA-2967 - override def getCoveringFilter(partition: PartitionKey): Filter = - throw new UnsupportedOperationException("https://geomesa.atlassian.net/browse/GEOMESA-2967") + override def getRangesForFilter(filter: Filter): Option[Seq[PartitionRange]] = { + val geometries = FilterHelper.extractGeometries(filter, attribute, intersect = true) + if (geometries.isEmpty) { + None + } else if (geometries.disjoint) { + Some(Seq.empty) + } else { + Some(generateRanges(geometries.values.map(GeometryUtils.bounds))) + } + } - // the max XZ2 value is (4^((bits / 2) + 1) - 1) / 3 - // this calculates the number of digits in that value - override protected def digits(bits: Int): Int = math.ceil(((bits / 2) + 1) * math.log10(4) - math.log10(3)).toInt + override def getPartitionsForFilter(filter: Filter): Option[Seq[PartitionKey]] = { + getRangesForFilter(filter).orElse(wholeWorldRanges).map { ranges => + ranges.flatMap { range => + Iterator.iterate(range.lower)(incrementHex).takeWhile(_ < range.upper).map(PartitionKey(name, _)) + } + } + } - override protected def generateRanges(xy: Seq[(Double, Double, Double, Double)]): Seq[IndexRange] = xz2.ranges(xy) + private def generateRanges(xy: Seq[(Double, Double, Double, Double)]): Seq[PartitionRange] = { + val builder = new RangeBuilder() + xz2.ranges(xy).foreach { range => + val lower = truncateToPartition(range.lower) + // index ranges are inclusive, but partition ranges are exclusive + val upper = incrementHex(truncateToPartition(range.upper)) + builder += PartitionRange(name, lower, upper) + } + builder.result() + } + + // truncates a full-resolution index to a partition group ID + private def truncateToPartition(fullIndex: Long): String = hexFormat.toHexDigits(fullIndex << 3, FullHexDigits).take(digits) } object XZ2Scheme extends SpatialPartitionSchemeFactory[Geometry]("xz2") { - override def buildPartitionScheme(bits: Int, geom: String, geomIndex: Int): SpatialScheme = + + // number of digits required to index a "full" (g == 12) xz value, which is 25 bits + private val FullHexDigits = 7 + + val FunctionName = new FunctionNameImpl("xz2", classOf[String], parameter("geom", classOf[String], 0, 1)) + + override def buildPartitionScheme(bits: Int, geom: String, geomIndex: Int): PartitionScheme = XZ2Scheme(geom, geomIndex, bits) + + private def incrementHex(hex: String): String = incrementHex(hex, hex.length - 1) + + @tailrec + private def incrementHex(hex: String, pos: Int): String = { + val c = hex.charAt(pos) + if (c != 'f') { + hex.substring(0, pos) + (c + 1).toChar + hex.substring(pos + 1) + } else if (pos == 0) { + hex + '0' // note: this isn't actually incrementing the value but should sort after all the valid hex values + } else { + incrementHex(hex.substring(0, pos) + '0' + hex.substring(pos + 1), pos - 1) + } + } + + /** + * Function to calculate an XZ2 hex-encoded value + */ + class XZ2Function extends FunctionExpressionImpl(FunctionName) { + + private var expression: Expression = _ + + override def setParameters(params: java.util.List[Expression]): Unit = { + super.setParameters(params) + if (params.isEmpty) { + expression = GetDefaultGeometry + } else { + expression = getExpression(0) + } + } + + override def evaluate(o: AnyRef): AnyRef = { + if (o == null) { + return null + } + val value = expression.evaluate(o, classOf[Geometry]) + if (value == null) { + return null + } + XZ2Encoder.encode(value) + } + } + + private object GetDefaultGeometry extends Expression { + + override def evaluate(obj: Any): Geometry = obj match { + case sf: SimpleFeature => sf.getDefaultGeometry.asInstanceOf[Geometry] + case _ => null + } + + override def evaluate[T](obj: Any, context: Class[T]): T = evaluate(obj).asInstanceOf[T] // only called by our code, above + + override def accept(visitor: ExpressionVisitor, extraData: Any): AnyRef = throw new UnsupportedOperationException() + } } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/scala/org/locationtech/geomesa/fs/storage/core/schemes/Z2Scheme.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/scala/org/locationtech/geomesa/fs/storage/core/schemes/Z2Scheme.scala index 6b2d522f7f0..ff1d9efa814 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/scala/org/locationtech/geomesa/fs/storage/core/schemes/Z2Scheme.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/main/scala/org/locationtech/geomesa/fs/storage/core/schemes/Z2Scheme.scala @@ -13,25 +13,56 @@ import org.geotools.api.feature.simple.SimpleFeature import org.geotools.api.filter.Filter import org.geotools.geometry.jts.ReferencedEnvelope import org.locationtech.geomesa.curve.Z2SFC +import org.locationtech.geomesa.filter.FilterHelper import org.locationtech.geomesa.fs.storage.core.schemes.SpatialScheme.SpatialPartitionSchemeFactory +import org.locationtech.geomesa.utils.geotools.GeometryUtils import org.locationtech.geomesa.zorder.sfcurve.IndexRange import org.locationtech.jts.geom.Point -case class Z2Scheme(attribute: String, index: Int, bits: Int) extends SpatialScheme(Z2Scheme.name, attribute, bits) { +case class Z2Scheme(attribute: String, index: Int, bits: Int) extends PartitionScheme { import org.locationtech.geomesa.filter.{andFilters, ff} import org.locationtech.geomesa.utils.geotools.CRS_EPSG_4326 + require(bits % 2 == 0, s"Bit precision must be an even number, but received $bits") + private val xyBits = bits / 2 private val z2 = new Z2SFC(xyBits) private val xRadius = (360d / math.pow(2, xyBits)) / 2 private val yRadius = (180d / math.pow(2, xyBits)) / 2 + private val format = s"%0${digits(bits)}d" + + lazy private val wholeWorldRanges = Some(z2.ranges(Seq((-180, -90, 180, 90)))) + + override val name: String = s"${Z2Scheme.name}:attribute=$attribute:bits=$bits" override def getPartition(feature: SimpleFeature): PartitionKey = { val pt = feature.getAttribute(index).asInstanceOf[Point] PartitionKey(name, format.format(z2.index(pt.getX, pt.getY))) } + override def getRangesForFilter(filter: Filter): Option[Seq[PartitionRange]] = { + getIndexRanges(filter).map { ranges => + val builder = new RangeBuilder() + ranges.foreach { range => + val lower = format.format(range.lower) + val upper = format.format(range.upper + 1) + builder += PartitionRange(name, lower, upper) + } + builder.result() + } + } + + override def getPartitionsForFilter(filter: Filter): Option[Seq[PartitionKey]] = { + getIndexRanges(filter).orElse(wholeWorldRanges).map { ranges => + ranges.flatMap { range => + val lower = range.lower + val steps = 1 + (range.upper - lower).toInt + Seq.tabulate(steps)(i => PartitionKey(name, format.format(lower + i))) + } + } + } + override def getCoveringFilter(partition: PartitionKey): Filter = { val (x, y) = z2.invert(partition.value.toLong) val (xmin, xmax) = (x - xRadius, x + xRadius) @@ -47,12 +78,22 @@ case class Z2Scheme(attribute: String, index: Int, bits: Int) extends SpatialSch andFilters(Seq(bbox) ++ xExclusive ++ yExclusive) } - override protected def digits(bits: Int): Int = math.ceil(bits * math.log10(2)).toInt + private def getIndexRanges(filter: Filter): Option[Seq[IndexRange]] = { + val geometries = FilterHelper.extractGeometries(filter, attribute, intersect = true) + if (geometries.isEmpty) { + None + } else if (geometries.disjoint) { + Some(Seq.empty) + } else { + Some(z2.ranges(geometries.values.map(GeometryUtils.bounds))) + } + } - override protected def generateRanges(xy: Seq[(Double, Double, Double, Double)]): Seq[IndexRange] = z2.ranges(xy) + // number of digits required to print our partition values + private def digits(bits: Int): Int = math.ceil(bits * math.log10(2)).toInt } object Z2Scheme extends SpatialPartitionSchemeFactory[Point]("z2") { - override def buildPartitionScheme(bits: Int, geom: String, geomIndex: Int): SpatialScheme = + override def buildPartitionScheme(bits: Int, geom: String, geomIndex: Int): PartitionScheme = Z2Scheme(geom, geomIndex, bits) } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/test/scala/org/locationtech/geomesa/fs/storage/core/StorageMetadataTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/test/scala/org/locationtech/geomesa/fs/storage/core/StorageMetadataTest.scala index 5bf425b7b11..ed67c520028 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/test/scala/org/locationtech/geomesa/fs/storage/core/StorageMetadataTest.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/test/scala/org/locationtech/geomesa/fs/storage/core/StorageMetadataTest.scala @@ -9,7 +9,7 @@ package org.locationtech.geomesa.fs.storage.core import org.locationtech.geomesa.features.ScalaSimpleFeature -import org.locationtech.geomesa.fs.storage.core.StorageMetadata.Z2Encoder +import org.locationtech.geomesa.fs.storage.core.StorageMetadata.{XZ2Encoder, Z2Encoder} import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes import org.locationtech.geomesa.utils.text.WKTUtils import org.locationtech.jts.geom.Point @@ -21,8 +21,8 @@ class StorageMetadataTest extends SpecificationWithJUnit { private val sft = SimpleFeatureTypes.createType("test", "*geom:Point:srid=4326") - "Z2Encoder" should { - "create truncate-able z values that align with our partition scheme" in { + "ZEncoders" should { + "create truncate-able z2 values that align with our partition scheme" in { val ps = PartitionSchemeFactory.load(sft, "z2:bits=4") foreach(Seq(-67.5, -22.5, 22.5, 67.5)) { lat => foreach(Seq(-135, -45, 45, 135)) { lon => @@ -33,5 +33,16 @@ class StorageMetadataTest extends SpecificationWithJUnit { } } } + "create truncate-able xz2 values that align with our partition scheme" in { + val ps = PartitionSchemeFactory.load(sft, "xz2:bits=4") + foreach(Seq(-67.5, -22.5, 22.5, 67.5)) { lat => + foreach(Seq(-135, -45, 45, 135)) { lon => + val pt = WKTUtils.read(s"POINT($lon $lat)").asInstanceOf[Point] + val partition = ps.getPartition(ScalaSimpleFeature.create(sft, "", pt)).value + val xz2 = XZ2Encoder.encode(pt) + xz2 must startWith(partition) + } + } + } } } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/test/scala/org/locationtech/geomesa/fs/storage/core/schemes/XZ2SchemeTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/test/scala/org/locationtech/geomesa/fs/storage/core/schemes/XZ2SchemeTest.scala index 09404f4903f..f201d246faf 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/test/scala/org/locationtech/geomesa/fs/storage/core/schemes/XZ2SchemeTest.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-core/src/test/scala/org/locationtech/geomesa/fs/storage/core/schemes/XZ2SchemeTest.scala @@ -13,18 +13,21 @@ import org.locationtech.geomesa.features.ScalaSimpleFeature import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes import org.specs2.mutable.SpecificationWithJUnit +import java.util.Random + class XZ2SchemeTest extends SpecificationWithJUnit { val sft = SimpleFeatureTypes.createType("test", "*geom:Point:srid=4326") "XZ2Scheme" should { - "partition based on 10 bit curve" in { - val ps = PartitionSchemeFactory.load(sft, "xz2:bits=10") + + "partition based on 12 bit curve" in { + val ps = PartitionSchemeFactory.load(sft, "xz2:bits=12") ps must beAnInstanceOf[XZ2Scheme] - ps.asInstanceOf[XZ2Scheme].bits mustEqual 10 + ps.asInstanceOf[XZ2Scheme].bits mustEqual 12 - ps.getPartition(ScalaSimpleFeature.create(sft, "1", "POINT (10 10)")).value mustEqual "1030" - ps.getPartition(ScalaSimpleFeature.create(sft, "1", "POINT (-75 38)")).value mustEqual "0825" + ps.getPartition(ScalaSimpleFeature.create(sft, "1", "POINT (10 10)")).value mustEqual "807e" + ps.getPartition(ScalaSimpleFeature.create(sft, "1", "POINT (-75 38)")).value mustEqual "66f2" } "partition based on 20 bit curve" in { @@ -32,8 +35,24 @@ class XZ2SchemeTest extends SpecificationWithJUnit { ps must beAnInstanceOf[XZ2Scheme] ps.asInstanceOf[XZ2Scheme].bits mustEqual 20 - ps.getPartition(ScalaSimpleFeature.create(sft, "1", "POINT (10 10)")).value mustEqual "1052614" - ps.getPartition(ScalaSimpleFeature.create(sft, "1", "POINT (-75 38)")).value mustEqual "0843360" + ps.getPartition(ScalaSimpleFeature.create(sft, "1", "POINT (10 10)")).value mustEqual "807e0a" + ps.getPartition(ScalaSimpleFeature.create(sft, "1", "POINT (-75 38)")).value mustEqual "66f2db" + } + + "calculate covering filters" in { + val r = new Random(77) + val features = Seq.tabulate(100) { i => + ScalaSimpleFeature.create(sft, s"$i", s"POINT (${r.nextDouble(-180, 180)} ${r.nextDouble(-90, 90)})") + } + + foreach(Seq(4, 8, 16)) { bits => + val ps = PartitionSchemeFactory.load(sft, s"xz2:bits=$bits") + ps must beAnInstanceOf[XZ2Scheme] + foreach(features.groupBy(ps.getPartition)) { case (partition, group) => + val filter = ps.getCoveringFilter(partition) + features.filter(filter.evaluate) must containTheSameElementsAs(group) + } + } } } } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/iceberg/IcebergMapper.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/iceberg/IcebergMapper.scala index b6e97506000..87001b0515b 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/iceberg/IcebergMapper.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/main/scala/org/locationtech/geomesa/fs/storage/parquet/iceberg/IcebergMapper.scala @@ -15,7 +15,7 @@ import org.apache.parquet.ParquetReadOptions import org.apache.parquet.hadoop.ParquetFileReader import org.apache.parquet.hadoop.metadata.ParquetMetadata import org.calrissian.mango.types.{LexiTypeEncoders, TypeEncoder} -import org.locationtech.geomesa.fs.storage.core.StorageMetadata.StorageFile +import org.locationtech.geomesa.fs.storage.core.StorageMetadata.{StorageFile, XZ2Encoder, Z2Encoder} import org.locationtech.geomesa.fs.storage.core.schemes.AttributeScheme.{IntegralBucketing, WidthBucketing} import org.locationtech.geomesa.fs.storage.core.schemes._ import org.locationtech.geomesa.fs.storage.core.{FileSystemStorage, Partition, PartitionScheme} @@ -28,7 +28,6 @@ import java.net.URI import java.time.LocalDate import java.time.format.DateTimeFormatter import java.time.temporal.ChronoUnit -import java.util.HexFormat import java.util.concurrent.ConcurrentHashMap /** @@ -202,19 +201,15 @@ object IcebergMapper { } private case class Z2Mapper(scheme: Z2Scheme) extends SchemeMapper { - private val hexFormat = HexFormat.of() - private val width = scheme.bits / 4 override def spec(b: PartitionSpec.Builder): PartitionSpec.Builder = - b.truncate(ZValueField.z2(scheme.attribute).zValue, width) - override def toIceberg(partitionValue: String): String = hexFormat.toHexDigits(partitionValue.toLong, width) + b.truncate(ZValueField.z2(scheme.attribute).zValue, scheme.bits / 4) + override def toIceberg(partitionValue: String): String = Z2Encoder.encodePartition(partitionValue, scheme.bits) } private case class XZ2Mapper(scheme: XZ2Scheme) extends SchemeMapper { - private val hexFormat = HexFormat.of() - private val width = scheme.bits / 4 override def spec(b: PartitionSpec.Builder): PartitionSpec.Builder = - b.truncate(ZValueField.xz2(scheme.attribute).zValue, width) - override def toIceberg(partitionValue: String): String = hexFormat.toHexDigits(partitionValue.toLong, width) + b.truncate(ZValueField.xz2(scheme.attribute).zValue, scheme.bits / 4) + override def toIceberg(partitionValue: String): String = partitionValue } private case class HashMapper(scheme: HashScheme[_]) extends SchemeMapper {