Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12197] [SparkCore] Kryo & Avro - Support Schema Repo #13761

Closed
wants to merge 3 commits into from

Conversation

RotemShaul
Copy link

[SPARK-12197] [SparkCore] Kryo & Avro - Support Schema Repo

What changes were proposed in this pull request?

Adding SchemaRepository for Avro Schemas when using Spark-Core and GenericRecords

How was this patch tested?

Unit tests and manual tests committed.
Also, we've done this change in our private repo and used it in our app.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@RotemShaul
Copy link
Author

RotemShaul commented Jun 18, 2016

Hi, the JIRA task that was closed on 'wont fix' since the PR had conflicts and you cleaned old PRs, I created a new clean one.

@hvanhovell
Copy link
Contributor

Don't Datasets and Encoders make this less relevant? What would be the use case here?

@RotemShaul
Copy link
Author

RotemShaul commented Jun 20, 2016

Not as far as I understand(correct me if I'm wrong), I'll explain the use case :

  1. We do not want to serialize the Avro schema of our processed events,
    only its id
  2. Our job can process multiple events with different schemas in a single
    RDD [for instance, we don't use Avro specific records, so this can't be our
    type in that DataSet. The DataSet will still have to remain
    DataSet[GenericRecord], can't be DataSet[Event]. Also GenericRecord or
    SpecificRecord of Avro is not a case class. )
  3. Most important - we do not know the schemas ahead of time, and we should
    be able to process an RDD with multiple different schemas (which are not
    known to us ahead of time, as it is constantly evolving)

The whole point of using GenericRecords in Avro and not specific records is
since we don't know the schemas ahead of time, we are processing events
each with different schema version (according to Avro schema resolution
rules and evolution).

Our specific use case - kind of sessionization of events by key. We don't
do analytics or aggregations, we get input data RDD[GenericRecord] and
return RDD[Frame[K,Iterator[GenericRecord]] and we store that output
somewhere.

Now, since we only do groupBy, we do not care about the events body
content. (we're infrastructure. the key is one field and sits somewhere in
header that doesn't change), and such don't access any of it body fields,
so we don't care about the evolving body schema and do not access any
fields other than the key.
If we were to put that GenericRecord into some Avro generated Specific
Record (or some CaseClass) we'd have to know all the fields ahead of time,
and the encoding into that class will result in failure or partial when
we'll have mismatch in the events new added fields or old removed fields.

If basically a DataSet has a tabular format behind the scenes, we can't
have such table format for our data set - as our table is 'dynamic', each
event has different schema. Taking the superset schema will result in new
empty columns and basically changing the events.

On Mon, Jun 20, 2016 at 8:35 AM, Herman van Hovell <notifications@github.com

wrote:

Don't Datasets and Encoders make this less relevant? What would be the
use case here?


You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
#13761 (comment), or mute
the thread
https://github.com/notifications/unsubscribe/AHlUNCQ6FThi-DA7V9gkqvUrgG3c4cprks5qNiasgaJpZM4I48Q9
.

@srowen
Copy link
Member

srowen commented Jun 20, 2016

I generally wouldn't open a PR three times for one issue when it's not getting traction, which is why the JIRA was closed. This is decent discussion, but if nobody's on board with the change this time, please let's leave it closed.

@RotemShaul
Copy link
Author

Sure - thought it was closed since the PR got old and had conflicts.
This PR basically generalizes an already implemented solution for the problem of serializing schema overhead. The solution introduced in spark 1.5 solves the problem for known static schemas(with spark.avro.registeredSchemas property ), this add the ability to solve the problem for dynamic schemas. (using repo)

@rxin
Copy link
Contributor

rxin commented Jul 23, 2016

@RotemShaul is this something doable by implementing a custom serializer outside Spark?

@RotemShaul
Copy link
Author

RotemShaul commented Jul 23, 2016

@rxin
Indeed it is, but then you lose the already implemented
GenericAvroSerializer abilities which come out of the box with Spark.
(Caching / Registering of static schemas )

As Spark already chose to (partially) support Avro from within SparkCore,
to me it makes sense it will also support schema repos, as they are very
common with Avro users to deal with Schema Evolution.
It was this partial support that actually sparked the idea of 'if they
support registering of Avro schemas, why not go all the way ?' and that's
why I created the PR in the first place.

Avro Generic Records and Spark-Core users will always face the
serialization problem of schemas, some might be able to solve it with
static schemas, and other will need the dynamic solution. It makes sense
that either SparkCore will provide solution for both use cases or none of
them. (and let it be resolved by custom serializer)

Just my opinion. In my current workplace - I took your
GenericAvroSerializer, added few lines of code to it, and used it as custom
serializer. But it could be generalized - hence the PR.
So providing it outside Spark will result in code duplication with your current impl, and I'm not really sure 'where'.

On Sat, Jul 23, 2016 at 5:14 AM, Reynold Xin notifications@github.com
wrote:

@RotemShaul https://github.com/RotemShaul is this something doable by
implementing a custom serializer outside Spark?


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#13761 (comment), or mute
the thread
https://github.com/notifications/unsubscribe-auth/AHlUNO0V3mMbYBJNDLLn3HEVMlxD4vXYks5qYXkhgaJpZM4I48Q9
.

@maropu maropu mentioned this pull request Apr 23, 2017
maropu added a commit to maropu/spark that referenced this pull request Apr 23, 2017
@asfgit asfgit closed this in e9f9715 Apr 24, 2017
peter-toth pushed a commit to peter-toth/spark that referenced this pull request Oct 6, 2018
This pr proposed to close stale PRs. Currently, we have 400+ open PRs and there are some stale PRs whose JIRA tickets have been already closed and whose JIRA tickets does not exist (also, they seem not to be minor issues).

// Open PRs whose JIRA tickets have been already closed
Closes apache#11785
Closes apache#13027
Closes apache#13614
Closes apache#13761
Closes apache#15197
Closes apache#14006
Closes apache#12576
Closes apache#15447
Closes apache#13259
Closes apache#15616
Closes apache#14473
Closes apache#16638
Closes apache#16146
Closes apache#17269
Closes apache#17313
Closes apache#17418
Closes apache#17485
Closes apache#17551
Closes apache#17463
Closes apache#17625

// Open PRs whose JIRA tickets does not exist and they are not minor issues
Closes apache#10739
Closes apache#15193
Closes apache#15344
Closes apache#14804
Closes apache#16993
Closes apache#17040
Closes apache#15180
Closes apache#17238

N/A

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes apache#17734 from maropu/resolved_pr.

Change-Id: Id2e590aa7283fe5ac01424d30a40df06da6098b5
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants