Source code for esmvalcore.io.intake_esgf

"""Access data using `intake-esgf <https://intake-esgf.readthedocs.io>`_.

.. note::

    It is highly recommended that you take a moment to
    :doc:`configure intake-esgf <intake_esgf:configure>` before using it
    with ESMValCore. Make sure to set ``local_cache`` to a path where
    it can store downloaded files and if (some) ESGF data is already
    available on your system, point ``esg_dataroot`` to it. If you are
    missing certain search results, you may want to choose a different
    index node for searching the ESGF.

Run the command ``esmvaltool config copy data-intake-esgf.yml`` to update
your :ref:`configuration <config-data-sources>` to use this module. This will
create a file with the following content in your configuration directory:

.. literalinclude:: ../configurations/data-intake-esgf.yml
   :language: yaml
   :caption: Contents of ``data-intake-esgf.yml``

"""

from __future__ import annotations

import copy
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING, Any

import intake_esgf
import intake_esgf.exceptions
import isodate

from esmvalcore.dataset import _isglob, _ismatch
from esmvalcore.io.protocol import DataElement, DataSource
from esmvalcore.iris_helpers import dataset_to_iris
from esmvalcore.local import _parse_period

if TYPE_CHECKING:
    import iris.cube

    from esmvalcore.typing import Facets, FacetValue


__all__ = [
    "IntakeESGFDataSource",
    "IntakeESGFDataset",
]


class _CachingCatalog(intake_esgf.ESGFCatalog):
    """An ESGF catalog that caches to_path_dict results."""

    def __init__(self):
        super().__init__()
        self._result = {}

    @classmethod
    def from_catalog(
        cls,
        catalog: intake_esgf.ESGFCatalog,
    ) -> _CachingCatalog:
        """Create a CachingCatalog from an existing ESGFCatalog."""
        cat = cls()
        cat.indices = catalog.indices
        cat.local_cache = catalog.local_cache
        cat.esg_dataroot = catalog.esg_dataroot
        cat.file_start = catalog.file_start
        cat.file_end = catalog.file_end
        cat.project = catalog.project
        cat.df = catalog.df
        return cat

    def to_path_dict(
        self,
        prefer_streaming: bool = False,
        globus_endpoint: str | None = None,
        globus_path: Path = Path("/"),
        minimal_keys: bool = True,
        ignore_facets: None | str | list[str] = None,
        separator: str = ".",
        quiet: bool = False,
    ) -> dict[str, list[str | Path]]:
        """Return the current search as a dictionary of paths to files."""
        kwargs = {
            "prefer_streaming": prefer_streaming,
            "globus_endpoint": globus_endpoint,
            "globus_path": globus_path,
            "minimal_keys": minimal_keys,
            "ignore_facets": ignore_facets,
            "separator": separator,
            "quiet": quiet,
        }
        key = tuple((k, v) for k, v in kwargs.items() if k != "quiet")
        if key not in self._result:
            self._result[key] = super().to_path_dict(**kwargs)
        return self._result[key]


[docs] @dataclass class IntakeESGFDataset(DataElement): """A dataset that can be used to load data found using intake-esgf_.""" name: str """A unique name identifying the data.""" facets: Facets = field(repr=False) """Facets are key-value pairs that were used to find this data.""" catalog: intake_esgf.ESGFCatalog = field(repr=False) """The intake-esgf catalog describing this data.""" _attributes: dict[str, Any] | None = field( init=False, repr=False, default=None, ) def __hash__(self) -> int: """Return a number uniquely representing the data element.""" return hash((self.name, self.facets.get("version")))
[docs] def prepare(self) -> None: """Prepare the data for access.""" self.catalog.to_path_dict(minimal_keys=False) for index in self.catalog.indices: # Set the sessions to None to avoid issues with pickling # requests_cache.CachedSession objects when max_parallel_tasks > 1. # After the prepare step, the sessions for interacting with the # search indices are not needed anymore as all file paths required # to load the data have been found. To make sure we do not # accidentally use the sessions later on, we set them to None # instead of e.g. requests.Session objects. # # This seems the safest/fastest solution as it avoids accessing the # sqlite database backing the cached_requests.CachedSession from # multiple processes on multiple machines. index.session = None
@property def attributes(self) -> dict[str, Any]: """Attributes are key-value pairs describing the data.""" if self._attributes is None: msg = ( "Attributes have not been read yet. Call the `to_iris` method " "first to read the attributes from the file." ) raise ValueError(msg) return self._attributes @attributes.setter def attributes(self, value: dict[str, Any]) -> None: self._attributes = value
[docs] def to_iris(self) -> iris.cube.CubeList: """Load the data as Iris cubes. Returns ------- : The loaded data. """ files = self.catalog.to_path_dict( minimal_keys=False, quiet=True, )[self.name] dataset = self.catalog.to_dataset_dict( minimal_keys=False, add_measures=False, quiet=True, )[self.name] # Store the local paths in the attributes for easier debugging. dataset.attrs["source_file"] = ", ".join(str(f) for f in files) # Cache the attributes. self.attributes = copy.deepcopy(dataset.attrs) return dataset_to_iris(dataset)
[docs] @dataclass class IntakeESGFDataSource(DataSource): """Data source that can be used to find data using intake-esgf.""" name: str """A name identifying the data source.""" project: str """The project that the data source provides data for.""" priority: int """The priority of the data source. Lower values have priority.""" facets: dict[str, str] """Mapping between the ESMValCore and ESGF facet names.""" values: dict[str, dict[str, str]] = field(default_factory=dict) """Mapping between the ESMValCore and ESGF facet values.""" debug_info: str = field(init=False, repr=False, default="") """A string containing debug information when no data is found.""" catalog: intake_esgf.ESGFCatalog = field( init=False, repr=False, default_factory=intake_esgf.ESGFCatalog, ) """The intake-esgf catalog used to find data.""" def __post_init__(self): self.catalog.project = intake_esgf.projects.projects[ self.project.lower() ]
[docs] def find_data(self, **facets: FacetValue) -> list[IntakeESGFDataset]: """Find data. Parameters ---------- **facets : Find data matching these facets. Returns ------- : A list of data elements that have been found. """ # Select searchable facets and normalize so all values are `list[str]`. normalized_facets = { facet: [str(values)] if isinstance(values, str | int) else values for facet, values in facets.items() if facet in self.facets } # Filter out glob patterns as these are not supported by intake-esgf. non_glob_facets = { facet: values for facet, values in normalized_facets.items() if not any(_isglob(v) for v in values) } # Translate "our" facets to ESGF facets and "our" values to ESGF values. query = { their_facet: [ self.values.get(our_facet, {}).get(v, v) for v in non_glob_facets[our_facet] ] for our_facet, their_facet in self.facets.items() if our_facet in non_glob_facets } if ( "timerange" in facets and not _isglob(facets["timerange"]) # type: ignore[operator] ): start, end = _parse_period(facets["timerange"]) query["file_start"] = isodate.date_isoformat( isodate.parse_date(start.split("T")[0]), ) query["file_end"] = isodate.date_isoformat( isodate.parse_date(end.split("T")[0]), ) # Search ESGF. try: self.catalog.search(**query, quiet=True) except intake_esgf.exceptions.NoSearchResults: self.debug_info = ( "`intake_esgf.ESGFCatalog().search(" + ", ".join( [ f"{k}={v}" if isinstance(v, list) else f"{k}='{v}'" for k, v in query.items() ], ) + ")` did not return any results." ) return [] # Return a list of datasets, with one IntakeESGFDataset per dataset_id. result: list[IntakeESGFDataset] = [] # These are the keys in the dict[str, xarray.Dataset] returned by # `intake_esgf.ESGFCatalog.to_dataset_dict`. Taken from: # https://github.com/esgf2-us/intake-esgf/blob/c34124e54078e70ef271709a6d158edb22bcdb96/intake_esgf/catalog.py#L523-L528 self.catalog.df["key"] = self.catalog.df.apply( lambda row: ".".join( [row[f] for f in self.catalog.project.master_id_facets()], ), axis=1, ) inverse_values = { our_facet: { their_value: our_value for our_value, their_value in self.values[our_facet].items() } for our_facet in self.values } for _, row in self.catalog.df.iterrows(): dataset_id = row["key"] # Use a caching catalog to avoid searching the indices after # calling the ESGFFile.prepare method. cat = _CachingCatalog.from_catalog(self.catalog) # Subset the catalog to a single dataset. cat.df = cat.df[cat.df.key == dataset_id] # Ensure only the requested variable is included in the dataset. # https://github.com/esgf2-us/intake-esgf/blob/18437bff5ee75acaaceef63093101223b4692259/intake_esgf/catalog.py#L544-L552 if "short_name" in normalized_facets: cat.last_search[self.facets["short_name"]] = [ self.values.get("short_name", {}).get(v, v) for v in normalized_facets["short_name"] ] # Retrieve "our" facets associated with the dataset_id. dataset_facets = {"version": [f"v{row['version']}"]} for our_facet, esgf_facet in self.facets.items(): if esgf_facet in row: esgf_values = row[esgf_facet] if isinstance(esgf_values, str): esgf_values = [esgf_values] our_values = [ inverse_values.get(our_facet, {}).get(v, v) for v in esgf_values ] dataset_facets[our_facet] = our_values # Only return datasets that match the glob patterns. if all( any( _ismatch(v, p) for v in dataset_facets[f] for p in normalized_facets[f] ) for f in dataset_facets if f in normalized_facets ): dataset = IntakeESGFDataset( name=dataset_id, facets={ k: v[0] if len(v) == 1 else v for k, v in dataset_facets.items() }, # type: ignore[arg-type] catalog=cat, ) result.append(dataset) return result