-
Notifications
You must be signed in to change notification settings - Fork 14.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dataproc metastore assets #21267
Dataproc metastore assets #21267
Conversation
ti = TaskInstance(task=operator, execution_date=dttm) | ||
backup_conf = ti.xcom_pull(task_ids=operator.task_id, key="backup_conf") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's should use XCom.get_one()
instead of creating ad hoc TaskInstance
objects to pull the necessary XCom
for operator links. Check out #21285. Using the XCom.get_one()
interface is also more straightforward and pertinent in these case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@josh-fell To adjust changes for the Metastore links we need to wait when your PR will be merged 😄 you changed the types in the get_one
method also 😄 without this pre-commit
will fail
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lwyszomi - merged :). You can rebase now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@potiuk Thank you, we will update also other PRs where we added new links :)
context, | ||
key="service_conf", | ||
value={"region": self.region, "service_id": self.service_id, "project_id": self.project_id}, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if it would make sense to add class/static method on operator link to handle this logic? Something like:
DataprocMetastoreServiceLink.persist(task_instance=self)
Why? First this make it more obvious why we save this XCom (to use it in link), second it somehow nicely encapsulates the logic into link class (for example you can make the key a class attribute so it will not be hardcoded in two places). WDYT? CC @potiuk @josh-fell
82f4ac5
to
d5bbfb1
Compare
@staticmethod | ||
def persist(context: "Context", task_instance: "DataprocMetastoreCreateBackupOperator"): | ||
task_instance.xcom_push( | ||
context=context, | ||
key=DataprocMetastoreBackupLink.key, | ||
value={ | ||
"region": task_instance.region, | ||
"service_id": task_instance.service_id, | ||
"backup_id": task_instance.backup_id, | ||
"project_id": task_instance.project_id, | ||
}, | ||
) | ||
|
||
def get_link(self, operator: BaseOperator, dttm: datetime): | ||
backup_conf = XCom.get_one( | ||
dag_id=operator.dag.dag_id, | ||
task_id=operator.task_id, | ||
execution_date=dttm, | ||
key=DataprocMetastoreBackupLink.key, | ||
) | ||
return ( | ||
METASTORE_BACKUP_LINK.format( | ||
region=backup_conf["region"], | ||
service_id=backup_conf["service_id"], | ||
backup_id=backup_conf["backup_id"], | ||
project_id=backup_conf["project_id"], | ||
) | ||
if backup_conf | ||
else "" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this! I'm wondering if we can create some abstraction for dataproc link. What do you think? Would it be worth to have something generic that be used to create another links?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have just had some discussion about it. We think we can improve it to have more generic solution as you suggest. I will implement those changes, so we can later check is it a good direction for further services as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! In general once this PR is merge I would suggest to open an email thread on dev list / discussions about making this a common pattern for links.
d5bbfb1
to
40d8a6a
Compare
Add links to assets for Dataproc Metastore
Co-authored-by: Wojciech Januszek januszek@google.com
Co-authored-by: Lukasz Wyszomirski wyszomirski@google.com
Co-authored-by: Maksim Yermakou maksimy@google.com
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.