-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgrib2timeseriesold.py
196 lines (154 loc) · 8.4 KB
/
grib2timeseriesold.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
import os
import glob
import chardet
import numpy as np
import pandas as pd
import pygrib
from datetime import datetime
from collections import defaultdict
import logging
import warnings
import gc
warnings.filterwarnings("ignore", category=DeprecationWarning)
# GRIB2 Time Series Processor v9
# Program to convert GRIB2 files to CSV files with a time series for each point
# By Marcello Novak, 2023
# Flag for testing reasons
isLinux = False
# Directory Configuration, change as needed
DATA_DIR = os.path.join(".", "data_test") if not isLinux else "./data_test"
OUTPUT_DIR = os.path.join(".", "output") if not isLinux else "./output"
LOG_DIR = os.path.join(".", "logs") if not isLinux else "./logs"
# Ensuring the directories exist, creating them if they don't
if not os.path.exists(LOG_DIR):
print(f"Log directory not found, creating {LOG_DIR}...")
os.makedirs(LOG_DIR)
if not os.path.exists(OUTPUT_DIR):
print(f"Output directory not found, creating {OUTPUT_DIR}...")
os.makedirs(OUTPUT_DIR)
# home/chennon/eager/data/hrrr/final
# home/chennon/eager/.../vars.txt
# Logging Configuration
current_time_str = datetime.now().strftime("%Y%m%dT%H%M%S")
log_filename = os.path.join(LOG_DIR, f"{current_time_str}.log")
logging.basicConfig(filename=log_filename, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Other Configuration
BUFFER_SIZE = 10 # Write after processing 10 files
data_buffer = {} # Buffer for data to be written to disk
existing_point_files = set() # Set for existing point files
# Initialize cache variables
latlons_cache = None
columns_cache = None
# Function to process a sub grid in the GRB file
def process_sub_grid(current_sub_grid):
current_values = current_sub_grid.values.ravel() # Get the values from all the grid cells
current_values = np.where(np.ma.getmask(current_values), np.nan, current_values)
current_column = f"{current_sub_grid.name}: {current_sub_grid.level} {current_sub_grid.units}" # Create column name
return current_column, current_values
# Function to process a whole GRB file
def process_grb_file(file, latlons_cache, columns_cache):
start_time = datetime.now() # Start the timer for this file
local_data = defaultdict(lambda: defaultdict(list)) # Create a dictionary to store the data
try:
with pygrib.open(file) as current_grib:
all_grids = current_grib.select()
if not all_grids:
# Log warning and return empty dictionary
logging.warning(f"No grids found in file {file}. Skipping.")
return {}
# Cache latitudes and longitudes
if latlons_cache is None:
latlons_cache = all_grids[0].latlons()
latitudes, longitudes = latlons_cache
# Cache column names
if columns_cache is None:
columns_cache = {sub_grid: f"{sub_grid.name}: {sub_grid.level} {sub_grid.units}" for sub_grid in all_grids}
# Create timestamp for this file (avoids repetitions)
timestamp = f"{all_grids[0].month:02d}/{all_grids[0].day:02d}" \
f"/{all_grids[0].year} {all_grids[0].hour:02d}:00"
# Process each sub grid in the grb file (each paper in the stack)
for sub_grid in all_grids:
# Check if the sub grid is unknown, skip
if "unknown" in sub_grid.name.lower():
logging.warning(f"Skipping sub-grid with name containing 'unknown': {sub_grid.name}")
continue
# Process the column names, values, and latitudes/longitudes
column_name, values = process_sub_grid(sub_grid)
for lat, lon, val in zip(latitudes.ravel(), longitudes.ravel(), values):
outfile = f"{lat:.3f}_{lon:.3f}.csv" # Rounds to three decimals now
local_data[outfile]['timestamp'].append(timestamp)
local_data[outfile][columns_cache[sub_grid]].append(val)
# Convert the data to a list of dictionaries
for outfile, columns_dict in local_data.items():
local_data[outfile] = [dict(row) for row in
zip(*[[(col, val) for val in val_list] for col, val_list in columns_dict.items()])]
# Log processing time and return the data
logging.info(f"{timestamp} Processed in: {datetime.now() - start_time}")
return local_data
# Catch any exceptions and log the errors
except Exception as e:
logging.error(f"Error processing file {file}. Error: {e}")
return {}
# Function to clear buffer and write to disk
def write_buffer_to_disk(data_buffer, existing_point_files):
for filename, rows in data_buffer.items():
path = os.path.join(OUTPUT_DIR, filename) # Create path to file
df = pd.DataFrame(rows) # Create a DataFrame
# If file exist, append, otherwise create
if filename in existing_point_files:
df.to_csv(path, mode='a', header=False, index=False)
else:
df.to_csv(path, mode='w', index=False)
existing_point_files.add(filename) # Add the file to the set of existing point files if created
# Function to post-process each file and fill any missing spaces
def post_process_file(filepath):
start_time = datetime.now() # Start the timer for this file
df = pd.read_csv(filepath) # Read the file into a DataFrame
df['timestamp'] = pd.to_datetime(
df['timestamp'], format='%m/%d/%Y %H:%M') # Convert to DatetimeIndex
df.set_index('timestamp', inplace=True) # Set the timestamp as the index for the DataFrame
df = df.resample('H').asfreq() # Resample to hourly intervals and insert missing rows
df.fillna("NaN", inplace=True) # Fill all missing values with "NaN"
df.replace("", "NaN", inplace=True) # Replace all empty cells with "NaN"
df.replace(r"^\s*$", "NaN", regex=True, inplace=True) # Also replace purely whitespace cells with "NaN"
df.to_csv(filepath) # Write the processed DataFrame back to the CSV file
elapsed_time = datetime.now() - start_time # Calculate elapsed time
logging.info(f"Post-processed {filepath} in: {elapsed_time}") # Log elapsed time
# Turn off automatic garbage collection
gc.disable()
# Main
if __name__ == '__main__':
print("GRIB2 Time Series Processor v9, by Marcello Novak, 2023") # Obligatory narcissistic message :)
print(f"Starting Main Processing for files in {DATA_DIR}...") # Print starting message to terminal
logging.info(f"Processing files in {DATA_DIR}...") # Log starting message
# Populate the existing_point_files set from the old directory
existing_point_files.update([f for f in os.listdir(OUTPUT_DIR) if f.endswith('.csv')])
files = glob.iglob(os.path.join(DATA_DIR, "*.grib2"))
# Process files sequentially
for i, file in enumerate(files, 1):
local_data = process_grb_file(file, latlons_cache, columns_cache)
gc.collect() # Collect garbage after processing each file
# Merge the local data with the data buffer
for key, value in local_data.items():
if key in data_buffer:
data_buffer[key].extend(value)
else:
data_buffer[key] = value # Create a new entry in the dictionary
# Write to disk if buffer is full
if i % BUFFER_SIZE == 0:
write_buffer_to_disk(data_buffer, existing_point_files)
data_buffer.clear()
gc.collect() # Collect garbage
# Write remaining files if any left in buffer after processing all files
if data_buffer:
write_buffer_to_disk(data_buffer, existing_point_files)
gc.collect() # Collect garbage
print("Main Processing complete.") # Print completion message to terminal for the main processing
print(f"Starting Post-Processing for files in {DATA_DIR}...") # Print starting message to terminal
logging.info(f"Post-processing files in {DATA_DIR}...") # Log starting message
# Post-process all the files in the old directory
output_files = glob.iglob(os.path.join(OUTPUT_DIR, "*.csv"))
for i, outfile in enumerate(output_files, 1):
post_process_file(outfile) # Post-process the file
gc.collect() # Collect garbage
print("Post-processing complete.")