Skip to content

Commit c73b136

Browse files
committed
Add get_virtual_reference(), in testing
1 parent db00537 commit c73b136

File tree

5 files changed

+150
-76
lines changed

5 files changed

+150
-76
lines changed

earthaccess/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@
2121
search_services,
2222
)
2323
from .auth import Auth
24-
from .dmrpp_zarr import open_virtual_dataset, open_virtual_mfdataset
25-
from .kerchunk import consolidate_metadata
24+
from .zarr import open_virtual_dataset, open_virtual_mfdataset, consolidate_metadata, get_virtual_reference
2625
from .search import DataCollection, DataCollections, DataGranule, DataGranules
2726
from .services import DataServices
2827
from .store import Store
@@ -59,6 +58,7 @@
5958
"Store",
6059
# kerchunk
6160
"consolidate_metadata",
61+
"get_virtual_reference",
6262
# virtualizarr
6363
"open_virtual_dataset",
6464
"open_virtual_mfdataset",

earthaccess/kerchunk.py

Lines changed: 0 additions & 74 deletions
This file was deleted.

earthaccess/zarr/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from .kerchunk import consolidate_metadata, get_virtual_reference
2+
from .dmrpp_zarr import open_virtual_dataset, open_virtual_mfdataset
3+
4+
__all__ = [
5+
"consolidate_metadata",
6+
"open_virtual_dataset",
7+
"open_virtual_mfdataset",
8+
"get_virtual_reference"
9+
]
File renamed without changes.

earthaccess/zarr/kerchunk.py

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
from __future__ import annotations
2+
3+
from typing import Optional, Union
4+
5+
import logging
6+
7+
import fsspec
8+
import fsspec.utils
9+
import s3fs
10+
11+
# import ipdb
12+
import earthaccess
13+
from uuid import uuid4
14+
import zipfile
15+
from pathlib import Path
16+
import json
17+
18+
logger = logging.getLogger(__name__)
19+
20+
21+
def _get_chunk_metadata(
22+
granule: earthaccess.DataGranule,
23+
fs: fsspec.AbstractFileSystem,
24+
) -> list[dict]:
25+
from kerchunk.hdf import SingleHdf5ToZarr
26+
27+
if not isinstance(granule, earthaccess.DataGranule) and isinstance(granule, dict):
28+
# WHY: dask serialization is doing something weird, it serializes the granule as a simple dict
29+
# we need to add cast it back to a datagranule to get the nice methods for parsing the data links
30+
# TODO: ask James what is going on
31+
granule = earthaccess.DataGranule(granule)
32+
33+
metadata = []
34+
access = "direct" if isinstance(fs, s3fs.S3FileSystem) else "indirect"
35+
# ipdb.set_trace()
36+
37+
for url in granule.data_links(access=access):
38+
with fs.open(url) as inf:
39+
h5chunks = SingleHdf5ToZarr(inf, url) # type: ignore
40+
m = h5chunks.translate()
41+
metadata.append(m)
42+
43+
return metadata
44+
45+
46+
def consolidate_metadata(
47+
granules: list[earthaccess.DataGranule],
48+
kerchunk_options: Optional[dict] = None,
49+
access: str = "direct",
50+
outfile: Optional[str] = None,
51+
storage_options: Optional[dict] = None,
52+
) -> Union[str, dict]:
53+
try:
54+
import dask
55+
56+
from kerchunk.combine import MultiZarrToZarr
57+
except ImportError as e:
58+
raise ImportError(
59+
"`earthaccess.consolidate_metadata` requires `dask` and `kerchunk` to be be installed"
60+
) from e
61+
62+
if access == "direct":
63+
fs = earthaccess.get_s3_filesystem(provider=granules[0]["meta"]["provider-id"])
64+
else:
65+
fs = earthaccess.get_fsspec_https_session()
66+
67+
# Get metadata for each granule
68+
get_chunk_metadata = dask.delayed(_get_chunk_metadata) # type: ignore
69+
70+
# ipdb.set_trace()
71+
chunks = dask.compute(*[get_chunk_metadata(g, fs) for g in granules]) # type: ignore
72+
chunks = sum(chunks, start=[])
73+
74+
# Get combined metadata object
75+
mzz = MultiZarrToZarr(chunks, **(kerchunk_options or {}))
76+
77+
if outfile is None:
78+
return mzz.translate()
79+
80+
output = fsspec.utils.stringify_path(outfile)
81+
mzz.translate(outfile, storage_options=storage_options or {})
82+
return output
83+
84+
def get_virtual_reference(short_name: str = "",
85+
cloud_hosted: bool=True,
86+
format: str ="parquet") -> Union[fsspec.FSMap, None]:
87+
"""
88+
Returns a virtual reference file for a given collection, this reference has to be created by the DAAC
89+
distributing the data. The reference mapper can be used directly in xarray as a Zarr store.
90+
"""
91+
92+
file_types = {
93+
"parquet": "parq.zip",
94+
"json": "json",
95+
}
96+
97+
98+
# Find collection-level metadata (UMM-C) on CMR:
99+
collections = earthaccess.search_datasets(short_name=short_name, cloud_hosted=cloud_hosted)
100+
101+
links = collections[0]["umm"].get("RelatedUrls", [])
102+
103+
# Look within UMM-C for links to virtual data set reference files:
104+
# I think both json or parquet should be under VIRTUAL COLLECTION
105+
refs = [e["URL"] for e in links if "Subtype" in e and (("VIRTUAL COLLECTION" in e["Subtype"]) or ("DATA RECIPE" in e["Subtype"]))]
106+
107+
108+
# Currently it is assumed that link descriptions have the following format:
109+
if refs:
110+
logger.info("Virtual data set reference file exists for this collection")
111+
link = [link for link in refs if link.endswith(file_types[format])][0]
112+
else:
113+
logger.info(
114+
"Virtual data set reference file does not exists in",
115+
"There may be a reference file in a different format, or else you will have to",
116+
"open this data set using traditional netCDF/HDF methods."
117+
)
118+
return None
119+
120+
# this assumes the ref point to s3 links, we'll have to refactor later
121+
http_fs = earthaccess.get_fsspec_https_session()
122+
fs = earthaccess.get_s3_filesystem(provider=collections[0]["meta"]["provider-id"])
123+
if link.endswith(".json"):
124+
with http_fs.open(link) as f:
125+
ref_loc = json.load(f)
126+
else:
127+
with http_fs.open(link, 'rb') as remote_zip:
128+
# Unzip the contents into the temporary directory
129+
with zipfile.ZipFile(remote_zip, 'r') as zip_ref:
130+
id = uuid4()
131+
local_path = Path(f".references/{id}")
132+
zip_ref.extractall(local_path)
133+
ref_loc = str([d for d in local_path.iterdir() if d.is_dir()][0])
134+
135+
storage_opts = {"fo": ref_loc,
136+
"remote_protocol": "s3",
137+
"remote_options": fs.storage_options}
138+
file_ref = fsspec.filesystem('reference', **storage_opts)
139+
return file_ref.get_mapper('')

0 commit comments

Comments
 (0)