Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17876] Write StructuredStreaming WAL to a stream instead of materializing all at once #15437

Closed
wants to merge 5 commits into from

Conversation

brkyvz
Copy link
Contributor

@brkyvz brkyvz commented Oct 11, 2016

What changes were proposed in this pull request?

The CompactibleFileStreamLog materializes the whole metadata log in memory as a String. This can cause issues when there are lots of files that are being committed, especially during a compaction batch.
You may come across stacktraces that look like:

java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at java.lang.StringCoding.encode(StringCoding.java:350)
at java.lang.String.getBytes(String.java:941)
at org.apache.spark.sql.execution.streaming.FileStreamSinkLog.serialize(FileStreamSinkLog.scala:127)

The safer way is to write to an output stream so that we don't have to materialize a huge string.

How was this patch tested?

Existing unit tests

@SparkQA
Copy link

SparkQA commented Oct 11, 2016

Test build #66754 has finished for PR 15437 at commit 4d50be5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@brkyvz
Copy link
Contributor Author

brkyvz commented Oct 11, 2016

cc @tdas @zsxwing Would one of you want to look at this?

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.io.IOUtils
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use org.apache.commons.io.IOUtils instead. Hadoop's IOUtils is @InterfaceStability.Evolving which can break compatibility at minor release

val lines = new String(bytes, UTF_8).split("\n")
if (lines.length == 0) {
override def deserialize(in: InputStream): Array[T] = {
val lines = IOUtils.lineIterator(in, UTF_8).asScala
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why not use Source.getLines since this is Scala.

out.write('\n')
out.write(serializeData(data).getBytes(UTF_8))
}
out.flush()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: no need to flush since it will be closed at once.

@@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.streaming

import java.io.OutputStream
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unused import

@@ -17,6 +17,7 @@

package org.apache.spark.sql.execution.streaming

import java.io.OutputStream
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unused import

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty good. Just some nits

@brkyvz
Copy link
Contributor Author

brkyvz commented Oct 12, 2016

Thanks @zsxwing addressed your comments

@SparkQA
Copy link

SparkQA commented Oct 12, 2016

Test build #66855 has finished for PR 15437 at commit cece672.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 13, 2016

Test build #66856 has finished for PR 15437 at commit 9c4fe72.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Oct 13, 2016

LGTM. Thanks! Merging to master and 2.0.

@asfgit asfgit closed this in edeb51a Oct 13, 2016
asfgit pushed a commit that referenced this pull request Oct 13, 2016
…terializing all at once

## What changes were proposed in this pull request?

The CompactibleFileStreamLog materializes the whole metadata log in memory as a String. This can cause issues when there are lots of files that are being committed, especially during a compaction batch.
You may come across stacktraces that look like:
```
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at java.lang.StringCoding.encode(StringCoding.java:350)
at java.lang.String.getBytes(String.java:941)
at org.apache.spark.sql.execution.streaming.FileStreamSinkLog.serialize(FileStreamSinkLog.scala:127)

```
The safer way is to write to an output stream so that we don't have to materialize a huge string.

## How was this patch tested?

Existing unit tests

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #15437 from brkyvz/ser-to-stream.

(cherry picked from commit edeb51a)
Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…terializing all at once

## What changes were proposed in this pull request?

The CompactibleFileStreamLog materializes the whole metadata log in memory as a String. This can cause issues when there are lots of files that are being committed, especially during a compaction batch.
You may come across stacktraces that look like:
```
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at java.lang.StringCoding.encode(StringCoding.java:350)
at java.lang.String.getBytes(String.java:941)
at org.apache.spark.sql.execution.streaming.FileStreamSinkLog.serialize(FileStreamSinkLog.scala:127)

```
The safer way is to write to an output stream so that we don't have to materialize a huge string.

## How was this patch tested?

Existing unit tests

Author: Burak Yavuz <brkyvz@gmail.com>

Closes apache#15437 from brkyvz/ser-to-stream.
@brkyvz brkyvz deleted the ser-to-stream branch February 3, 2019 20:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants