-
Notifications
You must be signed in to change notification settings - Fork 0
/
s3_xcom_backend.py
50 lines (39 loc) · 1.74 KB
/
s3_xcom_backend.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from typing import Any
from airflow.models.xcom import BaseXCom
import os
import pandas as pd
import uuid
class S3XComBackend(BaseXCom):
PREFIX = "xcom_s3://"
BUCKET_NAME = os.getenv('BUCKET_NAME')
AWS_ACCESS_KEY_ID=os.getenv('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY=os.getenv('AWS_SECRET_ACCESS_KEY')
DELETE_XCOM_AT_COMPLETION = (os.getenv('DELETE_XCOM_AT_COMPLETION','False')=='True')
@staticmethod
def serialize_value(value: Any):
if isinstance(value, pd.DataFrame):
key = "data_" + str(uuid.uuid4())
value.to_csv(
f"s3://{S3XComBackend.BUCKET_NAME}/{key}",
index=False,
storage_options={
"key": S3XComBackend.AWS_ACCESS_KEY_ID,
"secret": S3XComBackend.AWS_SECRET_ACCESS_KEY,
},
)
value = S3XComBackend.PREFIX + key
return BaseXCom.serialize_value(value)
@staticmethod
def deserialize_value(result) -> Any:
result = BaseXCom.deserialize_value(result)
if isinstance(result, str) and result.startswith(S3XComBackend.PREFIX):
key = result.replace(S3XComBackend.PREFIX, "")
result = pd.read_csv(f"s3://{S3XComBackend.BUCKET_NAME}/{key}",
storage_options={
"key": S3XComBackend.AWS_ACCESS_KEY_ID,
"secret": S3XComBackend.AWS_SECRET_ACCESS_KEY,
},)
if S3XComBackend.DELETE_XCOM_AT_COMPLETION:
fs = s3fs.S3FileSystem(key=S3XComBackend.AWS_ACCESS_KEY_ID,secret=S3XComBackend.AWS_SECRET_ACCESS_KEY)
fs.rm_file(f'{S3XComBackend.BUCKET_NAME}/{key}')
return result