-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Hive insert overwrite when using S3 storage #17307
Conversation
2e5a6aa
to
be04a09
Compare
@jainxrohit Can you please review this? |
@NikhilCollooru : Sorry, both these PRs are not the solving same problem. This PR is to support INSERT(overwrite) into existing partition on S3. It doesn't fix anything related to drop partition. |
Okay . I see it now. This PR is about retaining the directory location/path when the partition is overwritten. can you briefly explain what the other commits are ? |
The other commits add test infrastructure to run docker based tests using docker java API. The Minio container is used as S3 substitute and HiveHadoop container to run hadoop/hive servers. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@NikhilCollooru, feel free to review the PR. Once you feel it's in a good shape, I can help to take a look.
presto-hive/src/test/java/com/facebook/presto/hive/BaseTestHiveInsertOverwrite.java
Show resolved
Hide resolved
5e8ccf9
to
99a028b
Compare
Thanks @NikhilCollooru for the review. I addressed the comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
first 4 commits LGTM; still reviewing
public abstract class BaseTestContainer | ||
implements AutoCloseable | ||
{ | ||
private final Logger log; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a static class with direct assignment from Logger.get(BaseTestContainer.getClass());
private final Optional<Network> network; | ||
private final int startupRetryLimit; | ||
|
||
private GenericContainer<?> container; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
|
||
import static java.lang.String.format; | ||
|
||
public class Minio |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does minio stand for? I guess it's MinIOContainer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is in containers subpackage. Should we rename it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ya sure
import java.util.Optional; | ||
import java.util.Set; | ||
|
||
public class HiveHadoop |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HiveHadoopContainer
?
import static java.util.Objects.requireNonNull; | ||
import static org.testcontainers.containers.Network.newNetwork; | ||
|
||
public class HiveMinioDataLake |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MinIO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you asking to change to name to HiveMinIODataLake?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
import static com.facebook.presto.hive.HiveSessionProperties.InsertExistingPartitionsBehavior; | ||
import static com.facebook.presto.hive.HiveSessionProperties.InsertExistingPartitionsBehavior.APPEND; | ||
import static com.facebook.presto.hive.HiveSessionProperties.InsertExistingPartitionsBehavior.ERROR; | ||
import static com.facebook.presto.hive.HiveSessionProperties.InsertExistingPartitionsBehavior.OVERWRITE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move InsertExistingPartitionsBehavior
to this class so that hive config doesn't depend on hive session properties.
@@ -93,6 +98,7 @@ | |||
private boolean createEmptyBucketFiles = true; | |||
private boolean insertOverwriteImmutablePartitions; | |||
private boolean failFastOnInsertIntoImmutablePartitionsEnabled = true; | |||
private Optional<InsertExistingPartitionsBehavior> insertExistingPartitionsBehavior = Optional.empty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's weird to have optional here. Can we infer the default through getDefaultInsertExistingPartitionsBehavior
? Also, I guess our goal here is to deprecate isInsertOverwriteImmutablePartitionEnabled
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isInsertOverwriteImmutablePartitionEnabled is still a valid config. In the HiveSessionProperties, we set the default by calling getInsertExistingPartitionsBehavior() which is same as getDefaultInsertExistingPartitionsBehavior() except getInsertExistingPartitionsBehavior also considers the explicit value if any set by the user for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I meant was once we have insertExistingPartitionsBehavior
as an option in the config, there is no need to have isInsertOverwriteImmutablePartitionEnabled
anymore. Having the overlapping configs will confuse the users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we can infer the default value for this through getDefaultInsertExistingPartitionsBehavior(). If we do, and immutable-partitions is set to true later, we will return APPEND which is not valid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then can we make it non optional? Having an optional here makes the config ambiguous; check my other comment on fail hard
private static InsertExistingPartitionsBehavior getDefaultInsertExistingPartitionsBehavior(HiveClientConfig hiveClientConfig) | ||
{ | ||
if (!hiveClientConfig.isImmutablePartitions()) { | ||
return APPEND; | ||
} | ||
|
||
return hiveClientConfig.isInsertOverwriteImmutablePartitionEnabled() ? OVERWRITE : ERROR; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is still valuable with some part of it. Especially when we specify APPEND on immutable partitions (which should throw an error)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the default case, the method getInsertExistingPartitionsBehavior() is same as this method. We will still throw error for APPEND on immutable partitions. There is no change in that.
@@ -2091,6 +2125,27 @@ else if (partitionUpdate.getUpdateMode() == NEW || partitionUpdate.getUpdateMode | |||
.collect(toList()))); | |||
} | |||
|
|||
private void removeNonCurrentQueryFiles(ConnectorSession session, Path partitionPath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a javadoc to explain why we need to clean up files here?
if (handle.getLocationHandle().getWriteMode() == DIRECT_TO_TARGET_EXISTING_DIRECTORY) { | ||
removeNonCurrentQueryFiles(session, partitionUpdate.getTargetPath()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also explain a bit more of this branch? We are going to overwrite a partition, but this seems to be deleting the files.
metastore.addPartition( | ||
session, | ||
handle.getSchemaName(), | ||
handle.getTableName(), | ||
table.getStorage().getLocation(), | ||
false, | ||
partition, | ||
partitionUpdate.getWritePath(), | ||
partitionStatistics); | ||
} | ||
} | ||
else { // New partition | ||
metastore.addPartition( | ||
session, | ||
handle.getSchemaName(), | ||
handle.getTableName(), | ||
table.getStorage().getLocation(), | ||
false, | ||
partition, | ||
partitionUpdate.getWritePath(), | ||
partitionStatistics); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to simplify the logic as they are duplicated code. The old logic is a good example.
99a028b
to
dce9a5e
Compare
Thanks @highker . Addressed all the comments. Please review. |
@nmahadevuni, can you separate out the first 4 commits into a separate PR? Those are good and we can merge faster. For the last one, I might have more comments. |
rebase maybe? |
dce9a5e
to
dd5e9ab
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nits only
@@ -53,6 +61,24 @@ | |||
"hive.optimized-reader.enabled"}) | |||
public class HiveClientConfig | |||
{ | |||
public enum InsertExistingPartitionsBehavior |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move it closer to the setter and getter like how other enums are defined in the class?
@@ -93,6 +98,7 @@ | |||
private boolean createEmptyBucketFiles = true; | |||
private boolean insertOverwriteImmutablePartitions; | |||
private boolean failFastOnInsertIntoImmutablePartitionsEnabled = true; | |||
private Optional<InsertExistingPartitionsBehavior> insertExistingPartitionsBehavior = Optional.empty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then can we make it non optional? Having an optional here makes the config ambiguous; check my other comment on fail hard
@@ -604,6 +631,19 @@ public HiveClientConfig setInsertOverwriteImmutablePartitionEnabled(boolean inse | |||
return this; | |||
} | |||
|
|||
public InsertExistingPartitionsBehavior getInsertExistingPartitionsBehavior() | |||
{ | |||
return insertExistingPartitionsBehavior.orElse(immutablePartitions ? (isInsertOverwriteImmutablePartitionEnabled() ? OVERWRITE : ERROR) : APPEND); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's fail hard if the two configs do not match with each other instead of relying on optional empty
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If these configs don't match, we will throw error at line 628 checkArgument call. I have removed optional though.
@@ -604,6 +631,19 @@ public HiveClientConfig setInsertOverwriteImmutablePartitionEnabled(boolean inse | |||
return this; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let make isInsertOverwriteImmutablePartitionEnabled
a LegacyConfig
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my bad; actually i meant having a @Deprecated
annotation for it.
partitionStatistics); | ||
|
||
// New partition or overwriting existing partition by staging and moving the new partition | ||
if (!existingPartition || (existingPartition && handle.getLocationHandle().getWriteMode() != DIRECT_TO_TARGET_EXISTING_DIRECTORY)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like a bug? You mean
if (!existingPartition || partitionUpdate.getUpdateMode() == OVERWRITE)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If partitionUpdate.getUpdateMode() != OVERWRITE, then we will throw error on line 2089. If we just check for (partitionUpdate.getUpdateMode() == OVERWRITE) here, even when writeMode is DIRECT_TO_TARGET_EXISTING_DIRECTORY, we will add partition, which is wrong. Since for DIRECT_TO_TARGET_EXISTING_DIRECTORY, the partition already exists and we are removing old files. I simplified the condition to if (!existingPartition || handle.getLocationHandle().getWriteMode() != DIRECT_TO_TARGET_EXISTING_DIRECTORY)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If that is the case, we can still simlify the condition to be
if (!existingPartition || handle.getLocationHandle().getWriteMode() != DIRECT_TO_TARGET_EXISTING_DIRECTORY)
?
Please fix the test failure; seem to be related |
32c890c
to
d269b17
Compare
This implementation writes to target partition directory directly. After successful write, all files within partition whose name prefix or suffix don't match current query ID are removed. Cherry-pick of trinodb/trino@96e77e7 Co-authored-by: Arkadiusz Czajkowski <arek@starburstdata.com>
d269b17
to
7860880
Compare
The test failure most likely is because the container stopped before the actual tests ran. This is happening intermittently. I'm now running them in a separate profile in a new job "hive tests / hive-dockerized-tests" in "hive-tests.yml". "test other modules / test-other-modules " failure is not related. |
|
It timed out. It should have no issues running. Can you please restart just that one? |
@highker test-other-modules passed now. |
Thank you @NikhilCollooru and @highker for the review. |
Cherry-pick of trinodb/trino#9234
Co-authored-by: Arkadiusz Czajkowski arek@starburstdata.com
This PR fixes insert overwrite operation on S3 backing storage when below property is set
Applied fix writes to partition target directory directly. After successfully writing new files coordinator deletes all files within partition whose name prefix or suffix didn't match current query ID
Test plan - Test containers for HMS and MinIO allowing to test and debug S3 related issue easily from IDE
based on above local dockerized S3 data lake