Skip to content

Commit

Permalink
Merge pull request #1 from datafusion-contrib/json_contains
Browse files Browse the repository at this point in the history
add boilerplate and `json_obj_contains`
  • Loading branch information
samuelcolvin authored Apr 22, 2024
2 parents cb1ba7a + dcca8e1 commit a27344c
Show file tree
Hide file tree
Showing 10 changed files with 466 additions and 0 deletions.
90 changes: 90 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
name: CI

on:
push:
branches:
- main
tags:
- '**'
pull_request: {}

jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3

- uses: dtolnay/rust-toolchain@stable
with:
components: rustfmt, clippy

- id: cache-rust
uses: Swatinem/rust-cache@v2

- uses: pre-commit/action@v3.0.0
with:
extra_args: --all-files --verbose
env:
PRE_COMMIT_COLOR: always
SKIP: test

test:
name: test rust-${{ matrix.rust-version }}
strategy:
fail-fast: false
matrix:
rust-version: [stable, nightly]

runs-on: ubuntu-latest

env:
RUST_VERSION: ${{ matrix.rust-version }}

steps:
- uses: actions/checkout@v3

- uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ matrix.rust-version }}

- id: cache-rust
uses: Swatinem/rust-cache@v2

- run: cargo test --all-features
# - uses: taiki-e/install-action@cargo-llvm-cov
#
# - run: cargo llvm-cov --all-features --codecov --output-path codecov.json
#
# - uses: codecov/codecov-action@v3
# with:
# files: codecov.json
# env_vars: RUST_VERSION

# https://github.com/marketplace/actions/alls-green#why used for branch protection checks
check:
if: always()
needs: [test, lint]
runs-on: ubuntu-latest
steps:
- name: Decide whether the needed jobs succeeded or failed
uses: re-actors/alls-green@release/v1
with:
jobs: ${{ toJSON(needs) }}

release:
needs: [check]
if: "success() && startsWith(github.ref, 'refs/tags/')"
runs-on: ubuntu-latest
environment: release

steps:
- uses: actions/checkout@v2

- name: install rust stable
uses: dtolnay/rust-toolchain@stable

- uses: Swatinem/rust-cache@v2

- run: cargo publish
env:
CARGO_REGISTRY_TOKEN: ${{ secrets.CARGO_REGISTRY_TOKEN }}
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/target
Cargo.lock
32 changes: 32 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
fail_fast: true

repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.0.1
hooks:
- id: check-yaml
- id: check-toml
- id: end-of-file-fixer
- id: trailing-whitespace
- id: check-added-large-files

- repo: local
hooks:
- id: format-check
name: Format Check
entry: cargo fmt
types: [rust]
language: system
pass_filenames: false
- id: clippy
name: Clippy
entry: cargo clippy
types: [rust]
language: system
pass_filenames: false
- id: test
name: Test
entry: cargo test
types: [rust]
language: system
pass_filenames: false
1 change: 1 addition & 0 deletions .rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
max_width = 120
27 changes: 27 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[package]
name = "datafusion-functions-json"
version = "0.1.0"
edition = "2021"

[dependencies]
arrow-schema = "51.0.0"
datafusion-common = "37.0.0"
datafusion-expr = "37.0.0"
jiter = { git = "https://github.com/pydantic/jiter.git", branch = "next_skip" }
paste = "1.0.14"
log = "0.4.21"
datafusion-execution = "37.0.0"

[dev-dependencies]
arrow = "51.0.0"
datafusion = "37.0.0"
tokio = { version = "1.37.0", features = ["full"] }

[lints.clippy]
dbg_macro = "warn"
print_stdout = "warn"

# in general we lint against the pedantic group, but we will whitelist
# certain lints which we don't want to enforce (for now)
pedantic = { level = "warn", priority = -1 }
missing_errors_doc = "allow"
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# datafusion-functions-json

methods to implement:

* [x] `json_obj_contains(json: str, key: str) -> bool` - true if a JSON object has a specific key
* [ ] `json_obj_contains_all(json: str, keys: list[str]) -> bool` - true if a JSON object has all of a list of keys
* [ ] `json_obj_contains_any(json: str, keys: list[str]) -> bool` - true if a JSON object has all of a list of keys
* [ ] `json_obj_keys(json: str) -> list[str]` - get the keys of a JSON object
* [ ] `json_obj_values(json: str) -> list[Any]` - get the values of a JSON object
* [ ] `json_is_obj(json: str) -> bool` - true if the JSON is an object
* [ ] `json_array_contains(json: str, key: Any) -> bool` - true if a JSON array has a specific value
* [ ] `json_array_items(json: str) -> list[Any]` - get the items of a JSON array
* [ ] `json_is_array(json: str) -> bool` - true if the JSON is an array
* [ ] `json_get(json: str, key: str | int) -> Any` - get the value of a key in a JSON object or array
* [ ] `json_get_path(json: str, key: list[str | int]) -> Any` - is this possible?
* [ ] `json_length(json: str) -> int` - get the length of a JSON object or array
* [ ] `json_valid(json: str) -> bool` - true if the JSON is valid
* [ ] `json_cast(json: str) -> Any` - cast the JSON to a native type???
103 changes: 103 additions & 0 deletions src/json_obj_contains.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
use crate::macros::make_udf_function;
use arrow_schema::DataType;
use arrow_schema::DataType::{LargeUtf8, Utf8};
use datafusion_common::arrow::array::{as_string_array, ArrayRef, BooleanArray};
use datafusion_common::{exec_err, plan_err, Result, ScalarValue};
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
use jiter::Jiter;
use std::any::Any;
use std::sync::Arc;

make_udf_function!(
JsonObjContains,
json_obj_contains,
json_data key, // arg name
"Does the string exist as a top-level key within the JSON value?", // doc
json_obj_contains_udf // internal function name
);

#[derive(Debug)]
pub(super) struct JsonObjContains {
signature: Signature,
aliases: Vec<String>,
}

impl JsonObjContains {
pub fn new() -> Self {
Self {
signature: Signature::uniform(2, vec![Utf8, LargeUtf8], Volatility::Immutable),
aliases: vec!["json_obj_contains".to_string(), "json_object_contains".to_string()],
}
}
}

impl ScalarUDFImpl for JsonObjContains {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"json_obj_contains"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
match arg_types[0] {
Utf8 | LargeUtf8 => Ok(DataType::Boolean),
_ => {
plan_err!("The json_obj_contains function can only accept Utf8 or LargeUtf8.")
}
}
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
let json_haystack = match &args[0] {
ColumnarValue::Array(array) => as_string_array(array),
ColumnarValue::Scalar(_) => {
return exec_err!("json_obj_contains first argument: unexpected argument type, expected string array")
}
};

let needle = match &args[1] {
ColumnarValue::Scalar(ScalarValue::Utf8(Some(s))) => s.clone(),
_ => return exec_err!("json_obj_contains second argument: unexpected argument type, expected string"),
};

let array = json_haystack
.iter()
.map(|opt_json| opt_json.map(|json| jiter_json_contains(json.as_bytes(), &needle)))
.collect::<BooleanArray>();

Ok(ColumnarValue::from(Arc::new(array) as ArrayRef))
}

fn aliases(&self) -> &[String] {
&self.aliases
}
}

fn jiter_json_contains(json_data: &[u8], expected_key: &str) -> bool {
let mut jiter = Jiter::new(json_data, false);
let Ok(Some(first_key)) = jiter.next_object() else {
return false;
};

if first_key == expected_key {
return true;
}
if jiter.next_skip().is_err() {
return false;
}
while let Ok(Some(key)) = jiter.next_key() {
if key == expected_key {
return true;
}
if jiter.next_skip().is_err() {
return false;
}
}
false
}
26 changes: 26 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use datafusion_common::Result;
use datafusion_execution::FunctionRegistry;
use datafusion_expr::ScalarUDF;
use log::debug;
use std::sync::Arc;

mod json_obj_contains;
mod macros;

pub mod functions {
pub use crate::json_obj_contains::json_obj_contains;
}

/// Register all JSON UDFs
pub fn register_all(registry: &mut dyn FunctionRegistry) -> Result<()> {
let functions: Vec<Arc<ScalarUDF>> = vec![json_obj_contains::json_obj_contains_udf()];
functions.into_iter().try_for_each(|udf| {
let existing_udf = registry.register_udf(udf)?;
if let Some(existing_udf) = existing_udf {
debug!("Overwrite existing UDF: {}", existing_udf.name());
}
Ok(()) as Result<()>
})?;

Ok(())
}
Loading

0 comments on commit a27344c

Please sign in to comment.