-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest.py
41 lines (35 loc) · 1.14 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from prefect import flow, task
from prefect.tasks import task_input_hash
from prefect.artifacts import create_markdown_artifact
import httpx
@task(retries=3, log_prints=True, cache_key_fn=task_input_hash)
def fetch_weather(lat: float = 38.9, lon: float = -77.0):
base_url = "https://api.open-meteo.com/v1/forecast"
temps = httpx.get(
base_url,
params=dict(latitude=lat, longitude=lon, hourly="temperature_2m"),
)
forecasted_temp = float(temps.json()["hourly"]["temperature_2m"][0])
print(f"Forecasted temperature: {forecasted_temp} degrees")
return forecasted_temp
@task()
def save_weather(temp: float):
with open("weather.csv", "w") as f:
f.write(f"{temp}")
return "Temperature saved!"
@flow()
def pipeline(lat: float = 38.9, lon: float = -78.0):
temp = fetch_weather(lat, lon)
# Create a markdown artifact
text = f"""# Weather Forecast
{temp}
"""
create_markdown_artifact(
key="weather-result",
markdown=text,
description="Weather forecast"
)
result = save_weather(temp)
return result
if __name__ == "__main__":
pipeline.serve(name="deploy-1")