Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,19 @@ public void validate(UDFParameterValidator validator) throws Exception {
"Parameter \"compute\" is illegal. Please use \"batch\" (for default) or \"stream\".",
validator.getParameters().getStringOrDefault("compute", BATCH_COMPUTE))
.validate(
params -> (double) params[0] < (double) params[1],
"parameter $q1$ should be smaller than $q3$",
params ->
Double.isFinite((double) params[0])
&& Double.isFinite((double) params[1])
&& (double) params[0] < (double) params[1],
"parameter $q1$ and $q3$ should be finite, and $q1$ should be smaller than $q3$",
validator.getParameters().getDoubleOrDefault("q1", -1),
validator.getParameters().getDoubleOrDefault("q3", 1));
if (validator
.getParameters()
.getStringOrDefault("compute", BATCH_COMPUTE)
.equalsIgnoreCase(STREAM_COMPUTE)) {
validator.validateRequiredAttribute("q1").validateRequiredAttribute("q3");
}
}

@Override
Expand All @@ -84,21 +93,29 @@ public void beforeStart(UDFParameters parameters, UDTFConfigurations configurati

@Override
public void transform(Row row, PointCollector collector) throws Exception {
if (row.isNull(0)) {
return;
}
if (compute.equalsIgnoreCase(STREAM_COMPUTE) && q3 > q1) {
double v = Util.getValueAsDouble(row);
if (v < q1 - 1.5 * iqr || v > q3 + 1.5 * iqr) {
if (Double.isFinite(v) && (v < q1 - 1.5 * iqr || v > q3 + 1.5 * iqr)) {
collector.putDouble(row.getTime(), v);
}
} else if (compute.equalsIgnoreCase(BATCH_COMPUTE)) {
double v = Util.getValueAsDouble(row);
value.add(v);
timestamp.add(row.getTime());
if (Double.isFinite(v)) {
value.add(v);
timestamp.add(row.getTime());
}
}
}

@Override
public void terminate(PointCollector collector) throws Exception {
if (compute.equalsIgnoreCase(BATCH_COMPUTE)) {
if (value.isEmpty()) {
return;
}
q1 = Quantiles.quartiles().index(1).compute(value);
q3 = Quantiles.quartiles().index(3).compute(value);
iqr = q3 - q1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@

/** This function detects outliers which lies over average +/- k * sigma. */
public class UDTFKSigma implements UDTF {
private static final int DEFAULT_WINDOW_SIZE = 10000;

private double mean = 0.0;
private double variance = 0.0;
private double sumX2 = 0.0;
Expand All @@ -51,10 +53,10 @@ public void validate(UDFParameterValidator validator) throws Exception {
.validate(
x -> (int) x > 0,
"Window size should be larger than 0.",
validator.getParameters().getIntOrDefault("window", 10))
validator.getParameters().getIntOrDefault("window", DEFAULT_WINDOW_SIZE))
.validate(
x -> (double) x > 0,
"Parameter k should be larger than 0.",
x -> Double.isFinite((double) x) && (double) x > 0,
"Parameter k should be finite and larger than 0.",
validator.getParameters().getDoubleOrDefault("k", 3));
}

Expand All @@ -66,13 +68,20 @@ public void beforeStart(UDFParameters udfParameters, UDTFConfigurations udtfConf
.setOutputDataType(udfParameters.getDataType(0));
this.multipleK = udfParameters.getDoubleOrDefault("k", 3);
this.dataType = udfParameters.getDataType(0);
this.windowSize = udfParameters.getIntOrDefault("window", 10000);
this.windowSize = udfParameters.getIntOrDefault("window", DEFAULT_WINDOW_SIZE);
this.mean = 0.0;
this.variance = 0.0;
this.sumX1 = 0.0;
this.sumX2 = 0.0;
this.v = new CircularQueue<>(windowSize);
this.t = new LongCircularQueue(windowSize);
}

@Override
public void transform(Row row, PointCollector collector) throws Exception {
if (row.isNull(0)) {
return;
}
double value = Util.getValueAsDouble(row);
long timestamp = row.getTime();
if (Double.isFinite(value) && !Double.isNaN(value)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,15 @@
import org.apache.iotdb.udf.api.exception.UDFException;
import org.apache.iotdb.udf.api.type.Type;

import java.util.ArrayList;
import java.util.List;

/** This function is used to detect density anomaly of time series. */
public class UDTFLOF implements UDTF {
private int multipleK;
private int dim;
private static final String DEFAULT_METHOD = "default";
private static final String METHOD_SERIES = "series";
private String method = DEFAULT_METHOD;
private int window;

Expand Down Expand Up @@ -119,7 +123,27 @@ private double dist(Double[] nnk, Double[] x) {

@Override
public void validate(UDFParameterValidator validator) throws Exception {
validator.validateInputSeriesDataType(0, Type.INT32, Type.INT64, Type.FLOAT, Type.DOUBLE);
validator.validateInputSeriesNumber(1, Integer.MAX_VALUE);
for (int i = 0; i < validator.getParameters().getChildExpressionsSize(); i++) {
validator.validateInputSeriesDataType(i, Type.INT32, Type.INT64, Type.FLOAT, Type.DOUBLE);
}
validator
.validate(
k -> (int) k > 0,
"Parameter k should be a positive integer.",
validator.getParameters().getIntOrDefault("k", 3))
.validate(
window -> (int) window > 0,
"Parameter window should be a positive integer.",
validator.getParameters().getIntOrDefault("window", 10000))
.validate(
method -> isValidMethod((String) method),
"Method should be default or series.",
validator.getParameters().getStringOrDefault("method", DEFAULT_METHOD));
}

private static boolean isValidMethod(String method) {
return DEFAULT_METHOD.equalsIgnoreCase(method) || METHOD_SERIES.equalsIgnoreCase(method);
}

@Override
Expand All @@ -137,24 +161,31 @@ public void beforeStart(UDFParameters parameters, UDTFConfigurations configurati

@Override
public void transform(RowWindow rowWindow, PointCollector collector) throws Exception {
if (this.method.equals(DEFAULT_METHOD)) {
int size = rowWindow.windowSize();
Double[][] knn = new Double[size][dim];
long[] timestamp = new long[size];
int i = 0;
if (this.method.equalsIgnoreCase(DEFAULT_METHOD)) {
int size = 0;
Double[][] knn = new Double[rowWindow.windowSize()][dim];
long[] timestamp = new long[rowWindow.windowSize()];
int row = 0;
while (row < rowWindow.windowSize()) {
timestamp[i] = rowWindow.getRow(row).getTime();
Double[] values = new Double[dim];
boolean valid = true;
for (int j = 0; j < dim; j++) {
if (!rowWindow.getRow(row).isNull(j)) {
knn[i][j] = Util.getValueAsDouble(rowWindow.getRow(i), j);
} else {
i--;
size--;
if (rowWindow.getRow(row).isNull(j)) {
valid = false;
break;
}
double value = Util.getValueAsDouble(rowWindow.getRow(row), j);
if (!Double.isFinite(value)) {
valid = false;
break;
}
values[j] = value;
}
if (valid) {
timestamp[size] = rowWindow.getRow(row).getTime();
knn[size] = values;
size++;
}
i++;
row++;
}
if (size > multipleK) {
Expand All @@ -168,39 +199,35 @@ public void transform(RowWindow rowWindow, PointCollector collector) throws Exce
}
}
}
} else if (this.method.equals("series")) {
} else if (this.method.equalsIgnoreCase(METHOD_SERIES)) {
int size = rowWindow.windowSize() - window + 1;
if (size > 0) {
Double[][] knn = new Double[size][window];
long[] timestamp = new long[rowWindow.windowSize()];
double temp;
int i = 0;
List<Long> timestamp = new ArrayList<>();
List<Double> values = new ArrayList<>();
int row = 0;
while (row < rowWindow.windowSize()) {
timestamp[i] = rowWindow.getRow(row).getTime();
if (!rowWindow.getRow(row).isNull(0)) {
temp = Util.getValueAsDouble(rowWindow.getRow(row), 0);
for (int p = 0; p < window; p++) {
if (i - p < 0) {
break;
}
if (i - p < size) {
knn[i - p][p] = temp;
}
double value = Util.getValueAsDouble(rowWindow.getRow(row), 0);
if (Double.isFinite(value)) {
timestamp.add(rowWindow.getRow(row).getTime());
values.add(value);
}
} else {
i--;
size--;
}
i++;
row++;
}
size = values.size() - window + 1;
if (size > multipleK) {
Double[][] knn = new Double[size][window];
for (int i = 0; i < size; i++) {
for (int p = 0; p < window; p++) {
knn[i][p] = values.get(i + p);
}
}
double[] lof = new double[size];
for (int m = 0; m < size; m++) {
try {
lof[m] = getLOF(knn, knn[m], size);
collector.putDouble(timestamp[m], lof[m]);
collector.putDouble(timestamp.get(m), lof[m]);
} catch (Exception e) {
throw new UDFException(LibraryUdfMessages.FAIL_TO_GET_LOF + m, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,12 @@ public void beforeStart(UDFParameters udfp, UDTFConfigurations udtfc) throws Exc

@Override
public void transform(Row row, PointCollector collector) throws Exception {
detector.insert(row.getTime(), Util.getValueAsDouble(row));
if (!row.isNull(0)) {
double v = Util.getValueAsDouble(row);
if (Double.isFinite(v)) {
detector.insert(row.getTime(), v);
}
}
while (detector.hasNext()) {
collector.putBoolean(detector.getOutTime(), detector.getOutValue());
detector.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.udf.api.access.Row;
import org.apache.iotdb.udf.api.collector.PointCollector;
import org.apache.iotdb.udf.api.customizer.config.UDTFConfigurations;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
import org.apache.iotdb.udf.api.customizer.strategy.RowByRowAccessStrategy;
import org.apache.iotdb.udf.api.type.Type;
Expand All @@ -47,38 +48,66 @@ public class UDTFOutlier implements UDTF {
private ArrayList<Double> currentValueWindow = new ArrayList<>();
private Map<Long, Double> outliers = new HashMap<>();

@Override
public void validate(UDFParameterValidator validator) throws Exception {
validator
.validateInputSeriesNumber(1)
.validateInputSeriesDataType(0, Type.INT32, Type.INT64, Type.FLOAT, Type.DOUBLE)
.validate(
x -> (int) x > 0,
"Parameter k should be a positive integer.",
validator.getParameters().getIntOrDefault("k", 3))
.validate(
x -> Double.isFinite((double) x) && (double) x >= 0,
"Parameter r should be finite and non-negative.",
validator.getParameters().getDoubleOrDefault("r", 5))
.validate(
x -> (int) x > 0,
"Parameter w should be a positive integer.",
validator.getParameters().getIntOrDefault("w", 1000))
.validate(
x -> (int) x > 0,
"Parameter s should be a positive integer.",
validator.getParameters().getIntOrDefault("s", 500));
}

@Override
public void beforeStart(UDFParameters udfParameters, UDTFConfigurations udtfConfigurations)
throws Exception {
udtfConfigurations
.setAccessStrategy(new RowByRowAccessStrategy())
.setOutputDataType(udfParameters.getDataType(0));
.setOutputDataType(Type.DOUBLE);
this.k = udfParameters.getIntOrDefault("k", 3);
this.r = udfParameters.getDoubleOrDefault("r", 5);
this.w = udfParameters.getIntOrDefault("w", 1000);
this.s = udfParameters.getIntOrDefault("s", 500);

this.i = 0;

udtfConfigurations.setAccessStrategy(new RowByRowAccessStrategy());
udtfConfigurations.setOutputDataType(Type.DOUBLE);
currentTimeWindow.clear();
currentValueWindow.clear();
outliers.clear();
}

@Override
public void transform(Row row, PointCollector collector) throws Exception {
if (!row.isNull(0)) {
if (i >= w && (i - w) % s == 0) {
detect();
}
if (row.isNull(0)) {
return;
}
double v = Util.getValueAsDouble(row);
if (!Double.isFinite(v)) {
return;
}
if (i >= w && (i - w) % s == 0) {
detect();
}

if (i >= w) {
currentValueWindow.remove(0);
currentTimeWindow.remove(0);
}
currentTimeWindow.add(row.getTime());
currentValueWindow.add(Util.getValueAsDouble(row));
i += 1;
if (i >= w) {
currentValueWindow.remove(0);
currentTimeWindow.remove(0);
}
currentTimeWindow.add(row.getTime());
currentValueWindow.add(v);
i += 1;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,17 @@ public class UDTFRange implements UDTF {
public void validate(UDFParameterValidator validator) throws Exception {
validator
.validateInputSeriesNumber(1)
.validateInputSeriesDataType(0, Type.INT32, Type.INT64, Type.FLOAT, Type.DOUBLE);
.validateInputSeriesDataType(0, Type.INT32, Type.INT64, Type.FLOAT, Type.DOUBLE)
.validateRequiredAttribute("lower_bound")
.validateRequiredAttribute("upper_bound")
.validate(
params ->
Double.isFinite((double) params[0])
&& Double.isFinite((double) params[1])
&& (double) params[0] < (double) params[1],
"parameter \"lower_bound\" and \"upper_bound\" should be finite, and \"lower_bound\" should be smaller than \"upper_bound\".",
validator.getParameters().getDouble("lower_bound"),
validator.getParameters().getDouble("upper_bound"));
}

@Override
Expand All @@ -56,6 +66,9 @@ public void beforeStart(UDFParameters parameters, UDTFConfigurations configurati

@Override
public void transform(Row row, PointCollector collector) throws Exception {
if (row.isNull(0)) {
return;
}
int intValue;
long longValue;
float floatValue;
Expand All @@ -77,13 +90,14 @@ public void transform(Row row, PointCollector collector) throws Exception {
break;
case FLOAT:
floatValue = row.getFloat(0);
if (floatValue > upperBound || floatValue < lowerBound) {
if (Float.isFinite(floatValue) && (floatValue > upperBound || floatValue < lowerBound)) {
collector.putFloat(timestamp, floatValue);
}
break;
case DOUBLE:
doubleValue = row.getDouble(0);
if (doubleValue > upperBound || doubleValue < lowerBound) {
if (Double.isFinite(doubleValue)
&& (doubleValue > upperBound || doubleValue < lowerBound)) {
collector.putDouble(timestamp, doubleValue);
}
break;
Expand Down
Loading
Loading