Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,14 @@ public class TemporalFilter extends RangeFilter<Instant>
protected boolean timeRangeBeginsNow; // now = current time at the time of query evaluation
protected boolean timeRangeEndsNow; // now = current time at the time of query evaluation
protected boolean latestTime; // latest available time (can be in future)
protected boolean descendingOrder;


protected TemporalFilter()
{
this.range = Range.open(Instant.MIN, Instant.MAX);
}


public boolean isCurrentTime()
{
return timeRangeBeginsNow && timeRangeEndsNow;
Expand Down Expand Up @@ -78,7 +84,13 @@ public boolean isSingleValue()

return isCurrentTime() || isLatestTime() || super.isSingleValue();
}



public boolean descendingOrder()
{
return descendingOrder;
}


@Override
public Range<Instant> getRange()
Expand Down Expand Up @@ -208,6 +220,9 @@ else if (otherFilter.isLatestTime() && isAllTimes())
else if (otherFilter.isAllTimes() && isLatestTime())
return builder.withLatestTime();

var descendingOrder = this.descendingOrder || otherFilter.descendingOrder;
builder.descendingOrder(descendingOrder);

// otherwise compute time extent intersection
var thisTe = asTimeExtent();
var otherTe = otherFilter.asTimeExtent();
Expand Down Expand Up @@ -289,6 +304,7 @@ protected B copyFrom(F base)
instance.timeRangeBeginsNow = base.timeRangeBeginsNow;
instance.timeRangeEndsNow = base.timeRangeEndsNow;
instance.latestTime = base.latestTime;
instance.descendingOrder = base.descendingOrder;
return (B)this;
}

Expand Down Expand Up @@ -378,5 +394,16 @@ else if (!te.hasEnd())
else
return withRange(te.begin(), te.end());
}

/**
* Specify descending or ascending (default) chronological order.
* @param descending order
* @return This builder for chaining
*/
public B descendingOrder(boolean descending)
{
instance.descendingOrder = descending;
return (B)this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
package org.sensorhub.impl.database.registry;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import org.sensorhub.api.command.ICommandData;
import org.sensorhub.api.command.ICommandStatus;
import org.sensorhub.api.common.BigId;
import org.sensorhub.api.datastore.command.CommandFilter;
Expand Down Expand Up @@ -170,10 +173,13 @@ public Stream<Entry<BigId, ICommandStatus>> selectEntries(CommandStatusFilter fi

if (cmdStreams.isEmpty())
return Stream.empty();

Comparator<Entry<BigId, ICommandStatus>> comparator = Comparator.comparing(e -> e.getValue().getReportTime());
if (filter.getReportTime() != null && filter.getReportTime().descendingOrder())
comparator = comparator.reversed();

// stream and merge commands from all selected command streams and time periods
var mergeSortIt = new MergeSortSpliterator<Entry<BigId, ICommandStatus>>(cmdStreams,
(e1, e2) -> e1.getValue().getReportTime().compareTo(e2.getValue().getReportTime()));
var mergeSortIt = new MergeSortSpliterator<Entry<BigId, ICommandStatus>>(cmdStreams, comparator);

// stream output of merge sort iterator + apply limit
return StreamSupport.stream(mergeSortIt, false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@

package org.sensorhub.impl.database.registry;

import java.util.ArrayList;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.*;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.sensorhub.api.command.ICommandData;
Expand Down Expand Up @@ -204,10 +201,13 @@ public Stream<Entry<BigId, ICommandData>> selectEntries(CommandFilter filter, Se

if (cmdStreams.isEmpty())
return Stream.empty();


Comparator<Entry<BigId, ICommandData>> comparator = Comparator.comparing(e -> e.getValue().getIssueTime());
if (filter.getIssueTime() != null && filter.getIssueTime().descendingOrder())
comparator = comparator.reversed();

// stream and merge commands from all selected command streams and time periods
var mergeSortIt = new MergeSortSpliterator<Entry<BigId, ICommandData>>(cmdStreams,
(e1, e2) -> e1.getValue().getIssueTime().compareTo(e2.getValue().getIssueTime()));
var mergeSortIt = new MergeSortSpliterator<Entry<BigId, ICommandData>>(cmdStreams, comparator);

// stream output of merge sort iterator + apply limit
return StreamSupport.stream(mergeSortIt, false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@

package org.sensorhub.impl.database.registry;

import java.util.ArrayList;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.*;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import org.sensorhub.api.command.ICommandData;
import org.sensorhub.api.common.BigId;
import org.sensorhub.api.data.IObsData;
import org.sensorhub.api.datastore.feature.FoiFilter;
Expand Down Expand Up @@ -228,10 +227,13 @@ public Stream<Entry<BigId, IObsData>> selectEntries(ObsFilter filter, Set<ObsFie

if (obsStreams.isEmpty())
return Stream.empty();

Comparator<Entry<BigId, IObsData>> comparator = Comparator.comparing(e -> e.getValue().getPhenomenonTime());
if (filter.getPhenomenonTime() != null && filter.getPhenomenonTime().descendingOrder())
comparator = comparator.reversed();

// stream and merge obs from all selected datastreams and time periods
var mergeSortIt = new MergeSortSpliterator<Entry<BigId, IObsData>>(obsStreams,
(e1, e2) -> e1.getValue().getPhenomenonTime().compareTo(e2.getValue().getPhenomenonTime()));
var mergeSortIt = new MergeSortSpliterator<Entry<BigId, IObsData>>(obsStreams, comparator);

// stream output of merge sort iterator + apply limit
return StreamSupport.stream(mergeSortIt, false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public RangeCursor(MVMap<K, V> map, K from)

public RangeCursor(MVMap<K, V> map, K from, K to)
{
// TODO: Update to use reverse-order cursor in newer H2 version
super(map.cursor(from));
this.map = map;
this.to = to;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,7 @@
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.util.AbstractSet;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.*;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.h2.mvstore.MVBTreeMap;
Expand Down Expand Up @@ -300,10 +295,12 @@ public Stream<Entry<BigId, ICommandData>> selectEntries(CommandFilter filter, Se
return getPostFilteredResultStream(cmdStream, filter);
}));


Comparator<Entry<BigId, ICommandData>> comparator = Comparator.comparing(e -> e.getValue().getIssueTime());
if (filter.getIssueTime() != null && filter.getIssueTime().descendingOrder())
comparator = comparator.reversed();

// stream and merge commands from all selected command streams and time periods
var mergeSortIt = new MergeSortSpliterator<Entry<BigId, ICommandData>>(cmdStreams,
(e1, e2) -> e1.getValue().getIssueTime().compareTo(e2.getValue().getIssueTime()));
var mergeSortIt = new MergeSortSpliterator<Entry<BigId, ICommandData>>(cmdStreams, comparator);

// stream output of merge sort iterator + apply limit
return StreamSupport.stream(mergeSortIt, false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,7 @@
import java.nio.ByteBuffer;
import java.time.Duration;
import java.time.Instant;
import java.util.AbstractSet;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand All @@ -34,6 +29,7 @@
import org.h2.mvstore.MVBTreeMap;
import org.h2.mvstore.MVStore;
import org.h2.mvstore.RangeCursor;
import org.sensorhub.api.command.ICommandData;
import org.sensorhub.api.common.BigId;
import org.sensorhub.api.data.IObsData;
import org.sensorhub.api.datastore.feature.IFoiStore;
Expand Down Expand Up @@ -463,10 +459,13 @@ public Stream<Entry<BigId, IObsData>> selectEntries(ObsFilter filter, Set<ObsFie
return Stream.empty();

// TODO group by result time when series with different result times are selected

Comparator<Entry<BigId, IObsData>> comparator = Comparator.comparing(e -> e.getValue().getPhenomenonTime());
if (filter.getPhenomenonTime() != null && filter.getPhenomenonTime().descendingOrder())
comparator = comparator.reversed();

// stream and merge obs from all selected datastreams and time periods
var mergeSortIt = new MergeSortSpliterator<Entry<BigId, IObsData>>(obsStreams,
(e1, e2) -> e1.getValue().getPhenomenonTime().compareTo(e2.getValue().getPhenomenonTime()));
var mergeSortIt = new MergeSortSpliterator<Entry<BigId, IObsData>>(obsStreams, comparator);

// stream output of merge sort iterator + apply limit
return StreamSupport.stream(mergeSortIt, false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,31 +275,37 @@ protected IdCollection parseResourceIdsOrUids(String paramName, final Map<String


protected TemporalFilter parseTimeStampArg(String paramName, final Map<String, String[]> queryParams) throws InvalidRequestException
{
var builder = parseTimeStampArgToBuilder(paramName, queryParams);
if (builder == null)
return null;
return builder.build();
}


protected TemporalFilter.Builder parseTimeStampArgToBuilder(String paramName, final Map<String, String[]> queryParams) throws InvalidRequestException
{
var timeVal = getSingleParam(paramName, queryParams);
if (timeVal == null)
return null;

try
{
if (timeVal.equals("latest"))
{
return new TemporalFilter.Builder()
.withLatestTime()
.build();
.withLatestTime();
}
else if (timeVal.startsWith("latest/"))
{
return new TemporalFilter.Builder()
.withLatestTime()
.withRangeBeginningNow(Instant.MAX)
.build();
.withLatestTime()
.withRangeBeginningNow(Instant.MAX);
}
else
{
return new TemporalFilter.Builder()
.fromTimeExtent(TimeExtent.parse(timeVal))
.build();
.fromTimeExtent(TimeExtent.parse(timeVal));
}
}
catch (Exception e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.sensorhub.api.database.IObsSystemDatabase;
import org.sensorhub.api.datastore.DataStoreException;
import org.sensorhub.api.datastore.SpatialFilter;
import org.sensorhub.api.datastore.TemporalFilter;
import org.sensorhub.api.datastore.obs.DataStreamKey;
import org.sensorhub.api.datastore.obs.IObsStore;
import org.sensorhub.api.datastore.obs.ObsFilter;
Expand Down Expand Up @@ -477,12 +478,26 @@ protected ObsFilter getFilter(ResourceRef parent, Map<String, String[]> queryPar
// filter on parent if needed
if (parent.internalID != null)
builder.withDataStreams(parent.internalID);


// TODO attach to phenomenonTime
var phenomenonTimeFilterBuilder = new TemporalFilter.Builder();

// phenomenonTime param
var phenomenonTime = parseTimeStampArg("phenomenonTime", queryParams);
var phenomenonTime = parseTimeStampArgToBuilder("phenomenonTime", queryParams);
if (phenomenonTime != null)
builder.withPhenomenonTime(phenomenonTime);

phenomenonTimeFilterBuilder = phenomenonTime;

// chronological order, attached to phenomenonTime filter
var descendingOrder = getSingleParam("order", queryParams);
if (descendingOrder != null && !descendingOrder.isBlank()
&& ("desc".equals(descendingOrder) || "descending".equals(descendingOrder)))
{
phenomenonTimeFilterBuilder.descendingOrder(true);
}

if (phenomenonTime != null || descendingOrder != null)
builder.withPhenomenonTime(phenomenonTimeFilterBuilder.build());

// resultTime param
var resultTime = parseTimeStampArg("resultTime", queryParams);
if (resultTime != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,9 @@ protected void list(final RequestContext ctx) throws IOException
// stream and serialize all resources to servlet output
var binding = getBinding(ctx, false);
binding.startCollection();


var data = dataStore.selectEntries(filter).toList();

// fetch from DB and temporarily handle paging here
try (var results = postProcessResultList(dataStore.selectEntries(filter), filter))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.sensorhub.api.database.IObsSystemDatabase;
import org.sensorhub.api.command.ICommandStatus.CommandStatusCode;
import org.sensorhub.api.datastore.DataStoreException;
import org.sensorhub.api.datastore.TemporalFilter;
import org.sensorhub.api.datastore.command.CommandFilter;
import org.sensorhub.api.datastore.command.CommandStatusFilter;
import org.sensorhub.api.datastore.command.CommandStreamKey;
Expand Down Expand Up @@ -307,11 +308,24 @@ protected CommandFilter getFilter(ResourceRef parent, Map<String, String[]> quer
// filter on parent if needed
if (parent.internalID != null)
builder.withCommandStreams(parent.internalID);


var issueTimeFilterBuilder = new TemporalFilter.Builder();

// issueTime param
var issueTime = parseTimeStampArg("issueTime", queryParams);
var issueTime = parseTimeStampArgToBuilder("issueTime", queryParams);
if (issueTime != null)
builder.withIssueTime(issueTime);
issueTimeFilterBuilder = issueTime;

// chronological order, attached to issueTime filter
var descendingOrder = getSingleParam("order", queryParams);
if (descendingOrder != null && !descendingOrder.isBlank()
&& ("desc".equals(descendingOrder) || "descending".equals(descendingOrder)))
{
issueTimeFilterBuilder.descendingOrder(true);
}

if (issueTime != null || descendingOrder != null)
builder.withIssueTime(issueTimeFilterBuilder.build());

// status filter params
var statusCodes = parseMultiValuesArg("statusCode", queryParams);
Expand Down
Loading
Loading