"""
This module provides classes and functions for working with UATAQ sites.
"""
from collections import defaultdict
import datetime as dt
from datetime import timezone
import geopandas as gpd
import json
import pandas as pd
from typing import Sequence, Union, Literal
from lair.config import vprint
from lair.uataq import errors, filesystem, instruments
from lair.utils.clock import TimeRange
_all_or_mult_strs = Union[Literal['all'], str, list[str], tuple[str, ...], set[str]]
[docs]
class Site:
"""
A class representing a site where atmospheric measurements are taken.
Attributes
----------
SID : str
The site identifier.
config : dict
A dictionary containing configuration information for the site.
instruments : InstrumentEnsemble
An instance of the InstrumentEnsemble class representing the instruments at the site.
groups : set of str
The research groups that collect data at the site.
loggers : set of str
The loggers used by research groups that record data at a site.
pollutants : set of str
The pollutants measured at the site.
Methods
-------
read_data(instruments='all', lvl=None, time_range=None, num_processes=1, file_pattern=None)
Read data for each instrument for specified level.
read_obs(pollutants='all', format='wide', time_range=None, num_processes=1)
Read observations for each pollutant, combining instruments by pollutants.
get_recent_obs(recent=dt.timedelta(days=10), lvl='qaqc')
Get recent observations from site instruments.
"""
[docs]
def __init__(self, SID: str, config: dict, instruments: instruments.InstrumentEnsemble):
"""
Initializes a Site object with the given site ID.
Parameters
----------
SID : str
The site identifier.
config : dict
A dictionary containing configuration information for the site:
.. code-block:: python
{
name: str,
is_active: bool,
is_mobile: bool,
latitude: float,
longitude: float,
zagl: float,
loggers: dict,
instruments: {
instrument: {
loggers: dict
installation_date: str,
removal_date: str,
}
}
}
instruments : InstrumentEnsemble
An instance of the InstrumentEnsemble class representing the instruments at the site.
"""
self.SID = SID
self.config = config
self.instruments = instruments
self.groups = instruments.groups
self.loggers = instruments.loggers
self.pollutants = instruments.pollutants
# Build pollutant: instruments lookup table
self.pollutant_instruments = defaultdict(list)
for instrument in self.instruments:
if hasattr(instrument, 'pollutants'):
for pollutant in getattr(instrument, 'pollutants'):
self.pollutant_instruments[pollutant].append(instrument)
def __repr__(self):
cls = self.__class__.__name__
config = json.dumps(self.config, indent=4)
instruments = repr(self.instruments)
return f'{cls}(SID="{self.SID}", config={config}, instruments={instruments})'
def __str__(self):
return f"{self.__class__.__name__}: {self.SID}"
[docs]
def read_data(self, instruments: _all_or_mult_strs = 'all',
group: str | None = None,
lvl: str | None = None,
time_range: TimeRange | TimeRange._input_types = None,
num_processes: int | Literal['max'] = 1,
file_pattern: str | None = None) -> dict[str, pd.DataFrame]:
"""
Read data for the specified instruments and level.
Parameters
----------
instruments : str or list of str or 'all'
The instrument(s) to read data from. If 'all', read data from all instruments.
Default is 'all'.
group : str, optional
The research group to read data from. Default is None which uses the default group.
lvl : str, optional
The data level to read. Default is None which reads the highest level available.
time_range : str | list[Union[str, dt.datetime, None]] | tuple[Union[str, dt.datetime, None], Union[str, dt.datetime, None]] | slice | None
The time range to read data. Default is None which reads all available data.
num_processes : int or 'max'
The number of processes to use for reading data. Default is 1.
file_pattern : str, optional
The file pattern to use for filtering files. Default is None.
Returns
-------
dict[str, pandas.DataFrame]
A dictionary containing the data for each instrument.
Raises
------
ReaderError
If no data is found for the specified instruments.
"""
# Format instruments
if instruments == 'all':
instruments = self.instruments.names
elif isinstance(instruments, str):
instruments = [instruments.lower()]
elif isinstance(instruments, Sequence):
instruments = [i.lower() for i in instruments]
# Determine group to read data from
group = filesystem.get_group(group)
# Read data for each instrument and store in dictionary
data = {}
for name in instruments:
if name not in self.instruments:
raise errors.InstrumentNotFoundError(name, self.instruments)
instrument = self.instruments[name]
try:
data[name] = instrument.read_data(group, lvl, time_range,
num_processes, file_pattern)
except errors.ReaderError as e:
vprint(f'Error reading {instrument} data from {group} groupspace: {e}')
if not data:
raise errors.ReaderError(f'No data found for {instruments} at {self.SID} in {group} groupspace.')
return data
[docs]
def get_obs(self, pollutants: _all_or_mult_strs = 'all',
format: Literal['wide'] | Literal['long'] = 'wide',
group: str | None = None,
time_range: TimeRange._input_types = None,
num_processes: int | Literal['max'] = 1
) -> pd.DataFrame:
"""
Get observations for each pollutant, combining instruments by pollutants.
Parameters
----------
pollutants : str or list of str, optional
pollutants to read. If 'all', read all pollutants. Default is 'all'.
format : str, optional
Format of the data to return. Default is 'wide'.
group : str, optional
Research group to read data from. Default is None which uses the default group.
time_range : str | list[Union[str, dt.datetime, None]] | tuple[Union[str, dt.datetime, None], Union[str, dt.datetime, None]] | slice | None
The time range to read data. Default is None which reads all available data.
num_processes : int, optional
Number of processes to use for reading data. Default is 1.
Returns
-------
Union[Dict[str, pandas.DataFrame], pandas.DataFrame]
A dictionary of dataframes, one for each level of data read, or a single dataframe if only one level was read.
The keys of the dictionary are the names of the levels ('calibrated', 'qaqc', 'raw'), and the values are the
corresponding dataframes. If only one level was read, the method returns the corresponding dataframe directly.
"""
lvl = 'final'
if pollutants == 'all':
pollutants = self.pollutants
elif isinstance(pollutants, str):
pollutants = [pollutants.upper()]
elif isinstance(pollutants, Sequence):
pollutants = [p.upper() for p in pollutants]
if any(p not in self.pollutants for p in pollutants):
raise ValueError(f"Invalid pollutant(s): '{set(pollutants) - set(self.pollutants)}'")
# Get instruments for each pollutant
instruments_to_read = {
instrument.name for pollutant in pollutants
for instrument in self.pollutant_instruments[pollutant]
}
# Read data
data = self.read_data(instruments_to_read, group,
lvl, time_range, num_processes)
# Reshape data
vprint('Combining data by pollutant...')
if format == 'wide':
obs = pd.concat(data.values())
obs = obs.filter(regex='|'.join(pollutants)) # filter columns by pollutants
obs = obs.dropna(how='all')
elif format == 'long':
melted_dfs = []
for instrument, df in data.items():
df_reset = df.reset_index()
melted_df = df_reset.melt(id_vars='Time_UTC', value_vars=df.columns,
var_name='pollutant', value_name='value')
melted_dfs.append(melted_df)
obs = pd.concat(melted_dfs)
# Filter columns by pollutants
obs = obs[obs['pollutant'].str.contains('|'.join(pollutants))]
obs.dropna(subset='value', inplace=True)
obs.set_index('Time_UTC', inplace=True)
else:
raise ValueError(f"Invalid format '{format}'. Must be 'wide' or 'long'.")
return obs.sort_index()
[docs]
def get_recent_obs(self, recent: str | dt.timedelta = dt.timedelta(days=10),
pollutants: _all_or_mult_strs = 'all',
format: Literal['wide'] | Literal['long'] = 'wide',
group: str | None = None) -> pd.DataFrame:
'''
Get recent observations from site instruments.
Parameters
----------
recent : str or datetime.timedelta, optional
Time range to get recent observations. Default is 10 days.
pollutants : str or list of str, optional
Pollutants to read. If 'all', read all pollutants. Default is 'all'.
format : str, optional
Format of the data to return. Default is 'wide'.
group : str, optional
Research group to read data from. Defaults to None which uses the default group.
Returns
-------
pandas.DataFrame
A dataframe containing recent observations from site instruments.
'''
if isinstance(recent, str):
recent = pd.to_timedelta(recent)
start_time = dt.datetime.now(timezone.utc).replace(tzinfo=None) - recent
return self.get_obs(pollutants, format, group, [start_time, None])
[docs]
class MobileSite(Site):
"""
A class representing a mobile site where atmospheric measurements are taken.
Parameters
----------
SID : str
The site identifier.
config : dict
A dictionary containing configuration information for the site:
.. code-block:: python
{
...
is_mobile: True,
instruments: {
instrument: {...}
}
...
}
"""
_pilot_sites = ['trx01', 'trx02']
[docs]
@staticmethod
def merge_gps(obs: pd.DataFrame, gps: pd.DataFrame,
on: str | None = None,
obs_on: str | None = None, gps_on: str | None = None
) -> pd.DataFrame:
"""
Merge observation data with location data from GPS.
Parameters
----------
obs (pd.DataFrame): The observation data.
gps (pd.DataFrame): The GPS location data.
on (str, optional): The column name to merge on. Defaults to 'Time_UTC'.
obs_on (str, optional): The column name in the observation data to merge on. If not specified, it will use the value of 'on'.
gps_on (str, optional): The column name in the GPS data to merge on. If not specified, it will use the value of 'on'.
Returns
-------
pd.DataFrame: The merged data with added location information.
"""
def truncate(time):
return time.dt.floor('s')
vprint('Merging obs data with location data from gps...')
# Reset datetime index
obs = obs.reset_index()
gps = gps.reset_index()
# Merge on Time_UTC by default unless specified
if on is None:
on = 'Time_UTC'
obs_on = obs_on or on
gps_on = gps_on or on
# Convert to datetime
obs[obs_on] = pd.to_datetime(obs[obs_on], errors='coerce')
gps[gps_on] = pd.to_datetime(gps[gps_on], errors='coerce')
# Drop rows with missing obs, time, or location
obs.dropna(how='all', inplace=True)
gps.dropna(subset=[gps_on, 'Latitude_deg', 'Longitude_deg'],
inplace=True)
# Truncate time to seconds
obs[obs_on] = truncate(obs[obs_on])
gps[gps_on] = truncate(gps[gps_on])
# Perform merge
obs = obs.merge(gps, how='inner', left_on=obs_on, right_on=gps_on,
suffixes=('', '_gps'))
# Set Time_UTC as index
obs.set_index('Time_UTC', inplace=True)
# Convert to geodataframe
obs = gpd.GeoDataFrame(obs, crs='EPSG:4326',
geometry=gpd.points_from_xy(obs.Longitude_deg,
obs.Latitude_deg))
return obs
[docs]
def get_obs(self, pollutants: _all_or_mult_strs = 'all',
format: Literal['wide'] | Literal['long'] = 'wide',
group: str | None = None,
time_range: TimeRange._input_types = None,
num_processes: int | Literal['max'] = 1
) -> pd.DataFrame:
"""
Get mobile site observations for each pollutant, combining instruments by pollutants,
and merging location data from GPS.
Parameters
----------
pollutants : str or list of str, optional
pollutants to read. If 'all', read all pollutants. Default is 'all'.
format : str, optional
Format of the data to return. Default is 'wide'.
group : str, optional
Research group to read data from. Default is None which uses the default group.
time_range : list of str, optional
Time range to read data. Default is None.
num_processes : int, optional
Number of processes to use for reading data. Default is 1.
Returns
-------
pandas.DataFrame
A dataframe containing mobile site observations for each pollutant
with location data merged.
"""
# Determine group to read data from
group = filesystem.get_group(group)
# Read data
obs = super().get_obs(pollutants, format, group, time_range, num_processes)
gps = self.read_data('gps', group, 'final', time_range, num_processes)['gps']
# Merge gps data with obs data
if group == 'lin':
# Can't always trust the pi's time for lin-group mobile data
# but Pi_Time connects the gps data to the obs data
# so we'll merge on Pi_Time and use Time_UTC from gps as the time
merge_on = 'Pi_Time'
obs.index.name = 'Pi_Time'
elif group == 'horel':
merge_on = 'Time_UTC'
else: # FIXME just check for lin group?
raise ValueError(f"Invalid group '{group}'. Must be 'lin' or 'horel'.")
obs = MobileSite.merge_gps(obs, gps, on=merge_on)
if group == 'lin':
obs.drop(columns=['Pi_Time'], inplace=True)
return obs
@staticmethod
def plot(obs, ax=None):
import cartopy.crs as ccrs
import matplotlib.pyplot as plt
# FIXME is this the best way to do this?
obs['lon'] = obs.Longitude_deg.round(3)
obs['lat'] = obs.Latitude_deg.round(3)
# keep only most recent for each lat/lon
if ax is not None:
fig, ax = plt.subplots(subplot_kw={'projection': ccrs.PlateCarree()})
# ax.set_extent([SLV_bounds[0], SLV_bounds[2],
# SLV_bounds[1], SLV_bounds[3]], crs=ccrs.PlateCarree())
return ax