diff --git a/gradle/testing/randomization/policies/solr-tests.policy b/gradle/testing/randomization/policies/solr-tests.policy index 61df0871a35..31b9da9807b 100644 --- a/gradle/testing/randomization/policies/solr-tests.policy +++ b/gradle/testing/randomization/policies/solr-tests.policy @@ -37,6 +37,11 @@ grant { permission java.io.FilePermission "/path/to/myinst/conf/solrcore.properties", "read"; // TestConfigSets messes with these (wtf?) permission java.io.FilePermission "/path/to/solr/home/lib", "read"; + + // Needed by org.apache.solr.handler.component.UBIComponentTest + permission java.io.FilePermission "${common-solr.dir}/core/build/resources/test/solr/userfiles/ubi_queries.jsonl", "write"; + permission java.io.FilePermission "/tmp/src/solr/solr/core/build/resources/test/solr/userfiles${/}-", "write"; + permission java.io.FilePermission "/tmp/src/solr/solr/core/build/resources/test/solr/userfiles", "write"; permission java.nio.file.LinkPermission "hard"; diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java index 749436a0e8c..13d166cf449 100644 --- a/solr/core/src/java/org/apache/solr/core/SolrCore.java +++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java @@ -1983,7 +1983,7 @@ public SolrRequestHandler registerRequestHandler(String handlerName, SolrRequest /** Register the default search components */ private void loadSearchComponents() { - Map instances = createInstances(SearchComponent.standard_components); + Map instances = createInstances(SearchComponent.STANDARD_COMPONENTS); for (Map.Entry e : instances.entrySet()) e.getValue().setName(e.getKey()); searchComponents.init(instances, this); diff --git a/solr/core/src/java/org/apache/solr/handler/LoggingStream.java b/solr/core/src/java/org/apache/solr/handler/LoggingStream.java new file mode 100644 index 00000000000..8839686c1e7 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/handler/LoggingStream.java @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.handler; + +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.lang.invoke.MethodHandles; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.StreamContext; +import org.apache.solr.client.solrj.io.stream.TupleStream; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Explanation.ExpressionType; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.util.StrUtils; +import org.apache.solr.core.SolrCore; +import org.noggit.CharArr; +import org.noggit.JSONWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Sends tuples emitted by a wrapped {@link TupleStream} as writes to a log file. The log file will + * be created in the "userfiles" directory and formatted in the JSON w/ Lines format. + * + *

I really want to call this the DogStream, as it matches the CatStream. + * + *

Is this generically useful to be added to the streaming jar and Lang? + * + *

WriterStream? LoggingStream? FileoutputStream? JsonOutputStream? LoggingStream?? + */ +public class LoggingStream extends TupleStream implements Expressible { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + // field name in summary tuple for #docs updated in batch + public static String BATCH_LOGGED_FIELD_NAME = "batchLogged"; + + private Path chroot; + + /** + * The name of the log file that should be written to. This will be in the same directory that the + * CatStream is allowed to write to. + */ + private String filepath; + + private Path filePath; + + private int updateBatchSize; + + private int batchNumber; + private long totalDocsIndex; + private TupleStream tupleSource; + private List documentBatch = new ArrayList<>(); + + private OutputStream fos; + private final CharArr charArr = new CharArr(1024 * 2); + JSONWriter jsonWriter = new JSONWriter(charArr, -1); + private Writer writer; + + public LoggingStream(StreamExpression expression, StreamFactory factory) throws IOException { + + filepath = factory.getValueOperand(expression, 0); + if (filepath == null) { + throw new IllegalArgumentException("No filepath provided to log stream to"); + } + final String filepathWithoutSurroundingQuotes = stripSurroundingQuotesIfTheyExist(filepath); + if (StrUtils.isNullOrEmpty(filepathWithoutSurroundingQuotes)) { + throw new IllegalArgumentException("No filepath provided to stream"); + } + + this.filepath = filepathWithoutSurroundingQuotes; + + // Extract underlying TupleStream. + List streamExpressions = + factory.getExpressionOperandsRepresentingTypes( + expression, Expressible.class, TupleStream.class); + if (1 != streamExpressions.size()) { + throw new IOException( + String.format( + Locale.ROOT, + "Invalid expression %s - expecting a single stream but found %d", + expression, + streamExpressions.size())); + } + StreamExpression sourceStreamExpression = streamExpressions.get(0); + init(filepathWithoutSurroundingQuotes, factory.constructStream(sourceStreamExpression)); + } + + public LoggingStream(String filepath, TupleStream tupleSource) throws IOException { + + init(filepath, tupleSource); + } + + private void init(String filepath, TupleStream tupleSource) { + this.filepath = filepath; + this.tupleSource = tupleSource; + } + + /** The path of the file being logged to */ + public Path getFilePath() { + return filePath; + } + + @Override + public void open() throws IOException { + filePath = chroot.resolve(filepath).normalize(); + if (!filePath.startsWith(chroot)) { + throw new SolrException( + SolrException.ErrorCode.BAD_REQUEST, "file to log to must be under " + chroot); + } + + fos = new FileOutputStream(filePath.toFile(), true); + writer = new OutputStreamWriter(fos, StandardCharsets.UTF_8); + + tupleSource.open(); + } + + @Override + public Tuple read() throws IOException { + + Tuple tuple = tupleSource.read(); + if (tuple.EOF) { + + return tuple; + } else { + // tupleSource.pushBack(tuple); + uploadBatchToCollection(tuple); + // return createBatchSummaryTuple(b); + } + + // uploadBatchToCollection(documentBatch); + // int b = documentBatch.size(); + // documentBatch.clear(); + int b = 1; + return createBatchSummaryTuple(b); + } + + @Override + public void close() throws IOException { + if (writer != null) { + writer.flush(); + } + if (fos != null) { + fos.flush(); + fos.close(); + } + tupleSource.close(); + } + + @Override + public StreamComparator getStreamSort() { + return tupleSource.getStreamSort(); + } + + @Override + public List children() { + ArrayList sourceList = new ArrayList<>(1); + sourceList.add(tupleSource); + return sourceList; + } + + @Override + public StreamExpression toExpression(StreamFactory factory) throws IOException { + return toExpression(factory, true); + } + + private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) + throws IOException { + StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); + expression.addParameter(filepath); + + if (includeStreams) { + if (tupleSource != null) { + expression.addParameter(((Expressible) tupleSource).toExpression(factory)); + } else { + throw new IOException( + "This LoggingStream contains a non-expressible TupleStream - it cannot be converted to an expression"); + } + } else { + expression.addParameter(""); + } + + return expression; + } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + StreamExplanation explanation = new StreamExplanation(getStreamNodeId() + "-datastore"); + + explanation.setFunctionName(String.format(Locale.ROOT, "logging (%s)", filepath)); + explanation.setImplementingClass(this.getClass().getName()); + explanation.setExpressionType(ExpressionType.DATASTORE); + explanation.setExpression("Log tuples into " + filepath); + + // child is a datastore so add it at this point + StreamExplanation child = new StreamExplanation(getStreamNodeId().toString()); + child.setFunctionName(String.format(Locale.ROOT, factory.getFunctionName(getClass()))); + child.setImplementingClass(getClass().getName()); + child.setExpressionType(ExpressionType.DATASTORE); + child.setExpression(toExpression(factory, false).toString()); + child.addChild(tupleSource.toExplanation(factory)); + + explanation.addChild(child); + + return explanation; + } + + @Override + public void setStreamContext(StreamContext context) { + Object solrCoreObj = context.get("solr-core"); + if (solrCoreObj == null || !(solrCoreObj instanceof SolrCore)) { + throw new SolrException( + SolrException.ErrorCode.INVALID_STATE, + "StreamContext must have SolrCore in solr-core key"); + } + final SolrCore core = (SolrCore) context.get("solr-core"); + + this.chroot = core.getCoreContainer().getUserFilesPath(); + if (!Files.exists(chroot)) { + try { + Files.createDirectories(this.chroot); + } catch (IOException ioe) { + throw new SolrException( + SolrException.ErrorCode.INVALID_STATE, + chroot + " directory used to load files must exist but and couldn't be created!"); + } + } + + // Pass down the stream context. + this.tupleSource.setStreamContext(context); + } + + // private SolrInputDocument convertTupleTJson(Tuple tuple) { + // SolrInputDocument doc = new SolrInputDocument(); + // for (String field : tuple.getFields().keySet()) { + // + // if (!(field.equals(CommonParams.VERSION_FIELD) )) { + // Object value = tuple.get(field); + // if (value instanceof List) { + // addMultivaluedField(doc, field, (List) value); + // } else { + // doc.addField(field, value); + // } + // } + // } + // log.debug("Tuple [{}] was converted into SolrInputDocument [{}].", tuple, doc); + // jsonWriter + // return doc; + // } + + private void addMultivaluedField(SolrInputDocument doc, String fieldName, List values) { + for (Object value : values) { + doc.addField(fieldName, value); + } + } + + /** + * This method will be called on every batch of tuples consumed, after converting each tuple in + * that batch to a Solr Input Document. + */ + protected void uploadBatchToCollection(Tuple doc) throws IOException { + charArr.reset(); + // doc.toMap() + // Map m =doc.toMap() + // doc.forEach( + // (s, field) -> { + // if (s.equals("_version_") || s.equals("_roor_")) return; + // if (field instanceof List) { + // if (((List) field).size() == 1) { + // field = ((List) field).get(0); + // } + // } + // field = constructDateStr(field); + // if (field instanceof List) { + // List list = (List) field; + // if (hasdate(list)) { + // ArrayList listCopy = new ArrayList<>(list.size()); + // for (Object o : list) listCopy.add(constructDateStr(o)); + // field = listCopy; + // } + // } + // m.put(s, field); + // }); + // jsonWriter.write(m); + jsonWriter.write(doc); + writer.write(charArr.getArray(), charArr.getStart(), charArr.getEnd()); + writer.append('\n'); + } + + private Tuple createBatchSummaryTuple(int batchSize) { + assert batchSize > 0; + Tuple tuple = new Tuple(); + this.totalDocsIndex += batchSize; + ++batchNumber; + tuple.put(BATCH_LOGGED_FIELD_NAME, batchSize); + tuple.put("totalIndexed", this.totalDocsIndex); + tuple.put("batchNumber", batchNumber); + // if (coreName != null) { + // tuple.put("worker", coreName); + // } + return tuple; + } + + private String stripSurroundingQuotesIfTheyExist(String value) { + if (value.length() < 2) return value; + if ((value.startsWith("\"") && value.endsWith("\"")) + || (value.startsWith("'") && value.endsWith("'"))) { + return value.substring(1, value.length() - 1); + } + + return value; + } +} diff --git a/solr/core/src/java/org/apache/solr/handler/SolrDefaultStreamFactory.java b/solr/core/src/java/org/apache/solr/handler/SolrDefaultStreamFactory.java index d46962189cc..01a8e0bfd63 100644 --- a/solr/core/src/java/org/apache/solr/handler/SolrDefaultStreamFactory.java +++ b/solr/core/src/java/org/apache/solr/handler/SolrDefaultStreamFactory.java @@ -36,6 +36,7 @@ public SolrDefaultStreamFactory() { super(); this.withFunctionName("analyze", AnalyzeEvaluator.class); this.withFunctionName("cat", CatStream.class); + this.withFunctionName("logging", LoggingStream.class); this.withFunctionName("classify", ClassifyStream.class); this.withFunctionName("haversineMeters", HaversineMetersEvaluator.class); this.withFunctionName("wsum", WeightedSumMetric.class); diff --git a/solr/core/src/java/org/apache/solr/handler/component/HighlightComponent.java b/solr/core/src/java/org/apache/solr/handler/component/HighlightComponent.java index d05c44ef539..1caa82033b5 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/HighlightComponent.java +++ b/solr/core/src/java/org/apache/solr/handler/component/HighlightComponent.java @@ -197,7 +197,7 @@ public SolrHighlighter getHighlighter(SolrParams params) { public void modifyRequest(ResponseBuilder rb, SearchComponent who, ShardRequest sreq) { if (!rb.doHighlights) return; - // Turn on highlighting only only when retrieving fields + // Turn on highlighting only when retrieving fields if ((sreq.purpose & ShardRequest.PURPOSE_GET_FIELDS) != 0) { sreq.purpose |= ShardRequest.PURPOSE_GET_HIGHLIGHTS; // should already be true... diff --git a/solr/core/src/java/org/apache/solr/handler/component/SearchComponent.java b/solr/core/src/java/org/apache/solr/handler/component/SearchComponent.java index 913078c27ee..b7de8aa620a 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/SearchComponent.java +++ b/solr/core/src/java/org/apache/solr/handler/component/SearchComponent.java @@ -17,6 +17,7 @@ package org.apache.solr.handler.component; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import org.apache.solr.core.SolrInfoBean; import org.apache.solr.metrics.SolrMetricsContext; @@ -105,20 +106,24 @@ public SolrMetricsContext getSolrMetricsContext() { @Override public void initializeMetrics(SolrMetricsContext parentContext, String scope) { - // By default don't register any metrics - but prepare a child context + // By default, don't register any metrics - but prepare a child context this.solrMetricsContext = parentContext.getChildContext(this); } - public static final Map> standard_components = - Map.of( - HighlightComponent.COMPONENT_NAME, HighlightComponent.class, - QueryComponent.COMPONENT_NAME, QueryComponent.class, - FacetComponent.COMPONENT_NAME, FacetComponent.class, - FacetModule.COMPONENT_NAME, FacetModule.class, - MoreLikeThisComponent.COMPONENT_NAME, MoreLikeThisComponent.class, - StatsComponent.COMPONENT_NAME, StatsComponent.class, - DebugComponent.COMPONENT_NAME, DebugComponent.class, - RealTimeGetComponent.COMPONENT_NAME, RealTimeGetComponent.class, - ExpandComponent.COMPONENT_NAME, ExpandComponent.class, - TermsComponent.COMPONENT_NAME, TermsComponent.class); + public static final Map> STANDARD_COMPONENTS; + + static { + STANDARD_COMPONENTS = new HashMap<>(); + STANDARD_COMPONENTS.put(HighlightComponent.COMPONENT_NAME, HighlightComponent.class); + STANDARD_COMPONENTS.put(QueryComponent.COMPONENT_NAME, QueryComponent.class); + STANDARD_COMPONENTS.put(FacetComponent.COMPONENT_NAME, FacetComponent.class); + STANDARD_COMPONENTS.put(FacetModule.COMPONENT_NAME, FacetModule.class); + STANDARD_COMPONENTS.put(MoreLikeThisComponent.COMPONENT_NAME, MoreLikeThisComponent.class); + STANDARD_COMPONENTS.put(StatsComponent.COMPONENT_NAME, StatsComponent.class); + STANDARD_COMPONENTS.put(DebugComponent.COMPONENT_NAME, DebugComponent.class); + STANDARD_COMPONENTS.put(RealTimeGetComponent.COMPONENT_NAME, RealTimeGetComponent.class); + STANDARD_COMPONENTS.put(ExpandComponent.COMPONENT_NAME, ExpandComponent.class); + STANDARD_COMPONENTS.put(TermsComponent.COMPONENT_NAME, TermsComponent.class); + STANDARD_COMPONENTS.put(UBIComponent.COMPONENT_NAME, UBIComponent.class); + } } diff --git a/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java b/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java index 2d6b36ac3cb..4399a4ca88b 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/component/SearchHandler.java @@ -126,6 +126,13 @@ public class SearchHandler extends RequestHandlerBase private PluginInfo shfInfo; private SolrCore core; + /** + * The default set of components that every handler gets. You can change this by defining the + * specific components for a handler. It puts the {@link QueryComponent} first as subsequent + * components assume that the QueryComponent ran and populated the document list. + * + * @return A list of component names. + */ protected List getDefaultComponents() { ArrayList names = new ArrayList<>(9); names.add(QueryComponent.COMPONENT_NAME); @@ -137,6 +144,7 @@ protected List getDefaultComponents() { names.add(DebugComponent.COMPONENT_NAME); names.add(ExpandComponent.COMPONENT_NAME); names.add(TermsComponent.COMPONENT_NAME); + names.add(UBIComponent.COMPONENT_NAME); return names; } diff --git a/solr/core/src/java/org/apache/solr/handler/component/UBIComponent.java b/solr/core/src/java/org/apache/solr/handler/component/UBIComponent.java new file mode 100644 index 00000000000..13c892be4d7 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/handler/component/UBIComponent.java @@ -0,0 +1,641 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.handler.component; + +import static org.apache.solr.handler.RequestHandlerBase.isInternalShardRequest; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.LineNumberReader; +import java.lang.invoke.MethodHandles; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.function.Consumer; +import org.apache.solr.client.solrj.io.SolrClientCache; +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.comp.StreamComparator; +import org.apache.solr.client.solrj.io.stream.StreamContext; +import org.apache.solr.client.solrj.io.stream.TupleStream; +import org.apache.solr.client.solrj.io.stream.expr.DefaultStreamFactory; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.client.solrj.io.stream.expr.StreamExplanation; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.util.SimpleOrderedMap; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.core.PluginInfo; +import org.apache.solr.core.SolrCore; +import org.apache.solr.handler.LoggingStream; +import org.apache.solr.response.ResultContext; +import org.apache.solr.schema.IndexSchema; +import org.apache.solr.search.DocIterator; +import org.apache.solr.search.DocList; +import org.apache.solr.search.SolrIndexSearcher; +import org.apache.solr.util.plugin.SolrCoreAware; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * User Behavior Insights (UBI) is an open standard for gathering query and event data from users + * and storing it in a structured format. UBI can be used for in session personalization, implicit + * judgements, powering recommendation systems among others. Learn more about the UBI standard at https://ubisearch.dev. + * + *

The response from Solr is augmented by this component, and optionally the query details can be + * tracked and logged to various systems including log files or other backend systems. + * + *

Data tracked is a unique query_id for the search request, the end user's query, metadata about + * the query as a JSON map, and the resulting document id's. + * + *

You provide a streaming expression that is parsed and loaded by the component to stream query + * data to a target of your choice. If you do not, then the default expression of + * 'logging(ubi_queries.jsonl,ubiQuery())"' is used which logs data to + * $SOLR_HOME/userfiles/ubi_queries.jsonl file. + * + *

You must source your streaming events using the 'ubiQuery()' streaming expression to retrieve + * the {@link UBIQuery} object that contains the data for recording. + * + *

Event data is tracked by letting the user write events directly to the event repository of + * your choice, it could be a Solr collection, it could be a file or S3 bucket, and that is NOT + * handled by this component. + * + *

Add the component to a requestHandler in solrconfig.xml like this: + * + *

+ * <searchComponent name="ubi" class="solr.UBIComponent"/>
+ *
+ * <requestHandler name="/select" class="solr.SearchHandler">
+ *   <lst name="defaults">
+ *
+ *     ...
+ *
+ *   </lst>
+ *   <arr name="components">
+ *     <str>ubi</str>
+ *   </arr>
+ * </requestHandler>
+ * + * It can then be enabled at query time by supplying + * + *
ubi=true
+ * + * query parameter. + * + *

Ideally this component is used with the JSON Query syntax, as that facilitates passing in the + * additional data to be tracked with a query. Here is an example: + * + *

+ *     {
+ *     "query" : "apple AND ipod",
+ *     "limit":2,
+ *     "start":2,
+ *     "filter": [
+ *        "inStock:true"
+ *      ]
+ *     params: {
+ *       "ubi": "true"
+ *       "user_query": "Apple iPod",
+ *       "query_attributes": {
+ *         "experiment_name": "super_secret",
+ *         "page": 2,
+ *         "in_stock": "true"
+ *       }
+ *     }
+ *   }
+ * 
+ * + *

Refer to + * https://solr.apache.org/guide/solr/latest/query-guide/user-behavior-insights.html for more + * details + */ +public class UBIComponent extends SearchComponent implements SolrCoreAware { + + public static final String COMPONENT_NAME = "ubi"; + public static final String QUERY_ID = "query_id"; + public static final String QUERY_ATTRIBUTES = "query_attributes"; + public static final String USER_QUERY = "user_query"; + public static final String APPLICATION = "application"; + public static final String DOC_IDS = "doc_ids"; + + protected PluginInfo info = PluginInfo.EMPTY_INFO; + + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private StreamContext streamContext; + private StreamExpression streamExpression; + private StreamFactory streamFactory; + + protected SolrParams initArgs; + + @Override + public void init(NamedList args) { + this.initArgs = args.toSolrParams(); + } + + @Override + public void inform(SolrCore core) { + log.info("Initializing UBIComponent"); + + CoreContainer coreContainer = core.getCoreContainer(); + SolrClientCache solrClientCache = coreContainer.getSolrClientCache(); + + String expr; + String queryProcessingExpression = null; + if (initArgs != null) { + queryProcessingExpression = initArgs.get("queryProcessingExpression"); + } + + if (queryProcessingExpression == null) { + log.info( + "No 'queryProcessingExpression' file provided to describe processing of UBI query information."); + log.info( + "Writing out UBI query information to local $SOLR_HOME/userfiles/ubi_queries.jsonl file instead."); + // Most simplistic version + // expr = "logging(ubi_queries.jsonl, tuple(query_id=49,user_query=\"RAM memory\"))"; + + // The default version + expr = "logging(ubi_queries.jsonl,ubiQuery())"; + + // feels like 'stream' or 'get' or something should let me create a tuple out of something + // in the streamContext. That would turn the "ubi-query" object in the stream context into a + // nice + // tuple and return it. streamContext(ubi-query)?? + // expr = "logging(ubi_queries.jsonl," + "get(ubi-query)" + ")"; + } else { + + String[] args = {}; // maybe we have variables? + try (LineNumberReader bufferedReader = + new LineNumberReader( + new InputStreamReader( + core.getResourceLoader().openResource(queryProcessingExpression), + StandardCharsets.UTF_8))) { + expr = readExpression(bufferedReader, args); + } catch (IOException ioe) { + throw new SolrException( + SolrException.ErrorCode.SERVER_ERROR, + "Error reading file " + queryProcessingExpression, + ioe); + } + } + + streamContext = new StreamContext(); + streamContext.put("solr-core", core); + streamContext.setSolrClientCache(solrClientCache); + + streamExpression = StreamExpressionParser.parse(expr); + if (!streamExpression.toString().contains("ubiQuery")) { + log.error( + "The streaming expression " + + streamExpression + + " must include the 'ubiQuery()' to record UBI queries."); + } + + streamFactory = new DefaultStreamFactory(); + streamFactory.withFunctionName("logging", LoggingStream.class); + streamFactory.withFunctionName("ubiQuery", UBIQueryStream.class); + + if (coreContainer.isZooKeeperAware()) { + String defaultZkHost = core.getCoreContainer().getZkController().getZkServerAddress(); + streamFactory.withDefaultZkHost(defaultZkHost); + } + } + + @Override + public void prepare(ResponseBuilder rb) throws IOException { + SolrParams params = rb.req.getParams(); + if (!params.getBool(COMPONENT_NAME, false)) { + return; + } + rb.setNeedDocList(true); + } + + @Override + public void process(ResponseBuilder rb) throws IOException { + SolrParams params = rb.req.getParams(); + if (!params.getBool(COMPONENT_NAME, false)) { + return; + } + if (!isInternalShardRequest(rb.req)) { // subordinate shard req shouldn't yield logs + storeUbiDetails( + rb, + ubiQuery -> { + try { + DocList docList = ((ResultContext) rb.rsp.getResponse()).getDocList(); + ubiQuery.setDocIds(extractDocIds(docList, rb.req.getSearcher())); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + } + + @Override + public int distributedProcess(ResponseBuilder rb) throws IOException { + SolrParams params = rb.req.getParams(); + if (!params.getBool(COMPONENT_NAME, false)) { + return ResponseBuilder.STAGE_DONE; + } + + if (rb.stage < ResponseBuilder.STAGE_GET_FIELDS) { + return ResponseBuilder.STAGE_GET_FIELDS; + } + + if (rb.stage == ResponseBuilder.STAGE_GET_FIELDS) { + storeUbiDetails( + rb, + ubiQuery -> + ubiQuery.setDocIds( + String.join(",", rb.resultIds.keySet().stream().map(Object::toString).toList()))); + return ResponseBuilder.STAGE_DONE; + } + + return ResponseBuilder.STAGE_DONE; + } + + private static UBIQuery getUbiQuery(ResponseBuilder rb) { + if (rb.rsp.getValues().get("ubi") != null) { + return null; + } + SolrParams params = rb.req.getParams(); + + SolrIndexSearcher searcher = rb.req.getSearcher(); + + String queryId = params.get(QUERY_ID); + UBIQuery ubiQuery = new UBIQuery(queryId); + + ubiQuery.setUserQuery(params.get(USER_QUERY)); + ubiQuery.setApplication(params.get(APPLICATION)); + if (ubiQuery.getApplication() == null) { + ubiQuery.setApplication( + rb.isDistrib + ? rb.req.getCloudDescriptor().getCollectionName() + : searcher.getCore().getName()); + } + + String queryAttributes = params.get(QUERY_ATTRIBUTES); + + if (queryAttributes != null && queryAttributes.toString().startsWith("{")) { + // Look up the original nested JSON format, typically passed in + // via the JSON formatted query. + @SuppressWarnings("rawtypes") + Map jsonProperties = rb.req.getJSON(); + if (jsonProperties.containsKey("params")) { + @SuppressWarnings("rawtypes") + Map paramsProperties = (Map) jsonProperties.get("params"); + if (paramsProperties.containsKey(QUERY_ATTRIBUTES)) { + @SuppressWarnings("rawtypes") + Map queryAttributesAsMap = (Map) paramsProperties.get(QUERY_ATTRIBUTES); + ubiQuery.setQueryAttributes(queryAttributesAsMap); + } + } + } + return ubiQuery; + } + + private void storeUbiDetails(ResponseBuilder rb, Consumer docIdsSetter) + throws IOException { + UBIQuery ubiQuery = getUbiQuery(rb); + if (ubiQuery == null) return; + docIdsSetter.accept(ubiQuery); + addUserBehaviorInsightsToResponse(ubiQuery, rb); + recordQuery(ubiQuery); + } + + private void recordQuery(UBIQuery ubiQuery) throws IOException { + TupleStream stream; + + stream = constructStream(streamFactory, streamExpression); + + streamContext.put("ubi-query", ubiQuery); + stream.setStreamContext(streamContext); + + getTuple(stream); + } + + private void addUserBehaviorInsightsToResponse(UBIQuery ubiQuery, ResponseBuilder rb) { + SimpleOrderedMap ubiResponseInfo = new SimpleOrderedMap<>(); + + ubiResponseInfo.add(QUERY_ID, ubiQuery.getQueryId()); + rb.rsp.add("ubi", ubiResponseInfo); + } + + protected String extractDocIds(DocList dl, SolrIndexSearcher searcher) throws IOException { + IndexSchema schema = searcher.getSchema(); + + if (schema.getUniqueKeyField() == null) { + log.error("Can't track documents for query without unique field."); + return ""; + } + StringBuilder sb = new StringBuilder(); + + Set fields = Collections.singleton(schema.getUniqueKeyField().getName()); + for (DocIterator iter = dl.iterator(); iter.hasNext(); ) { + sb.append(schema.printableUniqueKey(searcher.getDocFetcher().doc(iter.nextDoc(), fields))) + .append(','); + } + String docIds = sb.length() > 0 ? sb.substring(0, sb.length() - 1) : ""; + + return docIds; + } + + protected List getTuples(TupleStream tupleStream) throws IOException { + tupleStream.open(); + List tuples = new ArrayList<>(); + for (; ; ) { + Tuple t = tupleStream.read(); + if (t.EOF) { + break; + } else { + tuples.add(t); + } + } + tupleStream.close(); + return tuples; + } + + protected Tuple getTuple(TupleStream tupleStream) throws IOException { + tupleStream.open(); + Tuple t = tupleStream.read(); + tupleStream.close(); + return t; + } + + // this should be a shared utility method + public static String readExpression(LineNumberReader bufferedReader, String[] args) + throws IOException { + + StringBuilder exprBuff = new StringBuilder(); + + boolean comment = false; + while (true) { + String line = bufferedReader.readLine(); + if (line == null) { + break; + } + + if (line.indexOf("/*") == 0) { + comment = true; + continue; + } + + if (line.indexOf("*/") == 0) { + comment = false; + continue; + } + + if (comment || line.startsWith("#") || line.startsWith("//")) { + continue; + } + + // Substitute parameters + if (line.length() > 0) { + for (int i = 1; i < args.length; i++) { + String arg = args[i]; + line = line.replace("$" + i, arg); + } + } + + exprBuff.append(line); + } + + return exprBuff.toString(); + } + + private static TupleStream constructStream( + StreamFactory streamFactory, StreamExpression streamExpression) { + try { + return streamFactory.constructStream(streamExpression); + } catch (IOException exception) { + throw new SolrException( + SolrException.ErrorCode.SERVER_ERROR, + "Error constructing stream for processing UBI data collection using expression " + + streamExpression, + exception); + } + } + + @Override + public String getDescription() { + return "A component that tracks the original user query and the resulting documents returned to understand the user."; + } + + /** + * Handles all the data required for tracking a query using User Behavior Insights. + * + *

Compatible with the + * https://github.com/o19s/ubi/blob/main/schema/1.2.0/query.request.schema.json. + */ + public static class UBIQuery { + + private String application; + private String queryId; + private String userQuery; + private Date timestamp; + + @SuppressWarnings("rawtypes") + private Map queryAttributes; + + private String docIds; + + public UBIQuery(String queryId) { + + if (queryId == null) { + queryId = UUID.randomUUID().toString().toLowerCase(Locale.ROOT); + } + this.queryId = queryId; + this.timestamp = new Date(); + } + + public Date getTimestamp() { + return timestamp; + } + + public void setApplication(String application) { + this.application = application; + } + + public String getApplication() { + return this.application; + } + + public String getQueryId() { + return queryId; + } + + public void setQueryId(String queryId) { + this.queryId = queryId; + } + + public String getUserQuery() { + return userQuery; + } + + public void setUserQuery(String userQuery) { + this.userQuery = userQuery; + } + + @SuppressWarnings("rawtypes") + public Map getQueryAttributes() { + return queryAttributes; + } + + @SuppressWarnings("rawtypes") + public void setQueryAttributes(Map queryAttributes) { + this.queryAttributes = queryAttributes; + } + + public String getDocIds() { + return docIds; + } + + public void setDocIds(String docIds) { + this.docIds = docIds; + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + public Map toMap() { + @SuppressWarnings({"rawtypes", "unchecked"}) + Map map = new HashMap(); + map.put(QUERY_ID, this.queryId); + map.put( + "timestamp", + DateTimeFormatter.ISO_INSTANT.format(Instant.ofEpochMilli(this.timestamp.getTime()))); + if (this.application != null) { + map.put(APPLICATION, this.application); + } + if (this.userQuery != null) { + map.put(USER_QUERY, this.userQuery); + } + if (this.docIds != null) { + map.put(DOC_IDS, this.docIds); + } + if (this.queryAttributes != null) { + + ObjectMapper objectMapper = new ObjectMapper(); + try { + map.put(QUERY_ATTRIBUTES, objectMapper.writeValueAsString(this.queryAttributes)); + } catch (JsonProcessingException e) { + // eat it. + } + } + + return map; + } + } + + /** + * Converts a UBIQuery that is stored in the StreamContext under the key 'ubi-query' into a Tuple + * and returns it. + * + *

I suspect that if I had the right magic with a LetStream or a GetStream, I could somehow + * just use that to say "pluck the 'ubi-query' object out of the StreamContext and call .toTuple + * or make a map of it and that would be my tuple'. + */ + public static class UBIQueryStream extends TupleStream implements Expressible { + + private StreamContext streamContext; + private boolean finished; + + public UBIQueryStream(StreamExpression expression, StreamFactory factory) throws IOException {} + + @Override + public StreamExpression toExpression(StreamFactory factory) throws IOException { + return toExpression(factory, true); + } + + private StreamExpression toExpression(StreamFactory factory, boolean includeStreams) + throws IOException { + // function name + StreamExpression expression = new StreamExpression(factory.getFunctionName(this.getClass())); + + return expression; + } + + @Override + public Explanation toExplanation(StreamFactory factory) throws IOException { + + StreamExplanation explanation = new StreamExplanation(getStreamNodeId().toString()); + explanation.setFunctionName(factory.getFunctionName(this.getClass())); + explanation.setImplementingClass(this.getClass().getName()); + explanation.setExpressionType(Explanation.ExpressionType.STREAM_SOURCE); + explanation.setExpression(toExpression(factory, false).toString()); + + return explanation; + } + + @Override + public void setStreamContext(StreamContext context) { + this.streamContext = context; + } + + @Override + public List children() { + List l = new ArrayList<>(); + return l; + } + + @Override + public void open() throws IOException {} + + @Override + public void close() throws IOException {} + + @SuppressWarnings({"unchecked"}) + @Override + public Tuple read() throws IOException { + + if (finished) { + return Tuple.EOF(); + } else { + finished = true; + + UBIQuery ubiQuery = (UBIQuery) streamContext.get("ubi-query"); + + return new Tuple(ubiQuery.toMap()); + } + } + + /** Return the stream sort - ie, the order in which records are returned */ + @Override + public StreamComparator getStreamSort() { + return null; + } + + @Override + public int getCost() { + return 0; + } + } +} diff --git a/solr/core/src/test-files/solr/collection1/conf/solrconfig-ubi.xml b/solr/core/src/test-files/solr/collection1/conf/solrconfig-ubi.xml new file mode 100644 index 00000000000..a2381f4a74c --- /dev/null +++ b/solr/core/src/test-files/solr/collection1/conf/solrconfig-ubi.xml @@ -0,0 +1,50 @@ + + + + + + ${tests.luceneMatchVersion:LATEST} + + + + ubi + + + + + ${solr.data.dir:} + + + + + + + + + + + + text + + + + diff --git a/solr/core/src/test-files/solr/configsets/cloud-minimal-field-limiting/conf/solrconfig.xml b/solr/core/src/test-files/solr/configsets/cloud-minimal-field-limiting/conf/solrconfig.xml index 00f1ab3714b..4cfd6a26a9d 100644 --- a/solr/core/src/test-files/solr/configsets/cloud-minimal-field-limiting/conf/solrconfig.xml +++ b/solr/core/src/test-files/solr/configsets/cloud-minimal-field-limiting/conf/solrconfig.xml @@ -34,8 +34,8 @@ ${solr.commitwithin.softcommit:true} - - + + explicit @@ -57,4 +57,3 @@ - diff --git a/solr/core/src/test-files/solr/configsets/ubi-enabled/conf/schema.xml b/solr/core/src/test-files/solr/configsets/ubi-enabled/conf/schema.xml new file mode 100644 index 00000000000..661b02a0f96 --- /dev/null +++ b/solr/core/src/test-files/solr/configsets/ubi-enabled/conf/schema.xml @@ -0,0 +1,24 @@ + + + + + + + + id + diff --git a/solr/core/src/test-files/solr/configsets/ubi-enabled/conf/solrconfig.xml b/solr/core/src/test-files/solr/configsets/ubi-enabled/conf/solrconfig.xml new file mode 100644 index 00000000000..ff613fc0da5 --- /dev/null +++ b/solr/core/src/test-files/solr/configsets/ubi-enabled/conf/solrconfig.xml @@ -0,0 +1,54 @@ + + + + + + + + + ${solr.data.dir:} + + + + + ${tests.luceneMatchVersion:LATEST} + + + + ${solr.commitwithin.softcommit:true} + + + + + ubi-query-pipeline.expr + + + + + explicit + true + text + + + ubi + + + + + diff --git a/solr/core/src/test-files/solr/configsets/ubi-enabled/conf/ubi-query-pipeline.expr b/solr/core/src/test-files/solr/configsets/ubi-enabled/conf/ubi-query-pipeline.expr new file mode 100644 index 00000000000..6e09615240a --- /dev/null +++ b/solr/core/src/test-files/solr/configsets/ubi-enabled/conf/ubi-query-pipeline.expr @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +commit(ubi_queries, + update(ubi_queries, + select( + ubiQuery(), + query_id as id, + timestamp, + application, + user_query, + query_attributes + ) + ) +) diff --git a/solr/core/src/test/org/apache/solr/handler/LoggingStreamTest.java b/solr/core/src/test/org/apache/solr/handler/LoggingStreamTest.java new file mode 100644 index 00000000000..fc3e2d1e5c2 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/handler/LoggingStreamTest.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.handler; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.client.solrj.io.SolrClientCache; +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.stream.CsvStream; +import org.apache.solr.client.solrj.io.stream.EchoStream; +import org.apache.solr.client.solrj.io.stream.ListStream; +import org.apache.solr.client.solrj.io.stream.StreamContext; +import org.apache.solr.client.solrj.io.stream.TupStream; +import org.apache.solr.client.solrj.io.stream.TupleStream; +import org.apache.solr.client.solrj.io.stream.expr.Explanation; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.common.SolrException; +import org.apache.solr.core.CoreDescriptor; +import org.apache.solr.core.SolrCore; +import org.apache.solr.embedded.JettySolrRunner; +import org.junit.BeforeClass; +import org.junit.Test; + +@SolrTestCaseJ4.SuppressSSL +public class LoggingStreamTest extends SolrCloudTestCase { + private static StreamFactory factory; + private static StreamContext context; + private static final String COLLECTION = "streams"; + + @BeforeClass + public static void setupCluster() throws Exception { + configureCluster(1) + .addConfig( + "config", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf")) + .configure(); + + CollectionAdminRequest.createCollection(COLLECTION, "config", 2, 1, 1, 0) + .process(cluster.getSolrClient()); + cluster.waitForActiveCollection(COLLECTION, 2, 2 * (1 + 1)); + + String zkHost = cluster.getZkServer().getZkAddress(); + factory = + new StreamFactory() + .withCollectionZkHost(COLLECTION, zkHost) + .withFunctionName("logging", LoggingStream.class) + .withFunctionName("echo", EchoStream.class) + .withFunctionName("parseCSV", CsvStream.class) + .withFunctionName("list", ListStream.class) + .withFunctionName("tuple", TupStream.class); + + final Path dataDir = findUserFilesDataDir(); + Files.createDirectories(dataDir); + + context = new StreamContext(); + context.put("solr-core", findSolrCore()); + SolrClientCache solrClientCache = new SolrClientCache(); + + context.setSolrClientCache(solrClientCache); + } + + @Test + public void testLoggingStreamExpressionToExpression() throws Exception { + String expressionString; + + // Basic test + try (LoggingStream stream = + new LoggingStream( + StreamExpressionParser.parse("logging(bob.jsonl,echo(\"bob\"))"), factory)) { + expressionString = stream.toExpression(factory).toString(); + assertTrue(expressionString.contains("logging(bob.jsonl,")); + assertTrue(expressionString.contains("echo(\"bob")); + } + + // Unwrap double quotes around file name test + try (LoggingStream stream = + new LoggingStream( + StreamExpressionParser.parse("logging(\"outputs/bob.jsonl\",echo(\"bob\"))"), + factory)) { + expressionString = stream.toExpression(factory).toString(); + assertTrue(expressionString.contains("logging(outputs/bob.jsonl,")); + assertTrue(expressionString.contains("echo(\"bob")); + } + } + + @Test + public void testLoggingStreamExpressionToExplanation() throws Exception { + + try (LoggingStream stream = + new LoggingStream( + StreamExpressionParser.parse("logging(bob.jsonl,echo(\"bob\"))"), factory)) { + Explanation explanation = stream.toExplanation(factory); + assertEquals("logging (bob.jsonl)", explanation.getFunctionName()); + assertEquals(LoggingStream.class.getName(), explanation.getImplementingClass()); + } + } + + @Test + public void testFileOutputDirectoryPermissions() throws Exception { + + LoggingStream stream = + new LoggingStream( + StreamExpressionParser.parse("logging(/tmp/bob.jsonl,echo(\"bob\"))"), factory); + stream.setStreamContext(context); + + LoggingStream finalStream1 = stream; + SolrException thrown = + assertThrows( + "Attempting to write to /tmp should be prevented", + SolrException.class, + () -> finalStream1.open()); + assertTrue(thrown.getMessage().startsWith("file to log to must be under ")); + + stream = + new LoggingStream( + StreamExpressionParser.parse("logging(../bob.jsonl,echo(\"bob\"))"), factory); + stream.setStreamContext(context); + + LoggingStream finalStream2 = stream; + thrown = + assertThrows( + "Attempting to escape the userfiles directory should be prevented", + SolrException.class, + () -> finalStream2.open()); + assertTrue(thrown.getMessage().startsWith("file to log to must be under ")); + } + + @Test + public void testLoggingStreamCombinedSourcedFromCSV() throws Exception { + String expr = + "logging(parsed_csv_output.jsonl," + + "parseCSV(list(tuple(file=\"file1\", line=\"a,b,c\"), " + + " tuple(file=\"file1\", line=\"1,2,3\")," + + " tuple(file=\"file1\", line=\"\\\"hello, world\\\",9000,20\")," + + " tuple(file=\"file2\", line=\"field_1,field_2,field_3\"), " + + " tuple(file=\"file2\", line=\"8,9,\")))" + + ")"; + + try (LoggingStream stream = new LoggingStream(StreamExpressionParser.parse(expr), factory)) { + stream.setStreamContext(context); + List tuples = getTuples(stream); + assertEquals(tuples.size(), 3); + assertEquals(tuples.get(0).getString("totalIndexed"), "1"); + assertEquals(tuples.get(0).getString("batchLogged"), "1"); + assertEquals(tuples.get(0).getString("batchNumber"), "1"); + + assertEquals(tuples.get(1).getString("totalIndexed"), "2"); + assertEquals(tuples.get(1).getString("batchLogged"), "1"); + assertEquals(tuples.get(1).getString("batchNumber"), "2"); + + assertEquals(tuples.get(2).getString("totalIndexed"), "3"); + assertEquals(tuples.get(2).getString("batchLogged"), "1"); + assertEquals(tuples.get(2).getString("batchNumber"), "3"); + } + } + + private static Path findUserFilesDataDir() { + for (JettySolrRunner jetty : cluster.getJettySolrRunners()) { + for (CoreDescriptor coreDescriptor : jetty.getCoreContainer().getCoreDescriptors()) { + if (coreDescriptor.getCollectionName().equals(COLLECTION)) { + return jetty.getCoreContainer().getUserFilesPath(); + } + } + } + + throw new IllegalStateException("Unable to determine data-dir for: " + COLLECTION); + } + + private static SolrCore findSolrCore() { + for (JettySolrRunner solrRunner : cluster.getJettySolrRunners()) { + for (SolrCore solrCore : solrRunner.getCoreContainer().getCores()) { + if (solrCore != null) { + return solrCore; + } + } + } + throw new RuntimeException("Didn't find any valid cores."); + } + + protected List getTuples(TupleStream tupleStream) throws IOException { + List tuples = new ArrayList<>(); + + try (tupleStream) { + tupleStream.open(); + for (Tuple t = tupleStream.read(); !t.EOF; t = tupleStream.read()) { + tuples.add(t); + } + } + return tuples; + } +} diff --git a/solr/core/src/test/org/apache/solr/handler/component/UBIComponentDistrQueriesTest.java b/solr/core/src/test/org/apache/solr/handler/component/UBIComponentDistrQueriesTest.java new file mode 100644 index 00000000000..c87b8f5c268 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/handler/component/UBIComponentDistrQueriesTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.handler.component; + +import java.util.List; +import java.util.Map; +import org.apache.lucene.tests.util.LuceneTestCase; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.cloud.AbstractDistribZkTestBase; +import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.cluster.api.SimpleMap; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.MapSolrParams; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +@LuceneTestCase.SuppressCodecs({"Lucene3x", "Lucene40", "Lucene41", "Lucene42", "Lucene45"}) +public class UBIComponentDistrQueriesTest extends SolrCloudTestCase { + + private static final String COLLECTIONORALIAS = "collection1"; + private static final int TIMEOUT = DEFAULT_TIMEOUT; + private static final String id = "id"; + + private static boolean useAlias; + + @BeforeClass + public static void setupCluster() throws Exception { + configureCluster(4) + .addConfig("conf", TEST_PATH().resolve("configsets").resolve("ubi-enabled").resolve("conf")) + .configure(); + + String collection; + useAlias = false; // random().nextBoolean(); + if (useAlias) { + collection = COLLECTIONORALIAS + "_collection"; + } else { + collection = COLLECTIONORALIAS; + } + + CollectionAdminRequest.createCollection(collection, "conf", 2, 1) + .process(cluster.getSolrClient()); + + cluster.waitForActiveCollection(collection, 2, 2); + + AbstractDistribZkTestBase.waitForRecoveriesToFinish( + collection, cluster.getZkStateReader(), false, true, TIMEOUT); + if (useAlias) { + CollectionAdminRequest.createAlias(COLLECTIONORALIAS, collection) + .process(cluster.getSolrClient()); + } + + // ------------------- + + CollectionAdminRequest.createCollection( + "ubi_queries", // it seems like a hardcoded name why? + "_default", + 1, + 1) + .process(cluster.getSolrClient()); + + cluster.waitForActiveCollection("ubi_queries", 1, 1); + + AbstractDistribZkTestBase.waitForRecoveriesToFinish( + "ubi_queries", cluster.getZkStateReader(), false, true, TIMEOUT); + } + + @Before + public void cleanIndex() throws Exception { + new UpdateRequest().deleteByQuery("*:*").commit(cluster.getSolrClient(), COLLECTIONORALIAS); + } + + @Test + public void testUBIQueryStream() throws Exception { + cluster + .getSolrClient(COLLECTIONORALIAS) + .add( + List.of( + new SolrInputDocument("id", "1", "subject", "aa"), + new SolrInputDocument("id", "2" /*"two"*/, "subject", "aa"), + new SolrInputDocument("id", "3", "subject", "aa"))); + cluster.getSolrClient(COLLECTIONORALIAS).commit(true, true); + QueryResponse queryResponse = + cluster + .getSolrClient(COLLECTIONORALIAS) + .query( + new MapSolrParams(Map.of("q", "aa", "df", "subject", "rows", "2", "ubi", "true"))); + String qid = (String) ((SimpleMap) queryResponse.getResponse().get("ubi")).get("query_id"); + assertTrue(qid.length() > 10); + Thread.sleep(10000); // I know what you think of + // TODO check that ids were recorded + QueryResponse queryCheck = + cluster + .getSolrClient("ubi_queries") + .query( + new MapSolrParams( + Map.of( + "q", "id:" + qid // doesn't search it why? is it a race? + ))); + // however I can't see doc ids found there. Shouldn't I ? + assertEquals(1L, queryCheck.getResults().getNumFound()); + assertEquals(queryCheck.getResults().get(0).get("id"), qid); + } +} diff --git a/solr/core/src/test/org/apache/solr/handler/component/UBIComponentStreamingQueriesTest.java b/solr/core/src/test/org/apache/solr/handler/component/UBIComponentStreamingQueriesTest.java new file mode 100644 index 00000000000..45f8d4a9a23 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/handler/component/UBIComponentStreamingQueriesTest.java @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.handler.component; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import org.apache.commons.io.input.ReversedLinesFileReader; +import org.apache.lucene.tests.util.LuceneTestCase; +import org.apache.solr.client.solrj.io.SolrClientCache; +import org.apache.solr.client.solrj.io.Tuple; +import org.apache.solr.client.solrj.io.stream.CloudSolrStream; +import org.apache.solr.client.solrj.io.stream.SelectStream; +import org.apache.solr.client.solrj.io.stream.StreamContext; +import org.apache.solr.client.solrj.io.stream.TupleStream; +import org.apache.solr.client.solrj.io.stream.UpdateStream; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; +import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser; +import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.cloud.AbstractDistribZkTestBase; +import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.core.SolrCore; +import org.apache.solr.embedded.JettySolrRunner; +import org.apache.solr.handler.LoggingStream; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +@LuceneTestCase.SuppressCodecs({"Lucene3x", "Lucene40", "Lucene41", "Lucene42", "Lucene45"}) +public class UBIComponentStreamingQueriesTest extends SolrCloudTestCase { + + private static final String COLLECTIONORALIAS = "collection1"; + private static final int TIMEOUT = DEFAULT_TIMEOUT; + + @BeforeClass + public static void setupCluster() throws Exception { + configureCluster(4) + .addConfig( + "conf", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf")) + .configure(); + + String collection; + boolean useAlias = random().nextBoolean(); + if (useAlias) { + collection = COLLECTIONORALIAS + "_collection"; + } else { + collection = COLLECTIONORALIAS; + } + + CollectionAdminRequest.createCollection(collection, "conf", 2, 1) + .process(cluster.getSolrClient()); + + cluster.waitForActiveCollection(collection, 2, 2); + + AbstractDistribZkTestBase.waitForRecoveriesToFinish( + collection, cluster.getZkStateReader(), false, true, TIMEOUT); + if (useAlias) { + CollectionAdminRequest.createAlias(COLLECTIONORALIAS, collection) + .process(cluster.getSolrClient()); + } + } + + @Before + public void cleanIndex() throws Exception { + new UpdateRequest().deleteByQuery("*:*").commit(cluster.getSolrClient(), COLLECTIONORALIAS); + } + + /** + * Test using the UBIQuery and UBIQueryStream classes independent of the UBIComponent to stream + * events. + */ + @Test + public void testUBIQueryStream() throws Exception { + + UBIComponent.UBIQuery ubiQuery; + StreamExpression expression; + TupleStream stream; + List tuples; + StreamContext streamContext = new StreamContext(); + SolrClientCache solrClientCache = new SolrClientCache(); + + try (solrClientCache) { + streamContext.setSolrClientCache(solrClientCache); + StreamFactory factory = + new StreamFactory().withFunctionName("ubiQuery", UBIComponent.UBIQueryStream.class); + // Basic test + ubiQuery = new UBIComponent.UBIQuery("123"); + + expression = StreamExpressionParser.parse("ubiQuery()"); + streamContext.put("ubi-query", ubiQuery); + stream = new UBIComponent.UBIQueryStream(expression, factory); + stream.setStreamContext(streamContext); + tuples = getTuples(stream); + + assertEquals(1, tuples.size()); + assertFields(tuples, "query_id", "timestamp"); + assertString(tuples.get(0), "query_id", "123"); + + assertNotNull(Instant.parse(tuples.get(0).getString("timestamp"))); + + // Introduce docIds + ubiQuery = new UBIComponent.UBIQuery("678"); + ubiQuery.setDocIds(("1,2,3,doc_52")); + streamContext.put("ubi-query", ubiQuery); + stream = new UBIComponent.UBIQueryStream(expression, factory); + stream.setStreamContext(streamContext); + tuples = getTuples(stream); + + assertEquals(1, tuples.size()); + assertFields(tuples, "query_id", "doc_ids"); + // assertEquals(new String[]{"1", "2", "3", "doc_52"}, tuples.get(0).getStrings("doc_ids")); + assertEquals("1,2,3,doc_52", tuples.get(0).getString("doc_ids")); + + // Include another field to see what is returned + ubiQuery = new UBIComponent.UBIQuery("234"); + ubiQuery.setApplication("typeahead"); + + streamContext.put("ubi-query", ubiQuery); + stream = new UBIComponent.UBIQueryStream(expression, factory); + stream.setStreamContext(streamContext); + tuples = getTuples(stream); + + assertEquals(1, tuples.size()); + assertFields(tuples, "query_id", "timestamp", "application"); + assertString(tuples.get(0), "query_id", "234"); + assertString(tuples.get(0), "application", "typeahead"); + + // Introduce event_attributes map of data + ubiQuery = new UBIComponent.UBIQuery("345"); + + @SuppressWarnings({"unchecked", "rawtypes"}) + Map queryAttributes = new HashMap(); + queryAttributes.put("attribute1", "one"); + queryAttributes.put("attribute2", 2); + ubiQuery.setQueryAttributes(queryAttributes); + + streamContext.put("ubi-query", ubiQuery); + stream = new UBIComponent.UBIQueryStream(expression, factory); + stream.setStreamContext(streamContext); + tuples = getTuples(stream); + + assertEquals(1, tuples.size()); + assertFields(tuples, "query_id", "timestamp", "query_attributes"); + assertString(tuples.get(0), "query_id", "345"); + assertString(tuples.get(0), "query_attributes", "{\"attribute1\":\"one\",\"attribute2\":2}"); + } + } + + @Test + public void testWritingToLogUbiQueryStream() throws Exception { + // Test that we can write out UBIQuery data cleanly to the JSON w Lines formatted log file. + UBIComponent.UBIQuery ubiQuery = new UBIComponent.UBIQuery("345"); + ubiQuery.setUserQuery("Memory RAM"); + ubiQuery.setApplication("typeahead"); + + @SuppressWarnings({"unchecked", "rawtypes"}) + Map queryAttributes = new HashMap(); + queryAttributes.put("parsed_query", "memory OR ram"); + queryAttributes.put("experiment", "secret"); + queryAttributes.put("marginBoost", 2.1); + ubiQuery.setQueryAttributes(queryAttributes); + + StreamExpression expression; + List tuples; + StreamContext streamContext = new StreamContext(); + SolrClientCache solrClientCache = new SolrClientCache(); + + try (solrClientCache) { + streamContext.setSolrClientCache(solrClientCache); + StreamFactory factory = + new StreamFactory() + .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress()) + .withFunctionName("search", CloudSolrStream.class) + .withFunctionName("ubiQuery", UBIComponent.UBIQueryStream.class) + .withFunctionName("logging", LoggingStream.class); + + expression = StreamExpressionParser.parse("logging(test.jsonl,ubiQuery())"); + streamContext.put("ubi-query", ubiQuery); + streamContext.put("solr-core", findSolrCore()); + LoggingStream stream = new LoggingStream(expression, factory); + stream.setStreamContext(streamContext); + tuples = getTuples(stream); + + assertEquals(1, tuples.size()); + assertFields(tuples, "totalIndexed"); + assertLong(tuples.get(0), "totalIndexed", 1); + + // Someday when we have a parseJSON() streaming expression we can replace this. + Path filePath = stream.getFilePath(); + try (ReversedLinesFileReader reader = + new ReversedLinesFileReader.Builder() + .setCharset(StandardCharsets.UTF_8) + .setPath(filePath) + .get()) { + String jsonLine = reader.readLine(); // Read the last line + assertNotNull(jsonLine); + ObjectMapper objectMapper = new ObjectMapper(); + @SuppressWarnings({"unchecked", "rawtypes"}) + Map ubiQueryAsMap = objectMapper.readValue(jsonLine, Map.class); + assertEquals(ubiQuery.getQueryId(), ubiQueryAsMap.get("query_id")); + assertEquals(ubiQuery.getApplication(), ubiQueryAsMap.get("application")); + assertEquals(ubiQuery.getDocIds(), ubiQueryAsMap.get("doc_ids")); + assertNotNull(ubiQueryAsMap.get("timestamp")); + assertEquals( + "{\"experiment\":\"secret\",\"marginBoost\":2.1,\"parsed_query\":\"memory OR ram\"}", + ubiQueryAsMap.get("query_attributes")); + } + } + } + + @Test + public void testWritingToSolrUbiQueryStream() throws Exception { + // Test that we can write out UBIQuery, especially the queryAttributes map, to a Solr + // collection. + + UBIComponent.UBIQuery ubiQuery = new UBIComponent.UBIQuery("345"); + ubiQuery.setUserQuery("Memory RAM"); + ubiQuery.setApplication("typeahead"); + + @SuppressWarnings({"unchecked", "rawtypes"}) + Map queryAttributes = new HashMap(); + queryAttributes.put("parsed_query", "memory OR ram"); + queryAttributes.put("experiment", "secret"); + queryAttributes.put("marginBoost", 2.1); + ubiQuery.setQueryAttributes(queryAttributes); + + StreamExpression expression; + TupleStream stream; + List tuples; + StreamContext streamContext = new StreamContext(); + SolrClientCache solrClientCache = new SolrClientCache(); + + // String zkHost = cluster.getZkServer().getZkAddress(); + + try (solrClientCache) { + streamContext.setSolrClientCache(solrClientCache); + StreamFactory factory = + new StreamFactory() + .withCollectionZkHost(COLLECTIONORALIAS, cluster.getZkServer().getZkAddress()) + .withFunctionName("search", CloudSolrStream.class) + .withFunctionName("update", UpdateStream.class) + .withFunctionName("select", SelectStream.class) + .withFunctionName("ubiQuery", UBIComponent.UBIQueryStream.class); + + expression = + StreamExpressionParser.parse( + "update(" + + COLLECTIONORALIAS + + ", batchSize=5, select(\n" + + " ubiQuery(),\n" + + " query_id as id,\n" + + " doc_ids,\n" + + " timestamp,\n" + + " application,\n" + + " user_query,\n" + + " query_attributes\n" + + " ))"); + streamContext.put("ubi-query", ubiQuery); + stream = new UpdateStream(expression, factory); + stream.setStreamContext(streamContext); + tuples = getTuples(stream); + + cluster.getSolrClient().commit(COLLECTIONORALIAS); + + assertEquals(1, tuples.size()); + Tuple t = tuples.get(0); + assertFalse(t.EOF); + assertEquals(1, t.get("batchIndexed")); + assertEquals(1L, t.get("totalIndexed")); + + // Ensure that destinationCollection actually has the new ubi query docs. + expression = + StreamExpressionParser.parse( + "search(" + COLLECTIONORALIAS + ", q=*:*, fl=\"id,*\", sort=\"id asc\")"); + stream = new CloudSolrStream(expression, factory); + stream.setStreamContext(streamContext); + tuples = getTuples(stream); + assertEquals(1, tuples.size()); + + Tuple tuple = tuples.get(0); + assertEquals(ubiQuery.getQueryId(), tuple.get("id")); + assertEquals(ubiQuery.getApplication(), tuple.get("application")); + assertEquals(ubiQuery.getUserQuery(), tuple.get("user_query")); + assertEquals(ubiQuery.getDocIds(), tuple.get("doc_ids")); + assertEquals(ubiQuery.getTimestamp().toInstant(), tuple.getDate("timestamp").toInstant()); + assertEquals( + "{\"experiment\":\"secret\",\"marginBoost\":2.1,\"parsed_query\":\"memory OR ram\"}", + tuple.get("query_attributes")); + } + } + + protected List getTuples(TupleStream tupleStream) throws IOException { + List tuples = new ArrayList<>(); + + try (tupleStream) { + tupleStream.open(); + for (Tuple t = tupleStream.read(); !t.EOF; t = tupleStream.read()) { + tuples.add(t); + } + } + return tuples; + } + + public boolean assertString(Tuple tuple, String fieldName, String expected) throws Exception { + String actual = (String) tuple.get(fieldName); + + if (!Objects.equals(expected, actual)) { + throw new Exception("Longs not equal:" + expected + " : " + actual); + } + + return true; + } + + public boolean assertLong(Tuple tuple, String fieldName, long l) throws Exception { + long lv = (long) tuple.get(fieldName); + if (lv != l) { + throw new Exception("Longs not equal:" + l + " : " + lv); + } + + return true; + } + + protected void assertFields(List tuples, String... fields) throws Exception { + for (Tuple tuple : tuples) { + for (String field : fields) { + if (!tuple.getFields().containsKey(field)) { + throw new Exception(String.format(Locale.ROOT, "Expected field '%s' not found", field)); + } + } + } + } + + private static SolrCore findSolrCore() { + for (JettySolrRunner solrRunner : cluster.getJettySolrRunners()) { + for (SolrCore solrCore : solrRunner.getCoreContainer().getCores()) { + if (solrCore != null) { + return solrCore; + } + } + } + throw new RuntimeException("Didn't find any valid cores."); + } +} diff --git a/solr/core/src/test/org/apache/solr/handler/component/UBIComponentTest.java b/solr/core/src/test/org/apache/solr/handler/component/UBIComponentTest.java new file mode 100644 index 00000000000..96be7433c98 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/handler/component/UBIComponentTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.handler.component; + +import org.apache.solr.SolrTestCaseJ4; +import org.junit.BeforeClass; +import org.junit.Test; + +/** Tests that the UBI Component augments the response properly */ +public class UBIComponentTest extends SolrTestCaseJ4 { + + @BeforeClass + public static void beforeTest() throws Exception { + initCore("solrconfig-ubi.xml", "schema12.xml"); + assertNull(h.validateUpdate(adoc("id", "1", "subject", "aa"))); + assertNull(h.validateUpdate(adoc("id", "two", "subject", "aa"))); + assertNull(h.validateUpdate(adoc("id", "3", "subject", "aa"))); + assertU(commit()); + } + + @Test + public void testGeneratingAQueryId() { + assertQ( + "Make sure we generate a query id", + req("q", "aa", "rows", "2", "ubi", "true"), + "count(//lst[@name='ubi']/str[@name='query_id'])=1"); + } + + @Test + public void testZeroResultsGeneratesQueryId() { + assertQ( + "Make sure we generate a query id even when no results are returned", + req("q", "abcdefgxyz", "rows", "0", "ubi", "true"), + "//*[@numFound='0']", + "count(//lst[@name='ubi']/str[@name='query_id'])=1"); + } + + @Test + public void testPassedInQueryIdIsUsed() { + assertQ( + "Make sure we reuse a passed in query id", + req("q", "aa", "rows", "0", "ubi", "true", "query_id", "123abc"), + "//lst[@name='ubi']/str[@name='query_id'][.='123abc']"); + } + + @Test + public void testGenerateQueryIdZeroRowsRequested() { + assertQ( + "Make sure we generate a query id if one is not passed in", + req("q", "aa", "rows", "0", "ubi", "true"), + "count(//lst[@name='ubi']/str[@name='query_id'])=1"); + } + + @Test + public void testJSONQuerySyntaxWithJustUBI() throws Exception { + String response = + JQ( + req( + "json", + "{\n" + + " 'query': 'aa',\n" + + " 'fields': '*',\n" + + " 'offset': 0,\n" + + " 'limit': 2,\n" + + " 'params': {\n" + + " 'df': 'subject',\n" + + " 'ubi': 'true'\n" + + " }\n" + + "}")); + assertTrue(response.contains("query_id")); + } + + @Test + public void testJSONQuerySyntaxWithNestedUBI() throws Exception { + assertJQ( + req( + "json", + "{\n" + + " 'query': 'aa',\n" + + " 'fields': '*',\n" + + " 'offset': 0,\n" + + " 'limit': 2,\n" + + " 'params': {\n" + + " 'df': 'subject',\n" + + " 'ubi': 'true',\n" + + " 'query_id': 'xjy-42-1rj'\n" + + " 'user_query': 'aa'\n" + + " 'query_attributes': {\n" + + " 'page': 2,\n" + + " 'filter': 'inStock:true',\n" + + " }\n" + + " }\n" + + "}"), + "response/numFound==3", + "ubi/query_id=='xjy-42-1rj'"); + } + + @Test + public void testDisabling() { + assertQ( + "Make sure we don't generate a query_id", + req("q", "aa", "ubi", "false"), + "count(//lst[@name='ubi'])=0"); + } +} diff --git a/solr/packaging/test/test_stream.bats b/solr/packaging/test/test_stream.bat similarity index 100% rename from solr/packaging/test/test_stream.bats rename to solr/packaging/test/test_stream.bat diff --git a/solr/packaging/test/test_ubi.bats b/solr/packaging/test/test_ubi.bats new file mode 100644 index 00000000000..fcc6bdfcac1 --- /dev/null +++ b/solr/packaging/test/test_ubi.bats @@ -0,0 +1,92 @@ +#!/usr/bin/env bats + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +load bats_helper + +setup() { + common_clean_setup +} + +teardown() { + # save a snapshot of SOLR_HOME for failed tests + save_home_on_failure + + delete_all_collections + SOLR_STOP_WAIT=1 solr stop --all >/dev/null 2>&1 +} + +@test "Track query using UBI with log file." { + solr start -e techproducts + + run solr healthcheck -c techproducts + refute_output --partial 'error' + + run curl -X POST -H 'Content-type:application/json' -d '{ + "add-searchcomponent": { + "name": "ubi", + "class": "solr.UBIComponent", + } + }' "http://localhost:${SOLR_PORT}/api/collections/techproducts/config" + + assert_output --partial '"status":0' + + run curl -X POST -H 'Content-type:application/json' -d '{ + "update-requesthandler": { + "name": "/select", + "class": "solr.SearchHandler", + "last-components":["ubi"] + } + }' "http://localhost:${SOLR_PORT}/api/collections/techproducts/config" + + assert_output --partial '"status":0' + + # Simple UBI enabled query + run curl "http://localhost:${SOLR_PORT}/solr/techproducts/select?q=*:*&rows=3&ubi=true&user_query=give%20me%20all&query_id=5678" + assert_output --partial '"status":0' + assert_output --partial '"query_id":"5678' + + # Check UBI query record was written out to default location + assert_file_exist ${SOLR_TIP}/server/solr/userfiles/ubi_queries.jsonl + assert_file_contains ${SOLR_TIP}/server/solr/userfiles/ubi_queries.jsonl '"query_id":"5678"' + + # Rich UBI user query tracking enabled query with JSON Query + run curl -X POST -H 'Content-type:application/json' -d '{ + "query" : "ram OR memory", + "filter": [ + "inStock:true" + ], + "limit":2, + "params": { + "ubi": "true", + "application":"primary_search", + "query_id": "xyz890", + "user_query":"RAM memory", + "query_attributes": { + "experiment": "supersecret", + "page": 1, + "filter": "productStatus:available" + } + } + }' "http://localhost:${SOLR_PORT}/solr/techproducts/select" + assert_output --partial '"query_id":"xyz890"' + + # Check UBI query record was written out to default location with additional metadata + assert_file_contains ${SOLR_TIP}/server/solr/userfiles/ubi_queries.jsonl '"query_id":"xyz890"' + assert_file_contains ${SOLR_TIP}/server/solr/userfiles/ubi_queries.jsonl 'supersecret' + + +} diff --git a/solr/server/resources/log4j2.xml b/solr/server/resources/log4j2.xml index f5b373338b7..3d4626c15ad 100644 --- a/solr/server/resources/log4j2.xml +++ b/solr/server/resources/log4j2.xml @@ -84,4 +84,3 @@ - diff --git a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java index c0831e55c0a..edaf26cb849 100644 --- a/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java +++ b/solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/TupStream.java @@ -67,8 +67,8 @@ public TupStream(StreamExpression expression, StreamFactory factory) throws IOEx fieldLabels.put(name, name); StreamExpressionParameter param = np.getParameter(); - // we're going to split these up here, so we only make the choice once - // order of these in read() doesn't matter + // We're going to split these up here, so we only make the choice once. + // The order of these in read() doesn't matter. if (param instanceof StreamExpressionValue) { stringParams.put(name, ((StreamExpressionValue) param).getValue()); } else if (factory.isEvaluator((StreamExpression) param)) { @@ -140,7 +140,7 @@ public Explanation toExplanation(StreamFactory factory) throws IOException { public void setStreamContext(StreamContext context) { this.streamContext = context; - // also set in evalators and streams + // also set in evaluators and streams for (StreamEvaluator evaluator : evaluatorParams.values()) { evaluator.setStreamContext(context); } diff --git a/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java b/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java index 2b19742ae6f..d293b5f66d3 100644 --- a/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java +++ b/solr/solrj-streaming/src/test/org/apache/solr/client/solrj/io/stream/StreamDecoratorTest.java @@ -1264,7 +1264,7 @@ public void testDaemonStream() throws Exception { .add(id, "10", "a_s", "hello0", "a_i", "1", "a_f", "10") .commit(cluster.getSolrClient(), COLLECTIONORALIAS); - // Now lets clear the existing docs in the queue 9, plus 3 more to get passed the run that was + // Now lets clear the existing docs in the queue 9, plus 3 more to get pass the run that was // blocked. The next run should // have the tuples with the updated count. for (int i = 0; i < 12; i++) { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/SolrQuery.java b/solr/solrj/src/java/org/apache/solr/client/solrj/SolrQuery.java index 76b053a0824..5102f845a9f 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/SolrQuery.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/SolrQuery.java @@ -818,6 +818,58 @@ public SolrQuery setHighlight(boolean b) { return this; } + /** + * Enable User Behavior Insights tracking for this query. + * + * @param b + */ + public SolrQuery setUBITracking(boolean b) { + if (b) { + this.set("ubi", true); + } else { + // makes me wonder if this should be all under ubi.* namespace. + // so ubi.application, ubi.query_id.... + this.remove("ubi"); + this.remove("application"); + this.remove("query_id"); + this.remove("client_id"); + this.remove("user_query"); + // this.remove("object_id_fields"); + this.remove("query_attributes"); + } + return this; + } + + /** Determine status of User Behavior Insights tracking for this query. */ + public boolean getUBITracking() { + return this.getBool("ubi", false); + } + + public SolrQuery setApplication(String application) { + this.set("application", application); + return this; + } + + public SolrQuery setQueryId(String queryId) { + this.set("query_id", queryId); + return this; + } + + public SolrQuery setClientId(String clientId) { + this.set("client_id", clientId); + return this; + } + + public SolrQuery setUserQuery(String userQuery) { + this.set("user_query", userQuery); + return this; + } + + /** + * public SolrQuery setQueryAttributes(Map queryAttributes) { this.set("query_attributes", + * queryAttributes); return this; } + */ + /** * Add field for MoreLikeThis. Automatically enables MoreLikeThis. * diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/SolrQueryTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/SolrQueryTest.java index 2786fd64385..2e72193b96b 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/SolrQueryTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/SolrQueryTest.java @@ -471,4 +471,17 @@ public void testMoreLikeThis() { assertEquals(15, solrQuery.setMoreLikeThisMaxQueryTerms(15).getMoreLikeThisMaxQueryTerms()); assertEquals(16, solrQuery.setMoreLikeThisCount(16).getMoreLikeThisCount()); } + + public void testUserBehaviorInsights() { + SolrQuery solrQuery = new SolrQuery(); + solrQuery.setUBITracking(true); + assertTrue(solrQuery.getUBITracking()); + + assertNull(solrQuery.get("query_id")); + solrQuery.setQueryId("12345"); + assertEquals("12345", solrQuery.get("query_id")); + + // need to figure out how to test query_attributes + + } } diff --git a/solr/webapp/web/partials/query.html b/solr/webapp/web/partials/query.html index 48e618643a7..d212d85bdd9 100644 --- a/solr/webapp/web/partials/query.html +++ b/solr/webapp/web/partials/query.html @@ -548,6 +548,15 @@ +

+ + + +
+