Source code for tsdat.io.writers

# TODO: Implement ZarrWriter

import copy
import xarray as xr
from typing import Any, Dict, Iterable, List, Optional, cast
from pathlib import Path
from pydantic import BaseModel, Extra
from .base import FileWriter

__all__ = [
    "NetCDFWriter",
    "CSVWriter",
    "ParquetWriter",
    "ZarrWriter",
]


[docs]class NetCDFWriter(FileWriter): """------------------------------------------------------------------------------------ Thin wrapper around xarray's `Dataset.to_netcdf()` function for saving a dataset to a netCDF file. Properties under the `to_netcdf_kwargs` parameter will be passed to `Dataset.to_netcdf()` as keyword arguments. File compression is used by default to save disk space. To disable compression set the `use_compression` parameter to `False`. ------------------------------------------------------------------------------------"""
[docs] class Parameters(BaseModel, extra=Extra.forbid):
[docs] compression_level: int = 1
"""The level of compression to use (0-9). Set to 0 to not use compression."""
[docs] compression_engine: str = "zlib"
"""The compression engine to use."""
[docs] to_netcdf_kwargs: Dict[str, Any] = {}
"""Keyword arguments passed directly to xr.Dataset.to_netcdf()."""
[docs] parameters: Parameters = Parameters()
[docs] file_extension: str = ".nc"
[docs] def write( self, dataset: xr.Dataset, filepath: Optional[Path] = None, **kwargs: Any ) -> None: to_netcdf_kwargs = copy.deepcopy(self.parameters.to_netcdf_kwargs) encoding_dict: Dict[str, Dict[str, Any]] = {} to_netcdf_kwargs["encoding"] = encoding_dict for variable_name in cast(Iterable[str], dataset.variables): # Prevent Xarray from setting 'nan' as the default _FillValue encoding_dict[variable_name] = dataset[variable_name].encoding # type: ignore if ( "_FillValue" not in encoding_dict[variable_name] and "_FillValue" not in dataset[variable_name].attrs ): encoding_dict[variable_name]["_FillValue"] = None if self.parameters.compression_level: encoding_dict[variable_name].update( { self.parameters.compression_engine: True, "complevel": self.parameters.compression_level, } ) dataset.to_netcdf(filepath, **to_netcdf_kwargs) # type: ignore
[docs]class CSVWriter(FileWriter): """--------------------------------------------------------------------------------- Converts a `xr.Dataset` object to a pandas `DataFrame` and saves the result to a csv file using `pd.DataFrame.to_csv()`. Properties under the `to_csv_kwargs` parameter are passed to `pd.DataFrame.to_csv()` as keyword arguments. ---------------------------------------------------------------------------------"""
[docs] class Parameters(BaseModel, extra=Extra.forbid):
[docs] dim_order: Optional[List[str]] = None
[docs] to_csv_kwargs: Dict[str, Any] = {}
[docs] parameters: Parameters = Parameters()
[docs] file_extension: str = ".csv"
[docs] def write( self, dataset: xr.Dataset, filepath: Optional[Path] = None, **kwargs: Any ) -> None: # QUESTION: Can we reliably write the dataset metadata to a separate file such # that it can always be retrieved? If not, should we declare this as a format # incapable of "round-tripping" (i.e., ds != read(write(ds)) for csv format)? df = dataset.to_dataframe(self.parameters.dim_order) # type: ignore df.to_csv(filepath, **self.parameters.to_csv_kwargs) # type: ignore
[docs]class ParquetWriter(FileWriter): """--------------------------------------------------------------------------------- Writes the dataset to a parquet file. Converts a `xr.Dataset` object to a pandas `DataFrame` and saves the result to a parquet file using `pd.DataFrame.to_parquet()`. Properties under the `to_parquet_kwargs` parameter are passed to `pd.DataFrame.to_parquet()` as keyword arguments. ---------------------------------------------------------------------------------"""
[docs] class Parameters(BaseModel, extra=Extra.forbid):
[docs] dim_order: Optional[List[str]] = None
[docs] to_parquet_kwargs: Dict[str, Any] = {}
[docs] parameters: Parameters = Parameters()
[docs] file_extension: str = ".parquet"
[docs] def write( self, dataset: xr.Dataset, filepath: Optional[Path] = None, **kwargs: Any ) -> None: # QUESTION: Can we reliably write the dataset metadata to a separate file such # that it can always be retrieved? If not, should we declare this as a format # incapable of "round-tripping" (i.e., ds != read(write(ds)) for csv format)? df = dataset.to_dataframe(self.parameters.dim_order) # type: ignore df.to_parquet(filepath, **self.parameters.to_parquet_kwargs) # type: ignore
[docs]class ZarrWriter(FileWriter): """--------------------------------------------------------------------------------- Writes the dataset to a basic zarr archive. Advanced features such as specifying the chunk size or writing the zarr archive in AWS S3 will be implemented later. ---------------------------------------------------------------------------------"""
[docs] class Parameters(BaseModel, extra=Extra.forbid):
[docs] to_zarr_kwargs: Dict[str, Any] = {}
[docs] parameters: Parameters = Parameters()
[docs] file_extension: str = ".zarr"
[docs] def write( self, dataset: xr.Dataset, filepath: Optional[Path] = None, **kwargs: Any ) -> None: encoding_dict: Dict[str, Dict[str, Any]] = {} for variable_name in cast(Iterable[str], dataset.variables): # Prevent Xarray from setting 'nan' as the default _FillValue encoding_dict[variable_name] = dataset[variable_name].encoding # type: ignore if ( "_FillValue" not in encoding_dict[variable_name] and "_FillValue" not in dataset[variable_name].attrs ): encoding_dict[variable_name]["_FillValue"] = None dataset.to_zarr(filepath, encoding=encoding_dict, **self.parameters.to_zarr_kwargs) # type: ignore