[influxdb] Fix queries with data migrated from InfluxDB1 without item tags (#10937)

Signed-off-by: Joan Pujol <joanpujol@gmail.com>
This commit is contained in:
Joan Pujol 2021-07-06 20:01:15 +02:00 committed by GitHub
parent cdb8d46fa8
commit 26258e8ef5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 27 additions and 9 deletions

View File

@ -32,4 +32,5 @@ public class InfluxDBConstants {
public static final String TAG_CATEGORY_NAME = "category";
public static final String TAG_TYPE_NAME = "type";
public static final String TAG_LABEL_NAME = "label";
public static final String FIELD_MEASUREMENT_NAME = "_measurement";
}

View File

@ -66,10 +66,18 @@ public class Influx2FilterCriteriaQueryCreatorImpl implements FilterCriteriaQuer
String itemName = criteria.getItemName();
if (itemName != null) {
String measurementName = calculateMeasurementName(itemName);
boolean needsToUseItemTagName = !measurementName.equals(itemName);
flux = flux.filter(measurement().equal(measurementName));
if (!measurementName.equals(itemName)) {
flux = flux.filter(tag("item").equal(itemName));
if (needsToUseItemTagName) {
flux = flux.filter(tag(TAG_ITEM_NAME).equal(itemName));
}
if (needsToUseItemTagName)
flux = flux.keep(new String[] { FIELD_MEASUREMENT_NAME, COLUMN_TIME_NAME_V2, COLUMN_VALUE_NAME_V2,
TAG_ITEM_NAME });
else
flux = flux.keep(new String[] { FIELD_MEASUREMENT_NAME, COLUMN_TIME_NAME_V2, COLUMN_VALUE_NAME_V2 });
}
if (criteria.getState() != null && criteria.getOperator() != null) {

View File

@ -79,8 +79,10 @@ public class InfluxFilterCriteriaQueryCreatorImplTest {
assertThat(queryV1, equalTo("SELECT \"value\"::field,\"item\"::tag FROM origin.sampleItem;"));
String queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
assertThat(queryV2, equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)\n\t"
+ "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")"));
assertThat(queryV2,
equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)\n\t"
+ "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")\n\t"
+ "|> keep(columns:[\"_measurement\", \"_time\", \"_value\"])"));
}
@Test
@ -112,7 +114,8 @@ public class InfluxFilterCriteriaQueryCreatorImplTest {
String queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
String expectedQueryV2 = String.format(
"from(bucket:\"origin\")\n\t" + "|> range(start:%s, stop:%s)\n\t"
+ "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")",
+ "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")\n\t"
+ "|> keep(columns:[\"_measurement\", \"_time\", \"_value\"])",
INFLUX2_DATE_FORMATTER.format(now.toInstant()), INFLUX2_DATE_FORMATTER.format(tomorrow.toInstant()));
assertThat(queryV2, equalTo(expectedQueryV2));
}
@ -130,6 +133,7 @@ public class InfluxFilterCriteriaQueryCreatorImplTest {
assertThat(queryV2,
equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)\n\t"
+ "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")\n\t"
+ "|> keep(columns:[\"_measurement\", \"_time\", \"_value\"])\n\t"
+ "|> filter(fn: (r) => (r[\"_field\"] == \"value\" and r[\"_value\"] <= 90))"));
}
@ -144,7 +148,8 @@ public class InfluxFilterCriteriaQueryCreatorImplTest {
String queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
assertThat(queryV2, equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)\n\t"
+ "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")\n\t" + "|> limit(n:10, offset:20)"));
+ "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")\n\t"
+ "|> keep(columns:[\"_measurement\", \"_time\", \"_value\"])\n\t" + "|> limit(n:10, offset:20)"));
}
@Test
@ -159,6 +164,7 @@ public class InfluxFilterCriteriaQueryCreatorImplTest {
assertThat(queryV2,
equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)\n\t"
+ "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")\n\t"
+ "|> keep(columns:[\"_measurement\", \"_time\", \"_value\"])\n\t"
+ "|> sort(desc:false, columns:[\"_time\"])"));
}
@ -189,7 +195,8 @@ public class InfluxFilterCriteriaQueryCreatorImplTest {
assertThat(queryV2,
equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)\n\t"
+ "|> filter(fn: (r) => r[\"_measurement\"] == \"measurementName\")\n\t"
+ "|> filter(fn: (r) => r[\"item\"] == \"sampleItem\")"));
+ "|> filter(fn: (r) => r[\"item\"] == \"sampleItem\")\n\t"
+ "|> keep(columns:[\"_measurement\", \"_time\", \"_value\", \"item\"])"));
when(metadataRegistry.get(metadataKey))
.thenReturn(new Metadata(metadataKey, "", Map.of("key1", "val1", "key2", "val2")));
@ -198,7 +205,9 @@ public class InfluxFilterCriteriaQueryCreatorImplTest {
assertThat(queryV1, equalTo("SELECT \"value\"::field,\"item\"::tag FROM origin.sampleItem;"));
queryV2 = instanceV2.createQuery(criteria, RETENTION_POLICY);
assertThat(queryV2, equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)\n\t"
+ "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")"));
assertThat(queryV2,
equalTo("from(bucket:\"origin\")\n\t" + "|> range(start:-100y)\n\t"
+ "|> filter(fn: (r) => r[\"_measurement\"] == \"sampleItem\")\n\t"
+ "|> keep(columns:[\"_measurement\", \"_time\", \"_value\"])"));
}
}