Skip to content

Commit

Permalink
Allow more flexible database purge policy configuration (#268)
Browse files Browse the repository at this point in the history
  • Loading branch information
earocorn authored Sep 13, 2024
1 parent c48274b commit 15efc0d
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import org.slf4j.Logger;

import java.util.Collection;

/**
* <p>
Expand All @@ -35,6 +36,7 @@ public interface IObsSystemDbAutoPurgePolicy
* for this aging policy
* @param db
* @param log
* @param systemUIDs
*/
public void trimStorage(IObsSystemDatabase db, Logger log);
public void trimStorage(IObsSystemDatabase db, Logger log, Collection<String> systemUIDs);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import org.sensorhub.api.config.DisplayInfo;
import org.sensorhub.api.database.IObsSystemDbAutoPurgePolicy;

import java.util.ArrayList;
import java.util.List;

/**
* <p>
Expand All @@ -36,6 +38,10 @@ public abstract class HistoricalObsAutoPurgeConfig
@DisplayInfo(label="Purge Execution Period", desc="Execution period of the purge policy (in seconds)")
public double purgePeriod = 3600.0;


@DisplayInfo.Required
@DisplayInfo.FieldType(DisplayInfo.FieldType.Type.SYSTEM_UID)
@DisplayInfo(label="System UIDs", desc="Unique IDs of system drivers to purge")
public List<String> systemUIDs = new ArrayList<>(List.of("*"));

public abstract IObsSystemDbAutoPurgePolicy getPolicy();
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.sensorhub.api.datastore.system.SystemFilter;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import org.sensorhub.api.database.IObsSystemDatabase;
import org.slf4j.Logger;
import org.vast.util.DateTimeFormat;
Expand Down Expand Up @@ -52,47 +53,62 @@ public class MaxAgeAutoPurgePolicy implements IObsSystemDbAutoPurgePolicy


@Override
public void trimStorage(IObsSystemDatabase db, Logger log)
public void trimStorage(IObsSystemDatabase db, Logger log, Collection<String> systemUIDs)
{
// remove all systems, datastreams, commandstreams and fois whose validity time period
// ended before (now - max age)
var oldestRecordTime = Instant.now().minusSeconds((long)config.maxRecordAge);

long numProcRemoved = db.getSystemDescStore().removeEntries(new SystemFilter.Builder()
.withValidTime(new TemporalFilter.Builder()
.withOperator(RangeOp.CONTAINS)
.withRange(Instant.MIN, oldestRecordTime)
.build())
.withUniqueIDs(systemUIDs)
.includeMembers(true)
.build());

long numFoisRemoved = db.getFoiStore().removeEntries(new FoiFilter.Builder()
.withValidTime(new TemporalFilter.Builder()
.withOperator(RangeOp.CONTAINS)
.withRange(Instant.MIN, oldestRecordTime)
.build())
.withUniqueIDs(systemUIDs)
.includeMembers(true)
.build());

long numDsRemoved = db.getDataStreamStore().removeEntries(new DataStreamFilter.Builder()
.withValidTime(new TemporalFilter.Builder()
.withOperator(RangeOp.CONTAINS)
.withRange(Instant.MIN, oldestRecordTime)
.build())
.withSystems(new SystemFilter.Builder()
.withUniqueIDs(systemUIDs)
.includeMembers(true)
.build())
.build());

long numCsRemoved = db.getCommandStreamStore().removeEntries(new CommandStreamFilter.Builder()
.withValidTime(new TemporalFilter.Builder()
.withOperator(RangeOp.CONTAINS)
.withRange(Instant.MIN, oldestRecordTime)
.build())
.withSystems(new SystemFilter.Builder()
.withUniqueIDs(systemUIDs)
.includeMembers(true)
.build())
.build());

// for each remaining datastream, remove all obs with a timestamp older than
// the latest result time minus the max age
long numObsRemoved = 0;
var allDataStreams = db.getDataStreamStore().selectEntries(db.getDataStreamStore().selectAllFilter()).iterator();
while (allDataStreams.hasNext())
var dataStreams = db.getDataStreamStore()
.selectEntries(new DataStreamFilter.Builder()
.withSystems(new SystemFilter.Builder()
.withUniqueIDs(systemUIDs).includeMembers(true).build()).build()).iterator();
while (dataStreams.hasNext())
{
var dsEntry = allDataStreams.next();
var dsEntry = dataStreams.next();
var dsID = dsEntry.getKey().getInternalID();
var resultTimeRange = dsEntry.getValue().getResultTimeRange();

Expand All @@ -109,10 +125,13 @@ public void trimStorage(IObsSystemDatabase db, Logger log)
// for each remaining command stream, remove all commands and status with a timestamp older than
// the latest issue time minus the max age
long numCmdRemoved = 0;
var allCmdStreams = db.getCommandStreamStore().selectEntries(db.getCommandStreamStore().selectAllFilter()).iterator();
while (allCmdStreams.hasNext())
var cmdStreams = db.getCommandStreamStore().selectEntries(
new CommandStreamFilter.Builder()
.withSystems(new SystemFilter.Builder()
.withUniqueIDs(systemUIDs).includeMembers(true).build()).build()).iterator();
while (cmdStreams.hasNext())
{
var dsEntry = allCmdStreams.next();
var dsEntry = cmdStreams.next();
var dsID = dsEntry.getKey().getInternalID();
var issueTimeRange = dsEntry.getValue().getIssueTimeRange();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,21 +80,27 @@ protected void doStart() throws SensorHubException
{
throw new DataStoreException("Cannot instantiate underlying database " + config.dbConfig.moduleClass, e);
}

if(!config.autoPurgeConfig.isEmpty())
autoPurgeTimer = new Timer();

// start auto-purge timer thread if policy is specified and enabled
if (config.autoPurgeConfig != null && config.autoPurgeConfig.enabled)
for(var autoPurgeConfig : config.autoPurgeConfig)
{
final IObsSystemDbAutoPurgePolicy policy = config.autoPurgeConfig.getPolicy();
autoPurgeTimer = new Timer();
TimerTask task = new TimerTask() {
public void run()
{
if (!db.isReadOnly())
policy.trimStorage(db, logger);
}
};

autoPurgeTimer.schedule(task, 0, (long)(config.autoPurgeConfig.purgePeriod*1000));
if (autoPurgeConfig != null && autoPurgeConfig.enabled)
{
var uids = Collections.unmodifiableCollection(autoPurgeConfig.systemUIDs);
final IObsSystemDbAutoPurgePolicy policy = autoPurgeConfig.getPolicy();
TimerTask task = new TimerTask() {
public void run()
{
if (!db.isReadOnly())
policy.trimStorage(db, logger, uids);
}
};

autoPurgeTimer.schedule(task, 0, (long)(autoPurgeConfig.purgePeriod*1000));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package org.sensorhub.impl.database.system;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import org.sensorhub.api.config.DisplayInfo;
Expand Down Expand Up @@ -46,7 +48,7 @@ public class SystemDriverDatabaseConfig extends DatabaseConfig


@DisplayInfo(label="Automatic Purge Policy", desc="Policy for automatically purging historical data")
public HistoricalObsAutoPurgeConfig autoPurgeConfig;
public List<HistoricalObsAutoPurgeConfig> autoPurgeConfig = new ArrayList<>();


@DisplayInfo(desc="Minimum period between database commits (in ms)")
Expand Down

0 comments on commit 15efc0d

Please sign in to comment.