diff --git a/NAMESPACE b/NAMESPACE index c5a67095ac..9b5930ece7 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -51,6 +51,7 @@ S3method(cube, data.table) S3method(rollup, data.table) export(frollmean) export(frollsum) +export(frollmax) export(frollapply) export(nafill) export(setnafill) diff --git a/NEWS.md b/NEWS.md index 79fc00a5a1..b2ee2ea71e 100644 --- a/NEWS.md +++ b/NEWS.md @@ -14,6 +14,8 @@ # 2: ``` +2. New window function `frollmax` computes the rolling maximum. Request came from @gpierard who needs left-aligned, adaptive, rolling max, [#5438](https://github.com/Rdatatable/data.table/issues/5438). Adaptive rolling functions did not have support for `align="left"`, therefore we added this feature as well for all adaptive rolling functions. We measure adaptive `frollmax` to be up to 50 times faster than the next fastest solution using `max` and grouping `by=.EACHI`. + ## NOTES 1. `transform` method for data.table sped up substantially when creating new columns on large tables. Thanks to @OfekShilon for the report and PR. The implemented solution was proposed by @ColeMiller1. diff --git a/R/froll.R b/R/froll.R index df901f0b84..27085f99f8 100644 --- a/R/froll.R +++ b/R/froll.R @@ -2,8 +2,25 @@ froll = function(fun, x, n, fill=NA, algo=c("fast", "exact"), align=c("right", " stopifnot(!missing(fun), is.character(fun), length(fun)==1L, !is.na(fun)) algo = match.arg(algo) align = match.arg(align) + leftadaptive = isTRUE(adaptive) && align=="left" ## support for left added in #5441 + if (leftadaptive) { + rev2 = function(x) if (is.list(x)) sapply(x, rev, simplify=FALSE) else rev(x) + verbose = getOption("datatable.verbose") + if (verbose) + catf("froll: adaptive=TRUE && align='left' pre-processing for align='right'\n") + ## TODO test atomic x but list of lists n (multiple windows)! + x = rev2(x) + n = rev2(n) + align = "right" + } ans = .Call(CfrollfunR, fun, x, n, fill, algo, align, na.rm, hasNA, adaptive) - ans + if (!leftadaptive) + ans + else { + if (verbose) + catf("froll: adaptive=TRUE && align='left' post-processing from align='right'\n") + rev2(ans) + } } frollmean = function(x, n, fill=NA, algo=c("fast", "exact"), align=c("right", "left", "center"), na.rm=FALSE, hasNA=NA, adaptive=FALSE) { @@ -12,9 +29,14 @@ frollmean = function(x, n, fill=NA, algo=c("fast", "exact"), align=c("right", "l frollsum = function(x, n, fill=NA, algo=c("fast","exact"), align=c("right", "left", "center"), na.rm=FALSE, hasNA=NA, adaptive=FALSE) { froll(fun="sum", x=x, n=n, fill=fill, algo=algo, align=align, na.rm=na.rm, hasNA=hasNA, adaptive=adaptive) } -frollapply = function(x, n, FUN, ..., fill=NA, align=c("right", "left", "center")) { +frollmax = function(x, n, fill=NA, algo=c("fast", "exact"), align=c("right", "left", "center"), na.rm=FALSE, hasNA=NA, adaptive=FALSE) { + froll(fun="max", x=x, n=n, fill=fill, algo=algo, align=align, na.rm=na.rm, hasNA=hasNA, adaptive=adaptive) +} +frollapply = function(x, n, FUN, ..., fill=NA, align=c("right", "left", "center"), adaptive) { FUN = match.fun(FUN) align = match.arg(align) + if (!missing(adaptive)) + stopf("frollapply does not support 'adaptive' argument") rho = new.env() ans = .Call(CfrollapplyR, FUN, x, n, fill, align, rho) ans diff --git a/inst/tests/froll.Rraw b/inst/tests/froll.Rraw index f6a4f96a80..54c67e5628 100644 --- a/inst/tests/froll.Rraw +++ b/inst/tests/froll.Rraw @@ -563,8 +563,8 @@ if (FALSE) { #### adaptive limitations test(6000.145, frollmean(1:2, 1:2, adaptive=TRUE, align="right"), c(1, 1.5)) -test(6000.146, frollmean(1:2, 1:2, adaptive=TRUE, align="center"), error="using adaptive TRUE and align argument different than 'right' is not implemented") -test(6000.147, frollmean(1:2, 1:2, adaptive=TRUE, align="left"), error="using adaptive TRUE and align argument different than 'right' is not implemented") +test(6000.146, frollmean(1:2, 1:2, adaptive=TRUE, align="center"), error="using adaptive TRUE and align 'center' is not implemented") +##test(6000.147, frollmean(1:2, 1:2, adaptive=TRUE, align="left"), error="using adaptive TRUE and align argument different than 'right' is not implemented") ## support added in #5441, TODO add tests test(6000.148, frollmean(list(1:2, 1:3), list(1:2), adaptive=TRUE), error="adaptive rolling function can only process 'x' having equal length of elements, like data.table or data.frame. If you want to call rolling function on list having variable length of elements call it for each field separately") #### adaptive exact diff --git a/man/froll.Rd b/man/froll.Rd index d6cb75067f..c718bafa0b 100644 --- a/man/froll.Rd +++ b/man/froll.Rd @@ -7,7 +7,9 @@ \alias{rollmean} \alias{frollmean} \alias{rollsum} +\alias{rollmax} \alias{frollsum} +\alias{frollmax} \alias{rollapply} \alias{frollapply} \title{Rolling functions} @@ -19,7 +21,9 @@ frollmean(x, n, fill=NA, algo=c("fast", "exact"), align=c("right", "left", "center"), na.rm=FALSE, hasNA=NA, adaptive=FALSE) frollsum(x, n, fill=NA, algo=c("fast","exact"), align=c("right", "left", "center"), na.rm=FALSE, hasNA=NA, adaptive=FALSE) -frollapply(x, n, FUN, \dots, fill=NA, align=c("right", "left", "center")) +frollmax(x, n, fill=NA, algo=c("fast","exact"), + align=c("right", "left", "center"), na.rm=FALSE, hasNA=NA, adaptive=FALSE) +frollapply(x, n, FUN, \dots, fill=NA, align=c("right", "left", "center"), adaptive) } \arguments{ \item{x}{ Vector, \code{data.frame} or \code{data.table} of integer, numeric or logical columns over which to calculate the windowed aggregations. May also be a list, in which case the rolling function is applied to each of its elements. } @@ -45,36 +49,69 @@ frollapply(x, n, FUN, \dots, fill=NA, align=c("right", "left", "center")) conveniently within \code{data.table} syntax. Argument \code{n} allows multiple values to apply rolling functions on - multiple window sizes. If \code{adaptive=TRUE}, then \code{n} must be a list. - Each list element must be integer vector of window sizes corresponding - to every single observation in each column; see Examples. + multiple window sizes. If \code{adaptive=TRUE}, then \code{n} must be a list, + see \emph{Adaptive rolling functions} section below for details. - When \code{algo="fast"} an \emph{"on-line"} algorithm is used, and - all of \code{NaN, +Inf, -Inf} are treated as \code{NA}. - Setting \code{algo="exact"} will make rolling functions to use a more - computationally-intensive algorithm that suffers less from floating point - rounding error (the same consideration applies to \code{\link[base]{mean}}). - \code{algo="exact"} also handles \code{NaN, +Inf, -Inf} consistently to - base R. In case of some functions (like \emph{mean}), it will additionally + When multiple columns or multiple window widths are provided, then they + are run in parallel. The exception is for \code{algo="exact"}, which runs in + parallel already. See \code{\link{setDTthreads}} for defaults and further details on parallelism in data.table. +} +\section{\code{hasNA} argument}{ + \code{hasNA} can be used to speed up processing in cases when it is known + whether \code{x} contains infinite values \code{NA, NaN, +Inf, -Inf}. + \itemize{ + \item{ Default \code{hasNA=NA} will use faster \code{NA} agnostic implementation, + but when \code{NA}s are detected it will re-run \code{NA} aware implementation. } + \item{ \code{hasNA=TRUE} will use \code{NA} aware implementation straightaway. } + \item{ \code{hasNA=FALSE} will use faster \code{NA} agnostic implementation. + Then depending on the rolling function it will either + \itemize{ + \item{ (\emph{mean, sum}) detect \code{NA}s, raise warning, re-run \code{NA} aware. } + \item{ (\emph{max}) not detect \code{NA}s and may silently produce an incorrect + answer. }} + Therefore \code{hasNA=FALSE} should be used with care. + } + } +} +\section{Implementation}{ + \itemize{ + \item{ \code{algo="fast"} uses \emph{"on-line"} algorithm, and + all of \code{NaN, +Inf, -Inf} are treated as \code{NA}. Not all functions + have \emph{fast} implementation available. As of now + \emph{max} and \code{adaptive=TRUE} do not have it, therefore it will + automatically fall back to \emph{exact} implementation. \code{datatable.verbose} + option can be used to check that. } + \item{ \code{algo="exact"} will make rolling functions use a more + computationally-intensive algorithm. For each observation from input vector + it will compute a function on a window from scratch (complexity \eqn{O(n^2)}). + Depending on the function, this algorithm may suffers less from + floating point rounding error (the same consideration applies to base \code{\link[base]{mean}}). + Algorithm also handles \code{NaN, +Inf, -Inf} consistently to + base R, unless - for some functions (e.g. \emph{max}) - \code{hasNA} is \code{FALSE} + but NAs are present. In case of some functions (like \emph{mean}), it will additionally make extra pass to perform floating point error correction. Error corrections might not be truly exact on some platforms (like Windows) - when using multiple threads. - + when using multiple threads. } + } +} +\section{Adaptive rolling functions}{ Adaptive rolling functions are a special case where each - observation has its own corresponding rolling window width. Due to the logic - of adaptive rolling functions, the following restrictions apply: + observation has its own corresponding rolling window width. \code{n} + argument must be a list, then each list element must be an integer vector + of window sizes corresponding to every single observation in each column; + see Examples. Due to the logic or implementation of adaptive rolling + functions, the following restrictions apply: \itemize{ - \item \code{align} only \code{"right"}. + \item \code{align} does not support \code{"center"}. \item if list of vectors is passed to \code{x}, then all vectors within it must have equal length. + \item functionality is not suported in \code{frollapply}. } - - When multiple columns or multiple windows width are provided, then they - are run in parallel. The exception is for \code{algo="exact"}, which runs in - parallel already. - +} +\section{\code{frollapply}}{ \code{frollapply} computes rolling aggregate on arbitrary R functions. - The input \code{x} (first argument) to the function \code{FUN} + \code{adaptive} argument is not supported. The input + \code{x} (first argument) to the function \code{FUN} is coerced to \emph{numeric} beforehand and \code{FUN} has to return a scalar \emph{numeric} value. Checks for that are made only during the first iteration when \code{FUN} is evaluated. Edge cases can be @@ -84,32 +121,34 @@ frollapply(x, n, FUN, \dots, fill=NA, align=c("right", "left", "center")) because there is no thread-safe API to R's C \code{eval}. Nevertheless we've seen the computation speed up vis-a-vis versions implemented in base R. } -\value{ - A list except when the input is a \code{vector} and - \code{length(n)==1} in which case a \code{vector} is returned. -} -\note{ +\section{\code{zoo} package users notice}{ Users coming from most popular package for rolling functions \code{zoo} might expect following differences in \code{data.table} implementation. \itemize{ - \item rolling function will always return result of the same length as input. - \item \code{fill} defaults to \code{NA}. + \item rolling functions will always return result of the same length + as input. + \item \code{fill} defaults to \code{NA}. \item \code{fill} accepts only constant values. It does not support for \emph{na.locf} or other functions. - \item \code{align} defaults to \code{"right"}. + \item \code{align} defaults to \code{"right"}. \item \code{na.rm} is respected, and other functions are not needed when input contains \code{NA}. - \item integers and logical are always coerced to double. + \item integers and logical are always coerced to double. \item when \code{adaptive=FALSE} (default), then \code{n} must be a numeric vector. List is not accepted. \item when \code{adaptive=TRUE}, then \code{n} must be vector of length equal to \code{nrow(x)}, or list of such vectors. \item \code{partial} window feature is not supported, although it can - be accomplished by using \code{adaptive=TRUE}, see - examples. \code{NA} is always returned for incomplete windows. + be accomplished by using \code{adaptive=TRUE}, see examples. + \code{NA} is always returned for incomplete windows. } - +} +\value{ + A list except when the input is a \code{vector} and + \code{length(n)==1} in which case a \code{vector} is returned. +} +\note{ Be aware that rolling functions operates on the physical order of input. If the intent is to roll values in a vector by a logical window, for example an hour, or a day, one has to ensure that there are no gaps in diff --git a/src/data.table.h b/src/data.table.h index 0a6eb207a8..a6213b7313 100644 --- a/src/data.table.h +++ b/src/data.table.h @@ -206,6 +206,9 @@ void frollmeanExact(double *x, uint64_t nx, ans_t *ans, int k, double fill, bool void frollsum(unsigned int algo, double *x, uint64_t nx, ans_t *ans, int k, int align, double fill, bool narm, int hasna, bool verbose); void frollsumFast(double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasna, bool verbose); void frollsumExact(double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasna, bool verbose); +void frollmax(unsigned int algo, double *x, uint64_t nx, ans_t *ans, int k, int align, double fill, bool narm, int hasna, bool verbose); +void frollmaxFast(double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasna, bool verbose); +void frollmaxExact(double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasna, bool verbose); void frollapply(double *x, int64_t nx, double *w, int k, ans_t *ans, int align, double fill, SEXP call, SEXP rho, bool verbose); // frolladaptive.c @@ -215,6 +218,9 @@ void fadaptiverollmeanExact(double *x, uint64_t nx, ans_t *ans, int *k, double f void fadaptiverollsum(unsigned int algo, double *x, uint64_t nx, ans_t *ans, int *k, double fill, bool narm, int hasna, bool verbose); void fadaptiverollsumFast(double *x, uint64_t nx, ans_t *ans, int *k, double fill, bool narm, int hasna, bool verbose); void fadaptiverollsumExact(double *x, uint64_t nx, ans_t *ans, int *k, double fill, bool narm, int hasna, bool verbose); +void fadaptiverollmax(unsigned int algo, double *x, uint64_t nx, ans_t *ans, int *k, double fill, bool narm, int hasna, bool verbose); +//void fadaptiverollmaxFast(double *x, uint64_t nx, ans_t *ans, int *k, double fill, bool narm, int hasna, bool verbose); // does not exists as of now +void fadaptiverollmaxExact(double *x, uint64_t nx, ans_t *ans, int *k, double fill, bool narm, int hasna, bool verbose); // frollR.c SEXP frollfunR(SEXP fun, SEXP obj, SEXP k, SEXP fill, SEXP algo, SEXP align, SEXP narm, SEXP hasNA, SEXP adaptive); diff --git a/src/froll.c b/src/froll.c index 3ab7bd927a..1c0fa25371 100644 --- a/src/froll.c +++ b/src/froll.c @@ -49,7 +49,7 @@ void frollmeanFast(double *x, uint64_t nx, ans_t *ans, int k, double fill, bool long double w = 0.0; // sliding window aggregate bool truehasna = hasna>0; // flag to re-run with NA support if NAs detected if (!truehasna) { - int i; // iterator declared here because it is being used after for loop + int i; // iterator declared here because it is being used after for loop, it is int32 rather than int64 because first loop over k-1 observations cannot exceed int32 range for (i=0; idbl_v[i] = fill; // answers are fill for partial window @@ -217,6 +217,7 @@ void frollmeanExact(double *x, uint64_t nx, ans_t *ans, int k, double fill, bool } /* fast rolling sum */ +// see frollmean code for comments describing online implementations for rolling functions void frollsum(unsigned int algo, double *x, uint64_t nx, ans_t *ans, int k, int align, double fill, bool narm, int hasna, bool verbose) { if (nx < k) { if (verbose) @@ -397,6 +398,278 @@ void frollsumExact(double *x, uint64_t nx, ans_t *ans, int k, double fill, bool } } +/* fast rolling max */ +// see frollmean code for comments describing online implementations for rolling functions +void frollmax(unsigned int algo, double *x, uint64_t nx, ans_t *ans, int k, int align, double fill, bool narm, int hasna, bool verbose) { + if (nx < k) { + if (verbose) + snprintf(end(ans->message[0]), 500, _("%s: window width longer than input vector, returning all %f vector\n"), __func__, fill); + for (int i=0; idbl_v[i] = fill; + } + return; + } + double tic = 0; + if (verbose) + tic = omp_get_wtime(); + if (algo==0) { + frollmaxFast(x, nx, ans, k, fill, narm, hasna, verbose); + } else { // algo==1 + frollmaxExact(x, nx, ans, k, fill, narm, hasna, verbose); + } + if (ans->status < 3 && align < 1) { + int k_ = align==-1 ? k-1 : floor(k/2); + if (verbose) + snprintf(end(ans->message[0]), 500, _("%s: align %d, shift answer by %d\n"), __func__, align, -k_); + memmove((char *)ans->dbl_v, (char *)ans->dbl_v + (k_*sizeof(double)), (nx-k_)*sizeof(double)); + for (uint64_t i=nx-k_; idbl_v[i] = fill; + } + } + if (verbose) + snprintf(end(ans->message[0]), 500, _("%s: processing algo %u took %.3fs\n"), __func__, algo, omp_get_wtime()-tic); +} +inline void windowmax(double *x, uint64_t o, int k, /* max of current window */ double *w, /* index of w within window */ uint64_t *iw) { + for (int i=0; i= w[0]: x[%d-%d+1] >= w[0]: %f >= %f: %d\n", i, o, x[o], i, k, x[o+i-k+1], w[0], x[o+i-k+1] >= w[0]); + if (x[o+i-k+1] >= w[0]) { // what if that is never satisfied? test! + iw[0] = o+i-k+1; + w[0] = x[iw[0]]; + } + } +} +inline void windowmaxnarm(double *x, uint64_t o, int k, bool narm, /* NA counter */ int *nc, double *w, uint64_t *iw) { + for (int i=0; i= w[0]: x[%d-%d+1] >= w[0]: %f >= %f: %d\n", i, o, x[o], i, k, x[o+i-k+1], w[0], x[o+i-k+1] >= w[0]); + if (R_FINITE(x[o+i-k+1])) { + if (x[o+i-k+1] >= w[0]) { // what if that is never satisfied? test! + iw[0] = o+i-k+1; + w[0] = x[iw[0]]; + } + } else if (narm) { + nc[0]++; + } else { + w[0] = NA_REAL; + } + } +} +/* fast rolling max - fast + * fast online algorithm do single pass over elements keeping track of recent max and its index + * if index of max is within progressing window then it keeps running single pass + * whenever max is leaving the window (index of max is outside of iterator minus window size) then new maximum is computed via nested loop on current complete window + * new max is used to continue outer single pass as long as new max index is not leaving the running window + * should scale well for bigger window size, may carry overhead for small window, needs benchmarking + * TODO NA handling + */ +void frollmaxFast(double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasna, bool verbose) { + if (verbose) + snprintf(end(ans->message[0]), 500, _("%s: running for input length %"PRIu64", window %d, hasna %d, narm %d\n"), "frollmaxFast", (uint64_t)nx, k, hasna, (int)narm); + double w = R_NegInf; // window max + uint64_t cmax = 0; // counter of nested loops for verbose + uint64_t iw = 0; // index of window max + bool truehasna = hasna>0; + if (!truehasna) { + int i; + for (i=0; i= w) { // >= rather than > because we track most recent maximum using iw + iw = i; + w = x[iw]; + } + ans->dbl_v[i] = fill; + } + if (!truehasna && !R_FINITE(x[i])) { + if (x[i] >= w) { + iw = i; + w = x[iw]; + } else { + // ensure iw ok, case when all window was -Inf or NA? + } + ans->dbl_v[i] = w; + } else { + truehasna = true; + } + if (!truehasna) { + for (uint64_t i=k; i= w) { + //Rprintf("iteration %d, new max %f at %d, old max %f at %d\n", i, x[i], i, w, iw); + iw = i; + w = x[iw]; + } else { + if (iw > i-k) { // max is still within window + // w and iw are still ok + //Rprintf("iteration %d, max %f at %d bigger than current value %f\n", i, w, iw, x[i]); + } else { // max left the window, need to find max in this window + //Rprintf("iteration %d, max %f at %d left the window, call windowmax from %d of size %d\n", i, w, iw, i, k); + w = R_NegInf; + iw = i-k; + windowmax(x, i, k, &w, &iw); + //Rprintf("iteration %d, windowmax found new max %f at %d\n", i, w, iw); + cmax++; + } + } + ans->dbl_v[i] = w; + } + if (verbose) + snprintf(end(ans->message[0]), 500, _("%s: windowmax called %"PRIu64" time(s)\n"), __func__, cmax); + if (truehasna) { + if (hasna==-1) { + ans->status = 2; + snprintf(end(ans->message[2]), 500, _("%s: hasNA=FALSE used but NA (or other non-finite) value(s) are present in input, use default hasNA=NA to avoid this warning"), __func__); + } + if (verbose) + snprintf(end(ans->message[0]), 500, _("%s: NA (or other non-finite) value(s) are present in input, re-running with extra care for NAs\n"), __func__); + w = R_NegInf; + cmax = 0; + } + } else { + if (hasna==-1) { + ans->status = 2; + snprintf(end(ans->message[2]), 500, _("%s: hasNA=FALSE used but NA (or other non-finite) value(s) are present in input, use default hasNA=NA to avoid this warning"), __func__); + } + if (verbose) + snprintf(end(ans->message[0]), 500, _("%s: NA (or other non-finite) value(s) are present in input, skip non-NA attempt and run with extra care for NAs\n"), __func__); + w = R_NegInf; + cmax = 0; + } + } + if (truehasna) { + int nc = 0; + int i; + for (i=0; i= w) { + iw = i; + w = x[iw]; + } + } else { + nc++; + } + ans->dbl_v[i] = fill; + } + if (R_FINITE(x[i])) { + if (x[i] >= w) { + iw = i; + w = x[iw]; + } + } else { + nc++; + } + if (nc == 0) { + ans->dbl_v[i] = w; + } else if (nc == k) { + ans->dbl_v[i] = narm ? R_NegInf : NA_REAL; + } else { + ans->dbl_v[i] = narm ? w : NA_REAL; + } + for (uint64_t i=k; i= w) { + iw = i; + w = x[iw]; + } else { + if (iw > i-k) { // max is still within window + // w and iw are still ok + } else { // max left the window, need to find max in this window + w = R_NegInf; + iw = i-k; + windowmaxnarm(x, i, k, narm, &nc, &w, &iw); + //Rprintf("iteration %d, windowmaxnarm found new max %f at %d\n", i, w, iw); + cmax++; + } + } + ans->dbl_v[i] = w; + } else { + nc++; + } + if (!R_FINITE(x[i-k])) { // value leaving rolling window is non-finite, decrement NF counter + nc--; + } + if (nc == 0) { + ans->dbl_v[i] = w; + } else if (nc == k) { + ans->dbl_v[i] = narm ? R_NegInf : NA_REAL; + } else { + ans->dbl_v[i] = narm ? w : NA_REAL; + } + } + if (verbose) + snprintf(end(ans->message[0]), 500, _("%s: windowmaxnarm called %"PRIu64" time(s)\n"), __func__, cmax); + } +} +/* fast rolling max - exact + * for each observation in x compute max in window from scratch + * no proper support for NAs yet + */ +void frollmaxExact(double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasna, bool verbose) { + if (verbose) + snprintf(end(ans->message[0]), 500, _("%s: running in parallel for input length %"PRIu64", window %d, hasna %d, narm %d\n"), "frollmaxExact", (uint64_t)nx, k, hasna, (int)narm); + for (int i=0; idbl_v[i] = fill; + } + bool truehasna = hasna>0; + if (!truehasna || !narm) { + #pragma omp parallel for num_threads(getDTthreads(nx, true)) + for (uint64_t i=k-1; i w) + w = x[i+j]; + } + if (R_FINITE(w)) { + ans->dbl_v[i] = w; + } else { + if (!narm) { + ans->dbl_v[i] = w; + } + truehasna = true; + } + } + if (truehasna) { + if (hasna==-1) { + ans->status = 2; + snprintf(end(ans->message[2]), 500, _("%s: hasNA=FALSE used but NA (or other non-finite) value(s) are present in input, use default hasNA=NA to avoid this warning"), __func__); + } + if (verbose) { + if (narm) { + snprintf(end(ans->message[0]), 500, _("%s: NA (or other non-finite) value(s) are present in input, re-running with extra care for NAs\n"), __func__); + } else { + snprintf(end(ans->message[0]), 500, _("%s: NA (or other non-finite) value(s) are present in input, na.rm was FALSE so in 'exact' implementation NAs were handled already, no need to re-run\n"), __func__); + } + } + } + } + if (truehasna && narm) { + #pragma omp parallel for num_threads(getDTthreads(nx, true)) + for (uint64_t i=k-1; i w) { + w = x[i+j]; + } + } + if (nc < k) { + ans->dbl_v[i] = w; + } else { + ans->dbl_v[i] = R_NegInf; + } + } + } +} + /* fast rolling any R function * not plain C, not thread safe * R eval() allocates diff --git a/src/frollR.c b/src/frollR.c index 74cc7dd4ef..ee4d7fd391 100644 --- a/src/frollR.c +++ b/src/frollR.c @@ -108,8 +108,8 @@ SEXP frollfunR(SEXP fun, SEXP obj, SEXP k, SEXP fill, SEXP algo, SEXP align, SEX else error(_("Internal error: invalid %s argument in %s function should have been caught earlier. Please report to the data.table issue tracker."), "align", "rolling"); // # nocov - if (badaptive && ialign!=1) - error(_("using adaptive TRUE and align argument different than 'right' is not implemented")); + if (badaptive && ialign==0) // support for left added in #5441 + error(_("using adaptive TRUE and align 'center' is not implemented")); SEXP ans = PROTECT(allocVector(VECSXP, nk * nx)); protecti++; // allocate list to keep results if (verbose) @@ -132,11 +132,13 @@ SEXP frollfunR(SEXP fun, SEXP obj, SEXP k, SEXP fill, SEXP algo, SEXP align, SEX dx[i] = REAL(VECTOR_ELT(x, i)); // assign source columns to C pointers } - enum {MEAN, SUM} sfun; + enum {MEAN, SUM, MAX} sfun; if (!strcmp(CHAR(STRING_ELT(fun, 0)), "mean")) { sfun = MEAN; } else if (!strcmp(CHAR(STRING_ELT(fun, 0)), "sum")) { sfun = SUM; + } else if (!strcmp(CHAR(STRING_ELT(fun, 0)), "max")) { + sfun = MAX; } else { error(_("Internal error: invalid %s argument in %s function should have been caught earlier. Please report to the data.table issue tracker."), "fun", "rolling"); // # nocov } @@ -152,7 +154,7 @@ SEXP frollfunR(SEXP fun, SEXP obj, SEXP k, SEXP fill, SEXP algo, SEXP align, SEX int ihasna = // plain C tri-state boolean as integer LOGICAL(hasna)[0]==NA_LOGICAL ? 0 : // hasna NA, default, no info about NA LOGICAL(hasna)[0]==TRUE ? 1 : // hasna TRUE, might be some NAs - -1; // hasna FALSE, there should be no NAs + -1; // hasna FALSE, there should be no NAs // or there must be no NAs for rollmax #5441 unsigned int ialgo; // decode algo to integer if (!strcmp(CHAR(STRING_ELT(algo, 0)), "fast")) @@ -193,6 +195,12 @@ SEXP frollfunR(SEXP fun, SEXP obj, SEXP k, SEXP fill, SEXP algo, SEXP align, SEX else fadaptiverollsum(ialgo, dx[i], inx[i], &dans[i*nk+j], ikl[j], dfill, bnarm, ihasna, verbose); break; + case MAX : + if (!badaptive) + frollmax(ialgo, dx[i], inx[i], &dans[i*nk+j], iik[j], ialign, dfill, bnarm, ihasna, verbose); + else + fadaptiverollmax(ialgo, dx[i], inx[i], &dans[i*nk+j], ikl[j], dfill, bnarm, ihasna, verbose); + break; default: error(_("Internal error: Unknown sfun value in froll: %d"), sfun); // #nocov } diff --git a/src/frolladaptive.c b/src/frolladaptive.c index e005cef693..f0a40fb9ca 100644 --- a/src/frolladaptive.c +++ b/src/frolladaptive.c @@ -364,3 +364,80 @@ void fadaptiverollsumExact(double *x, uint64_t nx, ans_t *ans, int *k, double fi } } } + +/* fast adaptive rolling max */ +void fadaptiverollmax(unsigned int algo, double *x, uint64_t nx, ans_t *ans, int *k, double fill, bool narm, int hasna, bool verbose) { + double tic = 0; + if (verbose) + tic = omp_get_wtime(); + if (algo==0 && verbose) { + //fadaptiverollmaxFast(x, nx, ans, k, fill, narm, hasna, verbose); // fadaptiverollmaxFast does not exists as of now + snprintf(end(ans->message[0]), 500, _("%s: algo %u not implemented, fall back to %u\n"), __func__, algo, (unsigned int) 1); + } + fadaptiverollmaxExact(x, nx, ans, k, fill, narm, hasna, verbose); + if (verbose) + snprintf(end(ans->message[0]), 500, _("%s: processing algo %u took %.3fs\n"), __func__, algo, omp_get_wtime()-tic); +} +//void fadaptiverollmaxFast(double *x, uint64_t nx, ans_t *ans, int *k, double fill, bool narm, int hasna, bool verbose); // does not exists as of now +/* fast adaptive rolling max - exact + * for hasNA=FALSE it will not detect if any NAs were in the input, therefore could produce incorrect result, well documented + */ +void fadaptiverollmaxExact(double *x, uint64_t nx, ans_t *ans, int *k, double fill, bool narm, int hasna, bool verbose) { + if (verbose) + snprintf(end(ans->message[0]), 500, _("%s: running in parallel for input length %"PRIu64", hasna %d, narm %d\n"), "fadaptiverollmaxExact", (uint64_t)nx, hasna, (int) narm); + if (hasna==-1) { // fastest we can get for adaptive max as there is no algo='fast', therefore we drop any NA checks when hasNA=FALSE + #pragma omp parallel for num_threads(getDTthreads(nx, true)) + for (uint64_t i=0; idbl_v[i] = fill; + } else { + double w = R_NegInf; + for (int j=-k[i]+1; j<=0; j++) { //Rprintf("x[%d+%d] > w: %f > %f: %d\n", i, j, x[i+j], w, x[i+j] > w); + if (x[i+j] > w) + w = x[i+j]; + } + ans->dbl_v[i] = w; + } + } + } else { + if (narm) { + #pragma omp parallel for num_threads(getDTthreads(nx, true)) + for (uint64_t i=0; idbl_v[i] = fill; + } else { + int nc = 0; + double w = R_NegInf; + for (int j=-k[i]+1; j<=0; j++) { + if (ISNAN(x[i+j])) + nc++; + else if (x[i+j] > w) + w = x[i+j]; + } + if (nc < k[i]) + ans->dbl_v[i] = w; + else + ans->dbl_v[i] = R_NegInf; + } + } + } else { + #pragma omp parallel for num_threads(getDTthreads(nx, true)) + for (uint64_t i=0; idbl_v[i] = fill; + } else { + double w = R_NegInf; + for (int j=-k[i]+1; j<=0; j++) { + if (ISNAN(x[i+j])) { + w = NA_REAL; + break; + } else if (x[i+j] > w) { + w = x[i+j]; + } + } + ans->dbl_v[i] = w; + } + } + } + } +}