Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,16 @@ protected void buildFilter(final ResourceRef parent, final Map<String, String[]>
{
builder.withObservedProperties(obsProps);
}

// system param
var sysIDs = parseResourceIdsOrUids("system", queryParams, idEncoders.getSystemIdEncoder());
if (sysIDs != null && !sysIDs.isEmpty())
{
if (sysIDs.isUids())
builder.withSystems().withUniqueIDs(sysIDs.getUids()).includeMembers(true).done();
else
builder.withSystems().withInternalIDs(sysIDs.getBigIds()).includeMembers(true).done();
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,24 @@

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;

import net.opengis.swe.v20.DataComponent;
import net.opengis.swe.v20.DataType;
import net.opengis.swe.v20.ScalarComponent;
import org.sensorhub.api.common.BigId;
import org.sensorhub.api.data.IDataStreamInfo;
import org.sensorhub.api.data.IObsData;
Expand All @@ -53,6 +60,7 @@
import org.sensorhub.impl.system.DataStreamTransactionHandler;
import org.sensorhub.impl.system.SystemDatabaseTransactionHandler;
import org.sensorhub.utils.CallbackException;
import org.vast.swe.SWEDataTypeUtils;
import org.vast.util.Asserts;
import com.google.common.base.Objects;
import net.opengis.swe.v20.BinaryEncoding;
Expand All @@ -68,6 +76,7 @@ public class ObsHandler extends BaseResourceHandler<BigId, IObsData, ObsFilter,
final SystemDatabaseTransactionHandler transactionHandler;
final ScheduledExecutorService threadPool;
final Map<String, CustomObsFormat> customFormats;
final SWEDataTypeUtils dataTypeUtils;


public static class ObsHandlerContextData
Expand All @@ -88,6 +97,7 @@ public ObsHandler(HandlerContext ctx, ScheduledExecutorService threadPool, Resou
this.transactionHandler = new SystemDatabaseTransactionHandler(eventBus, ctx.getWriteDb());
this.threadPool = threadPool;
this.customFormats = Asserts.checkNotNull(customFormats);
this.dataTypeUtils = new SWEDataTypeUtils();
}


Expand Down Expand Up @@ -535,7 +545,44 @@ protected ObsFilter getFilter(ResourceRef parent, Map<String, String[]> queryPar
.withRoi(geom)
.build());
}


// simple result value filter on top-level components
if (queryParams.get("filter") != null)
{
List<Predicate<IObsData>> valueFilterPredicates = new ArrayList<>();

// check if using datastream parent resource or datastream query param
if (parent.internalID != null)
{
IDataStreamInfo dsInfo = dataStore.getDataStreams().get(new DataStreamKey(parent.internalID));
if (dsInfo != null)
{
var filter = parseValueFilter(queryParams, dsInfo, parent.internalID);
if (filter != null)
valueFilterPredicates.add(filter);
}
}
else if (dsIDs != null && !dsIDs.isEmpty())
{
for (BigId dsID : dsIDs.getBigIds())
{
IDataStreamInfo dsInfo = dataStore.getDataStreams().get(new DataStreamKey(dsID));
if (dsInfo == null) continue;
var filter = parseValueFilter(queryParams, dsInfo, dsID);
if (filter != null)
valueFilterPredicates.add(filter);
}
}

if (!valueFilterPredicates.isEmpty())
{
valueFilterPredicates
.stream()
.reduce(Predicate::or)
.ifPresent(builder::withValuePredicate);
}
}

// limit
// need to limit to offset+limit+1 since we rescan from the beginning for now
if (limit != Long.MAX_VALUE)
Expand All @@ -545,6 +592,97 @@ protected ObsFilter getFilter(ResourceRef parent, Map<String, String[]> queryPar
}


Predicate<IObsData> parseValueFilter(Map<String, String[]> queryParams, IDataStreamInfo dsInfo, BigId dsID)
{
List<Predicate<IObsData>> allPredicates = new ArrayList<>();

var valueFilters = parseMultiValuesArg("filter", queryParams);

if (valueFilters == null || valueFilters.isEmpty())
return null;

for (var valueFilter : valueFilters)
{
var valueFilterPredicate = parseSingleValueFilter(valueFilter, dsInfo, dsID);
if (valueFilterPredicate != null)
allPredicates.add(valueFilterPredicate);
}

return allPredicates.stream().reduce(Predicate::or).orElse(null);
}

Predicate<IObsData> parseSingleValueFilter(String filter, IDataStreamInfo dsInfo, BigId dsID) {
String[] parts = filter.split("=", 2);
if (parts.length != 2)
throw new IllegalArgumentException("Invalid value filter: " + filter);

String field = parts[0];
String value = parts[1];

DataComponent resultStruct = dsInfo.getRecordStructure();

int index;

try {
index = resultStruct.getComponentIndex(field);
} catch (Exception ignored) {
return null;
}

if (index == -1)
throw new IllegalArgumentException("Could not find field " + field + " using value filter");

DataComponent component = resultStruct.getComponent(index);

if (!(component instanceof ScalarComponent))
throw new IllegalArgumentException("Unable to apply filter to non-scalar data: " + filter);

DataType dataType = ((ScalarComponent) component).getDataType();

Predicate<IObsData> predicate;

switch (dataType)
{
case BOOLEAN:
boolean boolValue = Boolean.parseBoolean(value);
predicate = (obsData) -> obsData.getResult().getAtomCount() > index && obsData.getResult().getBooleanValue(index) == boolValue;
break;
case BYTE:
byte byteValue = Byte.parseByte(value);
predicate = (obsData) -> obsData.getResult().getAtomCount() > index && obsData.getResult().getByteValue(index) == byteValue;
break;
case SHORT, UBYTE:
short shortValue = Short.parseShort(value);
predicate = (obsData) -> obsData.getResult().getAtomCount() > index && obsData.getResult().getShortValue(index) == shortValue;
break;
case INT, USHORT:
int intValue = Integer.parseInt(value);
predicate = (obsData) -> obsData.getResult().getAtomCount() > index && obsData.getResult().getIntValue(index) == intValue;
break;
case LONG, UINT, ULONG:
long longValue = Long.parseLong(value);
predicate = (obsData) -> obsData.getResult().getAtomCount() > index && obsData.getResult().getLongValue(index) == longValue;
break;
case FLOAT:
float floatValue = Float.parseFloat(value);
predicate = (obsData) -> obsData.getResult().getAtomCount() > index && obsData.getResult().getFloatValue(index) == floatValue;
break;
case DOUBLE:
double doubleValue = dataTypeUtils.parseDoubleOrInfOrIsoTime(value);
predicate = (obsData) -> obsData.getResult().getAtomCount() > index && obsData.getResult().getDoubleValue(index) == doubleValue;
break;
case UTF_STRING, ASCII_STRING:
predicate = (obsData) -> obsData.getResult().getAtomCount() > index && java.util.Objects.equals(obsData.getResult().getStringValue(index), value);
break;
default:
throw new IllegalArgumentException("Unsupported datatype " + dataType);
}

// prevents value predicates on wrong datastreams
return predicate.and((obsData) -> obsData.getDataStreamID().equals(dsID));
}


@Override
protected BigId addEntry(RequestContext ctx, IObsData res) throws DataStoreException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,16 @@ protected void buildFilter(final ResourceRef parent, final Map<String, String[]>
.withInternalIDs(foiIDs)
.done();
}*/

// system param
var sysIDs = parseResourceIdsOrUids("system", queryParams, idEncoders.getSystemIdEncoder());
if (sysIDs != null && !sysIDs.isEmpty())
{
if (sysIDs.isUids())
builder.withSystems().withUniqueIDs(sysIDs.getUids()).includeMembers(true).done();
else
builder.withSystems().withInternalIDs(sysIDs.getBigIds()).includeMembers(true).done();
}
}


Expand Down
Loading