Source code for wrf_nlcd_lulc_converter.utils

"""
Utility functions for WRF NLCD LULC converter.
"""

import numpy as np
import xarray as xr
import netCDF4
import shutil
import os
import subprocess
import random
import string


[docs] def generate_landusef_from_lu_index(lu_index, nclass=None, dtype=np.float32): """ Generate a LANDUSEF-style array from a LU_INDEX array. Parameters: lu_index (np.ndarray): 2D array of land use class indices (ny, nx). nclass (int): Number of land use classes. dtype (np.dtype): Data type for the output array. Returns: np.ndarray: Array of shape (1, nclass, ny, nx) with one-hot encoding for each class. """ # Ensure lu_index is 2D (ny, nx) if lu_index.ndim == 3 and lu_index.shape[0] == 1: lu_index = lu_index[0] elif lu_index.ndim > 2: raise ValueError(f"Unexpected lu_index shape: {lu_index.shape}") ny, nx = lu_index.shape landuse_generated = np.zeros((1, nclass, ny, nx), dtype=dtype) for class_idx in range(nclass): # Class numbers are assumed to be 1-based (i.e., class 1 is index 0) landuse_generated[0, class_idx, :, :] = (lu_index == (class_idx + 1)).astype(dtype) return landuse_generated
[docs] def crop_nlcd_with_gdalwarp(raw_NLCD_map, year, nlcd_domain, nlcd_processed_tmp_folder='~/tmp/nlcd_processed/', output_NLCD_crop_filename=None, force_recalculate=False): """ Crop the NLCD GeoTIFF using gdalwarp for a given year and domain. Parameters: raw_NLCD_map (str): Path to the raw NLCD GeoTIFF file. year (int or str): Year of the NLCD data (used in output filename). nlcd_domain (dict): Dictionary with keys 'lon_min', 'lon_max', 'lat_min', 'lat_max'. nlcd_processed_tmp_folder (str): Temporary folder for processed files. output_NLCD_crop_filename (str): Output filename for cropped file. force_recalculate (bool): Force recalculation even if file exists. Returns: str: Path to the cropped NLCD GeoTIFF. """ # Expand user in output folder path and ensure directory exists nlcd_processed_tmp_folder_expanded = os.path.expanduser(nlcd_processed_tmp_folder) os.makedirs(nlcd_processed_tmp_folder_expanded, exist_ok=True) if output_NLCD_crop_filename is None: rand_prefix = ''.join(random.choices(string.ascii_lowercase + string.digits, k=8)) output_NLCD_crop_filename = os.path.join(nlcd_processed_tmp_folder_expanded, f'{rand_prefix}_NLCD_{year}_crop.tif') else: output_NLCD_crop_filename = os.path.join(nlcd_processed_tmp_folder_expanded, output_NLCD_crop_filename) if os.path.exists(output_NLCD_crop_filename) and not force_recalculate: print(f"Cropped NLCD file already exists: {output_NLCD_crop_filename}. Use force_recalculate=True to overwrite.") return output_NLCD_crop_filename te_args = [ str(nlcd_domain['lon_min']), str(nlcd_domain['lat_min']), str(nlcd_domain['lon_max']), str(nlcd_domain['lat_max']) ] gdalwarp_cmd = [ "gdalwarp", "-t_srs", "EPSG:4326", "-te" ] + te_args + [ "-te_srs", "EPSG:4326", raw_NLCD_map, output_NLCD_crop_filename ] print("Running gdalwarp command:", " ".join(gdalwarp_cmd)) subprocess.run(gdalwarp_cmd, check=True) return output_NLCD_crop_filename
[docs] def update_lu_index_and_landusef_in_netcdf(ncfile, new_lu_index, new_landusef): """ Update LU_INDEX and LANDUSEF variables in a NetCDF file. Parameters: ncfile (str): Path to the NetCDF file. new_lu_index (np.ndarray): New LU_INDEX data. new_landusef (np.ndarray): New LANDUSEF data. """ with netCDF4.Dataset(ncfile, mode='r+') as ds: # Update LU_INDEX lu_index_var = ds.variables['LU_INDEX'] if new_lu_index.shape != lu_index_var.shape: if lu_index_var.shape[0] == 1 and new_lu_index.ndim == 2: lu_index_to_write = new_lu_index[np.newaxis, :, :] else: lu_index_to_write = np.reshape(new_lu_index, lu_index_var.shape) else: lu_index_to_write = new_lu_index lu_index_var[:] = lu_index_to_write # Update LANDUSEF landusef_var = ds.variables['LANDUSEF'] if new_landusef.shape != landusef_var.shape: if landusef_var.shape[0] == 1 and new_landusef.shape[0] == 1 and new_landusef.shape[1:] == landusef_var.shape[1:]: landusef_to_write = new_landusef else: landusef_to_write = np.reshape(new_landusef, landusef_var.shape) else: landusef_to_write = new_landusef landusef_var[:] = landusef_to_write
[docs] def extract_wrf_domain_info(wrf_file): """ Extract domain information from a WRF geo_em file. Parameters: wrf_file (str): Path to the WRF geo_em file. Returns: dict: Dictionary containing domain information. """ with xr.open_dataset(wrf_file) as ds: geog_latitudes = ds['XLAT_M'].values.squeeze() geog_longitudes = ds['XLONG_M'].values.squeeze() geog_latitudes_1d = geog_latitudes[:, 0] geog_longitudes_1d = geog_longitudes[0, :] geog01_lulc = ds['LU_INDEX'].values.squeeze() geog_roi_domain = { 'lon_min': geog_longitudes_1d[0], 'lon_max': geog_longitudes_1d[-1], 'lat_min': geog_latitudes_1d[0], 'lat_max': geog_latitudes_1d[-1] } return { 'latitudes': geog_latitudes_1d, 'longitudes': geog_longitudes_1d, 'lu_index': geog01_lulc, 'domain': geog_roi_domain }
[docs] def calculate_nlcd_crop_domain(wrf_domain, margin=1.0): """ Calculate the NLCD crop domain based on WRF domain with margin. Parameters: wrf_domain (dict): WRF domain dictionary. margin (float): Margin to add around the domain in degrees. Returns: dict: NLCD crop domain dictionary. """ nlcd_crop_domain = { 'lon_min': wrf_domain['lon_min'] - margin, 'lon_max': wrf_domain['lon_max'] + margin, 'lat_min': wrf_domain['lat_min'] - margin, 'lat_max': wrf_domain['lat_max'] + margin } return nlcd_crop_domain
[docs] def validate_file_paths(*file_paths): """ Validate that all file paths exist. Parameters: *file_paths: Variable number of file paths to validate. Raises: FileNotFoundError: If any file path doesn't exist. """ for file_path in file_paths: if not os.path.exists(file_path): raise FileNotFoundError(f"File not found: {file_path}")
[docs] def create_output_directory(output_path): """ Create output directory if it doesn't exist. Parameters: output_path (str): Path to the output directory or file. """ output_dir = os.path.dirname(output_path) if output_dir and not os.path.exists(output_dir): os.makedirs(output_dir, exist_ok=True)