Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ParallelCountBytes #983

Merged
merged 4 commits into from
May 31, 2016
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.base.Stopwatch;
import com.google.common.io.BaseEncoding;

import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -49,8 +50,16 @@ public class ParallelCountBytes {

/**
* WorkUnit holds a buffer and the instructions for what to put in it.
*
* <p>Use it like this:
* <ol>
* <li> call()
* <li> the data is now in buf, you can access it directly
* <li> if need more, call resetForIndex(...) and go back to the top.
* <li> else, call close()
* </ol>
*/
private class WorkUnit implements Callable<WorkUnit> {
private static class WorkUnit implements Callable<WorkUnit>, Closeable {
public final ByteBuffer buf;
final SeekableByteChannel chan;
final int blockSize;
Expand All @@ -70,7 +79,7 @@ public WorkUnit call() throws IOException {
return this;
}
chan.position(pos);
// read until buffer is full, or EOF
// read until buffer it is full, or EOF

This comment was marked as spam.

This comment was marked as spam.

while (chan.read(buf) > 0) {};
return this;
}
Expand All @@ -80,16 +89,16 @@ public WorkUnit resetForIndex(int blockIndex) {
buf.flip();
return this;
}

public void close() throws IOException {
chan.close();

This comment was marked as spam.

This comment was marked as spam.

}
}

/**
* See the class documentation.
*/
public static void main(String[] args) throws IOException {
new ParallelCountBytes().start(args);
}

public void start(String[] args) throws IOException {
public static void main(String[] args) throws Exception {
if (args.length == 0 || args[0].equals("--help")) {
help();
return;
Expand All @@ -100,16 +109,15 @@ public void start(String[] args) throws IOException {
}

/**
* Print the length of the indicated file.
* Print the length and MD5 of the indicated file.
*
* <p>This uses the normal Java NIO Api, so it can take advantage of any installed
* NIO Filesystem provider without any extra effort.
*/
private void countFile(String fname) throws IOException{
private static void countFile(String fname) throws Exception {
// large buffers pay off
final int bufSize = 50 * 1024 * 1024;
Queue<Future<WorkUnit>> work = new ArrayDeque<>();
try {
Path path = Paths.get(new URI(fname));
long size = Files.size(path);
System.out.println(fname + ": " + size + " bytes.");
Expand All @@ -125,14 +133,15 @@ private void countFile(String fname) throws IOException{
for (blockIndex = 0; blockIndex < nThreads; blockIndex++) {
work.add(exec.submit(new WorkUnit(Files.newByteChannel(path), bufSize, blockIndex)));
}
while (true) {
while (!work.isEmpty()) {
WorkUnit full = work.remove().get();
md.update(full.buf.array(), 0, full.buf.position());
total += full.buf.position();
if (full.buf.hasRemaining()) {
break;
full.close();
} else {
work.add(exec.submit(full.resetForIndex(blockIndex++)));
}
work.add(exec.submit(full.resetForIndex(blockIndex++)));
}
exec.shutdown();

Expand All @@ -141,12 +150,9 @@ private void countFile(String fname) throws IOException{
String hex = String.valueOf(BaseEncoding.base16().encode(md.digest()));
System.out.println("The MD5 is: 0x" + hex);
if (total != size) {
System.out.println("Wait, this doesn't match! We saw " + total + " bytes, " +
"yet the file size is listed at " + size + " bytes.");
System.out.println("Wait, this doesn't match! We saw " + total + " bytes, "
+ "yet the file size is listed at " + size + " bytes.");
}
} catch (Exception ex) {
System.out.println(fname + ": " + ex.toString());
}
}

private static void help() {
Expand Down