Skip to content
This repository has been archived by the owner on Dec 17, 2024. It is now read-only.

Commit

Permalink
#6 refactor shuffle-daos by abstracting shuffle IO for supporting bot…
Browse files Browse the repository at this point in the history
…h synchronous and asynchronous DAOS Object API (#8)

* reconstruct project and add new shuffle-daos plugin

Signed-off-by: jiafu zhang <jiafu.zhang@intel.com>

* corrected location of scalastyle-config

Signed-off-by: jiafu zhang <jiafu.zhang@intel.com>

* use daos-java version of 1.1.4

Signed-off-by: jiafu zhang <jiafu.zhang@intel.com>

* refactor shuffle-daos by abstracting shuffle IO for supporting both synchronous and asynchronous DAOS Object API

Signed-off-by: jiafu zhang <jiafu.zhang@intel.com>

* fix travis warning and unused import in shuffle-hadoop

Signed-off-by: jiafu zhang <jiafu.zhang@intel.com>

* fix travis build issue

Signed-off-by: jiafu zhang <jiafu.zhang@intel.com>

* fix travis build

Signed-off-by: jiafu zhang <jiafu.zhang@intel.com>

* maven quite build

Signed-off-by: jiafu zhang <jiafu.zhang@intel.com>

* fix traviswq outofmemroy issue

Signed-off-by: jiafu zhang <jiafu.zhang@intel.com>

* fix travis outofmemory issue

Signed-off-by: jiafu zhang <jiafu.zhang@intel.com>

* fix travis outofmemory issue

Signed-off-by: jiafu zhang <jiafu.zhang@intel.com>

* reduce data size to avoid outofmemory in test code

Signed-off-by: jiafu zhang <jiafu.zhang@intel.com>
  • Loading branch information
jiafuzha authored Feb 22, 2021
1 parent 99e53f0 commit 7894526
Show file tree
Hide file tree
Showing 20 changed files with 1,749 additions and 1,412 deletions.
3 changes: 1 addition & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,4 @@ jobs:
install:
- #empty install step
script:
- cd ${TRAVIS_BUILD_DIR}/oap-shuffle/remote-shuffle/
- mvn -q test
- mvn -q clean install
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@
</dependencyManagement>

<modules>
<module>shuffle-hadoop</module>
<module>shuffle-daos</module>
<module>shuffle-hadoop</module>
</modules>

</project>
3 changes: 3 additions & 0 deletions shuffle-daos/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<configuration>
<argLine>-Xmx2048m</argLine>
</configuration>
<executions>
<execution>
<id>test</id>
Expand Down
290 changes: 127 additions & 163 deletions shuffle-daos/src/main/java/org/apache/spark/shuffle/daos/DaosReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,221 +24,185 @@
package org.apache.spark.shuffle.daos;

import io.daos.obj.DaosObject;
import io.daos.obj.IODataDesc;
import io.netty.util.internal.ObjectPool;
import io.netty.buffer.ByteBuf;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv;
import org.apache.spark.shuffle.ShuffleReadMetricsReporter;
import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.BlockManagerId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.Tuple3;

import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
* A class with {@link DaosObject} wrapped to read data from DAOS in either caller's thread or
* dedicated executor thread. The actual read is performed by {@link DaosObject#fetch(IODataDesc)}.
* A abstract class with {@link DaosObject} wrapped to read data from DAOS.
*/
public class DaosReader {
public interface DaosReader {

private DaosObject object;

private Map<DaosShuffleInputStream.BufferSource, Integer> bufferSourceMap = new ConcurrentHashMap<>();

private BoundThreadExecutors executors;

private Map<DaosReader, Integer> readerMap;

private static Logger logger = LoggerFactory.getLogger(DaosReader.class);
DaosObject getObject();

/**
* construct DaosReader with <code>object</code> and dedicated read <code>executors</code>.
* release resources bound with this reader.
*
* @param object
* opened DaosObject
* @param executors
* null means read in caller's thread. Submit {@link ReadTask} to dedicate executor retrieved by
* {@link #nextReaderExecutor()} otherwise.
* @param force
* force close even if there is on-going read
*/
public DaosReader(DaosObject object, BoundThreadExecutors executors) {
this.object = object;
this.executors = executors;
}

public DaosObject getObject() {
return object;
}

public boolean hasExecutors() {
return executors != null;
}
void close(boolean force);

/**
* next executor. null if there is no executors being set.
* set global <code>readMap</code> and hook this reader for releasing resources.
*
* @return shareable executor instance. null means no executor set.
* @param readerMap
* global reader map
*/
public BoundThreadExecutors.SingleThreadExecutor nextReaderExecutor() {
if (executors != null) {
return executors.nextExecutor();
}
return null;
}
void setReaderMap(Map<DaosReader, Integer> readerMap);

/**
* release resources of all {@link org.apache.spark.shuffle.daos.DaosShuffleInputStream.BufferSource}
* bound with this reader.
* prepare read with some parameters.
*
* @param partSizeMap
* @param maxBytesInFlight
* how many bytes can be read concurrently
* @param maxReqSizeShuffleToMem
* maximum data can be put in memory
* @param metrics
* @return
*/
public void close() {
// force releasing
bufferSourceMap.forEach((k, v) -> k.cleanup(true));
bufferSourceMap.clear();
if (readerMap != null) {
readerMap.remove(this);
readerMap = null;
}
}

@Override
public String toString() {
return "DaosReader{" +
"object=" + object +
'}';
}
void prepare(LinkedHashMap<Tuple2<Long, Integer>, Tuple3<Long, BlockId, BlockManagerId>> partSizeMap,
long maxBytesInFlight, long maxReqSizeShuffleToMem, ShuffleReadMetricsReporter metrics);

/**
* register buffer source for resource cleanup.
* current map/reduce id being requested.
*
* @param source
* BufferSource instance
* @return map/reduce id tuple
*/
public void register(DaosShuffleInputStream.BufferSource source) {
bufferSourceMap.put(source, 1);
}
Tuple2<Long, Integer> curMapReduceId();

/**
* unregister buffer source if <code>source</code> is release already.
* get available buffer after iterating current buffer, next buffer in current desc and next desc.
*
* @param source
* BufferSource instance
* @return buffer with data read from DAOS
* @throws IOException
*/
public void unregister(DaosShuffleInputStream.BufferSource source) {
bufferSourceMap.remove(source);
}
ByteBuf nextBuf() throws IOException;

/**
* set global <code>readMap</code> and hook this reader for releasing resources.
* All data from current map output is read and
* reach to data from next map?
*
* @param readerMap
* global reader map
* @return true or false
*/
public void setReaderMap(Map<DaosReader, Integer> readerMap) {
readerMap.put(this, 0);
this.readerMap = readerMap;
}
boolean isNextMap();

/**
* Task to read from DAOS. Task itself is cached to reduce GC time.
* To reuse task for different reads, prepare and reset {@link ReadTaskContext} by calling
* {@link #newInstance(ReadTaskContext)}
* upper layer should call this method to read more map output
*/
static final class ReadTask implements Runnable {
private ReadTaskContext context;
private final ObjectPool.Handle<ReadTask> handle;
void setNextMap(boolean b);

private static final ObjectPool<ReadTask> objectPool = ObjectPool.newPool(handle -> new ReadTask(handle));
/**
* check if all data from current map output is read.
*/
void checkPartitionSize() throws IOException;

private static final Logger log = LoggerFactory.getLogger(ReadTask.class);
/**
* check if all map outputs are read.
*
* @throws IOException
*/
void checkTotalPartitions() throws IOException;

static ReadTask newInstance(ReadTaskContext context) {
ReadTask task = objectPool.get();
task.context = context;
return task;
/**
* reader configurations, please check configs prefixed with SHUFFLE_DAOS_READ in {@link package$#MODULE$}.
*/
final class ReaderConfig {
private long minReadSize;
private long maxBytesInFlight;
private long maxMem;
private int readBatchSize;
private int waitDataTimeMs;
private int waitTimeoutTimes;
private boolean fromOtherThread;

private static final Logger log = LoggerFactory.getLogger(ReaderConfig.class);

public ReaderConfig() {
this(true);
}

private ReadTask(ObjectPool.Handle<ReadTask> handle) {
this.handle = handle;
private ReaderConfig(boolean load) {
if (load) {
initialize();
}
}

@Override
public void run() {
boolean cancelled = context.cancelled;
try {
if (!cancelled) {
context.object.fetch(context.desc);
}
} catch (Exception e) {
log.error("failed to read for " + context.desc, e);
} finally {
// release desc buffer and keep data buffer
context.desc.release(cancelled);
context.signal();
context = null;
handle.recycle(this);
private void initialize() {
SparkConf conf = SparkEnv.get().conf();
minReadSize = (long)conf.get(package$.MODULE$.SHUFFLE_DAOS_READ_MINIMUM_SIZE()) * 1024;
this.maxBytesInFlight = -1L;
this.maxMem = -1L;
this.readBatchSize = (int)conf.get(package$.MODULE$.SHUFFLE_DAOS_READ_BATCH_SIZE());
this.waitDataTimeMs = (int)conf.get(package$.MODULE$.SHUFFLE_DAOS_READ_WAIT_DATA_MS());
this.waitTimeoutTimes = (int)conf.get(package$.MODULE$.SHUFFLE_DAOS_READ_WAIT_DATA_TIMEOUT_TIMES());
this.fromOtherThread = (boolean)conf.get(package$.MODULE$.SHUFFLE_DAOS_READ_FROM_OTHER_THREAD());
if (log.isDebugEnabled()) {
log.debug("minReadSize: " + minReadSize);
log.debug("maxBytesInFlight: " + maxBytesInFlight);
log.debug("maxMem: " + maxMem);
log.debug("readBatchSize: " + readBatchSize);
log.debug("waitDataTimeMs: " + waitDataTimeMs);
log.debug("waitTimeoutTimes: " + waitTimeoutTimes);
log.debug("fromOtherThread: " + fromOtherThread);
}
}
}

/**
* Context for read task. It holds all other object to read and sync between caller thread and read thread.
* It should be cached in caller thread for reusing.
*/
static final class ReadTaskContext extends LinkedTaskContext {

/**
* constructor with all parameters. Some of them can be reused later.
*
* @param object
* DAOS object to fetch data from DAOS
* @param counter
* counter to indicate how many data ready for being consumed
* @param takeLock
* lock to work with <code>notEmpty</code> condition to signal caller thread there is data ready to be consumed
* @param notEmpty
* condition to signal there is some data ready
* @param desc
* desc object to describe which part of data to be fetch and hold returned data
* @param mapReduceId
* to track which map reduce ID this task fetches data for
*/
ReadTaskContext(DaosObject object, AtomicInteger counter, Lock takeLock, Condition notEmpty,
IODataDesc desc, Object mapReduceId) {
super(object, counter, takeLock, notEmpty);
this.desc = desc;
this.morePara = mapReduceId;
public ReaderConfig copy(long maxBytesInFlight, long maxMem) {
ReaderConfig rc = new ReaderConfig(false);
rc.maxMem = maxMem;
rc.minReadSize = minReadSize;
rc.readBatchSize = readBatchSize;
rc.waitDataTimeMs = waitDataTimeMs;
rc.waitTimeoutTimes = waitTimeoutTimes;
rc.fromOtherThread = fromOtherThread;
if (maxBytesInFlight < rc.minReadSize) {
rc.maxBytesInFlight = minReadSize;
} else {
rc.maxBytesInFlight = maxBytesInFlight;
}
return rc;
}

@Override
public ReadTaskContext getNext() {
return (ReadTaskContext) next;
public int getReadBatchSize() {
return readBatchSize;
}

public Tuple2<Long, Integer> getMapReduceId() {
return (Tuple2<Long, Integer>) morePara;
public int getWaitDataTimeMs() {
return waitDataTimeMs;
}
}

/**
* Thread factory for DAOS read tasks.
*/
protected static class ReadThreadFactory implements ThreadFactory {
private AtomicInteger id = new AtomicInteger(0);

@Override
public Thread newThread(Runnable runnable) {
Thread t;
String name = "daos_read_" + id.getAndIncrement();
if (runnable == null) {
t = new Thread(name);
} else {
t = new Thread(runnable, name);
}
t.setDaemon(true);
t.setUncaughtExceptionHandler((thread, throwable) ->
logger.error("exception occurred in thread " + name, throwable));
return t;
public int getWaitTimeoutTimes() {
return waitTimeoutTimes;
}

public long getMaxBytesInFlight() {
return maxBytesInFlight;
}
}

public long getMaxMem() {
return maxMem;
}

public long getMinReadSize() {
return minReadSize;
}

public boolean isFromOtherThread() {
return fromOtherThread;
}
}
}
Loading

0 comments on commit 7894526

Please sign in to comment.