Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: core interface #2

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions exmaple/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from pyspark.sql import DataFrame

from sparkle import Sparkle
from sparkle.data_reader import IcebergReader, KafkaReader, MultiDataReader
from sparkle.data_writer import MultiDataWriter, IcebergWriter, KafkaWriter
from sparkle.models import InputField, Trigger, TriggerMethod


app = Sparkle(
reader=MultiDataReader(
IcebergReader(database_name="db1"),
KafkaReader(server="localhost:9092"),
),
writer=MultiDataWriter(
IcebergWriter(database_name="db1"),
IcebergWriter(database_name="db2"),
KafkaWriter(server="localhost:9092"),
),
)


@app.pipeline(
name="orders",
inputs=[
InputField("new_orders", KafkaReader, topic="com-sap-new-orders-v0.1"),
InputField("history_orders", IcebergReader, table="orders-v0.1"),
],
)
def orders(
trigger: Trigger, new_orders: DataFrame, history_orders: DataFrame
) -> DataFrame:
"""The pipeline to process the new and historical orders."""
match trigger.method:
case TriggerMethod.PROCESS:
return new_orders.union(history_orders)
case TriggerMethod.REPROCESS:
return new_orders
case _:
raise ValueError(f"Unsupported trigger method: {trigger.method}")
25 changes: 24 additions & 1 deletion pdm.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ description = "✨ A meta framework for Apache Spark, helping data engineers to
authors = [
{name = "Reza (Shahin) Khanipour", email = "shahin@DataChef.co"},
]
dependencies = []
dependencies = [
"pyspark>=3.5.2",
]
requires-python = ">=3.11"
readme = "README.md"
license = {text = "Apache-2.0"}
Expand Down Expand Up @@ -48,8 +50,9 @@ select = [
"E", # pycodestyle
"F", # pyflakes
"UP", # pyupgrade
"D" # pydocstyle
"D", # pydocstyle
]
extend-ignore = ["D100", "E501"]

[tool.ruff.lint.pydocstyle]
convention = "google"
Expand Down
5 changes: 5 additions & 0 deletions src/sparkle/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
"""A meta framework for Apache Spark."""

from sparkle.core import Sparkle

__version__ = "0.0.0"
__all__ = ["Sparkle"]
105 changes: 105 additions & 0 deletions src/sparkle/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
"""The core module of Sparkle."""

import logging
from typing import Any
from collections.abc import Callable
from pyspark.sql import SparkSession

from .models import Pipeline, Trigger, InputField, TriggerMethod
from .data_reader import DataReader
from .data_writer import DataWriter


logger = logging.getLogger(__name__)


class Sparkle:
"""The core class of Sparkle responsible for defining and running pipelines.

...

Attributes:
----------
reader : DataReader
The data reader instance to read data from the source.
writer : DataWriter
The data writer instance to write data to the destination.
_pipelines : dict[str, Pipeline]
The dictionary to store the defined pipelines.

Methods:
-------
add_pipeline_rule(pipeline_name, description, func, method, inputs, options)
Add a trigger rule for the given pipeline.

pipeline(name, inputs, method, **options)
A decorator to define an processing job for the given pipeline.

run(trigger, session)
Process a trigger request with the given Spark session
"""

def __init__(self, reader: DataReader, writer: DataWriter) -> None:
"""Initialize the Sparkle instance with the given reader and writer."""
self.reader = reader
self.writer = writer
self._pipelines: dict[str, Pipeline] = {}

def add_pipeline_rule(
self,
pipeline_name: str,
description: str | None,
func: Callable,
method: TriggerMethod,
inputs: list[InputField],
options: dict[str, Any],
):
"""Add a trigger rule for the given pipeline."""
if pipeline_name in self._pipelines.keys():
raise RuntimeError(f"Pipeline `{pipeline_name}` is already defined.")
else:
self._pipelines[pipeline_name] = Pipeline(
name=pipeline_name,
description=description,
inputs=inputs,
func=func,
method=method,
options=options,
)

def pipeline(
self,
name: str,
inputs: list[InputField],
method: TriggerMethod = TriggerMethod.PROCESS,
**options,
) -> Callable:
"""A decorator to define an processing job for the given pipeline."""

def decorator(func):
self.add_pipeline_rule(
pipeline_name=name,
description=func.__doc__,
func=func,
inputs=inputs,
method=method,
options=options,
)

return func

return decorator

def run(self, trigger: Trigger, session: SparkSession) -> None:
"""Process a trigger request with the given Spark session."""
logger.info(
f"Pipeline `{trigger.pipeline_name}` is triggered with `{trigger.method}` method."
)

if requested_pipeline := self._pipelines.get(trigger.pipeline_name):
dataframes = self.reader.read(requested_pipeline.inputs, session)
requested_pipeline.func(trigger, **dataframes)
else:
raise NotImplementedError(
f"Pipeline `{trigger.pipeline_name}` is not defined."
)
100 changes: 100 additions & 0 deletions src/sparkle/data_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""This module contains the `DataReader` protocol and its implementations."""

from typing import Protocol
from pyspark.sql import DataFrame, SparkSession

from .models import InputField


class DataReader(Protocol):
"""A protocol to define the interface of a data reader."""

def read(
self, inputs: list[InputField], spark_session: SparkSession
) -> dict[str, DataFrame]:
"""Read data from the source and return the dataframes."""
...


class IcebergReader(DataReader):
"""A data reader to read data from Iceberg databases."""

def __init__(self, database_name: str) -> None:
"""Initialize the IcebergReader with the given database name."""
super().__init__()
self.database_name = database_name

def read(
self, inputs: list[InputField], spark_session: SparkSession
) -> dict[str, DataFrame]:
"""Read data from the Iceberg database and return the dataframes."""
raise NotImplementedError


class SqlReader(DataReader):
"""A data reader to read data from SQL databases."""

def __init__(
self,
username: str,
password: str,
server_name: str = "localhost",
) -> None:
"""Initialize the SqlReader with the given credentials."""
super().__init__()
self.username = username
self.password = password
self.server_name = server_name

def read(
self, inputs: list[InputField], spark_session: SparkSession
) -> dict[str, DataFrame]:
"""Read data from the SQL database and return the dataframes."""
raise NotImplementedError


class KafkaReader(DataReader):
"""A data reader to read data from Kafka."""

def __init__(self, server: str) -> None:
"""Initialize the KafkaReader with the given server."""
super().__init__()
self.server = server

def read(
self, inputs: list[InputField], spark_session: SparkSession
) -> dict[str, DataFrame]:
"""Read data from the Kafka server and return the dataframes."""
for input in inputs:
if _ := input.options.get("topic"):
raise NotImplementedError
else:
raise ValueError(
"Option `topic` must be provided in the `InputField` with KafkaReader types."
)

raise NotImplementedError


class MultiDataReader(DataReader):
"""A data reader to read data from multiple data readers."""

def __init__(self, *readers: DataReader) -> None:
"""Initialize the MultiDataReader with the given data readers."""
super().__init__()
self._readers = list(readers)

def read(
self, inputs: list[InputField], spark_session: SparkSession
) -> dict[str, DataFrame]:
"""Read data from the multiple data readers and return the dataframes."""
dataframes = {}

for reader in self._readers:
reader_inputs = [
input for input in inputs if input.type == reader.__class__
]
dfs = reader.read(reader_inputs, spark_session)
dataframes.update(dfs)

return dataframes
52 changes: 52 additions & 0 deletions src/sparkle/data_writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""A module to define data writers."""

from typing import Protocol
from pyspark.sql import DataFrame, SparkSession


class DataWriter(Protocol):
"""A protocol to define the interface of a data writer."""

def write(self, df: DataFrame, spark_session: SparkSession) -> None:
"""Write data to the destination."""
...


class KafkaWriter(DataWriter):
"""A data writer to write data to Kafka."""

def __init__(self, server: str) -> None:
"""Initialize the KafkaWriter with the given server."""
super().__init__()
self._server = server

def write(self, df: DataFrame, spark_session: SparkSession) -> None:
"""Write data to the Kafka server."""
df.write.format("kafka").save()


class IcebergWriter(DataWriter):
"""A data writer to write data to Iceberg databases."""

def __init__(self, database_name: str) -> None:
"""Initialize the IcebergWriter with the given database name."""
super().__init__()
self._database_name = database_name

def write(self, df: DataFrame, spark_session: SparkSession) -> None:
"""Write data to the Iceberg database."""
df.write.format("iceberg").save()


class MultiDataWriter(DataWriter):
"""A data writer to write data to multiple data writers."""

def __init__(self, *writers: DataWriter) -> None:
"""Initialize the MultiDataWriter with the given data writers."""
super().__init__()
self._writers = list(writers)

def write(self, df: DataFrame, spark_session: SparkSession) -> None:
"""Write data to the destinations."""
for writer in self._writers:
writer.write(df, spark_session)
Loading
Loading