15
15
import java .util .Objects ;
16
16
import java .util .function .BiConsumer ;
17
17
import java .util .function .Consumer ;
18
- import java .util .function .Supplier ;
19
18
import java .util .stream .Collectors ;
20
19
import java .util .stream .IntStream ;
21
20
24
23
import org .apache .commons .lang3 .StringUtils ;
25
24
import org .opensearch .common .collect .Tuple ;
26
25
import org .opensearch .core .common .util .CollectionUtils ;
26
+ import org .opensearch .cluster .service .ClusterService ;
27
27
import org .opensearch .env .Environment ;
28
- import org .opensearch .index .mapper .MapperService ;
28
+ import org .opensearch .index .mapper .IndexFieldMapper ;
29
29
import org .opensearch .ingest .AbstractProcessor ;
30
30
import org .opensearch .ingest .IngestDocument ;
31
31
import org .opensearch .ingest .IngestDocumentWrapper ;
35
35
import com .google .common .collect .ImmutableMap ;
36
36
37
37
import lombok .extern .log4j .Log4j2 ;
38
+ import org .opensearch .neuralsearch .util .ProcessorDocumentUtils ;
38
39
39
40
/**
40
41
* The abstract class for text processing use cases. Users provide a field name map and a model id.
@@ -60,6 +61,7 @@ public abstract class InferenceProcessor extends AbstractProcessor {
60
61
protected final MLCommonsClientAccessor mlCommonsClientAccessor ;
61
62
62
63
private final Environment environment ;
64
+ private final ClusterService clusterService ;
63
65
64
66
public InferenceProcessor (
65
67
String tag ,
@@ -69,18 +71,19 @@ public InferenceProcessor(
69
71
String modelId ,
70
72
Map <String , Object > fieldMap ,
71
73
MLCommonsClientAccessor clientAccessor ,
72
- Environment environment
74
+ Environment environment ,
75
+ ClusterService clusterService
73
76
) {
74
77
super (tag , description );
75
78
this .type = type ;
76
79
if (StringUtils .isBlank (modelId )) throw new IllegalArgumentException ("model_id is null or empty, cannot process it" );
77
80
validateEmbeddingConfiguration (fieldMap );
78
-
79
81
this .listTypeNestedMapKey = listTypeNestedMapKey ;
80
82
this .modelId = modelId ;
81
83
this .fieldMap = fieldMap ;
82
84
this .mlCommonsClientAccessor = clientAccessor ;
83
85
this .environment = environment ;
86
+ this .clusterService = clusterService ;
84
87
}
85
88
86
89
private void validateEmbeddingConfiguration (Map <String , Object > fieldMap ) {
@@ -117,12 +120,12 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
117
120
public void execute (IngestDocument ingestDocument , BiConsumer <IngestDocument , Exception > handler ) {
118
121
try {
119
122
validateEmbeddingFieldsValue (ingestDocument );
120
- Map <String , Object > ProcessMap = buildMapWithProcessorKeyAndOriginalValue (ingestDocument );
121
- List <String > inferenceList = createInferenceList (ProcessMap );
123
+ Map <String , Object > processMap = buildMapWithTargetKeyAndOriginalValue (ingestDocument );
124
+ List <String > inferenceList = createInferenceList (processMap );
122
125
if (inferenceList .size () == 0 ) {
123
126
handler .accept (ingestDocument , null );
124
127
} else {
125
- doExecute (ingestDocument , ProcessMap , inferenceList , handler );
128
+ doExecute (ingestDocument , processMap , inferenceList , handler );
126
129
}
127
130
} catch (Exception e ) {
128
131
handler .accept (null , e );
@@ -225,7 +228,7 @@ private List<DataForInference> getDataForInference(List<IngestDocumentWrapper> i
225
228
List <String > inferenceList = null ;
226
229
try {
227
230
validateEmbeddingFieldsValue (ingestDocumentWrapper .getIngestDocument ());
228
- processMap = buildMapWithProcessorKeyAndOriginalValue (ingestDocumentWrapper .getIngestDocument ());
231
+ processMap = buildMapWithTargetKeyAndOriginalValue (ingestDocumentWrapper .getIngestDocument ());
229
232
inferenceList = createInferenceList (processMap );
230
233
} catch (Exception e ) {
231
234
ingestDocumentWrapper .update (ingestDocumentWrapper .getIngestDocument (), e );
@@ -273,7 +276,7 @@ private void createInferenceListForMapTypeInput(Object sourceValue, List<String>
273
276
}
274
277
275
278
@ VisibleForTesting
276
- Map <String , Object > buildMapWithProcessorKeyAndOriginalValue (IngestDocument ingestDocument ) {
279
+ Map <String , Object > buildMapWithTargetKeyAndOriginalValue (IngestDocument ingestDocument ) {
277
280
Map <String , Object > sourceAndMetadataMap = ingestDocument .getSourceAndMetadata ();
278
281
Map <String , Object > mapWithProcessorKeys = new LinkedHashMap <>();
279
282
for (Map .Entry <String , Object > fieldMapEntry : fieldMap .entrySet ()) {
@@ -331,54 +334,16 @@ private void buildMapWithProcessorKeyAndOriginalValueForMapType(
331
334
332
335
private void validateEmbeddingFieldsValue (IngestDocument ingestDocument ) {
333
336
Map <String , Object > sourceAndMetadataMap = ingestDocument .getSourceAndMetadata ();
334
- for (Map .Entry <String , Object > embeddingFieldsEntry : fieldMap .entrySet ()) {
335
- Object sourceValue = sourceAndMetadataMap .get (embeddingFieldsEntry .getKey ());
336
- if (sourceValue != null ) {
337
- String sourceKey = embeddingFieldsEntry .getKey ();
338
- Class <?> sourceValueClass = sourceValue .getClass ();
339
- if (List .class .isAssignableFrom (sourceValueClass ) || Map .class .isAssignableFrom (sourceValueClass )) {
340
- validateNestedTypeValue (sourceKey , sourceValue , () -> 1 );
341
- } else if (!String .class .isAssignableFrom (sourceValueClass )) {
342
- throw new IllegalArgumentException ("field [" + sourceKey + "] is neither string nor nested type, cannot process it" );
343
- } else if (StringUtils .isBlank (sourceValue .toString ())) {
344
- throw new IllegalArgumentException ("field [" + sourceKey + "] has empty string value, cannot process it" );
345
- }
346
- }
347
- }
348
- }
349
-
350
- @ SuppressWarnings ({ "rawtypes" , "unchecked" })
351
- private void validateNestedTypeValue (String sourceKey , Object sourceValue , Supplier <Integer > maxDepthSupplier ) {
352
- int maxDepth = maxDepthSupplier .get ();
353
- if (maxDepth > MapperService .INDEX_MAPPING_DEPTH_LIMIT_SETTING .get (environment .settings ())) {
354
- throw new IllegalArgumentException ("map type field [" + sourceKey + "] reached max depth limit, cannot process it" );
355
- } else if ((List .class .isAssignableFrom (sourceValue .getClass ()))) {
356
- validateListTypeValue (sourceKey , sourceValue , maxDepthSupplier );
357
- } else if (Map .class .isAssignableFrom (sourceValue .getClass ())) {
358
- ((Map ) sourceValue ).values ()
359
- .stream ()
360
- .filter (Objects ::nonNull )
361
- .forEach (x -> validateNestedTypeValue (sourceKey , x , () -> maxDepth + 1 ));
362
- } else if (!String .class .isAssignableFrom (sourceValue .getClass ())) {
363
- throw new IllegalArgumentException ("map type field [" + sourceKey + "] has non-string type, cannot process it" );
364
- } else if (StringUtils .isBlank (sourceValue .toString ())) {
365
- throw new IllegalArgumentException ("map type field [" + sourceKey + "] has empty string, cannot process it" );
366
- }
367
- }
368
-
369
- @ SuppressWarnings ({ "rawtypes" })
370
- private void validateListTypeValue (String sourceKey , Object sourceValue , Supplier <Integer > maxDepthSupplier ) {
371
- for (Object value : (List ) sourceValue ) {
372
- if (value instanceof Map ) {
373
- validateNestedTypeValue (sourceKey , value , () -> maxDepthSupplier .get () + 1 );
374
- } else if (value == null ) {
375
- throw new IllegalArgumentException ("list type field [" + sourceKey + "] has null, cannot process it" );
376
- } else if (!(value instanceof String )) {
377
- throw new IllegalArgumentException ("list type field [" + sourceKey + "] has non string value, cannot process it" );
378
- } else if (StringUtils .isBlank (value .toString ())) {
379
- throw new IllegalArgumentException ("list type field [" + sourceKey + "] has empty string, cannot process it" );
380
- }
381
- }
337
+ String indexName = sourceAndMetadataMap .get (IndexFieldMapper .NAME ).toString ();
338
+ ProcessorDocumentUtils .validateMapTypeValue (
339
+ FIELD_MAP_FIELD ,
340
+ sourceAndMetadataMap ,
341
+ fieldMap ,
342
+ indexName ,
343
+ clusterService ,
344
+ environment ,
345
+ false
346
+ );
382
347
}
383
348
384
349
protected void setVectorFieldsToDocument (IngestDocument ingestDocument , Map <String , Object > processorMap , List <?> results ) {
0 commit comments