-
Notifications
You must be signed in to change notification settings - Fork 181
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Broadcast plan over port 3238 #3756
Conversation
CodSpeed Performance ReportMerging #3756 will improve performances by 10.31%Comparing Summary
Benchmarks breakdown
|
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3756 +/- ##
==========================================
+ Coverage 77.31% 77.66% +0.34%
==========================================
Files 734 735 +1
Lines 93737 95495 +1758
==========================================
+ Hits 72475 74167 +1692
- Misses 21262 21328 +66
|
daft/dataframe/dataframe.py
Outdated
@@ -177,6 +199,7 @@ def explain( | |||
file (Optional[io.IOBase]): Location to print the output to, or defaults to None which defaults to the default location for | |||
print (in Python, that should be sys.stdout) | |||
""" | |||
self.explain_broadcast() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this correct? Why would be broadcast to the dashboard if we're not executing a query
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I was thinking if a user just wanted to see what plan would be produced without executing a query, they could also use the dashboard UI to view the query plan.
In this case, the UI would show:
- the query plan
- the time it took to produce the plan
- "N/A" for the time it took to execute the query (since
.explain
doesn't execute queries)
daft/dataframe/dataframe.py
Outdated
@@ -2818,6 +2841,7 @@ def collect(self, num_preview_rows: Optional[int] = 8) -> "DataFrame": | |||
DataFrame: DataFrame with materialized results. | |||
""" | |||
self._materialize_results() | |||
self.explain_broadcast() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe consider making this a decorator for all "execution" methods (show
, collect
, write_parquet
etc)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. A decorator makes sense here. Will update in the coming commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to use decorator instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will need to add this to more APIs. Do we have an exhaustive list somewhere of all the APIs we would want to add this decorator to?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't merge this because of requests
dependency
@@ -125,6 +125,10 @@ class DaftContext: | |||
|
|||
_runner: Runner | None = None | |||
|
|||
_enable_broadcast: bool = False | |||
_broadcast_addr: str = "127.0.0.1" | |||
_broadcast_port: int = 3238 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please put this behind a DashboardConfig dataclass?
@@ -197,6 +201,31 @@ def get_context() -> DaftContext: | |||
return _DaftContext | |||
|
|||
|
|||
def broadcast_metrics( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should really be more of a "enable dashboard" function
@@ -65,6 +67,17 @@ | |||
ManyColumnsInputType = Union[ColumnInputType, Iterable[ColumnInputType]] | |||
|
|||
|
|||
def BroadcastMetrics(func): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: functions are usually snake-cased
def BroadcastMetrics(func): | ||
"""Calls `self._explain_broadcast()` prior to executing the rest of the function.""" | ||
|
||
def wrapper(self, *args, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use @functools.wraps
to maintain expected behavior (https://stackoverflow.com/questions/308999/what-does-functools-wraps-do)
@@ -158,6 +171,44 @@ def _result(self) -> Optional[PartitionSet]: | |||
else: | |||
return self._result_cache.value | |||
|
|||
def _explain_broadcast(self): | |||
"""Broadcast the mermaid-formatted plan on the given port (assuming metrics-broadcasting is enabled).""" | |||
import requests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we have requests
as a dependency of Daft. This will likely cause issues when run in some users' environments
|
||
def wrapper(self, *args, **kwargs): | ||
result = func(self, *args, **kwargs) | ||
self._explain_broadcast() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean that the dashboard only receives the broadcast message after the method finishes execution? Seems wrong.
Another way you can potentially solve this is with a context manager in each dataframe execution method. Something like this:
def write_parquet():
df = ... # build plan
with ctx.dashboard.record_execution(df):
...
return df
"plan-time-end": str(plan_time_end), | ||
}, | ||
) | ||
print(f"Query ID: {id}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rm print
Closing for a couple reasons:
|
Overview
This PR adds the ability of broadcasting the serialized Mermaid query plan over port 3238.
Receiving services can listen to this TCP broadcast over 3238, read the data, and process/present it.
This will be useful for the dashboard, which will (through another proxy service) be listening over 3238 for the plan, so that it can display it to the end-user.
Notes
The number 3238 is
daft
spelled over the telephone's alphanumeric keypad.