Skip to content

Commit

Permalink
feat: Add Cube UDTF (#3935)
Browse files Browse the repository at this point in the history
Add support for cube udtf
  • Loading branch information
vpapavas authored Nov 27, 2019
1 parent 4856e8b commit 6be8e7c
Show file tree
Hide file tree
Showing 3 changed files with 257 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udtf;

import io.confluent.ksql.util.KsqlConstants;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

@UdtfDescription(name = "cube_explode", author = KsqlConstants.CONFLUENT_AUTHOR,
description =
"Takes as argument a list of columns and outputs all possible combinations of them. "
+ "It produces 2^d new rows where d is the number of columns given as parameter. "
+ "Duplicate entries for columns with null value are skipped.")
public class Cube {

@Udtf
public <T> List<List<T>> cube(final List<T> columns) {
if (columns == null) {
return Collections.emptyList();
}
return createAllCombinations(columns);
}


private <T> List<List<T>> createAllCombinations(List<T> columns) {

int combinations = 1 << columns.size();
// when a column value is null there is only a single possibility for the output
// value [null] instead of two [null, original]. in order to avoid creating duplicate
// rows, we use nullMask: a binary number with set bits at non-null column
// indices (see comment below on usage)
int nullMask = 0;
for (int i = 0; i < columns.size(); i++) {
if (columns.get(i) != null) {
nullMask |= 1 << (columns.size() - 1 - i);
}
}

List<List<T>> result = new ArrayList<>(combinations);
// bitmask is a binary number where a set bit represents that the value at that index of input
// should be included - (e.g. the bitmask 5 (101) represents that cols[2] and cols[0]
// should be set while cols[1] should be null).
// Start with an empty bitmask (necessary for correctness)
for (int bitMask = 0; bitMask <= combinations - 1; bitMask++) {
// canonicalBitMask represents which indices in the output
// row will be null after taking into consideration which values
// in columns were originally null
int canonicalBitMask = bitMask & nullMask;

if (canonicalBitMask != bitMask) {
// if the canonicalBitMask is not the same as bitMask, then this row is a logical
// duplicate of another row and we should not emit it
continue;
}
List<T> row = new ArrayList<>(columns.size());
for (int i = 0; i < columns.size(); i++) {
row.add(0, (bitMask & (1 << i)) == 0 ? null : columns.get(columns.size() - 1 - i));
}
result.add(row);
}
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.function.udtf;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

import com.google.common.collect.Lists;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.junit.Test;

public class CubeTest {

private Cube cubeUdtf = new Cube();

@Test
public void shouldCubeSingleColumn() {
// Given:
Object[] args = {1};

// When:
List<List<Object>> result = cubeUdtf.cube(Arrays.asList(args));

// Then:
assertThat(result.size(), is(2));
assertThat(result.get(0), is(Collections.singletonList(null)));
assertThat(result.get(1), is(Lists.newArrayList(1)));
}

@Test
public void shouldCubeSingleNullColumn() {
// Given:
Object[] oneNull = {null};

// When:
List<List<Object>> result = cubeUdtf.cube(Arrays.asList(oneNull));

// Then:
assertThat(result.size(), is(1));
assertThat(result.get(0), is(Arrays.asList(new String[]{null})));
}

@Test
public void shouldCubeColumnsWithDifferentTypes() {
// Given:
Object[] args = {1, "foo"};

// When:
List<List<Object>> result = cubeUdtf.cube(Arrays.asList(args));

// Then:
assertThat(result.size(), is(4));
assertThat(result.get(0), is(Arrays.asList(null, null)));
assertThat(result.get(1), is(Arrays.asList(null, "foo")));
assertThat(result.get(2), is(Arrays.asList(1, null)));
assertThat(result.get(3), is(Arrays.asList(1, "foo")));
}

@Test
public void shouldHandleOneNull() {
// Given:
Object[] oneNull = {1, null};

// When:
List<List<Object>> result = cubeUdtf.cube(Arrays.asList(oneNull));

// Then:
assertThat(result.size(), is(2));
assertThat(result.get(0), is(Arrays.asList(null, null)));
assertThat(result.get(1), is(Arrays.asList(1, null)));
}

@Test
public void shouldHandleAllNulls() {
// Given:
Object[] allNull = {null, null};

// When:
List<List<Object>> result = cubeUdtf.cube(Arrays.asList(allNull));

// Then:
assertThat(result.size(), is(1));
assertThat(result.get(0), is(Arrays.asList(null, null)));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
{
"comments": [
"Tests for the CUBE table function"
],
"tests": [
{
"name": "cube two int columns",
"statements": [
"CREATE STREAM TEST (col1 INT, col2 INT) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT cube_explode(as_array(col1, col2)) VAL FROM TEST;"
],
"inputs": [
{"topic": "test_topic", "key": 0, "value": {"col1": 1, "col2": 2}},
{"topic": "test_topic", "key": 1, "value": {"col1": 1, "col2": null}}
],
"outputs": [
{"topic": "OUTPUT", "key": "0", "value": {"VAL": [null, null]}},
{"topic": "OUTPUT", "key": "0", "value": {"VAL": [null, 2]}},
{"topic": "OUTPUT", "key": "0", "value": {"VAL": [1, null]}},
{"topic": "OUTPUT", "key": "0", "value": {"VAL": [1, 2]}},
{"topic": "OUTPUT", "key": "1", "value": {"VAL": [null, null]}},
{"topic": "OUTPUT", "key": "1", "value": {"VAL": [1, null]}}

]
},
{
"name": "cube three columns",
"statements": [
"CREATE STREAM TEST (col1 VARCHAR, col2 VARCHAR, col3 VARCHAR) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT cube_explode(as_array(col1, col2, col3)) VAL FROM TEST;"
],
"inputs": [
{"topic": "test_topic", "key": 0, "value": {"col1": "one", "col2": "two", "col3" : "three"}}
],
"outputs": [
{"topic": "OUTPUT", "key": "0", "value": {"VAL": [null, null, null]}},
{"topic": "OUTPUT", "key": "0", "value": {"VAL": [null, null, "three"]}},
{"topic": "OUTPUT", "key": "0", "value": {"VAL": [null, "two", null]}},
{"topic": "OUTPUT", "key": "0", "value": {"VAL": [null, "two", "three"]}},
{"topic": "OUTPUT", "key": "0", "value": {"VAL": ["one", null, null]}},
{"topic": "OUTPUT", "key": "0", "value": {"VAL": ["one", null, "three"]}},
{"topic": "OUTPUT", "key": "0", "value": {"VAL": ["one", "two", null]}},
{"topic": "OUTPUT", "key": "0", "value": {"VAL": ["one", "two", "three"]}}
]
},
{
"name": "cube two columns with udf on third",
"statements": [
"CREATE STREAM TEST (col1 VARCHAR, col2 VARCHAR, col3 INT) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT cube_explode(as_array(col1, col2)) VAL1, ABS(col3) VAL2 FROM TEST;"
],
"inputs": [
{"topic": "test_topic", "key": 0, "value": {"col1": "one", "col2": "two", "col3" : 3}}
],
"outputs": [
{"topic": "OUTPUT", "key": "0", "value": {"VAL1": [null, null], "VAL2": 3.0}},
{"topic": "OUTPUT", "key": "0", "value": {"VAL1": [null, "two"], "VAL2": 3.0}},
{"topic": "OUTPUT", "key": "0", "value": {"VAL1": ["one", null], "VAL2": 3.0}},
{"topic": "OUTPUT", "key": "0", "value": {"VAL1": ["one", "two"], "VAL2": 3.0}}
]
},
{
"name": "cube two columns twice",
"statements": [
"CREATE STREAM TEST (col1 VARCHAR, col2 VARCHAR, col3 INT, col4 INT) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE STREAM OUTPUT AS SELECT cube_explode(as_array(col1, col2)) VAL1, cube_explode(as_array(col3, col4)) VAL2 FROM TEST;"
],
"inputs": [
{"topic": "test_topic", "key": 0, "value": {"col1": "one", "col2": "two", "col3" : 3, "col4" : 4}}
],
"outputs": [
{"topic": "OUTPUT", "key": "0", "value": {"VAL1": [null, null], "VAL2": [null, null]}},
{"topic": "OUTPUT", "key": "0", "value": {"VAL1": [null, "two"], "VAL2": [null, 4]}},
{"topic": "OUTPUT", "key": "0", "value": {"VAL1": ["one", null], "VAL2": [3, null]}},
{"topic": "OUTPUT", "key": "0", "value": {"VAL1": ["one", "two"], "VAL2": [3, 4]}}
]
}
]
}

0 comments on commit 6be8e7c

Please sign in to comment.