Skip to content

Commit

Permalink
Expose getObjAsByteBuffer API (apache#57)
Browse files Browse the repository at this point in the history
  • Loading branch information
yeyuqiang authored Jan 12, 2021
1 parent 618fc2d commit d268544
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 8 deletions.
54 changes: 48 additions & 6 deletions core/src/main/java/org/apache/spark/io/pmem/MyPlasmaClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,9 @@ public ByteBuffer getChildObject(String parentObjectId, int index) {
return buffer;
}

/**
* record the total child number
*/
public void recordChildObjectNumber(String parentObjectId, int num) {
public void recordChildObjectMetaData(String parentObjectId, int num, int len) {
put(paddingParentObjectId(parentObjectId).getBytes(),
ByteBuffer.allocate(4).putInt(num).array(), null);
new ChildObjectMetaData(num, len).getBytes(), null);
}

public int getChildObjectNumber(String parentObjectId) {
Expand All @@ -61,7 +58,20 @@ public int getChildObjectNumber(String parentObjectId) {
if (buffer == null) {
return -1;
}
return buffer.getInt();
ChildObjectMetaData metaData = new ChildObjectMetaData(buffer);
release(paddingParentObjectId(parentObjectId).getBytes());
return metaData.getChildObjectNum();
}

public int getLastChildObjectLen(String parentObjectId) {
ByteBuffer buffer = getObjAsByteBuffer(paddingParentObjectId(parentObjectId).getBytes(),
0, false);
if (buffer == null) {
return -1;
}
ChildObjectMetaData metaData = new ChildObjectMetaData(buffer);
release(paddingParentObjectId(parentObjectId).getBytes());
return metaData.getChildObjectLen();
}

String paddingParentObjectId(String parentObjectId) {
Expand Down Expand Up @@ -96,6 +106,38 @@ public byte[] toBytes() {
}
}

/**
* Use 8 bytes to record child object metadata.
* The first 4 bytes will record the total number of child objects.
* The last 4 bytes will record the length of last child object.
*/
class ChildObjectMetaData {
ByteBuffer buf;

public ChildObjectMetaData(int num, int len) {
buf = ByteBuffer.allocate(8);
buf.putInt(num);
buf.putInt(len);
}

public ChildObjectMetaData(ByteBuffer buf) {
this.buf = buf;
}

public int getChildObjectNum() {
return buf.getInt(0);
}

public int getChildObjectLen() {
return buf.getInt(4);
}

public byte[] getBytes() {
return buf.array();
}

}

/**
* Hold a global plasma client instance.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public void write(byte[] b, int off, int len) {
return;
}
int bytesToWrite = len;
int lastObjectLen = 0;
while (bytesToWrite > 0) {
int remainBytesInBuf = buffer.remaining();
if (remainBytesInBuf <= bytesToWrite) {
Expand All @@ -85,13 +86,14 @@ public void write(byte[] b, int off, int len) {
} else {
buffer.put(b, off, bytesToWrite);
off += bytesToWrite;
lastObjectLen = bytesToWrite;
bytesToWrite = 0;
}
writeToPlasma();
buffer.clear();
currChildObjectNumber++;
}
client.recordChildObjectNumber(parentObjectId, currChildObjectNumber);
writeMetaData(currChildObjectNumber, lastObjectLen);
}

private void writeToPlasma() {
Expand All @@ -102,6 +104,11 @@ private void writeToPlasma() {
}
}

private void writeMetaData(int num, int len) {
client.recordChildObjectMetaData(parentObjectId, num, len);

}

private byte[] shrinkLastObjBuffer() {
byte[] lastObjBytes = new byte[buffer.position()];
buffer.flip();
Expand Down
21 changes: 20 additions & 1 deletion core/src/main/java/org/apache/spark/io/pmem/PlasmaUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@

package org.apache.spark.io.pmem;

import java.nio.ByteBuffer;

/**
* Utility to operate object stored in plasma server
*/
public class PlasmaUtils {

private static MyPlasmaClient client = MyPlasmaClientHolder.get();
private static final int DEFAULT_BUFFER_SIZE = 4096;

public static boolean contains(String objectId) {
int num = client.getChildObjectNumber(objectId);
Expand All @@ -33,7 +36,23 @@ public static void remove(String objectId) {
client.release(childObjectId.toBytes());
client.delete(childObjectId.toBytes());
}
client.release(client.paddingParentObjectId(objectId).getBytes());
client.delete(client.paddingParentObjectId(objectId).getBytes());
}

public static ByteBuffer getObjAsByteBuffer(String objectId) {
int num = client.getChildObjectNumber(objectId);
int len = client.getLastChildObjectLen(objectId);
int totalBufferSize;
if (len > 0) {
totalBufferSize = ((num - 1) * DEFAULT_BUFFER_SIZE) + len;
} else {
totalBufferSize = num * DEFAULT_BUFFER_SIZE;
}
ByteBuffer buffer = ByteBuffer.allocate(totalBufferSize);
for (int i = 0; i < num; i++) {
ByteBuffer buf = client.getChildObject(objectId, i);
buffer.put(buf);
}
return buffer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,20 @@ public void testMultiThreadReadWrite() throws InterruptedException {
threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
}

@Test
public void testGetObjAsByteBuffer() throws IOException {
String blockId = "block_id_" + random.nextInt(10000000);
byte[] bytesWrite = prepareByteBlockToWrite(2.7);
PlasmaOutputStream pos = new PlasmaOutputStream(blockId);
pos.write(bytesWrite);

ByteBuffer bytesRead = PlasmaUtils.getObjAsByteBuffer(blockId);
assertArrayEquals(bytesWrite, bytesRead.array());

PlasmaUtils.remove(blockId);
assertFalse(PlasmaUtils.contains(blockId));
}

@AfterClass
public static void tearDown() {
try {
Expand Down

0 comments on commit d268544

Please sign in to comment.