Source code for tsdat.io.writers

import copy
import warnings
import numpy as np
import pandas as pd
import xarray as xr
from typing import Any, Dict, Iterable, List, Optional, cast, Hashable
from pathlib import Path
from pydantic import BaseModel, Extra
from .base import FileWriter
from ..utils import get_filename


__all__ = [
    "NetCDFWriter",
    "SplitNetCDFWriter",
    "CSVWriter",
    "ParquetWriter",
    "ZarrWriter",
]


[docs]class NetCDFWriter(FileWriter): """------------------------------------------------------------------------------------ Thin wrapper around xarray's `Dataset.to_netcdf()` function for saving a dataset to a netCDF file. Properties under the `to_netcdf_kwargs` parameter will be passed to `Dataset.to_netcdf()` as keyword arguments. File compression is used by default to save disk space. To disable compression set the `compression_level` parameter to `0`. ------------------------------------------------------------------------------------"""
[docs] class Parameters(BaseModel, extra=Extra.forbid):
[docs] compression_level: int = 1
"""The level of compression to use (0-9). Set to 0 to not use compression."""
[docs] compression_engine: str = "zlib"
"""The compression engine to use."""
[docs] to_netcdf_kwargs: Dict[str, Any] = {}
"""Keyword arguments passed directly to xr.Dataset.to_netcdf()."""
[docs] parameters: Parameters = Parameters()
[docs] file_extension: str = ".nc"
[docs] def write( self, dataset: xr.Dataset, filepath: Optional[Path] = None, **kwargs: Any ) -> None: to_netcdf_kwargs = copy.deepcopy(self.parameters.to_netcdf_kwargs) encoding_dict: Dict[str, Dict[str, Any]] = {} to_netcdf_kwargs["encoding"] = encoding_dict for variable_name in cast(Iterable[str], dataset.variables): # Encoding options: https://unidata.github.io/netcdf4-python/#Dataset.createVariable # For some reason contiguous=True and chunksizes=None is incompatible with compression if hasattr(dataset[variable_name], "encoding"): if "contiguous" in dataset[variable_name].encoding: dataset[variable_name].encoding.pop("contiguous") if "chunksizes" in dataset[variable_name].encoding: dataset[variable_name].encoding.pop("chunksizes") # Prevent Xarray from setting 'nan' as the default _FillValue encoding_dict[variable_name] = dataset[variable_name].encoding.copy() # type: ignore if ( "_FillValue" not in encoding_dict[variable_name] and "_FillValue" not in dataset[variable_name].attrs ): encoding_dict[variable_name]["_FillValue"] = None if self.parameters.compression_level: encoding_dict[variable_name].update( { self.parameters.compression_engine: True, "complevel": self.parameters.compression_level, } ) dataset.to_netcdf(filepath, **to_netcdf_kwargs) # type: ignore
[docs]class SplitNetCDFWriter(NetCDFWriter): """------------------------------------------------------------------------------------ Wrapper around xarray's `Dataset.to_netcdf()` function for saving a dataset to a netCDF file based on a particular time interval, and is an extension of the `NetCDFWriter`. Files are split (sliced) via a time interval specified in two parts, `time_interval` a literal value, and a `time_unit` character (year: "Y", month: "M", day: "D", hour: "h", minute: "m", second: "s"). Properties under the `to_netcdf_kwargs` parameter will be passed to `Dataset.to_netcdf()` as keyword arguments. File compression is used by default to save disk space. To disable compression set the `compression_level` parameter to `0`. ------------------------------------------------------------------------------------"""
[docs] class Parameters(NetCDFWriter.Parameters):
[docs] time_interval: int = 1
"""Time interval value."""
[docs] time_unit: str = "D"
"""Time interval unit."""
[docs] parameters: Parameters = Parameters()
[docs] file_extension: str = ".nc"
[docs] def write( self, dataset: xr.Dataset, filepath: Optional[Path] = None, **kwargs: Any ) -> None: to_netcdf_kwargs = copy.deepcopy(self.parameters.to_netcdf_kwargs) encoding_dict: Dict[str, Dict[str, Any]] = {} to_netcdf_kwargs["encoding"] = encoding_dict for variable_name in cast(Iterable[str], dataset.variables): # Prevent Xarray from setting 'nan' as the default _FillValue encoding_dict[variable_name] = dataset[variable_name].encoding # type: ignore if ( "_FillValue" not in encoding_dict[variable_name] and "_FillValue" not in dataset[variable_name].attrs ): encoding_dict[variable_name]["_FillValue"] = None if self.parameters.compression_level: encoding_dict[variable_name].update( { self.parameters.compression_engine: True, "complevel": self.parameters.compression_level, } ) # Must remove original chunksize to split and save dataset if "chunksizes" in encoding_dict[variable_name]: del encoding_dict[variable_name]["chunksizes"] interval = self.parameters.time_interval unit = self.parameters.time_unit t1 = dataset.time[0] t2 = t1 + np.timedelta64(interval, unit) while t1 < dataset.time[-1]: ds_temp = dataset.sel(time=slice(t1, t2)) new_filename = get_filename(ds_temp, self.file_extension) new_filepath = filepath.with_name(new_filename) # type: ignore ds_temp.to_netcdf(new_filepath, **to_netcdf_kwargs) # type: ignore t1 = t2 t2 = t1 + np.timedelta64(interval, unit)
[docs]class CSVWriter(FileWriter): """--------------------------------------------------------------------------------- Converts a `xr.Dataset` object to a pandas `DataFrame` and saves the result to a csv file using `pd.DataFrame.to_csv()`. Properties under the `to_csv_kwargs` parameter are passed to `pd.DataFrame.to_csv()` as keyword arguments. ---------------------------------------------------------------------------------"""
[docs] class Parameters(BaseModel, extra=Extra.forbid):
[docs] dim_order: Optional[List[str]] = None
[docs] to_csv_kwargs: Dict[str, Any] = {}
[docs] parameters: Parameters = Parameters()
[docs] file_extension: str = ".csv"
[docs] def write( self, dataset: xr.Dataset, filepath: Optional[Path] = None, **kwargs: Any ) -> None: # QUESTION: Is this format capable of "round-tripping"? # (i.e., ds != read(write(ds)) for csv format) d1: List[Hashable] = [] d2: List[Hashable] = [] for var in dataset: shp = dataset[var].shape if len(shp) <= 1: d1.append(var) elif len(shp) == 2: d2.append(var) else: warnings.warn( "CSV writer cannot save variables with more than 2 dimensions." ) name = filepath.stem # type: ignore # Save header data header_filepath = filepath.with_suffix(".hdr.csv") # type: ignore header = dataset.attrs with open(str(header_filepath), "w", newline="\n") as fp: for key in header: fp.write(f"{key},{header[key]}\n") # Save variable metadata metadata_filepath = filepath.with_suffix(".attrs.csv") # type: ignore var_metadata: List[Dict[str, Any]] = [] for var in dataset: attrs = dataset[var].attrs attrs.update({"name": var}) var_metadata.append(attrs) df_metadata = pd.DataFrame(var_metadata) df_metadata = df_metadata.set_index("name") # type: ignore df_metadata.to_csv(metadata_filepath) if d1: # Save 1D variables d2.extend( [v for v in dataset.coords if v != "time"] ) # add 2D coordinates to remove list ds_1d = dataset.drop_vars(d2) # drop 2D variables df_1d = ds_1d.to_dataframe() df_1d.to_csv(filepath, **self.parameters.to_csv_kwargs) # type: ignore if d2: # Save 2D variables dim2_filepath = filepath.with_suffix(".2D.csv") # type: ignore ds_2d = dataset.drop_vars(d1) # drop 1D variables df_2d = ds_2d.to_dataframe(self.parameters.dim_order) # type: ignore df_2d.to_csv(dim2_filepath, **self.parameters.to_csv_kwargs) # type: ignore
[docs]class ParquetWriter(FileWriter): """--------------------------------------------------------------------------------- Writes the dataset to a parquet file. Converts a `xr.Dataset` object to a pandas `DataFrame` and saves the result to a parquet file using `pd.DataFrame.to_parquet()`. Properties under the `to_parquet_kwargs` parameter are passed to `pd.DataFrame.to_parquet()` as keyword arguments. ---------------------------------------------------------------------------------"""
[docs] class Parameters(BaseModel, extra=Extra.forbid):
[docs] dim_order: Optional[List[str]] = None
[docs] to_parquet_kwargs: Dict[str, Any] = {}
[docs] parameters: Parameters = Parameters()
[docs] file_extension: str = ".parquet"
[docs] def write( self, dataset: xr.Dataset, filepath: Optional[Path] = None, **kwargs: Any ) -> None: # QUESTION: Can we reliably write the dataset metadata to a separate file such # that it can always be retrieved? If not, should we declare this as a format # incapable of "round-tripping" (i.e., ds != read(write(ds)) for csv format)? df = dataset.to_dataframe(self.parameters.dim_order) # type: ignore df.to_parquet(filepath, **self.parameters.to_parquet_kwargs) # type: ignore
[docs]class ZarrWriter(FileWriter): """--------------------------------------------------------------------------------- Writes the dataset to a basic zarr archive. Advanced features such as specifying the chunk size or writing the zarr archive in AWS S3 will be implemented later. ---------------------------------------------------------------------------------"""
[docs] class Parameters(BaseModel, extra=Extra.forbid):
[docs] to_zarr_kwargs: Dict[str, Any] = {}
[docs] parameters: Parameters = Parameters()
[docs] file_extension: str = ".zarr"
[docs] def write( self, dataset: xr.Dataset, filepath: Optional[Path] = None, **kwargs: Any ) -> None: encoding_dict: Dict[str, Dict[str, Any]] = {} for variable_name in cast(Iterable[str], dataset.variables): # Prevent Xarray from setting 'nan' as the default _FillValue encoding_dict[variable_name] = dataset[variable_name].encoding # type: ignore if ( "_FillValue" not in encoding_dict[variable_name] and "_FillValue" not in dataset[variable_name].attrs ): encoding_dict[variable_name]["_FillValue"] = None dataset.to_zarr(filepath, encoding=encoding_dict, **self.parameters.to_zarr_kwargs) # type: ignore