[rrd4j] Use RrdDbPool to prevent ClosedByInterruptException (#13332)

Using the pool prevents exceptions like:

```
java.nio.channels.ClosedByInterruptException: null
	at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:199) ~[?:?]
	at sun.nio.ch.FileChannelImpl.endBlocking(FileChannelImpl.java:162) ~[?:?]
	at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:388) ~[?:?]
	at org.rrd4j.core.RrdNioBackend.<init>(RrdNioBackend.java:94) ~[?:?]
	at org.rrd4j.core.RrdNioBackendFactory.open(RrdNioBackendFactory.java:163) ~[?:?]
	at org.rrd4j.core.RrdBackendFactory.getBackend(RrdBackendFactory.java:521) ~[?:?]
	at org.rrd4j.core.RrdDb.<init>(RrdDb.java:627) ~[?:?]
	at org.rrd4j.core.RrdDb.of(RrdDb.java:500) ~[?:?]
	at org.openhab.persistence.rrd4j.internal.RRD4jPersistenceService.getDB(RRD4jPersistenceService.java:323) ~[?:?]
	at org.openhab.persistence.rrd4j.internal.RRD4jPersistenceService.store(RRD4jPersistenceService.java:141) ~[?:?]
	at org.openhab.persistence.rrd4j.internal.RRD4jPersistenceService.lambda$0(RRD4jPersistenceService.java:211) ~[?:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
	at java.lang.Thread.run(Thread.java:829) [?:?]
```

Fixes #13297

Also includes a few code improvements.

Signed-off-by: Wouter Born <github@maindrain.net>
This commit is contained in:
Wouter Born 2022-08-29 17:15:17 +02:00 committed by GitHub
parent 07e2b69e79
commit 8eb0d2c16e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 145 additions and 123 deletions

View File

@ -17,6 +17,7 @@ import java.time.format.DateTimeFormatter;
import java.time.format.FormatStyle;
import java.util.Locale;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.openhab.core.persistence.HistoricItem;
import org.openhab.core.types.State;
@ -26,6 +27,7 @@ import org.openhab.core.types.State;
* @author Kai Kreuzer - Initial contribution
*
*/
@NonNullByDefault
public class RRD4jItem implements HistoricItem {
private final String name;

View File

@ -12,12 +12,14 @@
*/
package org.openhab.persistence.rrd4j.internal;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@ -74,6 +76,8 @@ import org.rrd4j.DsType;
import org.rrd4j.core.FetchData;
import org.rrd4j.core.FetchRequest;
import org.rrd4j.core.RrdDb;
import org.rrd4j.core.RrdDb.Builder;
import org.rrd4j.core.RrdDbPool;
import org.rrd4j.core.RrdDef;
import org.rrd4j.core.Sample;
import org.slf4j.Logger;
@ -107,13 +111,23 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
private static final String DATASOURCE_STATE = "state";
public static final String DB_FOLDER = getUserPersistenceDataFolder() + File.separator + "rrd4j";
private static final Path DB_FOLDER = Path.of(OpenHAB.getUserDataFolder(), "persistence", "rrd4j").toAbsolutePath();
private static final RrdDbPool DATABASE_POOL = new RrdDbPool();
private final Logger logger = LoggerFactory.getLogger(RRD4jPersistenceService.class);
private final Map<String, ScheduledFuture<?>> scheduledJobs = new HashMap<>();
protected final ItemRegistry itemRegistry;
private final ItemRegistry itemRegistry;
public static Path getDatabasePath(String name) {
return DB_FOLDER.resolve(name + ".rrd");
}
public static RrdDbPool getDatabasePool() {
return DATABASE_POOL;
}
@Activate
public RRD4jPersistenceService(final @Reference ItemRegistry itemRegistry) {
@ -137,89 +151,92 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
return;
}
final String name = alias == null ? item.getName() : alias;
RrdDb db = getDB(name);
if (db != null) {
ConsolFun function = getConsolidationFunction(db);
long now = System.currentTimeMillis() / 1000;
if (function != ConsolFun.AVERAGE) {
try {
// we store the last value again, so that the value change
// in the database is not interpolated, but
// happens right at this spot
if (now - 1 > db.getLastUpdateTime()) {
// only do it if there is not already a value
double lastValue = db.getLastDatasourceValue(DATASOURCE_STATE);
if (!Double.isNaN(lastValue)) {
Sample sample = db.createSample();
sample.setTime(now - 1);
sample.setValue(DATASOURCE_STATE, lastValue);
sample.update();
logger.debug("Stored '{}' as value '{}' in rrd4j database (again)", name, lastValue);
}
if (db == null) {
return;
}
ConsolFun function = getConsolidationFunction(db);
long now = System.currentTimeMillis() / 1000;
if (function != ConsolFun.AVERAGE) {
try {
// we store the last value again, so that the value change
// in the database is not interpolated, but
// happens right at this spot
if (now - 1 > db.getLastUpdateTime()) {
// only do it if there is not already a value
double lastValue = db.getLastDatasourceValue(DATASOURCE_STATE);
if (!Double.isNaN(lastValue)) {
Sample sample = db.createSample();
sample.setTime(now - 1);
sample.setValue(DATASOURCE_STATE, lastValue);
sample.update();
logger.debug("Stored '{}' as value '{}' in rrd4j database (again)", name, lastValue);
}
} catch (IOException e) {
logger.debug("Error storing last value (again): {}", e.getMessage());
}
} catch (IOException e) {
logger.debug("Error storing last value (again): {}", e.getMessage());
}
}
try {
Sample sample = db.createSample();
sample.setTime(now);
Double value = null;
if (item instanceof NumberItem && item.getState() instanceof QuantityType) {
NumberItem nItem = (NumberItem) item;
QuantityType<?> qState = (QuantityType<?>) item.getState();
Unit<? extends Quantity<?>> unit = nItem.getUnit();
if (unit != null) {
QuantityType<?> convertedState = qState.toUnit(unit);
if (convertedState != null) {
value = convertedState.doubleValue();
} else {
logger.warn(
"Failed to convert state '{}' to unit '{}'. Please check your item definition for correctness.",
qState, unit);
}
} else {
value = qState.doubleValue();
}
} else {
DecimalType state = item.getStateAs(DecimalType.class);
if (state != null) {
value = state.toBigDecimal().doubleValue();
}
}
try {
Sample sample = db.createSample();
sample.setTime(now);
Double value = null;
if (item instanceof NumberItem && item.getState() instanceof QuantityType) {
NumberItem nItem = (NumberItem) item;
QuantityType<?> qState = (QuantityType<?>) item.getState();
Unit<? extends Quantity<?>> unit = nItem.getUnit();
if (unit != null) {
QuantityType<?> convertedState = qState.toUnit(unit);
if (convertedState != null) {
value = convertedState.doubleValue();
} else {
logger.warn(
"Failed to convert state '{}' to unit '{}'. Please check your item definition for correctness.",
qState, unit);
}
} else {
value = qState.doubleValue();
}
} else {
DecimalType state = item.getStateAs(DecimalType.class);
if (state != null) {
value = state.toBigDecimal().doubleValue();
}
if (value != null) {
if (db.getDatasource(DATASOURCE_STATE).getType() == DsType.COUNTER) { // counter values must be
// adjusted by stepsize
value = value * db.getRrdDef().getStep();
}
if (value != null) {
if (db.getDatasource(DATASOURCE_STATE).getType() == DsType.COUNTER) { // counter values must be
// adjusted by stepsize
value = value * db.getRrdDef().getStep();
}
sample.setValue(DATASOURCE_STATE, value);
sample.update();
logger.debug("Stored '{}' as value '{}' in rrd4j database", name, value);
sample.setValue(DATASOURCE_STATE, value);
sample.update();
logger.debug("Stored '{}' as value '{}' in rrd4j database", name, value);
}
} catch (IllegalArgumentException e) {
String message = e.getMessage();
if (message != null && message.contains("at least one second step is required")) {
// we try to store the value one second later
ScheduledFuture<?> job = scheduledJobs.get(name);
if (job != null) {
job.cancel(true);
scheduledJobs.remove(name);
}
} catch (IllegalArgumentException e) {
String message = e.getMessage();
if (message != null && message.contains("at least one second step is required")) {
// we try to store the value one second later
ScheduledFuture<?> job = scheduledJobs.get(name);
if (job != null) {
job.cancel(true);
scheduledJobs.remove(name);
}
job = scheduler.schedule(() -> store(item, name), 1, TimeUnit.SECONDS);
scheduledJobs.put(name, job);
} else {
logger.warn("Could not persist '{}' to rrd4j database: {}", name, e.getMessage());
}
} catch (Exception e) {
job = scheduler.schedule(() -> store(item, name), 1, TimeUnit.SECONDS);
scheduledJobs.put(name, job);
} else {
logger.warn("Could not persist '{}' to rrd4j database: {}", name, e.getMessage());
}
try {
db.close();
} catch (IOException e) {
logger.debug("Error closing rrd4j database: {}", e.getMessage());
}
} catch (Exception e) {
logger.warn("Could not persist '{}' to rrd4j database: {}", name, e.getMessage());
}
try {
db.close();
} catch (IOException e) {
logger.debug("Error closing rrd4j database: {}", e.getMessage());
}
}
@ -305,6 +322,12 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
} catch (IOException e) {
logger.warn("Could not query rrd4j database for item '{}': {}", itemName, e.getMessage());
return List.of();
} finally {
try {
db.close();
} catch (IOException e) {
logger.debug("Error closing rrd4j database: {}", e.getMessage());
}
}
}
@ -315,20 +338,24 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
protected synchronized @Nullable RrdDb getDB(String alias) {
RrdDb db = null;
File file = new File(DB_FOLDER + File.separator + alias + ".rrd");
Path path = getDatabasePath(alias);
try {
if (file.exists()) {
Builder builder = RrdDb.getBuilder();
builder.setPool(DATABASE_POOL);
if (Files.exists(path)) {
// recreate the RrdDb instance from the file
db = RrdDb.of(file.getAbsolutePath());
builder.setPath(path.toString());
db = builder.build();
} else {
File folder = new File(DB_FOLDER);
if (!folder.exists()) {
folder.mkdirs();
if (!Files.exists(DB_FOLDER)) {
Files.createDirectories(DB_FOLDER);
}
RrdDef rrdDef = getRrdDef(alias, file);
RrdDef rrdDef = getRrdDef(alias, path);
if (rrdDef != null) {
// create a new database file
db = RrdDb.of(rrdDef);
builder.setRrdDef(rrdDef);
db = builder.build();
} else {
logger.debug(
"Did not create rrd4j database for item '{}' since no rrd definition could be determined. This is likely due to an unsupported item type.",
@ -336,10 +363,10 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
}
}
} catch (IOException e) {
logger.error("Could not create rrd4j database file '{}': {}", file.getAbsolutePath(), e.getMessage());
logger.error("Could not create rrd4j database file '{}': {}", path, e.getMessage());
} catch (RejectedExecutionException e) {
// this happens if the system is shut down
logger.debug("Could not create rrd4j database file '{}': {}", file.getAbsolutePath(), e.getMessage());
logger.debug("Could not create rrd4j database file '{}': {}", path, e.getMessage());
}
return db;
}
@ -376,8 +403,8 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
return useRdc;
}
private @Nullable RrdDef getRrdDef(String itemName, File file) {
RrdDef rrdDef = new RrdDef(file.getAbsolutePath());
private @Nullable RrdDef getRrdDef(String itemName, Path path) {
RrdDef rrdDef = new RrdDef(path.toString());
RrdDefConfig useRdc = getRrdDefConfig(itemName);
if (useRdc != null) {
rrdDef.setStep(useRdc.step);
@ -416,8 +443,6 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
} else if (item instanceof NumberItem) {
if (unit != null) {
return new QuantityType(value, unit);
} else {
return new DecimalType(value);
}
}
return new DecimalType(value);
@ -434,10 +459,6 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
return SUPPORTED_TYPES.contains(ItemUtil.getMainItemType(item.getType()));
}
private static String getUserPersistenceDataFolder() {
return OpenHAB.getUserDataFolder() + File.separator + "persistence";
}
@Activate
protected void activate(final Map<String, Object> config) {
modified(config);
@ -496,7 +517,7 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
while (keys.hasNext()) {
String key = keys.next();
if (key.equals("service.pid") || key.equals("component.name")) {
if ("service.pid".equals(key) || "component.name".equals(key)) {
// ignore service.pid and name
continue;
}
@ -527,11 +548,11 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
}
try {
if (property.equals("def")) {
if ("def".equals(property)) {
rrdDef.setDef(value);
} else if (property.equals("archives")) {
} else if ("archives".equals(property)) {
rrdDef.addArchives(value);
} else if (property.equals("items")) {
} else if ("items".equals(property)) {
rrdDef.addItems(value);
} else {
logger.debug("Unknown property {} : {}", property, value);
@ -552,7 +573,7 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
}
}
private class RrdArchiveDef {
private static class RrdArchiveDef {
public @Nullable ConsolFun fcn;
public double xff;
public int steps, rows;
@ -591,13 +612,13 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
return;
}
if (opts[0].equals("ABSOLUTE")) { // dsType
if ("ABSOLUTE".equals(opts[0])) { // dsType
dsType = DsType.ABSOLUTE;
} else if (opts[0].equals("COUNTER")) {
} else if ("COUNTER".equals(opts[0])) {
dsType = DsType.COUNTER;
} else if (opts[0].equals("DERIVE")) {
} else if ("DERIVE".equals(opts[0])) {
dsType = DsType.DERIVE;
} else if (opts[0].equals("GAUGE")) {
} else if ("GAUGE".equals(opts[0])) {
dsType = DsType.GAUGE;
} else {
logger.warn("{}: dsType {} not supported", name, opts[0]);
@ -605,13 +626,13 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
heartbeat = Integer.parseInt(opts[1]);
if (opts[2].equals("U")) {
if ("U".equals(opts[2])) {
min = Double.NaN;
} else {
min = Double.parseDouble(opts[2]);
}
if (opts[3].equals("U")) {
if ("U".equals(opts[3])) {
max = Double.NaN;
} else {
max = Double.parseDouble(opts[3]);
@ -634,17 +655,17 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
}
RrdArchiveDef arc = new RrdArchiveDef();
if (opts[0].equals("AVERAGE")) {
if ("AVERAGE".equals(opts[0])) {
arc.fcn = ConsolFun.AVERAGE;
} else if (opts[0].equals("MIN")) {
} else if ("MIN".equals(opts[0])) {
arc.fcn = ConsolFun.MIN;
} else if (opts[0].equals("MAX")) {
} else if ("MAX".equals(opts[0])) {
arc.fcn = ConsolFun.MAX;
} else if (opts[0].equals("LAST")) {
} else if ("LAST".equals(opts[0])) {
arc.fcn = ConsolFun.LAST;
} else if (opts[0].equals("FIRST")) {
} else if ("FIRST".equals(opts[0])) {
arc.fcn = ConsolFun.FIRST;
} else if (opts[0].equals("TOTAL")) {
} else if ("TOTAL".equals(opts[0])) {
arc.fcn = ConsolFun.TOTAL;
} else {
logger.warn("{}: consolidation function {} not supported", name, opts[0]);
@ -657,10 +678,7 @@ public class RRD4jPersistenceService implements QueryablePersistenceService {
}
public void addItems(String itemsString) {
String splitItems[] = itemsString.split(",");
for (String item : splitItems) {
itemNames.add(item);
}
Collections.addAll(itemNames, itemsString.split(","));
}
public boolean appliesTo(String item) {

View File

@ -17,7 +17,6 @@ import static java.util.Map.entry;
import java.awt.Color;
import java.awt.Font;
import java.awt.image.BufferedImage;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
@ -50,6 +49,7 @@ import org.osgi.service.http.HttpService;
import org.osgi.service.http.NamespaceException;
import org.rrd4j.ConsolFun;
import org.rrd4j.core.RrdDb;
import org.rrd4j.core.RrdDb.Builder;
import org.rrd4j.graph.RrdGraph;
import org.rrd4j.graph.RrdGraphConstants.FontTag;
import org.rrd4j.graph.RrdGraphDef;
@ -120,9 +120,7 @@ public class RRD4jChartServlet implements Servlet, ChartProvider {
try {
logger.debug("Starting up rrd chart servlet at {}", SERVLET_NAME);
httpService.registerServlet(SERVLET_NAME, this, new Hashtable<>(), httpService.createDefaultHttpContext());
} catch (NamespaceException e) {
logger.error("Error during servlet startup", e);
} catch (ServletException e) {
} catch (NamespaceException | ServletException e) {
logger.error("Error during servlet startup", e);
}
}
@ -184,13 +182,17 @@ public class RRD4jChartServlet implements Servlet, ChartProvider {
protected void addLine(RrdGraphDef graphDef, Item item, int counter) {
Color color = LINECOLORS[counter % LINECOLORS.length];
String label = itemUIRegistry.getLabel(item.getName());
String rrdName = RRD4jPersistenceService.DB_FOLDER + File.separator + item.getName() + ".rrd";
String rrdName = RRD4jPersistenceService.getDatabasePath(item.getName()).toString();
ConsolFun consolFun;
if (label != null && label.contains("[") && label.contains("]")) {
label = label.substring(0, label.indexOf('['));
}
try {
RrdDb db = RrdDb.of(rrdName);
Builder builder = RrdDb.getBuilder();
builder.setPool(RRD4jPersistenceService.getDatabasePool());
builder.setPath(rrdName);
RrdDb db = builder.build();
consolFun = db.getRrdDef().getArcDefs()[0].getConsolFun();
db.close();
} catch (IOException e) {