Skip to content

Commit

Permalink
Add DocSelectFieldsActivity
Browse files Browse the repository at this point in the history
  • Loading branch information
tobias-weber committed Jan 22, 2025
1 parent a3542ab commit 9146805
Show file tree
Hide file tree
Showing 13 changed files with 142 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ assert canFuse(
}
}
} catch ( Exception e ) {
e.printStackTrace();
ctx.throwException( e );
} finally {
executedContext.getIterator().close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import org.polypheny.db.workflow.engine.storage.reader.CheckpointReader;
import org.polypheny.db.workflow.engine.storage.writer.DocWriter;

@ActivityDefinition(type = "docExtract", displayName = "Extract Collection in Polypheny", categories = { ActivityCategory.EXTRACT, ActivityCategory.DOCUMENT },
@ActivityDefinition(type = "docExtract", displayName = "Extract Collection", categories = { ActivityCategory.EXTRACT, ActivityCategory.DOCUMENT },
inPorts = {},
outPorts = { @OutPort(type = PortType.DOC) })

Expand Down Expand Up @@ -92,7 +92,7 @@ public void execute( List<CheckpointReader> inputs, Settings settings, Execution
@Override
public AlgNode fuse( List<AlgNode> inputs, Settings settings, AlgCluster cluster ) throws Exception {
LogicalCollection collection = settings.get( COLL_KEY, EntityValue.class ).getCollection();
AlgTraitSet traits = AlgTraitSet.createEmpty().plus( ModelTrait.DOCUMENT );
AlgTraitSet traits = cluster.traitSetOf( ModelTrait.DOCUMENT );
return new LogicalDocumentScan( cluster, traits, collection );
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright 2019-2025 The Polypheny Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.polypheny.db.workflow.dag.activities.impl;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.lang3.NotImplementedException;
import org.polypheny.db.algebra.AlgNode;
import org.polypheny.db.algebra.logical.document.LogicalDocumentProject;
import org.polypheny.db.algebra.type.AlgDataType;
import org.polypheny.db.plan.AlgCluster;
import org.polypheny.db.rex.RexNameRef;
import org.polypheny.db.rex.RexNode;
import org.polypheny.db.workflow.dag.activities.Activity;
import org.polypheny.db.workflow.dag.activities.Activity.ActivityCategory;
import org.polypheny.db.workflow.dag.activities.Activity.PortType;
import org.polypheny.db.workflow.dag.activities.ActivityException;
import org.polypheny.db.workflow.dag.activities.Fusable;
import org.polypheny.db.workflow.dag.activities.Pipeable;
import org.polypheny.db.workflow.dag.activities.TypePreview;
import org.polypheny.db.workflow.dag.activities.TypePreview.DocType;
import org.polypheny.db.workflow.dag.annotations.ActivityDefinition;
import org.polypheny.db.workflow.dag.annotations.ActivityDefinition.InPort;
import org.polypheny.db.workflow.dag.annotations.ActivityDefinition.OutPort;
import org.polypheny.db.workflow.dag.annotations.FieldSelectSetting;
import org.polypheny.db.workflow.dag.settings.FieldSelectValue;
import org.polypheny.db.workflow.dag.settings.SettingDef.Settings;
import org.polypheny.db.workflow.dag.settings.SettingDef.SettingsPreview;
import org.polypheny.db.workflow.engine.execution.context.ExecutionContext;
import org.polypheny.db.workflow.engine.execution.context.PipeExecutionContext;
import org.polypheny.db.workflow.engine.execution.pipe.InputPipe;
import org.polypheny.db.workflow.engine.execution.pipe.OutputPipe;
import org.polypheny.db.workflow.engine.storage.reader.CheckpointReader;

@ActivityDefinition(type = "docSelectFields", displayName = "Select Document Fields", categories = { ActivityCategory.TRANSFORM, ActivityCategory.DOCUMENT, },
inPorts = { @InPort(type = PortType.DOC, description = "The input collection.") },
outPorts = { @OutPort(type = PortType.DOC, description = "Collection containing only the specified fields of the input table. The '_id' field is always included.") },
shortDescription = "Select a subset of (nested) fields of the input collection."
)
@FieldSelectSetting(key = "fields", displayName = "Select Fields", reorder = false,
shortDescription = "Specify which fields to include. Alternatively, you can include any field not present in the list of excluded fields.")

@SuppressWarnings("unused")
public class DocSelectFieldsActivity implements Activity, Fusable, Pipeable {

@Override
public List<TypePreview> previewOutTypes( List<TypePreview> inTypes, SettingsPreview settings ) throws ActivityException {
return DocType.of().asOutTypes();
}


@Override
public void execute( List<CheckpointReader> inputs, Settings settings, ExecutionContext ctx ) throws Exception {
Fusable.super.execute( inputs, settings, ctx );
}


@Override
public AlgNode fuse( List<AlgNode> inputs, Settings settings, AlgCluster cluster ) throws Exception {
AlgDataType type = getDocType();
FieldSelectValue setting = settings.get( "fields", FieldSelectValue.class );
if ( setting.includeUnspecified() ) {
return LogicalDocumentProject.create( inputs.get( 0 ), Map.of(), setting.getExclude() );
}

List<String> fields = setting.getInclude();
Map<String, RexNode> nameRefs = fields.stream().collect( Collectors.toMap(
field -> field,
field -> RexNameRef.create( List.of( field.split( "\\." ) ), null, type )
) );
return LogicalDocumentProject.create( inputs.get( 0 ), nameRefs, setting.getExclude() );
}


@Override
public AlgDataType lockOutputType( List<AlgDataType> inTypes, Settings settings ) throws Exception {
return getDocType();
}


@Override
public void pipe( List<InputPipe> inputs, OutputPipe output, Settings settings, PipeExecutionContext ctx ) throws Exception {
throw new NotImplementedException( "Pipe is not yet implemented." );
}


@Override
public Optional<Boolean> canPipe( List<TypePreview> inTypes, SettingsPreview settings ) {
return Optional.of( false ); // TODO: implement pipe
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.polypheny.db.algebra.AlgNode;
import org.polypheny.db.algebra.logical.document.LogicalDocumentValues;
import org.polypheny.db.algebra.type.AlgDataType;
import org.polypheny.db.plan.AlgCluster;
import org.polypheny.db.schema.trait.ModelTrait;
import org.polypheny.db.transaction.Transaction;
import org.polypheny.db.type.entity.PolyString;
import org.polypheny.db.type.entity.PolyValue;
Expand Down Expand Up @@ -118,7 +120,13 @@ public AlgNode fuse( List<AlgNode> inputs, Settings settings, AlgCluster cluster
settings.get( "count", IntValue.class ).getValue(),
settings.get( "fixSeed", BoolValue.class ).getValue()
);
return LogicalDocumentValues.create( cluster, values );
return new LogicalDocumentValues( cluster, cluster.traitSetOf( ModelTrait.DOCUMENT ), values );
}


@Override
public Optional<Boolean> canFuse( List<TypePreview> inTypes, SettingsPreview settings ) {
return settings.get( "count", IntValue.class ).map( v -> v.getValue() <= 250 ); // otherwise the amount of generated code grows too big
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import org.polypheny.db.workflow.engine.storage.reader.CheckpointReader;
import org.polypheny.db.workflow.engine.storage.writer.LpgWriter;

@ActivityDefinition(type = "lpgExtract", displayName = "Extract Graph in Polypheny", categories = { ActivityCategory.EXTRACT, ActivityCategory.GRAPH },
@ActivityDefinition(type = "lpgExtract", displayName = "Extract Graph", categories = { ActivityCategory.EXTRACT, ActivityCategory.GRAPH },
inPorts = {},
outPorts = { @OutPort(type = PortType.LPG) })

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import org.polypheny.db.workflow.engine.storage.writer.CheckpointWriter;
import org.polypheny.db.workflow.engine.storage.writer.RelWriter;

@ActivityDefinition(type = "query", displayName = "Query", categories = { TRANSFORM, RELATIONAL, DOCUMENT, GRAPH },
@ActivityDefinition(type = "query", displayName = "Query Transform", categories = { TRANSFORM, RELATIONAL, DOCUMENT, GRAPH },
inPorts = {
@InPort(type = PortType.ANY, description = "The input data to be queried. Can have any data model."),
@InPort(type = PortType.ANY, isOptional = true, description = "An optional second input. Note: Not all query languages support inputs with differing data models.")
Expand Down Expand Up @@ -95,7 +95,7 @@ public void execute( List<CheckpointReader> inputs, Settings settings, Execution
AlgDataType outType = ActivityUtils.addPkCol( pair.left );
ctx.logInfo( "Adding primary key column to type." );
RelWriter writer = ctx.createRelWriter( 0, outType, true );
for (List<PolyValue> row : pair.right) {
for ( List<PolyValue> row : pair.right ) {
writer.wWithoutPk( row );
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import org.polypheny.db.workflow.engine.storage.reader.CheckpointReader;
import org.polypheny.db.workflow.engine.storage.writer.RelWriter;

@ActivityDefinition(type = "relExtract", displayName = "Extract Table in Polypheny", categories = { ActivityCategory.EXTRACT, ActivityCategory.RELATIONAL },
@ActivityDefinition(type = "relExtract", displayName = "Extract Table", categories = { ActivityCategory.EXTRACT, ActivityCategory.RELATIONAL },
inPorts = {},
outPorts = { @OutPort(type = PortType.REL) })

Expand Down Expand Up @@ -104,7 +104,7 @@ public void execute( List<CheckpointReader> inputs, Settings settings, Execution
@Override
public AlgNode fuse( List<AlgNode> inputs, Settings settings, AlgCluster cluster ) throws Exception {
LogicalTable table = settings.get( TABLE_KEY, EntityValue.class ).getTable();
AlgTraitSet traits = AlgTraitSet.createEmpty().plus( ModelTrait.RELATIONAL );
AlgTraitSet traits = cluster.traitSetOf( ModelTrait.RELATIONAL );

AlgNode scan = new LogicalRelScan( cluster, traits, table );
return ActivityUtils.addPkCol( scan, cluster );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -127,6 +128,12 @@ public AlgNode fuse( List<AlgNode> inputs, Settings settings, AlgCluster cluster
}


@Override
public Optional<Boolean> canFuse( List<TypePreview> inTypes, SettingsPreview settings ) {
return settings.get( "rowCount", IntValue.class ).map( v -> v.getValue() <= 250 ); // otherwise the amount of generated code grows too big
}


private static AlgDataType getType() {
AlgDataTypeFactory typeFactory = AlgDataTypeFactory.DEFAULT;
return typeFactory.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,16 @@
import org.polypheny.db.workflow.engine.execution.context.PipeExecutionContext;
import org.polypheny.db.workflow.engine.execution.pipe.InputPipe;
import org.polypheny.db.workflow.engine.execution.pipe.OutputPipe;
import org.polypheny.db.workflow.engine.storage.StorageManager;
import org.polypheny.db.workflow.engine.storage.reader.CheckpointReader;

@ActivityDefinition(type = "reorderCols", displayName = "Select / Reorder Columns", categories = { ActivityCategory.TRANSFORM, ActivityCategory.RELATIONAL },
inPorts = { @InPort(type = PortType.REL) },
outPorts = { @OutPort(type = PortType.REL) }
inPorts = { @InPort(type = PortType.REL, description = "The input table") },
outPorts = { @OutPort(type = PortType.REL, description = "A Table containing the selected subset of columns from the input table in the specified order.") },
shortDescription = "Select and reorder the columns of the input table."
)
@FieldSelectSetting(key = "cols", displayName = "Columns", reorder = true, defaultAll = true)
@FieldSelectSetting(key = "cols", displayName = "Columns", reorder = true, defaultAll = true,
shortDescription = "Specify the names of the columns to include. Alternatively, you can include all columns except for the excluded ones. The \"" + StorageManager.PK_COL + "\" column must always be included.")

@SuppressWarnings("unused")
public class ReorderColsActivity implements Activity, Fusable, Pipeable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ public static long estimateByteSize( PolyValue[] tuple ) {
size += DEFAULT_BYTE_SIZE;
}
}
return size;
return Math.max( DEFAULT_BYTE_SIZE, size );
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
import org.polypheny.db.catalog.entity.logical.LogicalCollection;
import org.polypheny.db.catalog.logistic.DataModel;
import org.polypheny.db.plan.AlgCluster;
import org.polypheny.db.plan.AlgTraitSet;
import org.polypheny.db.processing.ImplementationContext.ExecutedContext;
import org.polypheny.db.schema.trait.ModelTrait;
import org.polypheny.db.transaction.Transaction;
import org.polypheny.db.type.entity.PolyString;
import org.polypheny.db.type.entity.PolyValue;
Expand Down Expand Up @@ -88,8 +86,7 @@ public Iterable<PolyDocument> getDocIterable() {

@Override
public AlgNode getAlgNode( AlgCluster cluster ) {
AlgTraitSet traits = AlgTraitSet.createEmpty().plus( ModelTrait.DOCUMENT );
return new LogicalDocumentScan( cluster, traits, entity );
return LogicalDocumentScan.create( cluster, entity );
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public long getEdgeCount() {

@Override
public AlgNode getAlgNode( AlgCluster cluster ) {
AlgTraitSet traits = AlgTraitSet.createEmpty().plus( ModelTrait.GRAPH );
AlgTraitSet traits = cluster.traitSetOf( ModelTrait.GRAPH );
return new LogicalLpgScan( cluster, traits, entity, entity.getTupleType() );
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public long getRowCount() {

@Override
public AlgNode getAlgNode( AlgCluster cluster ) {
AlgTraitSet traits = AlgTraitSet.createEmpty().plus( ModelTrait.RELATIONAL );
AlgTraitSet traits = cluster.traitSetOf( ModelTrait.RELATIONAL );
return new LogicalRelScan( cluster, traits, entity );
}

Expand Down

0 comments on commit 9146805

Please sign in to comment.