"""
NOAA greenhouse gas data.
"""
from abc import ABCMeta
import datetime as dt
from functools import cached_property
import os
import pandas as pd
from typing import Literal, Union
import xarray as xr
from lair.config import GROUP_DIR
from lair.utils.records import ftp_download, list_files, Cacher
#: CarbonTracker data directory
CARBONTRACKER_DIR = os.path.join(GROUP_DIR, 'carbontracker')
#: NOAA GML data directory
GML_DIR = os.path.join(GROUP_DIR, 'gml')
[docs]
class CarbonTracker(metaclass=ABCMeta):
"""
NOAA CarbonTracker
Attributes
----------
specie : Literal['ch4', 'co2']
The greenhouse gas specie.
version : str
The CarbonTracker version.
directory : str
The directory for the version.
cache : bool
Whether to cache the data.
Methods
-------
get_specie_from_version(version)
Get the specie from the version.
from_version(version, carbon_tracker_directory=None)
Create a CarbonTracker object from the version.
download(sub_dirs=['fluxes', 'molefractions'], pattern=None)
Download CarbonTracker data from the NOAA GML FTP server.
"""
specie: Literal['ch4', 'co2']
[docs]
def __init__(self, version: str, carbon_tracker_directory: str | None=None,
cache: bool=True):
"""
Initialize a CarbonTracker object.
Parameters
----------
version : str
The version of CarbonTracker data to download.
Visit https://gml.noaa.gov/aftp/products/carbontracker/ to see available versions.
carbon_tracker_dir : str, optional
The directory to download the data to, by default CARBONTRACKER_DIR.
cache : bool, optional
Whether to cache the data, by default True.
"""
self.version = version
carbon_tracker_directory = carbon_tracker_directory or CARBONTRACKER_DIR
self.directory = os.path.join(carbon_tracker_directory, self.specie, version)
self.cache = cache
def __repr__(self):
return f"{self.__class__.__name__}(version={self.version}, directory={self.directory})"
def __str__(self):
return f"{self.__class__.__name__}({self.version})"
[docs]
@staticmethod
def get_specie_from_version(version: str) -> Literal['ch4', 'co2']:
"""
Get the specie from the version.
Parameters
----------
version : str
The version of CarbonTracker data.
Returns
-------
Literal['ch4', 'co2']
The specie.
"""
return 'ch4' if 'ch4' in version.lower() else 'co2'
[docs]
@staticmethod
def from_version(version: str, carbon_tracker_directory: str | None=None):
"""
Create a CarbonTracker object from the version.
Parameters
----------
version : str
The version of CarbonTracker data to download.
Visit https://gml.noaa.gov/aftp/products/carbontracker/ to see available versions.
carbon_tracker_directory : str, optional
The directory to download the data to, by default CARBONTRACKER_DIR.
Returns
-------
CarbonTracker
The CarbonTracker object.
"""
specie = CarbonTracker.get_specie_from_version(version)
if specie == 'co2':
raise ValueError("CarbonTrackerCO2 not yet implemented")
return CarbonTrackerCO2(version, directory)
elif specie == 'ch4':
return CarbonTrackerCH4(version, carbon_tracker_directory)
else:
raise ValueError("Invalid specie")
[docs]
def download(self, sub_dirs: list[str]=['fluxes', 'molefractions'],
pattern: str=None):
"""
Download CarbonTracker data from the NOAA GML FTP server.
Parameters
----------
sub_dirs : list of str, optional
The subdirectories to download data from, by default ['fluxes', 'molefractions'].
If None, download the entire version data.
pattern : str, optional
The pattern to match against the files, by default None
"""
host = 'ftp.gml.noaa.gov'
parent = '/products/carbontracker'
# Build list of remote paths to download
path = f'{parent}/{self.specie}/{self.version}'
paths = [f'{path}/{sub_dir}' for sub_dir in sub_dirs]
# Download the data
ftp_download(host, paths, self.directory, prefix=path, pattern=pattern)
return None
[docs]
class CarbonTrackerCH4(CarbonTracker):
"""
NOAA CarbonTracker-CH4
Attributes
----------
molefractions : xr.Dataset
The molefractions Dataset.
Methods
-------
calc_molefractions_pressure(molefractions)
Calculate the pressure at each level in the molefractions Dataset.
"""
specie = 'ch4'
[docs]
def __init__(self, version='CT-CH4-2023', carbon_tracker_directory=None, cache=True,
parallel_parse=True):
super().__init__(version, carbon_tracker_directory, cache)
self.parallel_parse = parallel_parse
@staticmethod
def _preprocess_molefractions(ds):
time_components = ds['time_components'].values
time = [dt.datetime(*row) for row in time_components]
ds = ds.assign_coords(time=time)
ds = ds.drop_vars('time_components')
return ds
@cached_property
def molefractions(self) -> xr.Dataset:
'Molefractions Dataset. Cached property.'
path = os.path.join(self.directory, 'molefractions')
files = list_files(path, '*nc', full_names=True, recursive=True)
if self.cache:
from lair.config import CACHE_DIR
cache_file = os.path.join(CACHE_DIR, 'carbontracker', self.specie, self.version, 'molefractions.pkl')
open_mfdataset = Cacher(xr.open_mfdataset, cache_file)
else:
open_mfdataset = xr.open_mfdataset
ds = open_mfdataset(files, preprocess=CarbonTrackerCH4._preprocess_molefractions,
parallel=self.parallel_parse)
return ds
[docs]
@staticmethod
def calc_molefractions_pressure(molefractions) -> xr.Dataset:
"""
Calculate the pressure at each level in the molefractions Dataset.
Parameters
----------
molefractions : xr.Dataset
The molefractions Dataset.
Returns
-------
xr.Dataset
The molefractions Dataset with the pressure calculated.
"""
molefractions['P'] = (molefractions.at
+ molefractions.bt * molefractions.surf_pressure)
molefractions['P'] /= 100 # Convert to hPa
molefractions['P'].attrs = {'long_name': 'Pressure', 'units': 'hPa',
'comment': 'Calculated from hybrid sigma-pressure coefficients and surface pressure.'}
return molefractions
[docs]
class Flask:
"""
NOAA GML Flask
Attributes
----------
specie : str
The greenhouse gas specie.
site : str
The site where the flask samples were collected.
platform : str, optional
The platform where the flask samples were collected, by default 'surface'.
lab_id : int, optional
The lab ID, by default 1.
measurement_group : str, optional
The measurement group, by default 'ccgg'.
frequency : str, optional
The frequency of the measurements, by default 'event'.
driver : str, optional
The driver to use to read the data, by default 'pandas'.
gml_dir : str, optional
The NOAA GML directory to download the data to, by default GML_DIR.
directory : str
The directory for the Flask data.
filename : str
The filename for the Flask data.
filepath : str
The filepath for the Flask data.
data : pd.DataFrame | xr.Dataset
The Flask data.
file_template : str
The template for the Flask filename.
driver_ext : dict
The driver extensions.
"""
file_template = '{specie}_{site}_{platform}-flask_{lab_id}_{measurement_group}_{frequency}.{ext}'
driver_ext = {
'pandas': 'txt',
'xarray': 'nc'
}
[docs]
def __init__(self, specie: str, site: str,
platform: Literal['surface', 'shipboard']='surface',
lab_id: int=1,
measurement_group: Literal['ccgg', 'sil']='ccgg',
frequency: Literal['event', 'month']='event',
driver: Literal['pandas', 'xarray']='pandas',
gml_dir: str|None=None):
"""
Initialize a Flask object.
Parameters
----------
specie : str
The greenhouse gas specie.
site : str
The site where the flask samples were collected.
platform : str, optional
The platform where the flask samples were collected, by default 'surface'.
lab_id : int, optional
The lab ID, by default 1.
measurement_group : str, optional
The measurement group, by default 'ccgg'.
frequency : str, optional
The frequency of the measurements, by default 'event'.
driver : str, optional
The driver to use to read the data, by default 'pandas'.
gml_dir : str, optional
The NOAA GML directory to download the data to, by default GML_DIR.
"""
self.specie = specie
self.site = site
self.platform = platform
self.lab_id = lab_id
self.measurement_group = measurement_group
self.frequency = frequency
self.driver = driver
self.ext = self.driver_ext[driver]
self.gml_dir = gml_dir or GML_DIR
self.directory = os.path.join(self.gml_dir, specie, 'flask')
self.filename = self.file_template.format(**self.__dict__)
self.filepath = os.path.join(self.directory, self.filename)
def __repr__(self):
return f'Flask(specie={self.specie}, site={self.site}, platform={self.platform}, lab_id={self.lab_id}, measurement_group={self.measurement_group}, frequency={self.frequency}, driver={self.driver})'
def __str__(self):
return f'NOAA GML Flask({self.specie}, {self.site})'
def download(self):
host = 'ftp.gml.noaa.gov'
path = f'/data/trace_gases/{self.specie}/flask/surface/{self.ext}/{self.filename}'
ftp_download(host, path, self.directory)
return self.filepath
@cached_property
def data(self):
if self.driver == 'pandas':
data = pd.read_csv(self.filepath, sep=' ', comment='#',
parse_dates=['datetime'])
data['datetime'] = data.datetime.dt.tz_localize(None)
data = data.dropna(subset=['datetime']).set_index('datetime').sort_index()
elif self.driver == 'xarray':
data = xr.open_dataset(self.filepath)
times = data.time.values
data = data.drop_vars('time').assign_coords(time=('obs', times))
else:
raise ValueError("Invalid driver")
return data
[docs]
@staticmethod
def apply_qaqc(data: Union[pd.DataFrame, xr.Dataset], driver: str='pandas'):
"""
Apply QA/QC to the Flask data.
Parameters
----------
data : pd.DataFrame | xr.Dataset
The Flask data.
driver : str, optional
The driver to use to read the data, by default 'pandas'.
Returns
-------
pd.DataFrame | xr.Dataset
The Flask data with QA/QC applied.
"""
if driver == 'pandas':
data = data[data.qcflag == '...']
elif driver == 'xarray':
data = data.where(data.qcflag == '...')
else:
raise ValueError("Invalid driver")
return data