-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-17417][Core] Fix # of partitions for Reliable RDD checkpointing #15370
Conversation
Test build #66419 has finished for PR 15370 at commit
|
@@ -72,7 +72,8 @@ private[spark] class ReliableCheckpointRDD[T: ClassTag]( | |||
.sortBy(_.toString) | |||
// Fail fast if input files are invalid | |||
inputFiles.zipWithIndex.foreach { case (path, i) => | |||
if (!path.toString.endsWith(ReliableCheckpointRDD.checkpointFileName(i))) { | |||
if (path.getName != ReliableCheckpointRDD.checkpointFileName(i) & |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may be right that you can assume this code won't encounter files with older shorter split numbers. But it seems easier to just look for a file name ending in "part-[0-9]+" here rather than special case an "old format"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For validating the file name part-[0-9]+
is definitely concise, however we will loose the consistency in the understanding here. While validating we would be flexible but while actually getting the files we would always look for a 10digit formatting scheme. This would always fail if the part files don't match. So adding a part-[0-9]+
check wont help.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can't you accept both types of file name this way every where they're looked for / read? that seems best of all. Maybe I am missing something.
@@ -83,8 +84,9 @@ private[spark] class ReliableCheckpointRDD[T: ClassTag]( | |||
* Return the locations of the checkpoint file associated with the given partition. | |||
*/ | |||
protected override def getPreferredLocations(split: Partition): Seq[String] = { | |||
val status = fs.getFileStatus( | |||
new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileName(split.index))) | |||
val path = new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileName(split.index)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These next 2 changes probably aren't needed then
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to determine the actual path to the part file. And if we plan to not parse the old format, then we can hardcode it to the new format, because if we keep it flexible - we will have to deal with figuring out what formatting was used or else we would be dealing with FNF
exceptions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we expect to be able to recover check-pointed RDDs across Spark versions? My impression that isn't something that is really supported.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's really supported; if this much can be more flexible without adding much of any complexity though, all the better.
private def checkpointFileName(partitionIndex: Int): String = { | ||
"part-%05d".format(partitionIndex) | ||
private def checkpointFileName(partitionIndex: Int, oldFormat: Boolean = false): String = { | ||
if (oldFormat) "part-%05d".format(partitionIndex) else "part-%010d".format(partitionIndex) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just output 10 digits always here I think. If there are instances that look for this name, they can be made more flexible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is again required for getting the actual path to part file while reconstructing the RDD. If we do not plan to parse the old code, we can default it to 10 digits.
How about instead of hard coding this to 5 or 10 digits we use java.text.NumberFormat, set the minimum to 5 digits but it allows going over. This way its automatically handles backwards compatible and will handle bigger numbers. We would have to change anywhere that is expecting exactly 5 digits but I haven't seen any code other then touched here |
Using the NumberFormat will simplify the case and address both types. |
// Use a common file numbering format which defaults to 5 digits for saving various part files. | ||
val numFormatter = NumberFormat.getIntegerInstance() | ||
numFormatter.setMinimumIntegerDigits(5) | ||
numFormatter.setMaximumIntegerDigits(10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we really need to set the max. Every place this is used we pass an id in that is already restricted to an int. If we did happen to change those other interfaces to take say a long this would then break again. I'm thinking we should just remove.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay.
Test build #66454 has finished for PR 15370 at commit
|
Test build #66456 has finished for PR 15370 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is getting overly complex. Everywhere that creates a file name just creates it with 10 digits, just as it creates with 5 digits now. Everywhere that reads the file name should simply assume it's of the form "part-[0-9]+". I don't think it needs a formatter or more complexity than that.
@@ -71,6 +72,11 @@ private[spark] object CallSite { | |||
private[spark] object Utils extends Logging { | |||
val random = new Random() | |||
|
|||
// Use a common file numbering format which defaults to 5 digits for saving various part files. | |||
val numFormatter = NumberFormat.getIntegerInstance() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a global utility class; it shouldn't have an object so specific to one usage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used by multiple different classes for formatting the number if seems fine to be in Utils. How about instead we have a function in here that is formatPartitionNumber(int id) which internally uses the number formatter?
If we assume file name of the form "part-[0-9]+"
Also, with the NumberFormat impl, files continue to be named upto 5 digits by default. Only when you exceed 100000 it starts with 6 digits, 7 digits and so on. This takes care of the old format as well and handles the case exceeding the current limit. |
Yeah, the name should be zero-padded to 10 digits to make the lexical ordering match the intended ordering. It may not matter, but seems a bit safer. OK right the code path doesn't just list a dir for files and it tries to reconstruct it. Well, in that case, I think I'd vote for not trying to be 'compatible' here. It should assume the new 10-digit-padded name, or else this is going to add complexity and overhead in several places, checking for two possible files. As for the other related changes, I tend to like the idea of making these similar changes to other outputs, but I'm not as clear about the implications of that and how they're read. Those outputs would be used across versions. At least, we'd have to make sure the lexical ordering stays correct. I'm inclined to leave it if it's not a problem but am not sure. |
I don't consider this complex, its simply one number formatter and you say get. Padding to 10 digits isn't necessary in 99% of the cases, so you are just using longer file strings and more digits if its always 10. Also if you use the number formatter in the other cases you don't have to worry about breaking backwards compatibility. It still uses 5 digits up until you would really need. I think we should leave this as number formatter. I also think if we are using it for all the part file formatting leaving in utils makes sense. We could rename it slightly to make it more apparent about its usage. |
/** | ||
* Return the checkpoint file name for the given partition. | ||
* Old format is supported so that we can successfully parse previously checkpointed files. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say just remove this comment
If you still believe this to complex can you please clarify why so we can discuss. This is basically 5 lines of code (yes lines of code aren't good complexity indicator but..), 3 of them to create the formatter, one to change the sort to ignore prefix and one to actually use the formatter which defaults to same behavior as before. |
The central problem with the formatter is just that it doesn't output padding, and I don't see a reason to make the lexical ordering no longer match numerical, as it does now. You can make it output padding, but then we just have exactly the behavior given by three simple characters in a format string. It seems appealing to centralize this, but it's yet another random utility method in a heavily-overloaded class, adds a small redirection, and isn't thread-safe even. This seems like more than enough reason. Merely adjusting the format pattern seems correct and simple, if indeed there's no need to look for 5-digit file names. That should be true of checkpoint files. I'm not sure about the others and therefore not as clear that change should be made here. |
We can do this without the formatter as well. The issue was lying around with sorting the file names as string and fixing that will solve the problem. That ways if we have more than 100000 files, it would still be able to parse it correctly. I will push the changes shortly. |
cb3271a
to
d9403f8
Compare
Test build #66502 has finished for PR 15370 at commit
|
All tests passed. Error unrelated. |
retest this please |
Ah you are right, sorry I totally missed that this is purely a sorting problem. I was thinking the %05d was causing an issue but it doesn't. |
Test build #66514 has finished for PR 15370 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh man I guess I overlooked that basic point from the beginning too -- it's already generating partition numbers greater than 99999 where needed, and just needs to be sorted correctly.
+1 |
## What changes were proposed in this pull request? Currently the no. of partition files are limited to 10000 files (%05d format). If there are more than 10000 part files, the logic goes for a toss while recreating the RDD as it sorts them by string. More details can be found in the JIRA desc [here](https://issues.apache.org/jira/browse/SPARK-17417). ## How was this patch tested? I tested this patch by checkpointing a RDD and then manually renaming part files to the old format and tried to access the RDD. It was successfully created from the old format. Also verified loading a sample parquet file and saving it as multiple formats - CSV, JSON, Text, Parquet, ORC and read them successfully back from the saved files. I couldn't launch the unit test from my local box, so will wait for the Jenkins output. Author: Dhruve Ashar <dhruveashar@gmail.com> Closes #15370 from dhruve/bug/SPARK-17417. (cherry picked from commit 4bafaca) Signed-off-by: Tom Graves <tgraves@yahoo-inc.com>
## What changes were proposed in this pull request? Currently the no. of partition files are limited to 10000 files (%05d format). If there are more than 10000 part files, the logic goes for a toss while recreating the RDD as it sorts them by string. More details can be found in the JIRA desc [here](https://issues.apache.org/jira/browse/SPARK-17417). ## How was this patch tested? I tested this patch by checkpointing a RDD and then manually renaming part files to the old format and tried to access the RDD. It was successfully created from the old format. Also verified loading a sample parquet file and saving it as multiple formats - CSV, JSON, Text, Parquet, ORC and read them successfully back from the saved files. I couldn't launch the unit test from my local box, so will wait for the Jenkins output. Author: Dhruve Ashar <dhruveashar@gmail.com> Closes apache#15370 from dhruve/bug/SPARK-17417.
What changes were proposed in this pull request?
Currently the no. of partition files are limited to 10000 files (%05d format). If there are more than 10000 part files, the logic goes for a toss while recreating the RDD as it sorts them by string. More details can be found in the JIRA desc here.
How was this patch tested?
I tested this patch by checkpointing a RDD and then manually renaming part files to the old format and tried to access the RDD. It was successfully created from the old format. Also verified loading a sample parquet file and saving it as multiple formats - CSV, JSON, Text, Parquet, ORC and read them successfully back from the saved files. I couldn't launch the unit test from my local box, so will wait for the Jenkins output.