[rrd4j] Improved the internal data structure (#16389)

Signed-off-by: Jörg Sautter <joerg.sautter@gmx.net>
Signed-off-by: Ciprian Pascu <contact@ciprianpascu.ro>
This commit is contained in:
joerg1985 2024-02-17 19:53:07 +01:00 committed by Ciprian Pascu
parent a7eb60c15d
commit a2b03aca60

View File

@ -25,6 +25,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
@ -102,6 +103,15 @@ import org.slf4j.LoggerFactory;
QueryablePersistenceService.class }, configurationPid = "org.openhab.rrd4j", configurationPolicy = ConfigurationPolicy.OPTIONAL)
public class RRD4jPersistenceService implements QueryablePersistenceService {
private record Key(long timestamp, String name) implements Comparable<Key> {
@Override
public int compareTo(Key other) {
int c = Long.compare(timestamp, other.timestamp);
return (c == 0) ? Objects.compare(name, other.name, String::compareTo) : c;
}
}
public static final String SERVICE_ID = "rrd4j";
private static final String DEFAULT_OTHER = "default_other";
@ -116,7 +126,7 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
private final Map<String, RrdDefConfig> rrdDefs = new ConcurrentHashMap<>();
private final ConcurrentSkipListMap<Long, Map<String, Double>> storageMap = new ConcurrentSkipListMap<>();
private final ConcurrentSkipListMap<Key, Double> storageMap = new ConcurrentSkipListMap<>(Key::compareTo);
private static final String DATASOURCE_STATE = "state";
@ -318,7 +328,7 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
}
long now = System.currentTimeMillis() / 1000;
Double oldValue = storageMap.computeIfAbsent(now, t -> new ConcurrentHashMap<>()).put(name, value);
Double oldValue = storageMap.put(new Key(now, name), value);
if (oldValue != null && !oldValue.equals(value)) {
logger.debug(
"Discarding value {} for item {} with timestamp {} because a new value ({}) arrived with the same timestamp.",
@ -327,14 +337,14 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
}
private void doStore(boolean force) {
long now = System.currentTimeMillis() / 1000;
while (!storageMap.isEmpty()) {
long timestamp = storageMap.firstKey();
long now = System.currentTimeMillis() / 1000;
if (now > timestamp || force) {
Key key = storageMap.firstKey();
if (now > key.timestamp || force) {
// no new elements can be added for this timestamp because we are already past that time or the service
// requires forced storing
Map<String, Double> values = storageMap.pollFirstEntry().getValue();
values.forEach((name, value) -> writePointToDatabase(name, value, timestamp));
Double value = storageMap.pollFirstEntry().getValue();
writePointToDatabase(key.name, value, key.timestamp);
} else {
return;
}