Skip to content

Commit

Permalink
Aggregations: support for pipeline aggs and filter (#1296)
Browse files Browse the repository at this point in the history
* Aggregations: support for pipeline aggs and filter

Both can be defined in a container.
Only bucket selector pipeline aggregation for now.
  • Loading branch information
rbayet authored and romainruaud committed Feb 12, 2019
1 parent d014740 commit 32b86e5
Show file tree
Hide file tree
Showing 25 changed files with 953 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,27 @@ class Builder
*/
private $builders;

/**
* @var PipelineBuilderInterface[]
*/
private $pipelineBuilders;

/**
* Constructor.
*
* @param QueryBuilder $queryBuilder Query builder used to build
* Queries inside sort orders.
* @param BuilderInterface[] $builders Aggregation builder implementations
* @param QueryBuilder $queryBuilder Query builder used to build
* Queries inside sort orders.
* @param BuilderInterface[] $builders Aggregation builder implementations
* @param PipelineBuilderInterface[] $pipelineBuilders Pipeline aggregation builder implementations
*/
public function __construct(QueryBuilder $queryBuilder, array $builders = [])
{
public function __construct(
QueryBuilder $queryBuilder,
array $builders = [],
array $pipelineBuilders = []
) {
$this->queryBuilder = $queryBuilder;
$this->builders = $builders;
$this->pipelineBuilders = $pipelineBuilders;
}

/**
Expand All @@ -62,8 +72,8 @@ public function buildAggregations(array $buckets = [])
foreach ($buckets as $bucket) {
$bucketType = $bucket->getType();
$builder = $this->getBuilder($bucketType);
$aggregation = $builder->buildBucket($bucket);
$subAggregations = isset($aggregation['aggregations']) ? $aggregation['aggregations'] : [];
$aggregation = $builder->buildBucket($bucket);
$subAggregations = $aggregation['aggregations'] ?? [];

if (!empty($bucket->getChildBuckets())) {
$subAggregations = array_merge($subAggregations, $this->buildAggregations($bucket->getChildBuckets()));
Expand All @@ -73,6 +83,13 @@ public function buildAggregations(array $buckets = [])
$subAggregations[$metric->getName()] = [$metric->getType() => ['field' => $metric->getField()]];
}

foreach ($bucket->getPipelines() as $pipeline) {
$pipelineType = $pipeline->getType();
$pipelineBuilder = $this->getPipelineBuilder($pipelineType);
$pipelineAgg = $pipelineBuilder->buildPipeline($pipeline);
$subAggregations[$pipeline->getName()] = $pipelineAgg;
}

if (!empty($subAggregations)) {
$aggregation['aggregations'] = $subAggregations;
}
Expand Down Expand Up @@ -119,4 +136,20 @@ private function getBuilder($bucketType)

return $this->builders[$bucketType];
}

/**
* Retrieve the builder used to convert a pipeline into an ES aggregation.
*
* @param string $pipelineType Pipeline type to be built.
*
* @return PipelineBuilderInterface
*/
private function getPipelineBuilder($pipelineType)
{
if (!isset($this->pipelineBuilders[$pipelineType])) {
throw new \InvalidArgumentException("No builder found for pipeline aggregation type {$pipelineType}.");
}

return $this->pipelineBuilders[$pipelineType];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ interface BuilderInterface
/**
* Build the ES aggregation from a search request bucket.
*
* @param BucketInterface $bucket Bucketto be built.
* @param BucketInterface $bucket Bucket to be built.
*
* @return array
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<?php
/**
* DISCLAIMER
*
* Do not edit or add to this file if you wish to upgrade this module to newer
* versions in the future.
*
*
* @category Smile
* @package Smile\ElasticsuiteCore
* @author Richard BAYET <richard.bayet@smile.fr>
* @copyright 2019 Smile
* @license Open Software License ("OSL") v. 3.0
*/

namespace Smile\ElasticsuiteCore\Search\Adapter\Elasticsuite\Request\Aggregation\PipelineBuilder;

use Smile\ElasticsuiteCore\Search\Adapter\Elasticsuite\Request\Aggregation\PipelineBuilderInterface;
use Smile\ElasticsuiteCore\Search\Request\PipelineInterface;

/**
* Build a bucket selector ES pipeline aggregation.
*
* @category Smile
* @package Smile\ElasticsuiteCore
*/
class BucketSelector implements PipelineBuilderInterface
{
/**
* Build the pipeline aggregation.
*
* @param PipelineInterface $pipeline Bucket selector pipeline.
*
* @return array
*/
public function buildPipeline(PipelineInterface $pipeline)
{
if ($pipeline->getType() !== PipelineInterface::TYPE_BUCKET_SELECTOR) {
throw new \InvalidArgumentException("Query builder : invalid aggregation type {$pipeline->getType()}.");
}

$aggParams = [
'buckets_path' => $pipeline->getBucketsPath(),
'script' => $pipeline->getScript(),
'gap_policy' => $pipeline->getGapPolicy(),
];

return ['bucket_selector' => $aggParams];
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php
/**
* DISCLAIMER
*
* Do not edit or add to this file if you wish to upgrade this module to newer
* versions in the future.
*
*
* @category Smile
* @package Smile\ElasticsuiteCore
* @author Richard BAYET <richard.bayet@smile.fr>
* @copyright 2019 Smile
* @license Open Software License ("OSL") v. 3.0
*/

namespace Smile\ElasticsuiteCore\Search\Adapter\Elasticsuite\Request\Aggregation;

use Smile\ElasticsuiteCore\Search\Request\PipelineInterface;

/**
* Build Elasticsearch pipeline aggregation from search request PipelineInterface.
*
* @category Smile
* @package Smile\ElasticsuiteCore
*/
interface PipelineBuilderInterface
{
/**
* Build the ES aggregation from a search request pipeline.
*
* @param PipelineInterface $pipeline Pipeline to be built.
*
* @return array
*/
public function buildPipeline(PipelineInterface $pipeline);
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,28 @@ class AggregationBuilder
*/
private $metricFactory;

/**
* @var PipelineFactory
*/
private $pipelineFactory;

/**
* Constructor.
*
* @param AggregationFactory $aggregationFactory Factory used to instantiate buckets.
* @param MetricFactory $metricFactory Factory used to instantiate metrics.
* @param PipelineFactory $pipelineFactory Factory used to instantiate pipelines.
* @param QueryBuilder $queryBuilder Factory used to create queries inside filtered or nested aggs.
*/
public function __construct(
AggregationFactory $aggregationFactory,
MetricFactory $metricFactory,
PipelineFactory $pipelineFactory,
QueryBuilder $queryBuilder
) {
$this->aggregationFactory = $aggregationFactory;
$this->metricFactory = $metricFactory;
$this->pipelineFactory = $pipelineFactory;
$this->queryBuilder = $queryBuilder;
}

Expand Down Expand Up @@ -93,7 +101,7 @@ public function buildAggregations(ContainerConfigurationInterface $containerConf
private function buildAggregation(ContainerConfigurationInterface $containerConfig, $filters, $bucketParams)
{
$bucketType = $bucketParams['type'];
$fieldName = isset($bucketParams['field']) ? $bucketParams['field'] : $bucketParams['name'];
$fieldName = $bucketParams['field'] ?? $bucketParams['name'];

try {
$field = $containerConfig->getMapping()->getField($fieldName);
Expand All @@ -107,6 +115,11 @@ private function buildAggregation(ContainerConfigurationInterface $containerConf
$bucketParams['field'] = $fieldName;
}

// Merge container/aggregation defined aggregation filters with global request filters.
$filters = array_merge($filters, $bucketParams['filters'] ?? []);
unset($bucketParams['filters']);

// Ensure any globally applied (attribute layered navigation) filter is NOT applied on the (most likely) originating agg.
$bucketFilters = array_diff_key($filters, [$fieldName => true]);
if (!empty($bucketFilters)) {
$bucketParams['filter'] = $this->createFilter($containerConfig, $bucketFilters);
Expand All @@ -125,6 +138,8 @@ private function buildAggregation(ContainerConfigurationInterface $containerConf
$bucketParams['nestedFilter'] = $nestedFilter;
}

$bucketParams = $this->createPipelines($bucketParams);

return $this->aggregationFactory->create($bucketType, $bucketParams);
}

Expand All @@ -141,4 +156,23 @@ private function createFilter(ContainerConfigurationInterface $containerConfig,
{
return $this->queryBuilder->create($containerConfig, $filters, $currentPath);
}

/**
* Parse bucket params and create PipelineInterface instances
*
* @param array $bucketParams Bucket params.
*
* @return array
*/
private function createPipelines($bucketParams)
{
if (isset($bucketParams['pipelines'])) {
foreach ($bucketParams['pipelines'] as &$pipelineParams) {
$pipelineType = $pipelineParams['type'];
$pipelineParams = $this->pipelineFactory->create($pipelineType, $pipelineParams);
}
}

return $bucketParams;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use Smile\ElasticsuiteCore\Search\Request\BucketInterface;
use Smile\ElasticsuiteCore\Search\Request\QueryInterface;
use Smile\ElasticsuiteCore\Search\Request\MetricInterface;
use Smile\ElasticsuiteCore\Search\Request\PipelineInterface;

/**
* Abstract bucket implementation.
Expand Down Expand Up @@ -46,6 +47,11 @@ abstract class AbstractBucket implements BucketInterface
*/
private $childBuckets;

/**
* @var PipelineInterface[]
*/
private $pipelines;

/**
* @var string
*/
Expand All @@ -64,19 +70,21 @@ abstract class AbstractBucket implements BucketInterface
/**
* Constructor.
*
* @param string $name Bucket name.
* @param string $field Bucket field.
* @param MetricInterface[] $metrics Bucket metrics.
* @param BucketInterface[] $childBuckets Child buckets.
* @param string $nestedPath Nested path for nested bucket.
* @param QueryInterface $filter Bucket filter.
* @param QueryInterface $nestedFilter Nested filter for the bucket.
* @param string $name Bucket name.
* @param string $field Bucket field.
* @param MetricInterface[] $metrics Bucket metrics.
* @param BucketInterface[] $childBuckets Child buckets.
* @param PipelineInterface[] $pipelines Bucket pipelines.
* @param string $nestedPath Nested path for nested bucket.
* @param QueryInterface $filter Bucket filter.
* @param QueryInterface $nestedFilter Nested filter for the bucket.
*/
public function __construct(
$name,
$field,
array $metrics = [],
array $childBuckets = [],
array $pipelines = [],
$nestedPath = null,
QueryInterface $filter = null,
QueryInterface $nestedFilter = null
Expand All @@ -85,6 +93,7 @@ public function __construct(
$this->field = $field;
$this->metrics = $metrics;
$this->childBuckets = $childBuckets;
$this->pipelines = $pipelines;
$this->nestedPath = $nestedPath;
$this->filter = $filter;
$this->nestedFilter = $nestedFilter;
Expand Down Expand Up @@ -154,4 +163,12 @@ public function getChildBuckets()
{
return $this->childBuckets;
}

/**
* {@inheritDoc}
*/
public function getPipelines()
{
return $this->pipelines;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use Smile\ElasticsuiteCore\Search\Request\BucketInterface;
use Smile\ElasticsuiteCore\Search\Request\QueryInterface;
use Smile\ElasticsuiteCore\Search\Request\MetricInterface;
use Smile\ElasticsuiteCore\Search\Request\PipelineInterface;

/**
* Date historgram bucket implementation.
Expand All @@ -30,28 +31,43 @@ class DateHistogram extends Histogram
/**
* Constructor.
*
* @param string $name Bucket name.
* @param string $field Bucket field.
* @param MetricInterface[] $metrics Bucket metrics.
* @param BucketInterface[] $childBuckets Child buckets.
* @param string $nestedPath Nested path for nested bucket.
* @param QueryInterface $filter Bucket filter.
* @param QueryInterface $nestedFilter Nested filter for the bucket.
* @param integer $interval Histogram interval.
* @param integer $minDocCount Histogram min doc count.
* @SuppressWarnings(PHPMD.ExcessiveParameterList)
*
* @param string $name Bucket name.
* @param string $field Bucket field.
* @param MetricInterface[] $metrics Bucket metrics.
* @param BucketInterface[] $childBuckets Child buckets.
* @param PipelineInterface[] $pipelines Bucket pipelines.
* @param string $nestedPath Nested path for nested bucket.
* @param QueryInterface $filter Bucket filter.
* @param QueryInterface $nestedFilter Nested filter for the bucket.
* @param integer $interval Histogram interval.
* @param integer $minDocCount Histogram min doc count.
*/
public function __construct(
$name,
$field,
array $metrics = [],
array $childBuckets = [],
array $pipelines = [],
$nestedPath = null,
QueryInterface $filter = null,
QueryInterface $nestedFilter = null,
$interval = "1d",
$minDocCount = 0
) {
parent::__construct($name, $field, $metrics, $childBuckets, $nestedPath, $filter, $nestedFilter, $interval, $minDocCount);
parent::__construct(
$name,
$field,
$metrics,
$childBuckets,
$pipelines,
$nestedPath,
$filter,
$nestedFilter,
$interval,
$minDocCount
);
}

/**
Expand Down
Loading

0 comments on commit 32b86e5

Please sign in to comment.