Skip to content

Commit

Permalink
PrestoSQL migrate to Trino
Browse files Browse the repository at this point in the history
  • Loading branch information
archongum committed May 16, 2022
1 parent 757adff commit 6a2b5f0
Show file tree
Hide file tree
Showing 24 changed files with 1,275 additions and 0 deletions.
29 changes: 29 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
.idea
*.iml

# Compiled class file
*.class

# Log file
*.log

# BlueJ files
*.ctxt

# Mobile Tools for Java (J2ME)
.mtj.tmp/

# Package Files #
*.jar
*.war
*.ear
*.zip
*.tar.gz
*.rar

# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*

# idea
target/
out/
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Installation
1. `mvn clean assembly:assembly`
2. Copy `trino-udf-*-jar-with-dependencies.jar` to `${TRINO_HOME}/plugin/custom-functions/` in all Trino nodes.
(create directory if not exists)
3. Restart Trino cluster

# Versions
- JDK-11
- Trino-380

# Functions
## Scalar Functions
| Function | Return Type | Argument Types | Description | Usage |
|-------------------------|-------------|----------------|--------------------------------------------------------------------------------------|-----------------------------------------|
| first_day | date | date | first day of month | first_day(current_date) |
| last_day | date | date | last day of month | last_day(current_date) |
| yesterday | date | | yesterday | yesterday() |
| last_second | timestamp | date | last second of the date | last_second(current_date) |
| yesterday_last_second | timestamp | | last second of yesterday | yesterday_last_second() |
| to_datetime | timestamp | date, varchar | combine the two args | to_datetime(current_date, '23:59:59') |
| array_max_count_element | T | array(T) | Get maximum count element (null is not counting; if has multiple return one of them) | array_max_count_element(array['1','2']) |
| rand | double | varchar | Return double in [0,1] | rand(varchar) |

## Aggregate Functions
| Function | Return Type | Argument Types | Description | Usage |
|----------------------------| ----------- |----------------| ------------------------------------------------------------------------------------ | ----------------------- |
| max_count_element | VARCHAR | VARCHAR | Get maximum count element (null is not counting; if has multiple return one of them) | max_count_element(name) |
| array_agg_distinct | INTEGER | array(VARCHAR) | Count distinct array elements. input: array(VARCHAR), output: integer. | array_agg_distinct(ids) |
| array_agg_distinct_integer | INTEGER | array(INTEGER) | Count distinct array elements. input: array(INTEGER), output: integer. | array_agg_distinct(ids) |
76 changes: 76 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.github.archongum</groupId>
<artifactId>trino-udf</artifactId>
<version>4</version>

<properties>
<trino.version>380</trino.version>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<maven.compiler.version>3.8.0</maven.compiler.version>
<jackson.version>2.13.1</jackson.version>
<junit.version>5.4.2</junit.version>
</properties>


<dependencies>
<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-spi</artifactId>
<version>${trino.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-array</artifactId>
<version>${trino.version}</version>
<exclusions>
<exclusion>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven.compiler.version}</version>
<configuration>
<encoding>utf-8</encoding>
</configuration>
</plugin>

<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
45 changes: 45 additions & 0 deletions src/main/java/com/github/archongum/trino/UdfPlugin.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2013-2016 Qubole
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.archongum.trino;

import java.util.HashSet;
import java.util.Set;
import com.github.archongum.trino.udf.aggregate.ArrayAggDistinctAggregation;
import com.github.archongum.trino.udf.aggregate.ArrayAggDistinctIntegerAggregation;
import com.github.archongum.trino.udf.aggregate.MaxCountElementAggregation;
import com.github.archongum.trino.udf.scalar.ArrayMaxCountElementFunction;
import com.github.archongum.trino.udf.scalar.CommonFunctions;
import com.github.archongum.trino.udf.scalar.DateTimeFunctions;
import io.trino.spi.Plugin;


/**
* @author Archon 2018年9月20日
*/
public class UdfPlugin implements Plugin {
@Override
public Set<Class<?>> getFunctions()
{
Set<Class<?>> set = new HashSet<>();
set.add(ArrayMaxCountElementFunction.class);
set.add(CommonFunctions.class);
set.add(DateTimeFunctions.class);
set.add(MaxCountElementAggregation.class);
set.add(ArrayAggDistinctAggregation.class);
set.add(ArrayAggDistinctIntegerAggregation.class);
return set;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.github.archongum.trino.udf.aggregate;

import java.util.HashSet;
import java.util.Set;
import com.github.archongum.trino.udf.aggregate.state.SetState;
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.function.AggregationFunction;
import io.trino.spi.function.CombineFunction;
import io.trino.spi.function.Description;
import io.trino.spi.function.InputFunction;
import io.trino.spi.function.OutputFunction;
import io.trino.spi.function.SqlType;
import io.trino.spi.type.StandardTypes;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.VarcharType.VARCHAR;


/**
* @author Archon 2021年10月21日
*/
@AggregationFunction("array_agg_distinct")
@Description("Count distinct array elements. input: array(VARCHAR), output: integer.")
public class ArrayAggDistinctAggregation {

@InputFunction
public static void input(SetState state, @SqlType("array(VARCHAR)") Block block) {
if (block.getPositionCount() == 0) {
return;
}

Set<String> set = state.getSet();
if (set == null) {
set = new HashSet<>();
state.setSet(set);
}

for (int i = 0; i < block.getPositionCount(); i++) {
if (block.isNull(i)) {
continue;
}
String curElement = VARCHAR.getSlice(block, i).toStringUtf8();
set.add(curElement);
}
}

@CombineFunction
public static void combine(SetState state, SetState otherState) {
Set<String> prev = state.getSet();
Set<String> input = otherState.getSet();
if (prev == null) {
state.setSet(input);
} else {
if (input != null && !input.isEmpty()) {
prev.addAll(input);
}
}
}

@OutputFunction(StandardTypes.INTEGER)
public static void output(SetState state, BlockBuilder out) {
Set<String> set = state.getSet();
if (set == null || set.isEmpty()) {
out.appendNull();
} else {
INTEGER.writeLong(out, set.size());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.github.archongum.trino.udf.aggregate;

import java.util.HashSet;
import java.util.Set;
import com.github.archongum.trino.udf.aggregate.state.SetStateLong;
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.function.AggregationFunction;
import io.trino.spi.function.CombineFunction;
import io.trino.spi.function.Description;
import io.trino.spi.function.InputFunction;
import io.trino.spi.function.OutputFunction;
import io.trino.spi.function.SqlType;
import io.trino.spi.type.StandardTypes;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.IntegerType.INTEGER;


/**
* @author Archon 2021年10月21日
*/
@AggregationFunction("array_agg_distinct_integer")
@Description("Count distinct array elements. input: array(BIGINT), output: integer.")
public class ArrayAggDistinctIntegerAggregation {

@InputFunction
public static void input(SetStateLong state, @SqlType("array(BIGINT)") Block block) {
if (block.getPositionCount() == 0) {
return;
}

Set<Long> set = state.getSet();
if (set == null) {
set = new HashSet<>();
state.setSet(set);
}

for (int i = 0; i < block.getPositionCount(); i++) {
if (block.isNull(i)) {
continue;
}
Long curElement = BIGINT.getLong(block, i);
set.add(curElement);
}
}

@CombineFunction
public static void combine(SetStateLong state, SetStateLong otherState) {
Set<Long> prev = state.getSet();
Set<Long> input = otherState.getSet();
if (prev == null) {
state.setSet(input);
} else {
if (input != null && !input.isEmpty()) {
prev.addAll(input);
}
}
}

@OutputFunction(StandardTypes.INTEGER)
public static void output(SetStateLong state, BlockBuilder out) {
Set<Long> set = state.getSet();
if (set == null || set.isEmpty()) {
out.appendNull();
} else {
INTEGER.writeLong(out, set.size());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.github.archongum.trino.udf.aggregate;

import com.github.archongum.trino.udf.aggregate.state.MapState;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.function.AggregationFunction;
import io.trino.spi.function.CombineFunction;
import io.trino.spi.function.Description;
import io.trino.spi.function.InputFunction;
import io.trino.spi.function.OutputFunction;
import io.trino.spi.function.SqlType;
import io.trino.spi.type.StandardTypes;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import static io.trino.spi.type.VarcharType.VARCHAR;


/**
* @author Archon 2019年8月30日
*/
@AggregationFunction("max_count_element")
@Description("Get maximum count element (null is not counting; if has multiple return one of them)")
public class MaxCountElementAggregation {
@InputFunction
public static void input(MapState state, @SqlType(StandardTypes.VARCHAR) Slice value) {
Map<String, Long> map = state.getMap();
if (map == null) {
map = new HashMap<>(16);
state.setMap(map);
}
String v = value.toStringUtf8();
Long cnt = map.get(v);
if (cnt == null) {
map.put(v, 1L);
} else {
map.put(v, cnt+1);
}
}

@CombineFunction
public static void combine(MapState state, MapState otherState) {
if (state.getMap() == null && otherState.getMap() == null) {
return;
}
if (otherState.getMap() == null && state.getMap() != null) {
otherState.setMap(state.getMap());
return;
}
if (state.getMap() == null && otherState.getMap() != null) {
state.setMap(otherState.getMap());
return;
}

otherState.getMap().forEach((k, v) -> state.getMap().merge(k, v, Long::sum));
}

@OutputFunction(StandardTypes.VARCHAR)
public static void output(MapState state, BlockBuilder out)
{
if (state.getMap().isEmpty()) {
out.appendNull();
} else {
VARCHAR.writeSlice(out,
Slices.utf8Slice(state.getMap().entrySet().stream().max(Entry.comparingByValue()).get().getKey()));
}
}
}
Loading

0 comments on commit 6a2b5f0

Please sign in to comment.