Source code for sensortoolkit.lib_utils._flatten_datasets

# -*- coding: utf-8 -*-
"""
This module contains a method for converting datasets for AirSensor and
ReferenceMonitor objects corresponding to instruments that have been collocated
at an ambient monitoring site into a single pandas DataFrame object and
(optionally) saved as comma-separated value files for each sampling or
averaging interval present in sensor and reference datasets.

================================================================================

@Author:
  | Samuel Frederick, NSSC Contractor (ORAU)
  | U.S. EPA / ORD / CEMM / AMCD / SFSB


Created:
  Mon Jan 31 11:06:57 2022
Last Updated:
  Mon Jan 31 11:06:57 2022
"""
import os
import pandas as pd
import numpy as np
from sensortoolkit.datetime_utils import get_todays_date
from sensortoolkit.lib_utils import flatten_list

[docs]def flatten_datasets(AirSensor, ReferenceMonitor, verbose=True, include_units=True, write_to_file=False): """ Args: AirSensor (sensortoolkit.AirSensor): DESCRIPTION. ReferenceMonitor (sensortoolkit.ReferenceMonitor): DESCRIPTION. write_to_file (bool, optional): DESCRIPTION. Defaults to False. Returns: flat_dict (dict): DESCRIPTION. """ state_fips_codes = pd.read_csv(os.path.join(__file__, '..', 'us_fips_codes.csv')) site_info_cols = ['Agency', 'Site_Name', 'Site_AQS','Site_Lat', 'Site_Lon', 'Data_Source', 'Data_Acquisition_Date_Time'] ref_intervals = list(set(flatten_list([list(ReferenceMonitor.data[key].keys()) for key in ReferenceMonitor.data]))) flat_dict = {} meta_dfs = {} min_ref_df = pd.DataFrame() intervals = list(set().union(list(AirSensor.data.keys()), ref_intervals)) for interval in intervals: flat_df = pd.DataFrame() print(f'Flattening {interval} datasets') if interval in AirSensor.data: for sensor_key in AirSensor.data[interval]: sensor_df = AirSensor.data[interval][sensor_key] suffix = f'_{sensor_key}' sensor_df = sensor_df.add_suffix(suffix) flat_df = flat_df.join(sensor_df, how='outer') else: flat_df = pd.DataFrame() for classifier in ReferenceMonitor.data.keys(): for ref_interval in ReferenceMonitor.data[classifier]: if ref_interval == interval: ref_df = ReferenceMonitor.data[classifier][interval] site_info = ref_df[site_info_cols] ref_df = ref_df.drop(columns=site_info_cols) suffix = f'_Ref' ref_df = ref_df.add_suffix(suffix) param_ref_cols = [param.replace('_Value', '') for param in ref_df.columns if '_Value' in param] source = site_info.Data_Source.dropna().unique()[0] for param in param_ref_cols: ref_df[f'{param}_Data_Source'] = source if (interval != '1-minute'): if not flat_df.empty: flat_df = flat_df.join(ref_df, how='outer') else: min_ref_df = min_ref_df.join(ref_df, how='outer') verbose_cols = [] ref_verbose_cols = [] if not min_ref_df.empty: ref_verbose_cols, min_ref_df = verbose_columns(min_ref_df, ref_verbose_cols, include_units, verbose, interval) if not flat_df.empty: if (interval in ref_intervals) and interval != '1-minute': flat_df = flat_df.join(site_info) #ref_sources = list(set(ref_sources)) #flat_df.Data_Source = ', '.join(ref_sources) verbose_cols.extend(site_info) verbose_cols, flat_df = verbose_columns(flat_df, verbose_cols, include_units, verbose, interval) if (not verbose) and (not flat_df.empty): #print(flat_df.columns, verbose_cols) flat_df, meta_df = remove_verbose(flat_df, verbose_cols, include_units) if not min_ref_df.empty: min_ref_df, _ = remove_verbose(min_ref_df, ref_verbose_cols, include_units) if write_to_file: print('..writing flattened dataset to .csv') today = get_todays_date() interv = interval.replace('-', '_') #interv_str = interval.replace('hour', 'hr') bdate = AirSensor.bdate.strftime('%y%m%d') edate = AirSensor.edate.strftime('%y%m%d') state_abbrev = '' if ReferenceMonitor.site_id != 'Unspecified Site ID': state_fips = int(ReferenceMonitor.site_id[0:2]) state_abbrev = state_fips_codes[state_fips_codes.FIPS_Code==state_fips]['Abbreviation'].unique()[0] #site_address = test_loc['site_address'].split(', ') state_abbrev = f'_{state_abbrev}_' if (not verbose) and (not flat_df.empty): meta_dfs[interval] = meta_df if not min_ref_df.empty: file_name= f'Ref{state_abbrev}{interv}_bdate{bdate}_edate{edate}_{today}.csv' file_path = os.path.join(AirSensor.project_path, 'data', 'eval_stats', AirSensor.name, file_name) drop_cols = [col for col in verbose_cols if col in min_ref_df] min_ref_df = min_ref_df.drop(columns=drop_cols) min_ref_df.to_csv(file_path, float_format='%.2f') min_ref_df = pd.DataFrame() if pd.to_timedelta(interv.replace('_', ' ')) < pd.to_timedelta('1 h'): interv = 'RecRes' sensor_name = f'{AirSensor.make.replace(" ", "-")}_{AirSensor.model.replace(" ", "-")}' file_name= f'{sensor_name}{state_abbrev}{interv}_bdate{bdate}_edate{edate}_{today}.csv' file_path = os.path.join(AirSensor.project_path, 'data', 'eval_stats', AirSensor.name, file_name) if not flat_df.empty: for sensor_key in AirSensor.data[interval]: for param in ['PM25', 'O3', 'PM10', 'NO2', 'CO', 'SO2']: if ((f'{param}_Value_{sensor_key}' in flat_df.columns) and (f'{param}_Value_Ref' in flat_df.columns)): flat_df[f'{param}_Ratio_{sensor_key}'] = flat_df[f'{param}_Value_{sensor_key}'] / flat_df[f'{param}_Value_Ref'] flat_df[f'{param}_Diff_{sensor_key}'] = flat_df[f'{param}_Value_{sensor_key}'] - flat_df[f'{param}_Value_Ref'] flat_df[f'{param}_AbsDiff_{sensor_key}'] = abs(flat_df[f'{param}_Diff_{sensor_key}']) flat_df.to_csv(file_path, float_format='%.2f') flat_dict[interval] = flat_df if (not verbose) and (interval == '1-hour'): meta_df = pd.DataFrame() for meta_data in meta_dfs.values(): meta_df = meta_df.combine_first(meta_data) meta_file_name = f'{AirSensor.name}{state_abbrev}metadata_{today}.csv' file_path = os.path.join(AirSensor.project_path, 'data', 'eval_stats', AirSensor.name, meta_file_name) meta_df.to_csv(file_path) return flat_dict
[docs]def verbose_columns(df, verbose_cols, include_units, verbose, interval): ref_info_cols = ['Param_Code_Ref', 'Method_Ref', 'Method_Code_Ref', 'Method_POC_Ref'] for col in df.columns: if '_Unit_' in col: if (include_units) and (not verbose): verbose_cols.append(col) elif (include_units) and (verbose): pass else: df = df.drop(columns=[col]) if '_Data_Source' in col: verbose_cols.append(col) for ref_col_fragment in ref_info_cols: if ref_col_fragment in col: verbose_cols.append(col) return verbose_cols, df
[docs]def remove_verbose(df, verbose_cols, include_units): meta_dict = {col: df[col].dropna().unique()[0] for col in verbose_cols if not df[col].isna().all()} #print(meta_dict) meta_df = pd.DataFrame(meta_dict, index=[0]) df = df.drop(columns=verbose_cols) return df, meta_df