Skip to content

Commit

Permalink
Externalising configuration for indexation workers
Browse files Browse the repository at this point in the history
  • Loading branch information
goulven authored and goulven committed Oct 3, 2024
1 parent 84dc67a commit 4810c5e
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 14 deletions.
2 changes: 1 addition & 1 deletion api/src/main/java/org/open4goods/api/config/ApiConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ String logsFolder(@Autowired final ApiProperties config) {

@Bean DataFragmentStoreService dataFragmentStoreService(
@Autowired final ApiProperties config, @Autowired final SerialisationService serialisationService, @Autowired StandardiserService standardiserService, @Autowired AggregationFacadeService generationService, @Autowired ProductRepository aggregatedDataRepository) {
return new DataFragmentStoreService(standardiserService, generationService, aggregatedDataRepository);
return new DataFragmentStoreService(standardiserService, generationService, aggregatedDataRepository, config.getIndexationConfig());
}

/**
Expand Down
19 changes: 19 additions & 0 deletions api/src/main/java/org/open4goods/api/config/yml/ApiProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.List;

import org.apache.commons.lang3.ArrayUtils;
import org.open4goods.api.controller.api.IndexationController;
import org.open4goods.commons.config.BrandsConfiguration;
import org.open4goods.commons.config.yml.DevModeConfiguration;
import org.open4goods.commons.config.yml.GithubConfiguration;
Expand Down Expand Up @@ -193,6 +194,12 @@ public class ApiProperties {
private List<String> corsAllowedHosts = new ArrayList<>();


/**
* Configuration for indexation (number of threads, batch size, ...)
*/
private IndexationConfig indexationConfig = new IndexationConfig();


/**
* Duration of the pause to apply beetween 2 subsequent GenAI generation
*/
Expand Down Expand Up @@ -614,6 +621,18 @@ public void setGenAiPauseDurationMs(long genAiPauseDurationMs) {



public IndexationConfig getIndexationConfig() {
return indexationConfig;
}



public void setIndexationConfig(IndexationConfig indexationConfig) {
this.indexationConfig = indexationConfig;
}






Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.open4goods.api.config.yml;

public class IndexationConfig {

/**
* Max size of the blocking queue
*/
int queueMaxSize = 5000;

/**
* Bulk size (applied for update, means on fetching and on insertion in elastic cluster)
*/

int bulkPageSize = 150;

/**
* Number ofconccurent workers
*/
int workers = 2;

/**
* Duration of pause when no elements in the queue
*/
int pauseDuration = 4000;


public int getBulkPageSize() {
return bulkPageSize;
}
public void setBulkPageSize(int dequeueSize) {
this.bulkPageSize = dequeueSize;
}
public int getWorkers() {
return workers;
}
public void setWorkers(int workers) {
this.workers = workers;
}
public int getPauseDuration() {
return pauseDuration;
}
public void setPauseDuration(int pauseDuration) {
this.pauseDuration = pauseDuration;
}
public int getQueueMaxSize() {
return queueMaxSize;
}
public void setQueueMaxSize(int queueMaxSize) {
this.queueMaxSize = queueMaxSize;
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.lang3.StringUtils;
import org.open4goods.api.config.yml.ApiProperties;
import org.open4goods.api.config.yml.IndexationConfig;
import org.open4goods.api.services.AggregationFacadeService;
import org.open4goods.commons.dao.ProductRepository;
import org.open4goods.commons.exceptions.AggregationSkipException;
Expand All @@ -32,7 +34,6 @@
*
* It also provides mechanism to stop indexation (and keep data in the persisted
* file), and to perform "direct" updates without giving up to the file buffer
* TODO : Could also have a thread pool here to increase performances
* @author Goulven.Furet
*
*/
Expand All @@ -52,8 +53,7 @@ public class DataFragmentStoreService {
private final AtomicBoolean serviceShutdown = new AtomicBoolean(false);

// The queue implementation
// TODO : Limit from conf
private BlockingQueue<DataFragment> queue = new LinkedBlockingQueue<>(15000);
private BlockingQueue<DataFragment> queue;

private ProductRepository aggregatedDataRepository;

Expand All @@ -62,24 +62,21 @@ public class DataFragmentStoreService {

/**
*
* @param indexationConfig
* @param queueFolder The folder where indexation queued datas will be stored
*/
public DataFragmentStoreService(StandardiserService standardiserService, AggregationFacadeService generationService, ProductRepository aggregatedDataRepository) {
public DataFragmentStoreService(StandardiserService standardiserService, AggregationFacadeService generationService, ProductRepository aggregatedDataRepository, IndexationConfig indexationConfig) {


this.standardiserService = standardiserService;
this.aggregatedDataRepository = aggregatedDataRepository;
this.generationService=generationService;

// TODO : from conf
int dequeueSize = 150;
int workers = 2;
int pauseDuration = 4000;
//
logger.info("Starting file queue consumer thread, with bulk page size of {} items", dequeueSize );
//
for (int i = 0; i < workers; i++) {
Thread.startVirtualThread(new DataFragmentAggregationWorker(this, dequeueSize, pauseDuration,"dequeue-worker-"+i));
this.queue = new LinkedBlockingQueue<>(indexationConfig.getQueueMaxSize());

for (int i = 0; i < indexationConfig.getWorkers(); i++) {
logger.info("Starting file queue consumer thread {}, with bulk page size of {} items",i, indexationConfig.getBulkPageSize() );
Thread.startVirtualThread(new DataFragmentAggregationWorker(this, indexationConfig.getBulkPageSize(), indexationConfig.getPauseDuration(),"dequeue-worker-"+i));
}


Expand Down

0 comments on commit 4810c5e

Please sign in to comment.