Source code for lair.air.soundings

"""
Upper air sounding data.
"""

try:
    from siphon.simplewebservice.wyoming import WyomingUpperAir
except ImportError:
    print('siphon not installed. Please install siphon to use the soundings module.')

from collections import deque
import datetime as dt
import os
import pandas as pd
import requests
from time import sleep
import xarray as xr

from lair.config import vprint, GROUP_DIR
from lair import units
from lair.constants import Rd, cp
from lair.air.meteorology import ideal_gas_law, hypsometric, poisson


#: Sounding data directory
SOUNDING_DIR = os.path.join(GROUP_DIR, 'soundings')


[docs] class Sounding: """ Upper air sounding data. Attributes ---------- path : str The path to the sounding data. filename : str The filename of the sounding data. station : str The station identifier. time : datetime The date and time of the sounding. data : pd.DataFrame The sounding data. Methods ------- interpolate(start=1289, stop=5000, interval=10) Interpolate sounding data to regular height intervals. """ _attrs = ['station', 'time', 'station_number', 'latitude', 'longitude', 'elevation', 'pw']
[docs] def __init__(self, path: str): """ Initialize a Sounding object. Strips the station identifier and time from the filename and reads the data. Parameters ---------- path : str The path to the sounding data. """ self.path = path self.filename = os.path.basename(path) self.station = self.filename.split('_')[0] self.time = dt.datetime.strptime(self.filename.split('_')[1].split('.')[0], '%Y%m%d%H') self.data = pd.read_csv(path, parse_dates=['time']) self.data.drop(columns=['station', 'time'], inplace=True) for attr in self._attrs: if attr in self.data.columns: setattr(self, attr, self.data[attr].iloc[0]) self.data.drop(columns=attr, inplace=True)
[docs] def interpolate(self, start=1289, stop=5000, interval=10): """Interpolate sounding data to a specified height. Parameters ---------- start : int The starting height. stop : int The stopping height. step : int The height step. Returns ------- xr.Dataset The interpolated sounding data. """ height = range(start, stop, interval) df = pd.DataFrame(index=height) data_height = self.data.dropna(subset='height').set_index('height') data_height['raw'] = True merged = pd.concat([df, data_height]) merged.index.name = 'height' merged = merged.sort_values(['height', 'raw']) data = merged.interpolate(method='index') data = data.loc[data.raw.isna() & (data.index >= start) & (data.index < stop)] data.drop(columns='raw', inplace=True) ds = data.to_xarray().expand_dims(time=[self.time]) ds['pw'] = (('time', ), [self.pw]) attrs = {attr: getattr(self, attr) for attr in self._attrs if hasattr(self, attr) and attr not in ['time', 'pw']} ds.attrs.update(attrs) ds.attrs['interpolation_interval'] = interval ds.attrs['notes'] = f'Interpolated to {start}-{stop}m at {interval}m intervals.' return ds
def plot(self): # TODO raise NotImplementedError
[docs] def merge(soundings: list) -> pd.DataFrame: """ Merge a list of sounding data. Parameters ---------- soundings : list A list of Sounding objects. Returns ------- pd.DataFrame The merged sounding data. """ dfs = [] for sounding in soundings: df = sounding.data.copy() df['time'] = sounding.time df['pw'] = sounding.pw dfs.append(df) return pd.concat(dfs)
[docs] def download_sounding(station, date, dst=None) -> str: """ Download an upper air sounding from the Wyoming archive. Parameters ---------- station : str The 4-letter station identifier. # FIXME: 3-letter? date : datetime The date and time. dst : str The destination directory. Returns ------- str The path to the downloaded sounding data. """ if dst is None: dst = os.path.join(SOUNDING_DIR, station) os.makedirs(dst, exist_ok=True) path = os.path.join(dst, f'{station}_{date:%Y%m%d%H}.csv') if not os.path.exists(path): print(f'Downloading {station} on {date:%Y-%m-%d %H:%M}...') df = WyomingUpperAir.request_data(date, station) df.to_csv(path, index=False) else: print(f'{station} on {date:%Y-%m-%d %H:%M} already exists.') return path
[docs] def download_soundings(station, start, end, dst=None, months=None): """ Download upper air soundings from the Wyoming archive. Parameters ---------- station : str The 4-letter station identifier. start : datetime The start date and time. end : datetime The end date and time. dst : str The destination directory. months : list The months to download. """ print('Downloading soundings...') dates = pd.date_range(start, end, freq='12h') if months: dates = dates[dates.month.isin(months)] to_download = deque(dates) while len(to_download) > 0: date = to_download.popleft() try: download_sounding(station, date, dst) except IndexError as e: print(f'Error downloading {station} on {date:%Y-%m-%d %H:%M}: {e}') continue except ValueError as e: print(f'Error downloading {station} on {date:%Y-%m-%d %H:%M}: {e}') if 'No data available' in str(e): continue else: raise e except requests.exceptions.HTTPError as e: print(f'Error downloading {station} on {date:%Y-%m-%d %H:%M}: {e}') if 'Please try again later' in str(e): print('Trying again in 2 seconds...') to_download.appendleft(date) sleep(2) else: raise e
[docs] def get_soundings(station='SLC', start=None, end=None, sounding_dir=None, months=None, driver='xarray', **kwargs): """ Get upper air soundings from the Wyoming archive. Parameters ---------- station : str The 4-letter station identifier. start : datetime The start date and time. end : datetime The end date and time. sounding_dir : str The directory containing the sounding data. Returns ------- pd.DataFrame The sounding data. """ if sounding_dir is None: sounding_dir = os.path.join(SOUNDING_DIR, station) files = os.listdir(sounding_dir) if len(files) == 0: print('No soundings found. Downloading...') if not all([start, end]): raise ValueError('start and end must be specified if no soundings are found.') download_soundings(station, start, end, sounding_dir, months) soundings = [] for file in files: # Skip files that don't match the date range date = file.split('_')[1].split('.')[0] date = dt.datetime.strptime(date, '%Y%m%d%H') if start: if date <= start: continue if end: if date > end: continue path = os.path.join(sounding_dir, file) try: soundings.append(Sounding(path)) except Exception as e: print(f'Error reading file {path}: {e}') continue if driver in ['xarray', 'nc']: data = xr.concat([sounding.interpolate() for sounding in soundings], dim='time').sortby('time') elif driver in ['pandas', 'csv']: data = merge(soundings) else: raise ValueError(f'Invalid driver: {driver}') return data
[docs] def valleyheatdeficit(data: xr.Dataset, integration_height=2200) -> xr.DataArray: """ Calculate the valley heat deficit. Whiteman, C. David, et al. “Relationship between Particulate Air Pollution and Meteorological Variables in Utah's Salt Lake Valley.” Atmospheric Environment, vol. 94, Sept. 2014, pp. 742-53. DOI.org (Crossref), https://doi.org/10.1016/j.atmosenv.2014.06.012. Parameters ---------- data : xr.DataSet The sounding data. integration_height : int The height to integrate to [m]. Returns ------- xr.DataArray The valley heat deficit [MJ/m^2]. """ h0 = data.elevation # Subset to the heights between the surface and the integration height data = data.sel(height=slice(h0, integration_height)) T = data.temperature.pint.quantify('degC').pint.to('degK') p = data.pressure.pint.quantify('hPa').pint.to('Pa') # Calculate potential temperature using poisson's equation theta = poisson(T=T, p=p, p0=1e5 * units('Pa')) theta_h = theta.sel(height=integration_height, method='nearest') # Calculate virtual temperature to account for water vapor Tv = hypsometric(p1=p.isel(height=slice(0, -1)).values, p2=p.isel(height=slice(1, None)).values, deltaz=data.interpolation_interval * units('m')) layer_heights = T.height.values[:-1] + data.interpolation_interval / 2 Tv = xr.DataArray(Tv, coords=[data.time, layer_heights], dims=['time', 'height'])\ .interp_like(T, method='linear')\ .pint.quantify('degK') # Calculate the density using the ideal gas law rho = ideal_gas_law(solve_for='rho', p=p, T=Tv, R=Rd) # Set pint units - Setting units to kg/m3 doesnt change the numbers # pint-xarray hasnt implemented .to_base_units() yet # when they do, we can change this to .pint.to_base_units() rho = rho.pint.to('kg/m^3') # Calculate the heat deficit by integrating using the trapezoid method heat_deficit = (cp * rho * (theta_h - theta)).dropna('height', how='all')\ .integrate('height') * (1 * units('m')) # J/m2 return heat_deficit.pint.to('MJ/m^2').pint.dequantify()