-
Notifications
You must be signed in to change notification settings - Fork 932
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update how to manage host UDF instance #17770
Conversation
Signed-off-by: Chong Gao <res_life@163.com>
@revans2 Help review the solution first. |
/ok to test |
/ok to test |
/ok to test |
Passed the building locally. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the changes here is necessary for auto-closing the aggregation instance. The cloned instances are automatically deleted by libcudf. The original instance is closed by the close
method in the derived Java wrapper class. So the changes should only be in the plugin. We only need to refactor the close
method, moving it to the base Java wrapper class.
cpp/include/cudf/aggregation.hpp
Outdated
@@ -610,7 +610,7 @@ class host_udf_base; | |||
* @return A HOST_UDF aggregation object | |||
*/ | |||
template <typename Base = aggregation> | |||
std::unique_ptr<Base> make_host_udf_aggregation(std::unique_ptr<host_udf_base> host_udf); | |||
std::unique_ptr<Base> make_host_udf_aggregation(host_udf_base* host_udf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am no C++ expert here, but this feels like a step backwards in safety? A unique_ptr
makes it clear that host_udf
will be owned by aggregation
returned and it will be responsible for closing it. This new API has no such contract and all I can assume is "be careful that host_udf is not closed before the aggregation is or else really bad things will happen".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's a backward, we can revert it. But for this PR, it's also applicable to free UDF instance automatically.
This make_host_udf_aggregation is used for how to creating a Aggregation. We can pass in a unique_ptr<host_udf_base>.
There are two palce to create aggregation, one is here, just revert it.
As before, create aggregation via a clone.
Another palce is via Java JNI:
In class HostUDFAggregation
:
long createNativeInstance() {
// The created host Agg instance takes the ownership of the passed in UDF instance.
// When a host Agg instance is released, the UDF instance in it is also released.
return Aggregation.createHostUDFAgg(wrapper.createUDFInstance());
}
If the code is like above, dynamically create UDF instance, and let Agg take the owner of life cycle, then we are free to close it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change makes the API less safe because it doesn't prevent one from managing a single resource with more than one unique pointers:
{
auto* x = new X;
auto y1 = make_host_udf_aggregation(x);
auto y2 = make_host_udf_aggregation(x);
} // double free when out of scope
@@ -307,8 +307,8 @@ JNIEXPORT jlong JNICALL Java_ai_rapids_cudf_Aggregation_createHostUDFAgg(JNIEnv* | |||
JNI_NULL_CHECK(env, udf_native_handle, "udf_native_handle is null", 0); | |||
try { | |||
cudf::jni::auto_set_device(env); | |||
auto const udf_ptr = reinterpret_cast<cudf::host_udf_base const*>(udf_native_handle); | |||
auto output = cudf::make_host_udf_aggregation(udf_ptr->clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is wrong with this code? We would clone the underlying host_udf to be used, which then let the java caller be free to close the HostUDFWrapper whenever they wanted to.
I am also fine with the host_udf_base*
being created on demand like with the createUDFInstance()
API and passed down to this code. But in that case we should not need to modify the C++ API to make is less secure. Instead we should wrap it in a unique_ptr and pass that to cudf::make_host_udf_aggregation
so the aggregation itself can delete it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function Java_ai_rapids_cudf_Aggregation_createHostUDFAgg
is only called by HostUDFAggregation
, in function:
long createNativeInstance() {
// The created host Agg instance takes the ownership of the passed in UDF instance.
// When a host Agg instance is released, the UDF instance in it is also released.
return Aggregation.createHostUDFAgg(wrapper.createUDFInstance());
}
Refer to the above code, the passed in udf_native_handle is created by wrapper.createUDFInstance
, if document the createUDFInstance
as private function, then it's secure. And the new version directly use the passed in UDF instance, it's more effective without clone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see no reason why so we only create the native UDF instance only on demand but not upon initialization because the native instance will be used eventually, isn't it? Or did I missed something here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only create the native UDF instance only on demand
This is reasonable, in this PR NVIDIA/spark-rapids#11638, I have to create a UDF instance, and pass into GroupByAggregation, and have another place to close the instance in HyperLogLogPlusPlusHostUDF
Below code is memory leaking hll.
override lazy val groupByAggregate: GroupByAggregation = {
val hll =new HyperLogLogPlusPlusHostUDF(AggregationType.GroupBy, precision)
GroupByAggregation.hostUDF(hll)
}
If we create the UDF instance lazily, and bind the UDF instance to a Aggregation instance like this PR, then we are free to release the UDF instance.
This PR is leveraging existing mechanism to release UDF instance.
Yes, the cloned instances are hold by a std::unique_ptr<aggregation> host_udf_aggregation::clone() const The created aggregation instance by Java JNI will also be auto closed by Java API. Refer to the description. |
The original code: class host_udf_aggregation final : public groupby_aggregation,
public reduce_aggregation,
public segmented_reduce_aggregation {
public:
std::unique_ptr<host_udf_base> udf_ptr;
explicit host_udf_aggregation(std::unique_ptr<host_udf_base> udf_ptr_);
}; The aggregation and the host UDF are bound into one resource, I mean life cycles are the same. // take the ownership and put into a unique ptr
host_udf_aggregation(host_udf_base* raw_udf_ptr_) {
// call to original constructor
host_udf_aggregation(std::unique_ptr(raw_udf_ptr_));
} Then the creation via Java JNI is from the below code: long createNativeInstance() {
// The created host Agg instance takes the ownership of the passed in UDF instance.
// When a host Agg instance is released, the UDF instance in it is also released.
return Aggregation.createHostUDFAgg(wrapper.createUDFInstance());
} I'll update the code tomorrow and remove unsecure chagnes. |
Now the code is almost done, please review first @ttnghia, @revans2. It will like: case class CudfMergeHLLPP(override val dataType: DataType,
precision: Int)
override lazy val groupByAggregate: GroupByAggregation = {
// the created hll contains a native UDF instance.
val hll = new HyperLogLogPlusPlusHostUDF(AggregationType.GroupByMerge, precision)
// here the UDF instance in hll is not closed, it's leaked here.
GroupByAggregation.hostUDF(hll)
}
} def performGroupByAggregation(preProcessed: ColumnarBatch): ColumnarBatch = {
withResource(new NvtxRange("groupby", NvtxColor.BLUE)) { _ =>
withResource(GpuColumnVector.from(preProcessed)) { preProcessedTbl =>
val groupOptions = cudf.GroupByOptions.builder()
.withIgnoreNullKeys(false)
.withKeysSorted(doSortAgg)
.build()
val cudfAggsOnColumn = cudfAggregates.zip(aggOrdinals).map {
case (cudfAgg, ord) => cudfAgg.groupByAggregate.onColumn(ord)
}
// perform the aggregate
val aggTbl = preProcessedTbl
.groupBy(groupOptions, groupingOrdinals: _*)
.aggregate(cudfAggsOnColumn.toSeq: _*)
withResource(aggTbl) { _ =>
GpuColumnVector.from(aggTbl, postStepDataTypes.toArray)
}
}
}
} Iterate all the If cudfAgg.groupByAggregate is instance of `HostUDFAggregation` {
agg.asInstance[HostUDFAggregation].getWrapper.closeUDF();
} We can put the above code into table.aggregate, but we MUST check all the aggregate variations. |
Your last commit makes sense and clarified the changes. This way is a clean solution indeed 👍 |
/ok to test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please polish the docs. Also please wait for @revans2 to see if he has other comments before merging.
/ok to test |
Tests passed locally with Spark-Rapids PR and JNI PR. |
/ok to test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a blind approval from my side to help unblock @res-life per his request. As Chong has already got approval from Nghia
/ok to test |
auto to_del = reinterpret_cast<cudf::host_udf_base*>(ptr); | ||
delete to_del; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we can colse all kind of derived UDF instances.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, it's in the derived class, link.
*/ | ||
public abstract class HostUDFWrapper implements AutoCloseable { | ||
public final long udfNativeHandle; | ||
public abstract class HostUDFWrapper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pardon my ignorance, but why is this not an AutoCloseable
any more?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previsously HostUDFWrapper
has a udfNativeHandle
(UDF native instance), so it's AutoCloseable
which is expected to release the UDF instance in it.
Now, the udfNativeHandle
is removed, instead we create a function createUDFInstance
to lazily create this udfNativeHandle
and use closeUDFInstance
it.
Now the createUDFInstance
and closeUDFInstance
are in the same class, and after UDF is created and used, we directly remove it, so this will reduce the possiblity of memory leak, and make the API more simple.
long udf = 0; | ||
try { | ||
udf = wrapper.createUDFInstance(); | ||
return Aggregation.createHostUDFAgg(udf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UDF instance is hold by Aggregation instance via a unique_ptr
, the Aggregation instance is responsible for the life cycle of UDF instance.
Here pass in a UDF instance and createHostUDFAgg
clone it and construct a Aggregation instance, in the finally block, we close the original UDF instance.
/merge |
/ok to test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks fine. I have some minor nits, but they are not important.
Description
Update how to manage host UDF instance:
Usage:
Now dynamically create a UDF instance, and finally close it after a Aggregation is created.
In this way, the changes are all in cuDF JNI scope. We do not need to introduce any resource management code in Spark-Rapids repo.
Checklist