-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathTransientViewDumper.py
executable file
·116 lines (101 loc) · 4.01 KB
/
TransientViewDumper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File : ampel/contrib/hu/t3/TransientInfoDumper.py
# License : BSD-3-Clause
# Author : Jakob van Santen <jakob.van.santen@desy.de>
# Date : 15.08.2018
# Last Modified Date: 02.05.2023
# Last Modified By : Simeon Reusch <simeon.reusch@desy.de>
import uuid
from collections.abc import Generator
from gzip import GzipFile
from io import BytesIO
from urllib.parse import ParseResult, urlparse, urlunparse
from xml.etree import ElementTree
import requests
from requests.auth import HTTPBasicAuth
from ampel.abstract.AbsPhotoT3Unit import AbsPhotoT3Unit
from ampel.secret.NamedSecret import NamedSecret
from ampel.struct.T3Store import T3Store
from ampel.struct.UnitResult import UnitResult
from ampel.types import T3Send, UBson
from ampel.view.TransientView import TransientView
from .DCachePublisher import dump_json
def strip_auth_from_url(url):
try:
auth = requests.utils.get_auth_from_url(url)
scheme, netloc, path, params, query, fragment = urlparse(url)
netloc = netloc[(netloc.index("@") + 1) :]
url = urlunparse(ParseResult(scheme, netloc, path, params, query, fragment))
return url, auth
except KeyError:
return url, None
def strip_path_from_url(url):
scheme, netloc, *_ = urlparse(url)
return urlunparse(ParseResult(scheme, netloc, "/", "", "", ""))
class TransientViewDumper(AbsPhotoT3Unit):
""""""
version = 0.1
require = ("desycloud",)
# If this is passed, files are always saved locally
outputfile: None | str = None
desycloud_auth: NamedSecret[dict] = NamedSecret[dict](label="desycloud")
desycloud_folder: str = "dumps"
desycloud_filename: str = str(uuid.uuid1())
def post_init(self) -> None:
if not self.outputfile:
self.buffer = BytesIO()
self.outfile = GzipFile(
filename=self.desycloud_filename + ".json",
fileobj=self.buffer,
mode="w",
)
self.path = (
f"/AMPEL/{self.desycloud_folder}/"
+ self.desycloud_filename
+ ".json.gz"
)
self.session = requests.Session()
assert self.resource
self.webdav_base = self.resource["desycloud"]
self.ocs_base = (
strip_path_from_url(self.resource["desycloud"])
+ "/ocs/v1.php/apps/files_sharing/api/v1"
)
else:
self.outfile = GzipFile(self.outputfile + ".json.gz", mode="w")
def process(
self, transients: Generator[TransientView, T3Send, None], t3s: T3Store
) -> UBson | UnitResult:
count = 0
for count, tran_view in enumerate(transients, 1): # noqa: B007
self.outfile.write(dump_json(tran_view))
self.outfile.write(b"\n")
self.outfile.close()
self.logger.info(f"Total number of transients written: {count}")
if self.outputfile:
self.logger.info(self.outputfile + ".json.gz")
else:
assert isinstance(self.buffer, BytesIO)
mb = len(self.buffer.getvalue()) / 2.0**20
self.logger.info(f"{mb:.1f} MB of gzipped JSONy goodness")
auth = HTTPBasicAuth(**self.desycloud_auth.get())
self.session.put(
self.webdav_base + self.path,
data=self.buffer.getvalue(),
auth=auth,
).raise_for_status()
response = self.session.post(
self.ocs_base + "/shares",
data=dict(path=self.path, shareType=3),
auth=auth,
headers={"OCS-APIRequest": "true"}, # i'm not a CSRF attack, i swear
)
if response.ok and (
element := ElementTree.fromstring(response.text).find("data/url")
):
if element.text:
self.logger.info(element.text)
else:
response.raise_for_status()
return None