Skip to content

Commit

Permalink
create latest_by_offset() udaf
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim Fox committed Mar 16, 2020
1 parent 395626c commit e2ce900
Show file tree
Hide file tree
Showing 5 changed files with 416 additions and 65 deletions.
132 changes: 68 additions & 64 deletions docs/developer-guide/syntax-reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2093,70 +2093,74 @@ convention is followed.
Aggregate functions
===================

+------------------------+---------------------------+------------+---------------------------------------------------------------------+
| Function | Example | Input Type | Description |
+========================+===========================+============+=====================================================================+
| COLLECT_LIST | ``COLLECT_LIST(col1)`` | Stream, | Return an array containing all the values of ``col1`` from each |
| | | Table | input row (for the specified grouping and time window, if any). |
| | | | Currently only works for simple types (not Map, Array, or Struct). |
| | | | This version limits the size of the result Array to a maximum of |
| | | | 1000 entries and any values beyond this limit are silently ignored. |
| | | | When using with a window type of ``session``, it can sometimes |
| | | | happen that two session windows get merged together into one when a |
| | | | late-arriving record with a timestamp between the two windows is |
| | | | processed. In this case the 1000 record limit is calculated by |
| | | | first considering all the records from the first window, then the |
| | | | late-arriving record, then the records from the second window in |
| | | | the order they were originally processed. |
+------------------------+---------------------------+------------+---------------------------------------------------------------------+
| COLLECT_SET | ``COLLECT_SET(col1)`` | Stream | Return an array containing the distinct values of ``col1`` from |
| | | | each input row (for the specified grouping and time window, if any).|
| | | | Currently only works for simple types (not Map, Array, or Struct). |
| | | | This version limits the size of the result Array to a maximum of |
| | | | 1000 entries and any values beyond this limit are silently ignored. |
| | | | When using with a window type of ``session``, it can sometimes |
| | | | happen that two session windows get merged together into one when a |
| | | | late-arriving record with a timestamp between the two windows is |
| | | | processed. In this case the 1000 record limit is calculated by |
| | | | first considering all the records from the first window, then the |
| | | | late-arriving record, then the records from the second window in |
| | | | the order they were originally processed. |
+------------------------+---------------------------+------------+---------------------------------------------------------------------+
| COUNT | ``COUNT(col1)``, | Stream, | Count the number of rows. When ``col1`` is specified, the count |
| | ``COUNT(*)`` | Table | returned will be the number of rows where ``col1`` is non-null. |
| | | | When ``*`` is specified, the count returned will be the total |
| | | | number of rows. |
+------------------------+---------------------------+------------+---------------------------------------------------------------------+
| HISTOGRAM | ``HISTOGRAM(col1)`` | Stream, | Return a map containing the distinct String values of ``col1`` |
| | | Table | mapped to the number of times each one occurs for the given window. |
| | | | This version limits the number of distinct values which can be |
| | | | counted to 1000, beyond which any additional entries are ignored. |
| | | | When using with a window type of ``session``, it can sometimes |
| | | | happen that two session windows get merged together into one when a |
| | | | late-arriving record with a timestamp between the two windows is |
| | | | processed. In this case the 1000 record limit is calculated by |
| | | | first considering all the records from the first window, then the |
| | | | late-arriving record, then the records from the second window in |
| | | | the order they were originally processed. |
+------------------------+---------------------------+------------+---------------------------------------------------------------------+
| AVERAGE | ``AVG(col1)`` | Stream, | Return the average value for a given column. |
| | | Table | Note: rows where ``col1`` is null are ignored. |
+------------------------+---------------------------+------------+---------------------------------------------------------------------+
| MAX | ``MAX(col1)`` | Stream | Return the maximum value for a given column and window. |
| | | | Note: rows where ``col1`` is null will be ignored. |
+------------------------+---------------------------+------------+---------------------------------------------------------------------+
| MIN | ``MIN(col1)`` | Stream | Return the minimum value for a given column and window. |
| | | | Note: rows where ``col1`` is null will be ignored. |
+------------------------+---------------------------+------------+---------------------------------------------------------------------+
| SUM | ``SUM(col1)`` | Stream, | Sums the column values |
| | | Table | Note: rows where ``col1`` is null will be ignored. |
+------------------------+---------------------------+------------+---------------------------------------------------------------------+
| TOPK | ``TOPK(col1, k)`` | Stream | Return the Top *K* values for the given column and window |
| | | | Note: rows where ``col1`` is null will be ignored. |
+------------------------+---------------------------+------------+---------------------------------------------------------------------+
| TOPKDISTINCT | ``TOPKDISTINCT(col1, k)`` | Stream | Return the distinct Top *K* values for the given column and window |
| | | | Note: rows where ``col1`` is null will be ignored. |
+------------------------+---------------------------+------------+---------------------------------------------------------------------+
+------------------------+----------------------------+------------+---------------------------------------------------------------------+
| Function | Example | Input Type | Description |
+========================+============================+============+=====================================================================+
| COLLECT_LIST | ``COLLECT_LIST(col1)`` | Stream, | Return an array containing all the values of ``col1`` from each |
| | | Table | input row (for the specified grouping and time window, if any). |
| | | | Currently only works for simple types (not Map, Array, or Struct). |
| | | | This version limits the size of the result Array to a maximum of |
| | | | 1000 entries and any values beyond this limit are silently ignored. |
| | | | When using with a window type of ``session``, it can sometimes |
| | | | happen that two session windows get merged together into one when a |
| | | | late-arriving record with a timestamp between the two windows is |
| | | | processed. In this case the 1000 record limit is calculated by |
| | | | first considering all the records from the first window, then the |
| | | | late-arriving record, then the records from the second window in |
| | | | the order they were originally processed. |
+------------------------+----------------------------+------------+---------------------------------------------------------------------+
| COLLECT_SET | ``COLLECT_SET(col1)`` | Stream | Return an array containing the distinct values of ``col1`` from |
| | | | each input row (for the specified grouping and time window, if any).|
| | | | Currently only works for simple types (not Map, Array, or Struct). |
| | | | This version limits the size of the result Array to a maximum of |
| | | | 1000 entries and any values beyond this limit are silently ignored. |
| | | | When using with a window type of ``session``, it can sometimes |
| | | | happen that two session windows get merged together into one when a |
| | | | late-arriving record with a timestamp between the two windows is |
| | | | processed. In this case the 1000 record limit is calculated by |
| | | | first considering all the records from the first window, then the |
| | | | late-arriving record, then the records from the second window in |
| | | | the order they were originally processed. |
+------------------------+----------------------------+------------+---------------------------------------------------------------------+
| COUNT | ``COUNT(col1)``, | Stream, | Count the number of rows. When ``col1`` is specified, the count |
| | ``COUNT(*)`` | Table | returned will be the number of rows where ``col1`` is non-null. |
| | | | When ``*`` is specified, the count returned will be the total |
| | | | number of rows. |
+------------------------+----------------------------+------------+---------------------------------------------------------------------+
| HISTOGRAM | ``HISTOGRAM(col1)`` | Stream, | Return a map containing the distinct String values of ``col1`` |
| | | Table | mapped to the number of times each one occurs for the given window. |
| | | | This version limits the number of distinct values which can be |
| | | | counted to 1000, beyond which any additional entries are ignored. |
| | | | When using with a window type of ``session``, it can sometimes |
| | | | happen that two session windows get merged together into one when a |
| | | | late-arriving record with a timestamp between the two windows is |
| | | | processed. In this case the 1000 record limit is calculated by |
| | | | first considering all the records from the first window, then the |
| | | | late-arriving record, then the records from the second window in |
| | | | the order they were originally processed. |
+------------------------+----------------------------+------------+---------------------------------------------------------------------+
| AVERAGE | ``AVG(col1)`` | Stream, | Return the average value for a given column. |
| | | Table | Note: rows where ``col1`` is null are ignored. |
+------------------------+----------------------------+------------+---------------------------------------------------------------------+
| MAX | ``MAX(col1)`` | Stream | Return the maximum value for a given column and window. |
| | | | Note: rows where ``col1`` is null will be ignored. |
+------------------------+----------------------------+------------+---------------------------------------------------------------------+
| MIN | ``MIN(col1)`` | Stream | Return the minimum value for a given column and window. |
| | | | Note: rows where ``col1`` is null will be ignored. |
+------------------------+----------------------------+------------+---------------------------------------------------------------------+
| SUM | ``SUM(col1)`` | Stream, | Sums the column values |
| | | Table | Note: rows where ``col1`` is null will be ignored. |
+------------------------+----------------------------+------------+---------------------------------------------------------------------+
| TOPK | ``TOPK(col1, k)`` | Stream | Return the Top *K* values for the given column and window |
| | | | Note: rows where ``col1`` is null will be ignored. |
+------------------------+----------------------------+------------+---------------------------------------------------------------------+
| TOPKDISTINCT | ``TOPKDISTINCT(col1, k)`` | Stream | Return the distinct Top *K* values for the given column and window |
| | | | Note: rows where ``col1`` is null will be ignored. |
+------------------------+----------------------------+------------+---------------------------------------------------------------------+
| LATEST_BY_OFFSET | ``LATEST_BY_OFFSET(col1)`` | Stream | Returns the latest value for a given column as computed by offset |
| | | | i.e. a row with a greater offset is considered later. |
| | | | Note: rows where ``col1`` is null will be ignored. |
+------------------------+----------------------------+------------+---------------------------------------------------------------------+

For more information, see :ref:`aggregate-streaming-data-with-ksql`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class UdafLoader {

void loadUdafFromClass(final Class<?> theClass, final String path) {
final UdafDescription udafAnnotation = theClass.getAnnotation(UdafDescription.class);

final List<UdafFactoryInvoker> invokers = new ArrayList<>();
for (final Method method : theClass.getMethods()) {
if (method.getAnnotation(UdafFactory.class) != null) {
Expand Down
Loading

0 comments on commit e2ce900

Please sign in to comment.