diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/obs/DataStreamHandler.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/obs/DataStreamHandler.java index 278ad178d..f945575b5 100644 --- a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/obs/DataStreamHandler.java +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/obs/DataStreamHandler.java @@ -141,6 +141,16 @@ protected void buildFilter(final ResourceRef parent, final Map { builder.withObservedProperties(obsProps); } + + // system param + var sysIDs = parseResourceIdsOrUids("system", queryParams, idEncoders.getSystemIdEncoder()); + if (sysIDs != null && !sysIDs.isEmpty()) + { + if (sysIDs.isUids()) + builder.withSystems().withUniqueIDs(sysIDs.getUids()).includeMembers(true).done(); + else + builder.withSystems().withInternalIDs(sysIDs.getBigIds()).includeMembers(true).done(); + } } diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/obs/ObsHandler.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/obs/ObsHandler.java index 7081ff975..3a0146323 100644 --- a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/obs/ObsHandler.java +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/obs/ObsHandler.java @@ -16,10 +16,13 @@ import java.io.IOException; import java.time.Instant; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentLinkedQueue; @@ -27,6 +30,10 @@ import java.util.concurrent.Flow.Subscription; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; + +import net.opengis.swe.v20.DataComponent; +import net.opengis.swe.v20.DataType; +import net.opengis.swe.v20.ScalarComponent; import org.sensorhub.api.common.BigId; import org.sensorhub.api.data.IDataStreamInfo; import org.sensorhub.api.data.IObsData; @@ -53,6 +60,7 @@ import org.sensorhub.impl.system.DataStreamTransactionHandler; import org.sensorhub.impl.system.SystemDatabaseTransactionHandler; import org.sensorhub.utils.CallbackException; +import org.vast.swe.SWEDataTypeUtils; import org.vast.util.Asserts; import com.google.common.base.Objects; import net.opengis.swe.v20.BinaryEncoding; @@ -68,6 +76,7 @@ public class ObsHandler extends BaseResourceHandler customFormats; + final SWEDataTypeUtils dataTypeUtils; public static class ObsHandlerContextData @@ -88,6 +97,7 @@ public ObsHandler(HandlerContext ctx, ScheduledExecutorService threadPool, Resou this.transactionHandler = new SystemDatabaseTransactionHandler(eventBus, ctx.getWriteDb()); this.threadPool = threadPool; this.customFormats = Asserts.checkNotNull(customFormats); + this.dataTypeUtils = new SWEDataTypeUtils(); } @@ -535,7 +545,44 @@ protected ObsFilter getFilter(ResourceRef parent, Map queryPar .withRoi(geom) .build()); } - + + // simple result value filter on top-level components + if (queryParams.get("filter") != null) + { + List> valueFilterPredicates = new ArrayList<>(); + + // check if using datastream parent resource or datastream query param + if (parent.internalID != null) + { + IDataStreamInfo dsInfo = dataStore.getDataStreams().get(new DataStreamKey(parent.internalID)); + if (dsInfo != null) + { + var filter = parseValueFilter(queryParams, dsInfo, parent.internalID); + if (filter != null) + valueFilterPredicates.add(filter); + } + } + else if (dsIDs != null && !dsIDs.isEmpty()) + { + for (BigId dsID : dsIDs.getBigIds()) + { + IDataStreamInfo dsInfo = dataStore.getDataStreams().get(new DataStreamKey(dsID)); + if (dsInfo == null) continue; + var filter = parseValueFilter(queryParams, dsInfo, dsID); + if (filter != null) + valueFilterPredicates.add(filter); + } + } + + if (!valueFilterPredicates.isEmpty()) + { + valueFilterPredicates + .stream() + .reduce(Predicate::or) + .ifPresent(builder::withValuePredicate); + } + } + // limit // need to limit to offset+limit+1 since we rescan from the beginning for now if (limit != Long.MAX_VALUE) @@ -545,6 +592,97 @@ protected ObsFilter getFilter(ResourceRef parent, Map queryPar } + Predicate parseValueFilter(Map queryParams, IDataStreamInfo dsInfo, BigId dsID) + { + List> allPredicates = new ArrayList<>(); + + var valueFilters = parseMultiValuesArg("filter", queryParams); + + if (valueFilters == null || valueFilters.isEmpty()) + return null; + + for (var valueFilter : valueFilters) + { + var valueFilterPredicate = parseSingleValueFilter(valueFilter, dsInfo, dsID); + if (valueFilterPredicate != null) + allPredicates.add(valueFilterPredicate); + } + + return allPredicates.stream().reduce(Predicate::or).orElse(null); + } + + Predicate parseSingleValueFilter(String filter, IDataStreamInfo dsInfo, BigId dsID) { + String[] parts = filter.split("=", 2); + if (parts.length != 2) + throw new IllegalArgumentException("Invalid value filter: " + filter); + + String field = parts[0]; + String value = parts[1]; + + DataComponent resultStruct = dsInfo.getRecordStructure(); + + int index; + + try { + index = resultStruct.getComponentIndex(field); + } catch (Exception ignored) { + return null; + } + + if (index == -1) + throw new IllegalArgumentException("Could not find field " + field + " using value filter"); + + DataComponent component = resultStruct.getComponent(index); + + if (!(component instanceof ScalarComponent)) + throw new IllegalArgumentException("Unable to apply filter to non-scalar data: " + filter); + + DataType dataType = ((ScalarComponent) component).getDataType(); + + Predicate predicate; + + switch (dataType) + { + case BOOLEAN: + boolean boolValue = Boolean.parseBoolean(value); + predicate = (obsData) -> obsData.getResult().getAtomCount() > index && obsData.getResult().getBooleanValue(index) == boolValue; + break; + case BYTE: + byte byteValue = Byte.parseByte(value); + predicate = (obsData) -> obsData.getResult().getAtomCount() > index && obsData.getResult().getByteValue(index) == byteValue; + break; + case SHORT, UBYTE: + short shortValue = Short.parseShort(value); + predicate = (obsData) -> obsData.getResult().getAtomCount() > index && obsData.getResult().getShortValue(index) == shortValue; + break; + case INT, USHORT: + int intValue = Integer.parseInt(value); + predicate = (obsData) -> obsData.getResult().getAtomCount() > index && obsData.getResult().getIntValue(index) == intValue; + break; + case LONG, UINT, ULONG: + long longValue = Long.parseLong(value); + predicate = (obsData) -> obsData.getResult().getAtomCount() > index && obsData.getResult().getLongValue(index) == longValue; + break; + case FLOAT: + float floatValue = Float.parseFloat(value); + predicate = (obsData) -> obsData.getResult().getAtomCount() > index && obsData.getResult().getFloatValue(index) == floatValue; + break; + case DOUBLE: + double doubleValue = dataTypeUtils.parseDoubleOrInfOrIsoTime(value); + predicate = (obsData) -> obsData.getResult().getAtomCount() > index && obsData.getResult().getDoubleValue(index) == doubleValue; + break; + case UTF_STRING, ASCII_STRING: + predicate = (obsData) -> obsData.getResult().getAtomCount() > index && java.util.Objects.equals(obsData.getResult().getStringValue(index), value); + break; + default: + throw new IllegalArgumentException("Unsupported datatype " + dataType); + } + + // prevents value predicates on wrong datastreams + return predicate.and((obsData) -> obsData.getDataStreamID().equals(dsID)); + } + + @Override protected BigId addEntry(RequestContext ctx, IObsData res) throws DataStoreException { diff --git a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/task/CommandStreamHandler.java b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/task/CommandStreamHandler.java index cdc609632..5d7ed8bd2 100644 --- a/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/task/CommandStreamHandler.java +++ b/sensorhub-service-consys/src/main/java/org/sensorhub/impl/service/consys/task/CommandStreamHandler.java @@ -126,6 +126,16 @@ protected void buildFilter(final ResourceRef parent, final Map .withInternalIDs(foiIDs) .done(); }*/ + + // system param + var sysIDs = parseResourceIdsOrUids("system", queryParams, idEncoders.getSystemIdEncoder()); + if (sysIDs != null && !sysIDs.isEmpty()) + { + if (sysIDs.isUids()) + builder.withSystems().withUniqueIDs(sysIDs.getUids()).includeMembers(true).done(); + else + builder.withSystems().withInternalIDs(sysIDs.getBigIds()).includeMembers(true).done(); + } }