Skip to content

Commit

Permalink
GoogleTaxonomy + Batch layout
Browse files Browse the repository at this point in the history
  • Loading branch information
goulven authored and goulven committed Dec 30, 2023
1 parent 6e092da commit afebbb3
Show file tree
Hide file tree
Showing 12 changed files with 461 additions and 64 deletions.
25 changes: 23 additions & 2 deletions api/src/main/java/org/open4goods/api/config/ApiConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.open4goods.crawler.services.fetching.CsvDatasourceFetchingService;
import org.open4goods.crawler.services.fetching.WebDatasourceFetchingService;
import org.open4goods.dao.ProductRepository;
import org.open4goods.exceptions.InvalidParameterException;
import org.open4goods.model.constants.CacheConstants;
import org.open4goods.model.constants.Currency;
import org.open4goods.model.constants.TimeConstants;
Expand All @@ -34,6 +35,7 @@
import org.open4goods.services.BrandService;
import org.open4goods.services.DataSourceConfigService;
import org.open4goods.services.EvaluationService;
import org.open4goods.services.GoogleTaxonomyService;
import org.open4goods.services.Gs1PrefixService;
import org.open4goods.services.ImageMagickService;
import org.open4goods.services.RemoteFileCachingService;
Expand Down Expand Up @@ -149,6 +151,23 @@ AiService aiService (AiAgent nudgerAgent, VerticalsConfigService verticalService
return new AiService(nudgerAgent, verticalService, spelEvaluationService);
}

@Bean
public GoogleTaxonomyService googleTaxonomyService(@Autowired RemoteFileCachingService remoteFileCachingService) {
GoogleTaxonomyService gts = new GoogleTaxonomyService(remoteFileCachingService);

// TODO : From conf
// TODO : Add others
try {
gts.loadGoogleTaxonUrl("https://www.google.com/basepages/producttype/taxonomy-with-ids.fr-FR.txt", "fr");
gts.loadGoogleTaxonUrl("https://www.google.com/basepages/producttype/taxonomy-with-ids.en-US.txt", "fr");
} catch (Exception e) {
logger.error("Error loading google taxonomy", e);
}


return gts;
}

@Bean
AiAgent nudgerAgent(@Autowired ChatLanguageModel chatLanguageModel) {
return AiServices.builder(AiAgent.class)
Expand Down Expand Up @@ -178,8 +197,10 @@ RealtimeAggregationService realtimeAggregationService( @Autowired EvaluationServ
@Autowired DataSourceConfigService dataSourceConfigService,
@Autowired VerticalsConfigService configService,
@Autowired BarcodeValidationService barcodeValidationService,
@Autowired BrandService brandservice) {
return new RealtimeAggregationService(evaluationService, referentielService, standardiserService, autowireBeanFactory, aggregatedDataRepository, apiProperties, gs1prefixService, dataSourceConfigService, configService, barcodeValidationService,brandservice);
@Autowired BrandService brandservice,
@Autowired GoogleTaxonomyService gts
) {
return new RealtimeAggregationService(evaluationService, referentielService, standardiserService, autowireBeanFactory, aggregatedDataRepository, apiProperties, gs1prefixService, dataSourceConfigService, configService, barcodeValidationService,brandservice, gts);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@

import org.open4goods.api.services.BatchService;
import org.open4goods.config.yml.ui.VerticalConfig;
import org.open4goods.dao.ProductRepository;
import org.open4goods.exceptions.InvalidParameterException;
import org.open4goods.exceptions.ResourceNotFoundException;
import org.open4goods.model.constants.RolesConstants;
import org.open4goods.services.SerialisationService;
import org.open4goods.services.VerticalsConfigService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
Expand Down Expand Up @@ -39,6 +43,11 @@ public class BatchController {

private final BatchService batchService;


@Autowired
private ProductRepository repository;


public BatchController(BatchService batchService, SerialisationService serialisationService, VerticalsConfigService verticalsConfigService) {
this.serialisationService = serialisationService;
this.service = verticalsConfigService;
Expand Down Expand Up @@ -85,5 +94,12 @@ public void scoreVerticals() throws InvalidParameterException, IOException {
public void sanitize() throws InvalidParameterException, IOException {
batchService.sanitize();
}

@GetMapping("/sanitisation/{gtin}")
@Operation(summary="Launch sanitisation of all products")
public void sanitizeOne(@PathVariable String gtin ) throws InvalidParameterException, IOException, ResourceNotFoundException {
batchService.sanitizeOne(repository.getById(gtin));
}


}
67 changes: 15 additions & 52 deletions api/src/main/java/org/open4goods/api/services/BatchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ public BatchService(
this.realtimeAggregationService = realtimeAggregationService;
}




/**
* update all verticals. Scheduled
Expand All @@ -82,53 +80,6 @@ public void scoreAll() {



// /**
// * Update a vertical
// * @param verticalId
// */
// public void fullUpdate(String verticalId) {
// VerticalConfig vertical = verticalsService.getConfigById(verticalId).orElseThrow();
// fullUpdate(vertical);
// }

// /**
// * Update verticals with the batch Aggragator
// * @throws AggregationSkipException
// */
// public void fullUpdate(VerticalConfig vertical) {
//
// logger.info("Full update for {}", vertical.getId());
// ScoringBatchedAggregator batchAgg = batchAggregationService.getAggregator(vertical);
// RealTimeAggregator rtAgg = realtimeAggregationService.getAggregator(vertical);
//
// Stream<Product> productsStream = dataRepository.getProductsMatchingVertical(vertical);
//
// List<Product> productBag = new ArrayList<>();
// logger.info("Starting realtime aggregation");
// // Realtime aggregation
// productsStream.forEach(data -> {
// try {
// dedicatedLogger.debug("Realtime aggregation for {}", data);
// //TODO : Bad design
// productBag.add( rtAgg.build(data.getFragment(), data));
// } catch (AggregationSkipException e) {
// dedicatedLogger.warn("Error on realtimeaggregation aggregation for {}", data, e);
// }
// });
//
// dedicatedLogger.info("Starting batch aggregation");
// // Batched (scoring) aggregation
// batchAgg.update(productBag);
//
// // TODO : Bulk size from conf
// Lists.partition(productBag, 200).forEach(p -> {
// dedicatedLogger.info("Indexing {} products", p.size());
// dataRepository.index(p);
// });
//
// }
//


/**
* Score verticals with the batch Aggragator
Expand All @@ -140,7 +91,7 @@ public void batchScore(VerticalConfig vertical) {

ScoringBatchedAggregator batchAgg = batchAggregationService.getScoringAggregator(vertical);

List<Product> productBag = dataRepository.getProductsMatchingVertical(vertical).toList();
List<Product> productBag = dataRepository.getProductsMatchingCategories(vertical).toList();
// Batched (scoring) aggregation
batchAgg.update(productBag);
logger.info("Score batching : indexing {} products", productBag.size());
Expand All @@ -164,8 +115,20 @@ public void sanitize() {
batchAgg.update(p);
dataRepository.index(p);
});
logger.info("started : Sanitisation batching for all items");

logger.info("done: Sanitisation batching for all items");
}

/**
* Launch the sanitisation of one product
* @param product
*/
public void sanitizeOne(Product product) {
logger.info("started : Sanitisation batching for {}",product);
SanitisationBatchedAggregator batchAgg = batchAggregationService.getFullSanitisationAggregator();

batchAgg.update(product);
dataRepository.index(product);
logger.info("done : Sanitisation batching for {}", product);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.open4goods.api.services.aggregation.services.realtime.MediaAggregationService;
import org.open4goods.api.services.aggregation.services.realtime.NamesAggregationService;
import org.open4goods.api.services.aggregation.services.realtime.PriceAggregationService;
import org.open4goods.api.services.aggregation.services.realtime.VerticalRealTimeAggregationService;
import org.open4goods.api.services.aggregation.services.realtime.TaxonomyRealTimeAggregationService;
import org.open4goods.config.yml.ui.VerticalConfig;
import org.open4goods.config.yml.ui.VerticalProperties;
import org.open4goods.dao.ProductRepository;
Expand All @@ -25,6 +25,7 @@
import org.open4goods.services.BrandService;
import org.open4goods.services.DataSourceConfigService;
import org.open4goods.services.EvaluationService;
import org.open4goods.services.GoogleTaxonomyService;
import org.open4goods.services.Gs1PrefixService;
import org.open4goods.services.StandardiserService;
import org.open4goods.services.VerticalsConfigService;
Expand Down Expand Up @@ -67,14 +68,18 @@ public class RealtimeAggregationService {
private BarcodeValidationService barcodeValidationService;

private BrandService brandService;

private GoogleTaxonomyService taxonomyService;

public RealtimeAggregationService(EvaluationService evaluationService,
ReferentielService referentielService, StandardiserService standardiserService,
AutowireCapableBeanFactory autowireBeanFactory, ProductRepository aggregatedDataRepository,
ApiProperties apiProperties, Gs1PrefixService gs1prefixService,
DataSourceConfigService dataSourceConfigService, VerticalsConfigService configService,
BarcodeValidationService barcodeValidationService,
BrandService brandService) {
BrandService brandService,
GoogleTaxonomyService taxonomyService
) {
super();
this.evaluationService = evaluationService;
this.referentielService = referentielService;
Expand All @@ -87,7 +92,7 @@ public RealtimeAggregationService(EvaluationService evaluationService,
verticalConfigService = configService;
this.brandService=brandService;
this.barcodeValidationService = barcodeValidationService;

this.taxonomyService = taxonomyService;

aggregator = getAggregator(configService.getConfigById(VerticalsConfigService.MAIN_VERTICAL_NAME).get());

Expand Down Expand Up @@ -131,7 +136,7 @@ RealTimeAggregator getAggregator(VerticalConfig config) {

services.add(new BarCodeAggregationService(apiProperties.logsFolder(), gs1prefixService,barcodeValidationService, apiProperties.isDedicatedLoggerToConsole()));

services.add(new VerticalRealTimeAggregationService( apiProperties.logsFolder(), verticalConfigService, apiProperties.isDedicatedLoggerToConsole()));
services.add(new TaxonomyRealTimeAggregationService( apiProperties.logsFolder(), verticalConfigService, taxonomyService, apiProperties.isDedicatedLoggerToConsole()));

services.add(new AttributeRealtimeAggregationService(verticalConfigService, brandService, apiProperties.logsFolder(), apiProperties.isDedicatedLoggerToConsole()));

Expand Down Expand Up @@ -159,6 +164,9 @@ RealTimeAggregator getAggregator(VerticalConfig config) {

services.add(new MediaAggregationService(config, apiProperties.logsFolder(), apiProperties.isDedicatedLoggerToConsole()));




final RealTimeAggregator ret = new RealTimeAggregator(services);

autowireBeanFactory.autowireBean(ret);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,12 @@ public void onDataFragment(final DataFragment dataFragment, final Product produc


// Removing
product.getAttributes().setUnmapedAttributes(product.getAttributes().getUnmapedAttributes().stream().filter(e -> !toRemoveFromUnmatched.contains(e.getName())) .collect(Collectors.toSet()));
product.getAttributes().setUnmapedAttributes(product.getAttributes().getUnmapedAttributes().stream()
// TODO : Should be from path
// TODO : apply from sanitisation
.filter(e -> !e.getName().contains("CATEGORY"))
.filter(e -> !toRemoveFromUnmatched.contains(e.getName()))
.collect(Collectors.toSet()));



Expand Down
Original file line number Diff line number Diff line change
@@ -1,31 +1,87 @@
package org.open4goods.api.services.aggregation.services.realtime;

import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang3.StringUtils;
import org.open4goods.api.services.aggregation.AbstractRealTimeAggregationService;
import org.open4goods.config.yml.ui.VerticalConfig;
import org.open4goods.model.data.DataFragment;
import org.open4goods.model.data.UnindexedKeyVal;
import org.open4goods.model.product.Product;
import org.open4goods.services.GoogleTaxonomyService;
import org.open4goods.services.VerticalsConfigService;

/**
* Service in charge of mapping product categories to verticals
* @author goulven
*
*/
public class VerticalRealTimeAggregationService extends AbstractRealTimeAggregationService {
public class TaxonomyRealTimeAggregationService extends AbstractRealTimeAggregationService {

private VerticalsConfigService verticalService;
private GoogleTaxonomyService taxonomyService;

public VerticalRealTimeAggregationService( final String logsFolder, final VerticalsConfigService verticalService,boolean toConsole) {
public TaxonomyRealTimeAggregationService( final String logsFolder, final VerticalsConfigService verticalService,GoogleTaxonomyService taxonomyService, boolean toConsole) {
super(logsFolder, toConsole);
this.verticalService = verticalService;
this.taxonomyService = taxonomyService;

}

@Override
public void onDataFragment(final DataFragment input, final Product output) {

setVerticalFromCategories(input, output);

Integer taxonomy = googleTaxonomy(input);

if (null != taxonomy) {
output.setGoogleTaxonomyId(taxonomy);
}
}



/**
* Try to detect the google taxonomy id
* @param input
* @return
*/
private Integer googleTaxonomy(final DataFragment input) {
Integer taxonomyId = null;

List<Integer> taxons =new ArrayList<>();

//TODO : equivalent in a batch service, for stock processing
input.getAttributes().forEach(a -> {
String i = a.getName();

if (i.contains("CATEGORY")) {
Integer t = taxonomyService.resolve(a.getValue());
if (null != t) {
taxons.add(t);
}
}
});

if (taxons.size() == 1) {
taxonomyId = taxons.stream().findAny().orElse(null);
} else if (taxons.size() > 1) {
// TODO : The language (should not be needed), will bug when other languages
taxonomyId = taxonomyService.selectDeepest("fr", taxons);
}

return taxonomyId;
}


/**
* Defines a vertical and a taxonomy id from the config based matching
* @param input
* @param output
*/
private void setVerticalFromCategories(final DataFragment input, final Product output) {
String category = input.getCategory();


Expand Down Expand Up @@ -55,7 +111,6 @@ public void onDataFragment(final DataFragment input, final Product output) {
dedicatedLogger.info("No category in {}, removing vertical", output);
output.setVertical(null);
}

}

}
22 changes: 22 additions & 0 deletions commons/src/main/java/org/open4goods/config/TestConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.open4goods.config;

import org.open4goods.services.GoogleTaxonomyService;
import org.open4goods.services.RemoteFileCachingService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TestConfig {

@Bean
public RemoteFileCachingService remoteFileCachingService() {
// TODO : from env variable
return new RemoteFileCachingService("/tmp");
}

@Bean
public GoogleTaxonomyService googleTaxonomyService(@Autowired RemoteFileCachingService remoteFileCachingService) {
return new GoogleTaxonomyService(remoteFileCachingService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public ProductRepository() {
* @param v
* @return
*/
public Stream<Product> getProductsMatchingVertical(VerticalConfig v) {
public Stream<Product> getProductsMatchingCategories(VerticalConfig v) {
Criteria c = new Criteria("datasourceCategories").in(v.getMatchingCategories())
// TODO : Add exclusion
// .and(new Criteria("datasourceCategories").notIn(v.getMatchingCategories()))
Expand Down
Loading

0 comments on commit afebbb3

Please sign in to comment.