Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added aggregate & conditonal readers for parquet #172

Merged
merged 5 commits into from
Nov 6, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,26 @@ object DataReaders {
aggregateParams: AggregateParams[T]
): AggregateCSVProductReader[T] = csvProduct(path, key, aggregateParams)

/**
* Creates a [[AggregateParquetProductReader]]
*/
def parquetProduct[T <: Product : Encoder : WeakTypeTag](
path: Option[String] = None,
key: T => String = randomKey _,
aggregateParams: AggregateParams[T]
): AggregateParquetProductReader[T] = new AggregateParquetProductReader[T](
readPath = path, key = key, aggregateParams = aggregateParams
)

/**
* Creates a [[AggregateParquetProductReader]], but is called parquetCase so it's easier to understand
*/
def parquetCase[T <: Product : Encoder : WeakTypeTag](
path: Option[String] = None,
key: T => String = randomKey _,
aggregateParams: AggregateParams[T]
): AggregateParquetProductReader[T] = parquetProduct(path, key, aggregateParams)

}

/**
Expand Down Expand Up @@ -232,6 +252,27 @@ object DataReaders {
conditionalParams: ConditionalParams[T]
): ConditionalCSVProductReader[T] = csvProduct(path, key, conditionalParams)

/**
* Creates a [[ConditionalParquetProductReader]]
*/
def parquetProduct[T <: Product : Encoder : WeakTypeTag]
(
path: Option[String] = None,
key: T => String = randomKey _,
conditionalParams: ConditionalParams[T]
): ConditionalParquetProductReader[T] = new ConditionalParquetProductReader[T](
readPath = path, key = key, conditionalParams = conditionalParams
)

/**
* Creates a [[ConditionalParquetProductReader]], but is called parquetCase so is easier to understand
*/
def parquetCase[T <: Product : Encoder : WeakTypeTag](
path: Option[String] = None,
key: T => String = randomKey _,
conditionalParams: ConditionalParams[T]
): ConditionalParquetProductReader[T] = parquetProduct(path, key, conditionalParams)

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,33 @@ class ParquetProductReader[T <: Product : Encoder]
maybeRepartition(data, params)
}
}

/**
* Data Reader for Parquet events, where there may be multiple records for a given key. Each parquet record
* will be automatically converted to type T that defines an [[Encoder]].
* @param readPath default path to data
* @param key function for extracting key from record
* @param aggregateParams params for time-based aggregation
* @tparam T
*/
class AggregateParquetProductReader[T <: Product : Encoder : WeakTypeTag]
(
readPath: Option[String],
key: T => String,
val aggregateParams: AggregateParams[T]
)extends ParquetProductReader[T](readPath, key) with AggregateDataReader[T]

/**
* Data Reader for Parquet events, when computing conditional probabilities. There may be multiple records for
* a given key. Each parquet record will be automatically converted to type T that defines an [[Encoder]].
* @param readPath default path to data
* @param key function for extracting key from record
* @param conditionalParams params for conditional aggregation
* @tparam T
*/
class ConditionalParquetProductReader[T <: Product : Encoder : WeakTypeTag]
(
readPath: Option[String],
key: T => String,
val conditionalParams: ConditionalParams[T]
)extends ParquetProductReader[T](readPath, key) with ConditionalDataReader[T]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please fix scalastyle warnings

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tovbinm Seems like we have are going to have similar logic for testing these readers. Will look at the possibility of extracting these as Table Tests and update the PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any tests included? Why not?