From 1c53aae2a8dc33a6f8b7f76f75b49e5336b55b0a Mon Sep 17 00:00:00 2001 From: Kaden McKeen Date: Thu, 18 Jan 2024 17:08:11 -0500 Subject: [PATCH 1/3] Created date parsing and reconciliation systems for robust date handling. --- cyclops/data/df/dates/__init__.py | 1 + cyclops/data/df/dates/common.py | 52 ++ cyclops/data/df/dates/dates.py | 661 +++++++++++++++++++++++ cyclops/data/df/dates/groupby.py | 45 ++ cyclops/data/df/dates/index.py | 83 +++ cyclops/data/df/dates/join.py | 45 ++ cyclops/data/df/dates/pairs.py | 129 +++++ cyclops/data/df/dates/pandas.py | 160 ++++++ cyclops/data/df/dates/reconcile_dates.py | 561 +++++++++++++++++++ cyclops/data/df/dates/type.py | 221 ++++++++ 10 files changed, 1958 insertions(+) create mode 100644 cyclops/data/df/dates/__init__.py create mode 100644 cyclops/data/df/dates/common.py create mode 100644 cyclops/data/df/dates/dates.py create mode 100644 cyclops/data/df/dates/groupby.py create mode 100644 cyclops/data/df/dates/index.py create mode 100644 cyclops/data/df/dates/join.py create mode 100644 cyclops/data/df/dates/pairs.py create mode 100644 cyclops/data/df/dates/pandas.py create mode 100644 cyclops/data/df/dates/reconcile_dates.py create mode 100644 cyclops/data/df/dates/type.py diff --git a/cyclops/data/df/dates/__init__.py b/cyclops/data/df/dates/__init__.py new file mode 100644 index 000000000..4297c5136 --- /dev/null +++ b/cyclops/data/df/dates/__init__.py @@ -0,0 +1 @@ +"""Processors for date handling.""" diff --git a/cyclops/data/df/dates/common.py b/cyclops/data/df/dates/common.py new file mode 100644 index 000000000..7766f6f7d --- /dev/null +++ b/cyclops/data/df/dates/common.py @@ -0,0 +1,52 @@ +from typing import Any, Dict, List, Optional, Set, Union + +import numpy as np + + +def to_list(obj: Any) -> List[Any]: + """Convert some object to a list of object(s) unless already one. + + Parameters + ---------- + obj : any + The object to convert to a list. + + Returns + ------- + list + The processed object. + + """ + if isinstance(obj, list): + return obj + + if isinstance(obj, (np.ndarray, set, dict)): + return list(obj) + + return [obj] + + +def to_list_optional( + obj: Optional[Any], none_to_empty: bool = False +) -> Union[List[Any], None]: + """Convert some object to a list of object(s) unless already None or a list. + + Parameters + ---------- + obj : any + The object to convert to a list. + none_to_empty: bool, default = False + If true, return a None obj as an empty list. Otherwise, return as None. + + Returns + ------- + list or None + The processed object. + + """ + if obj is None: + if none_to_empty: + return [] + return None + + return to_list(obj) diff --git a/cyclops/data/df/dates/dates.py b/cyclops/data/df/dates/dates.py new file mode 100644 index 000000000..10235bf84 --- /dev/null +++ b/cyclops/data/df/dates/dates.py @@ -0,0 +1,661 @@ +from typing import List, Optional, Union +import warnings + +import datetime +from datetime import timedelta + +from dateutil import parser as du_parser +from dateutil.parser import ParserError + +# import datefinder + +import numpy as np +import pandas as pd + +from fecg.utils.pandas.pandas import check_cols +from fecg.utils.pandas.type import is_datetime_series, is_str_series + +# Datetime component names +DATE_COMPONENTS = ["year", "month", "day"] +TIME_COMPONENTS = ["hour", "minute", "second", "microsecond"] +DT_COMPONENTS = DATE_COMPONENTS + TIME_COMPONENTS + +# Parsing results for pd.to_datetime (PD_DT) and the dateutil parser (DU_DT) +PD_DT = "pd" +DU_DT = "du" +DU_TO_PD_DT = f"{DU_DT}_to_{PD_DT}" + + +def datetime_to_unix(series: pd.Series) -> pd.Series: + """ + Convert a datetime series to UNIX timestamps. + + Parameters + ---------- + series : pandas.Series + Datetime series. + + Returns + ------- + pd.Series + Series containing UNIX timestamps corresponding to the datetime values. + """ + is_datetime_series(series, raise_err=True) + + return series.astype(int) / 10**9 + + +def unix_to_datetime(series: pd.Series) -> pd.Series: + """ + Convert a series of UNIX timestamps to datetime. + + Parameters + ---------- + series : pandas.Series + Series containing UNIX timestamps. + + Returns + ------- + pd.Series + Series containing datetime values corresponding to the UNIX timestamps. + """ + return series.astype(int).astype("datetime64[s]") + + +def round_date(dates: pd.Series) -> pd.Series: + """ + Round datetimes to the nearest day. + + Parameters + ---------- + dates : pd.Series + Datetime series. + + Returns + ------- + pd.Series + Series rounded to the nearest day. + """ + is_datetime_series(dates, raise_err=True) + + return dates.dt.round('1d') + + +def has_time( + dates: pd.Series, + raise_err_on_time: bool = False, +) -> pd.Series: + """ + Checks whether any datetimes have a time component. + + Parameters + ---------- + dates : pd.Series + Datetime series. + raise_err : bool, default False + If True, raise an error if any date has a time component. + + Raises + ------ + ValueError + If any date has a time component and `raise_err` is True. + + Returns + ------- + bool + Whether any dates have a time component. + """ + # Round datetime values + rounded = round_date(dates) + + # If the same when rounded, then no time, if different, then has time + # Since NaN isn't equal to NaN, specifically check to make sure not null + has_time = (dates != rounded) & ~dates.isna() + + # Check if any dates have times and raise_err is True + if raise_err_on_time and has_time.any(): + raise ValueError("Dates cannot have a time component.") + + return has_time + + +# DEPRECIATED IN CONTRAST TO `analyze_dates`??? +def invalid_date(dates: pd.Series, **to_datetime_kwargs) -> pd.Series: + """ + Given a Series of dates, return a boolean Series of whether the dates are invalid. + + Parameters + ---------- + dates : pandas.Series + A string series containing (possibly invalid) dates. + **to_datetime_kwargs + Additional arguments for pandas.to_datetime. + + Returns + ------- + pandas.Series + Series with boolean values indicating whether each date is invalid. + + Raises + ------ + ValueError + When "errors" is specified in `to_datetime_kwargs` + """ + is_str_series(dates, raise_err=True) + + if "errors" in to_datetime_kwargs: + raise ValueError("Cannot specify 'errors' in to_datetime_kwargs.") + + return pd.isna(pd.to_datetime(dates, errors='coerce', **to_datetime_kwargs)) + + +def filter_date_deltas( + dates: pd.DataFrame, + delta_cutoff: Union[str, timedelta] = None, + left_delta_cutoff: Union[str, timedelta] = None, + right_delta_cutoff: Union[str, timedelta] = None, +) -> pd.DataFrame: + """ + Filter DataFrame based on date delta conditions. + + Parameters + ---------- + dates : pandas.DataFrame + DataFrame containing 'delta' column. + delta_cutoff : timedelta, optional + Maximum delta value allowed. + left_delta_cutoff : timedelta, optional + Minimum delta value allowed. + right_delta_cutoff : timedelta, optional + Maximum delta value allowed. + + Returns + ------- + pandas.DataFrame + Filtered DataFrame based on delta conditions. + + Raises + ------ + ValueError + When delta_cutoff specified along with left_delta_cutoff or right_delta_cutoff. + """ + if delta_cutoff is not None: + if left_delta_cutoff is not None or right_delta_cutoff is not None: + raise ValueError( + "Cannot specify left_delta_cutoff or right_delta_cutoff when " + "delta_cutoff is specified." + ) + + return dates[abs(dates['delta']) <= pd.to_timedelta(delta_cutoff)] + + if left_delta_cutoff is not None: + dates = dates[dates['delta'] >= pd.to_timedelta(left_delta_cutoff)] + + if right_delta_cutoff is not None: + dates = dates[dates['delta'] <= pd.to_timedelta(right_delta_cutoff)] + + return dates + + +class DatePairHandler: + """ + Handler to create and manipulate pairs based on dates and IDs. + + Attributes + ---------- + data_x : pandas.DataFrame + DataFrame containing data x. Should have the index `id` and a `date` column. + data_y : pandas.DataFrame + DataFrame containing data y. Should have the index `id` and a `date` column. + date_pairs : pandas.DataFrame + DataFrame containing date pair results. + _paired_data : pandas.DataFrame, optional + The paired data coming from the data_x and data_y columns. Computed and stored + based on `date_pairs` when the `paired_data` method is first called. + """ + def __init__( + self, + data_x: pd.DataFrame, + data_y: pd.DataFrame, + delta_cutoff: Union[str, timedelta] = None, + left_delta_cutoff: Union[str, timedelta] = None, + right_delta_cutoff: Union[str, timedelta] = None, + keep_closest_to: Optional[str] = None, + ): + assert data_x.index.name == "id" + assert data_y.index.name == "id" + assert "idx_x" not in data_x.columns + assert "idx_y" not in data_y.columns + assert "date" in data_x.columns + assert "date" in data_y.columns + + data_x["idx_x"] = np.arange(len(data_x)) + data_y["idx_y"] = np.arange(len(data_y)) + + date_pairs = data_x[["date", "idx_x"]].merge(data_y[["date", "idx_y"]], on='id', how='inner') + + if keep_closest_to is not None: + assert keep_closest_to in ["date_x", "date_y"] + + date_pairs["delta"] = date_pairs["date_x"] - date_pairs["date_y"] + date_pairs["abs_delta"] = abs(date_pairs["delta"]) + + date_pairs = filter_date_deltas( + date_pairs, + delta_cutoff=delta_cutoff, + left_delta_cutoff=left_delta_cutoff, + right_delta_cutoff=right_delta_cutoff, + ) + + if keep_closest_to is not None: + date_pairs = date_pairs.reset_index() + min_deltas = date_pairs.groupby(["id", keep_closest_to]).agg({ + "abs_delta": "min", + }).reset_index() + date_pairs = date_pairs.merge( + min_deltas, + on=["id", keep_closest_to, "abs_delta"], + how='inner', + ) + + self.data_x = data_x + self.data_y = data_y + self.date_pairs = date_pairs + self._paired_data = None + + @property + def paired_data(self) -> pd.DataFrame: + """ + Get paired data based on the date pairs. + + Returns + ------- + pandas.DataFrame + Paired data based on the date pairs. + """ + if self._paired_data is None: + self._paired_data = pd.concat([ + self.data_x.set_index("idx_x").loc[self.date_pairs["idx_x"]].reset_index(), + self.data_y.set_index("idx_y").loc[self.date_pairs["idx_y"]].reset_index(), + ], axis=1) + + return self._paired_data + + +def du_parse_date(date: str, **parse_kwargs) -> Union[datetime.datetime, float]: + """ + Parse a date string using dateutil's parser. + + Parameters + ---------- + date : str + Date string to be parsed. + **parse_kwargs + Keyword arguments to pass to the parser. + + Returns + ------- + datetime.datetime or float + Parsed datetime object or np.nan on failure. + """ + try: + return du_parser.parse(date, **parse_kwargs) + + # ParserError = failed to parse + # TypeError = wrong type, e.g., nan or int + except (ParserError, TypeError): + return np.nan + + +def extract_du_components( + du_series: pd.Series, + components: Optional[List[str]] = None, +) -> pd.DataFrame: + """ + Extract datetime components from dates parsed from dateutil (du). + + Useful for Series full of datetimes that cannot be converted using + `pandas.to_datetime` without possibly losing dates to errors like + `OutOfBoundsDatetime`. + + Parameters + ---------- + du_series : pd.Series + Series of datetimes parsed using dateutil. + components : list of str, optional + Components to extract from the datetime. If None, uses `DT_COMPONENTS`. + + Returns + ------- + pd.DataFrame + DataFrame containing the extracted datetime components. + """ + def extract_components(datetime, components): + if pd.isna(datetime): + return np.full(len(components), np.nan) + return np.array([getattr(datetime, comp) for comp in components]) + + components = components or DT_COMPONENTS + component_data = pd.DataFrame( + np.stack(du_series.apply(extract_components, args=(components,)).values), + columns=components, + index=du_series.index, + ) + return component_data.astype("Int64") + + +def datetime_components( + texts: pd.Series, + components: Optional[List[str]] = None, +) -> pd.DataFrame: + """ + Extract separate datetime components (NaN when missing) using dateutil. + + Useful because functionalities like `pandas.to_datetime` will return + NaT if a full date is not present (e.g., missing a year). + + Parameters + ---------- + texts : pd.Series + Series of datetime strings. + components : list of str, optional + Components to extract from the datetime. If None, uses `DT_COMPONENTS`. + + Returns + ------- + pd.DataFrame + DataFrame containing the extracted datetime components and the parsed date. + """ + # Extract dates with different values across all components + du = texts.apply(du_parse_date) + du.rename(DU_DT, inplace=True) + + du2 = texts.apply(du_parse_date, default=datetime.datetime(1, 2, 2, 2, 2, 2, 2)) + du2.rename("du2", inplace=True) + + # Where they are equal is not default, where they aren't is default (i.e., missing) + components = components or DT_COMPONENTS + equal = pd.concat([ + extract_du_components(du, components=components), + extract_du_components(du2, components=components).add_suffix('_2'), + ], axis=1) + + for i, comp in enumerate(components): + # If a value is missing (different for different default components), + # then replace it with NaN + equal[comp][equal[comp] != equal[f'{comp}_2']] = np.nan + + return pd.concat([du, equal[components]], axis=1) + + +def analyzed_dates_differ( + analyzed: pd.DataFrame, + warn: bool = False, + raise_err: bool = False, +) -> pd.Series: + """ + Check where the analyzed `dateutil` and `pd.to_datetime` dates differ. + + Parameters + ---------- + analyzed : pd.DataFrame + A result of `analyze_dates`. + warn : bool, default False + Whether to warn the user when the dates differ. + raise_err : bool, default False + Whether to raise an error when the dates differ. + + Returns + ------- + pd.Series + Boolean series indicating where the dates from `pd.to_datetime` and + `dateutil` do not match. + + Raises + ------ + ValueError + Raised if `raise_err` is True and there are non-matching dates between + `pd.to_datetime` and `dateutil`. + """ + check_cols(analyzed, [PD_DT, DU_DT], raise_err_on_missing=True) + + # If the dates parsed from pd and du aren't the same date (and didn't + # both fail to parse), then flag that something funky might be going on + matching = (analyzed[PD_DT] == analyzed[DU_DT]) | \ + (analyzed[[PD_DT, DU_DT]].isna().sum(axis=1) == 2) + + if not matching.all(): + msg = ( + "`pd.to_datetime` and `dateutil` produced different results. " + "Consider manual inspection." + ) + + if raise_err: + raise ValueError(msg) + + if warn: + warnings.warn(msg) + + return ~matching + + +def analyzed_dates_failed_to_convert( + analyzed: pd.DataFrame, + warn: bool = False, + raise_err: bool = False, +) -> pd.Series: + """ + Check whether any `dateutil` dates which failed to convert using `pd.to_datetime`. + + One common failure is due to a `pandas.errors.OutOfBoundsDatetime`. + + Parameters + ---------- + analyzed : pd.DataFrame + A result of `analyze_dates`. + warn : bool, default False + Whether to warn the user if there are failures. + raise_err : bool, default False + Whether to raise an error if there are failures. + + Returns + ------- + pd.Series + Boolean series indicating where the `dateutil` dates failed to convert. + + Raises + ------ + ValueError + Raised if `raise_err` is True and there are `dateutil` dates failed to convert. + """ + check_cols(analyzed, [DU_DT, DU_TO_PD_DT], raise_err_on_missing=True) + + # If du date is not null but the converted date is, then it failed to convert + failed = analyzed[DU_DT].notnull() & analyzed[DU_TO_PD_DT].isna() + + if failed.any(): + msg = ( + "Failed to convert `dateutil` dates using `pd.to_datetime`. " + "Consider manual inspection." + ) + + if raise_err: + raise ValueError(msg) + + if warn: + warnings.warn(msg) + + return failed + + +def analyze_dates( + texts: pd.Series, + components: Optional[List[str]] = None, + warn: bool = True, +) -> pd.DataFrame: + """ + Analyze a series of dates and extract datetime components. + + Parameters + ---------- + texts : pd.Series + Series of datetime strings to be analyzed. + components : list of str, optional + Components to extract from the datetime. If None, uses `DT_COMPONENTS`. + warn : bool, default True + Whether to analyze the dates and warn the user about various anomalies. + + Returns + ------- + pd.DataFrame + DataFrame containing the analyzed dates and extracted components. + """ + is_str_series(texts, raise_err=True) + + texts.rename("text", inplace=True) + dates = texts.to_frame() + + dates[PD_DT] = pd.to_datetime(dates["text"], infer_datetime_format=True, errors="coerce") + + components = components or DT_COMPONENTS + dates = pd.concat([ + dates, + datetime_components(dates["text"], components=components), + ], axis=1) + + # Drop a component column if the whole column is NaN - it is likely never specified + dates.drop( + [comp for comp in components if dates[comp].isna().all()], + axis=1, + inplace=True, + ) + + dates[DU_TO_PD_DT] = pd.to_datetime( + dates[DU_DT], + infer_datetime_format=True, + errors="coerce", + ) + + if warn: + analyzed_dates_differ(dates, warn=True) + analyzed_dates_failed_to_convert(dates, warn=True) + + return dates + + +def components_to_datetime( + comps: pd.DataFrame, + default_time: Optional[datetime.time] = None, +) -> pd.Series: + """ + Converts a DataFrame of datetime components into a datetime series. + + Useful for combining separate date and time texts. + + Parameters + ---------- + comps: pandas.DataFrame + DataFrame of component columns. Must have `DATE_COMPONENTS` columns and may + have any in `DT_COMPONENTS`. + default_time : datetime.time, optional + Default time for filling null time components. Defaults to midnight (all 0). + + Returns + ------- + pd.Series + A datetime series. Null time components will be filled with the components in + `default_time`. Null date components will result in a null result. + + Notes + ----- + Consider using `default_time=datetime.time(12)` (noon) to approximate the datetime + with the least error. If nothing is specified, it defaults to midnight, which is + a bad default for many events, e.g., few medical procedures take place at night. + + Examples + -------- + >>> # Convert components to datetime, using noon as the default time + >>> dts = components_to_datetime(comps, default_time=datetime.time(12)) + """ + # Check component columns + check_cols(comps, DATE_COMPONENTS, raise_err_on_missing=True) + check_cols(comps, DT_COMPONENTS, raise_err_on_unexpected=True) + avail_time_comps = set(comps.columns).intersection(set(TIME_COMPONENTS)) + + if not (comps.dtypes.unique().astype(str) == 'Int64').all(): + raise ValueError("Components must have type 'Int64'.") + + # Handle default times + default_time = default_time or datetime.time(0) + TIME_COMPONENTS + for time_comp in TIME_COMPONENTS: + time_comp_value = getattr(default_time, time_comp) + + # If the column already exists, fill any nulls with the default value + if time_comp in avail_time_comps: + comps[time_comp].fillna(time_comp_value, inplace=True) + # If not, then create the column using the default value + else: + comps[time_comp] = time_comp_value + comps[time_comp] = comps[time_comp].astype("Int64") + + # Convert the components (now filled with time defaults) into datetimes + cmp = comps.copy() + index = cmp.index + cmp.reset_index(drop=True, inplace=True) + + # Convert only the datetimes which are not missing date components, + # the rest will be filled with NaN during reindexing + res = pd.to_datetime(cmp[~cmp.isna().any(axis=1)].astype(int)).reindex(cmp.index) + res.index = index + + return res + + +def combine_date_and_time_components( + date_comps: pd.DataFrame, + time_comps: pd.DataFrame, +) -> pd.DataFrame: + """ + Combine date components from one DataFrame and time components from another. + + Parameters + ---------- + date_comps : pandas.DataFrame + DataFrame containing relevant date components. Non-relevant columns dropped. + time_comps : pandas.DataFrame + DataFrame containing relevant time components. Non-relevant columns dropped. + + Returns + ------- + pd.DataFrame + A DataFrame with the date components from `date_comps` and time components from + `time_comps`. + + Examples + -------- + >>> date_comps = analyze_dates(meta["AcquisitionDate"]) + >>> time_comps = analyze_dates(meta["AcquisitionTime"]) + >>> comps = combine_date_and_time_components( + >>> date_comps, + >>> time_comps, + >>> default_time=datetime.time(12), + >>> ) + >>> dts = components_to_datetime(datetime) + """ + if not date_comps.index.equals(date_comps.index): + raise ValueError( + "Indexes of `date_comps` and `time_comps` must be the same." + ) + + unexpected_cols_date, _, _ = check_cols(date_comps, DATE_COMPONENTS) + date_comps = date_comps.drop(unexpected_cols_date, axis=1) + + unexpected_cols_time, _, _ = check_cols(time_comps, TIME_COMPONENTS) + time_comps = time_comps.drop(unexpected_cols_time, axis=1) + + return pd.concat([date_comps, time_comps], axis=1) + + +#def find_dates(text): +# matches = datefinder.find_dates(text, source=True, index=True) diff --git a/cyclops/data/df/dates/groupby.py b/cyclops/data/df/dates/groupby.py new file mode 100644 index 000000000..6be041fa5 --- /dev/null +++ b/cyclops/data/df/dates/groupby.py @@ -0,0 +1,45 @@ +import pandas as pd + + +def agg_mode(series: pd.Series) -> list: + """ + Get the mode(s) of a series by using `.agg(agg_mode)`. + + Parameters + ---------- + series : pd.Series + Series. + + Returns + ------- + list + List containing the mode(s) of the input series. + """ + return pd.Series.mode(series).to_list() + + +def groupby_agg_mode( + grouped: pd.core.groupby.generic.SeriesGroupBy, + single_modes_only: bool = False, +) -> pd.Series: + """ + Compute the mode(s) for each group of a grouped series. + + Parameters + ---------- + grouped : pd.core.groupby.generic.SeriesGroupBy + Grouped series. + single_modes_only : bool, default False + If True, only groups with a singular mode are kept. + + Returns + ------- + pd.Series + A pandas Series containing the mode(s) for each group. + """ + + result = grouped.agg(agg_mode).explode() + if single_modes_only: + duplicate_indices = result.index[result.index.duplicated(keep=False)] + result = result.drop(duplicate_indices) + return result diff --git a/cyclops/data/df/dates/index.py b/cyclops/data/df/dates/index.py new file mode 100644 index 000000000..c2f0818eb --- /dev/null +++ b/cyclops/data/df/dates/index.py @@ -0,0 +1,83 @@ +import pandas as pd + + +def index_structure_equal( + idx1: pd.Index, + idx2: pd.Index, + raise_err: bool = False, +) -> bool: + """ + Check whether two indexes have the same structure. Values aren't considered. + + Parameters + ---------- + idx1 : pandas.Index + The first index to compare. + idx2 : pandas.Index + The second index to compare. + raise_err : bool, default False + If True, raises an error if indexes do not have the same structure. + + Returns + ------- + bool + True if the indexes have the same structure, otherwise False. + """ + if type(idx1) != type(idx2): + if raise_err: + raise ValueError("Index dtypes do not match.") + + return False + + if idx1.names != idx2.names: + if raise_err: + raise ValueError("Index names do not match.") + + return False + + if idx1.nlevels != idx2.nlevels: + if raise_err: + raise ValueError("Number of index levels do not match.") + + return False + + return True + + +def is_multiindex( + idx: pd.Index, + raise_err: bool = False, + raise_err_multi: bool = False, +) -> bool: + """ + Check whether a given index is a MultiIndex. + + Parameters + ---------- + idx : pd.Index + Index to check. + raise_err : bool, default False + If True, raise a ValueError when idx is not a MultiIndex. + raise_err_multi : bool, default False + If True, raise a ValueError when idx is a MultiIndex. + + Raises + ------ + ValueError + Raised when `idx` is not a MultiIndex and `raise_err` is True. + Raised when `idx` is a MultiIndex and `raise_err_multi` is True. + + Returns + ------- + bool + True if idx is a MultiIndex, False otherwise. + """ + multiindex = isinstance(idx, pd.MultiIndex) + + if not multiindex and raise_err: + raise ValueError("Index must be a MultiIndex.") + + if multiindex and raise_err_multi: + raise ValueError("Index cannot be a MultiIndex.") + + return multiindex diff --git a/cyclops/data/df/dates/join.py b/cyclops/data/df/dates/join.py new file mode 100644 index 000000000..4ac7b4da8 --- /dev/null +++ b/cyclops/data/df/dates/join.py @@ -0,0 +1,45 @@ +from typing import Hashable, Optional, Sequence, Union + +import pandas as pd + +from fecg.utils.pandas.pandas import COLS_TYPE + + +def reset_index_merge( + left: Union[pd.DataFrame, pd.Series], + right: Union[pd.DataFrame, pd.Series], + index_col: Optional[COLS_TYPE] = None, + **merge_kwargs, +) -> pd.DataFrame: + """ + Merges two dataframes after resetting their indexes. + + Parameters + ---------- + left : pandas.DataFrame or pandas.Series + The left object to merge. + right : pandas.DataFrame or pandas.Series + The right object to merge. + index_col : hashable or sequence of hashable, optional + Column(s) to set as index for the merged result. + **merge_kwargs + Additional keyword arguments to pass to pandas merge function. + + Returns + ------- + pd.DataFrame + The merged dataframe. + """ + + # Reset index for both dataframes + left_reset = left.reset_index() + right_reset = right.reset_index() + + # Merge the dataframes + merged = pd.merge(left_reset, right_reset, **merge_kwargs) + + # If index_col is provided, set it for the merged dataframe + if index_col: + merged.set_index(index_col, inplace=True) + + return merged diff --git a/cyclops/data/df/dates/pairs.py b/cyclops/data/df/dates/pairs.py new file mode 100644 index 000000000..f01cb0012 --- /dev/null +++ b/cyclops/data/df/dates/pairs.py @@ -0,0 +1,129 @@ +from typing import Tuple, Union + +import numpy as np +import pandas as pd + +import networkx as nx + +from fecg.utils.pandas.type import to_frame_if_series + + +def get_pairs( + data: Union[pd.Series, pd.DataFrame], + self_match: bool = False, + combinations: bool = True, +) -> pd.DataFrame: + """ + Perform a self-cross to generate pairs. + + Parameters + ---------- + data : pandas.Series or pandas.DataFrame + Values used to create the pairs. + self_match : bool, default False + If False, rows which paired with themselves are excluded. + combinations : bool, default True + If True, remove one of two permutations, leaving only pair combinations. + + Returns + ------- + pandas.DataFrame + A DataFrame of pairs. + + Notes + ----- + Often, we are only interested in combinations of pairs, not permutations. For + example, if evaluating the pairs using a commutative function, where argument order + does not affect the result, we would want to take only the pair combinations. + """ + pairs = to_frame_if_series(data).merge(data, how='cross') + + if combinations or not self_match: + length = len(data) + idx0 = np.repeat(np.arange(length), length) + idx1 = np.tile(np.arange(length), length) + + if combinations: + if self_match: + pairs = pairs[idx0 <= idx1] + else: + pairs = pairs[idx0 < idx1] + else: + pairs = pairs[idx0 != idx1] + + return pairs + + +def split_pairs(pairs: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]: + """ + Split x and y pair columns into two separate DataFrames. + + Parameters + ---------- + pairs : pandas.DataFrame + A DataFrame of pairs. + + Returns + ------- + pandas.DataFrame + A DataFrame of pairs which had the "_x" columns. Suffix now removed. + pandas.DataFrame + A DataFrame of pairs which had the "_y" columns. Suffix now removed. + """ + half_len = (len(pairs.columns)//2) + + pairs_x = pairs.iloc[:, :half_len] + pairs_y = pairs.iloc[:, half_len:] + + cols = pairs.columns[:half_len].str.slice(stop=-2) + + pairs_x.columns = cols + pairs_y.columns = cols + + return pairs_x, pairs_y + + +def pairs_to_groups(pairs: pd.DataFrame) -> pd.DataFrame: + """ + Convert pairs of values in a DataFrame to groups of connected values. + + Given a DataFrame with two columns representing pairs of values, this function + constructs a graph where each value is a node and each pair is an edge. It then + finds the connected components of this graph, returning each component as a group + in a DataFrame. + + Parameters + ---------- + pairs : pandas.DataFrame + A DataFrame with two columns, each containing values. Each row represents a + pair of connected values. + + Raises + ------ + ValueError + If the input DataFrame does not have exactly two columns. + + Returns + ------- + pandas.DataFrame + A DataFrame with columns `value` and `group`. Each row represents a value and + its associated group ID. + """ + if pairs.shape[1] != 2: + raise ValueError("The DataFrame must have exactly two columns.") + + # Create an empty graph + graph = nx.Graph() + + # Add edges to the graph based on the DataFrame rows + for _, row in pairs.iterrows(): + graph.add_edge(row[pairs.columns[0]], row[pairs.columns[1]]) + + # Find connected components + components = pd.Series(nx.connected_components(graph)) + + # Convert connected components into a groups series + groups = components.explode() + groups = pd.Series(groups.index, index=groups.values, name="group") + + return groups diff --git a/cyclops/data/df/dates/pandas.py b/cyclops/data/df/dates/pandas.py new file mode 100644 index 000000000..8aea57570 --- /dev/null +++ b/cyclops/data/df/dates/pandas.py @@ -0,0 +1,160 @@ +from typing import ( + Any, + Dict, + Hashable, + List, + Sequence, + Set, + Tuple, + Union, +) + +from functools import reduce + +import pandas as pd + +from fecg.utils.common import to_list +from fecg.utils.pandas.type import ( + is_bool_series, + is_int_series, + is_series, +) + +COLS_TYPE = Union[Hashable, Sequence[Hashable]] + + +def check_cols( + data: pd.DataFrame, + cols: COLS_TYPE, + raise_err_on_unexpected: bool = False, + raise_err_on_existing: bool = False, + raise_err_on_missing: bool = False, +) -> Tuple[Set[Hashable], Set[Hashable], Set[Hashable]]: + """ + Check DataFrame columns for expected columns and handle errors. + + Parameters + ---------- + data : pd.DataFrame + The input DataFrame to check columns against. + cols : hashable or list of Hashable + The column(s) to check for in the DataFrame. + raise_err_on_unexpected : bool, default False + Raise an error if unexpected columns are found. + raise_err_on_existing : bool, default False + Raise an error if any of the specified columns already exist. + raise_err_on_missing : bool, default False + Raise an error if any of the specified columns are missing. + + Returns + ------- + Tuple[Set[Hashable], Set[Hashable], Set[Hashable]] + A tuple containing sets of unexpected, existing, and missing columns. + """ + cols = set(to_list(cols)) + data_cols = set(data.columns) + + unexpected = data_cols - cols + if raise_err_on_unexpected and len(unexpected) > 0: + raise ValueError(f"Unexpected columns: {', '.join(unexpected)}") + + existing = data_cols.intersection(cols) + if raise_err_on_existing and len(existing) > 0: + raise ValueError(f"Existing columns: {', '.join(existing)}") + + missing = cols - data_cols + if raise_err_on_missing and len(missing) > 0: + raise ValueError(f"Missing columns: {', '.join(missing)}") + + return unexpected, existing, missing + + +def and_conditions(conditions: List[pd.Series]) -> pd.Series: + """ + Perform element-wise logical AND operation on a list of boolean Series. + + Parameters + ---------- + conditions : list of pd.Series + A list of boolean Pandas Series. + + Raises + ------ + ValueError + If the conditions are not Pandas boolean series. + + Returns + ------- + pd.Series + A new Pandas Series resulting from the element-wise logical AND operation. + """ + for condition in conditions: + is_bool_series(condition, raise_err=True) + + return reduce(lambda x, y: x & y, conditions) + + +def or_conditions(conditions: List[pd.Series]) -> pd.Series: + """ + Perform element-wise logical OR operation on a list of boolean Series. + + Parameters + ---------- + conditions : list of pd.Series + A list of boolean Pandas Series. + + Raises + ------ + ValueError + If the conditions are not Pandas boolean series. + + Returns + ------- + pd.Series + A new Pandas Series resulting from the element-wise logical OR operation. + """ + for condition in conditions: + is_bool_series(condition, raise_err=True) + + return reduce(lambda x, y: x | y, conditions) + + +def combine_nonoverlapping(datas: List[Union[pd.DataFrame, pd.Series]]) -> pd.DataFrame: + """ + Combine non-overlapping DataFrames/Series into a single DataFrame/Series. + + The objects in `datas` should be all DataFrames or all Series, not a combination. + + For any given value location, it can be non-null in exactly 0 or 1 of the + DataFrames. The combined DataFrame will contains all of these values. + + Parameters + ---------- + datas : list of pandas.DataFrame + A list of DataFrames to be combined. + + Returns + ------- + pandas.DataFrame + The combined DataFrame. + + Raises + ------ + ValueError + If unauthorized overlap is found between DataFrames. + """ + # Get masks where the DataFrames are NaN + datas_na = [data.isna() for data in datas] + + # Check that there is no unauthorized overlap + datas_not_na = [(~data_na).astype(int) for data_na in datas_na] + datas_not_na_sum = reduce(lambda x, y: x + y, datas_not_na) + if not (datas_not_na_sum <= 1).all().all(): + raise ValueError("Unauthorized overlap found between DataFrames. Cannot combine.") + + # Combine the DataFrames + combined = datas[0].copy() + for data in datas[1:]: + combined = combined.combine_first(data) + + return combined diff --git a/cyclops/data/df/dates/reconcile_dates.py b/cyclops/data/df/dates/reconcile_dates.py new file mode 100644 index 000000000..e44aeab62 --- /dev/null +++ b/cyclops/data/df/dates/reconcile_dates.py @@ -0,0 +1,561 @@ +from typing import Dict, Hashable, List, Optional +import warnings +from copy import deepcopy +from dataclasses import dataclass, field + +import datetime +from datetime import timedelta + +import numpy as np +import pandas as pd + +from sklearn.cluster import DBSCAN + +from fecg.utils.common import to_list_optional +from fecg.utils.dates.dates import datetime_to_unix, has_time +from fecg.utils.pairs import ( + get_pairs, + pairs_to_groups, + split_pairs, +) +from fecg.utils.pandas.groupby import groupby_agg_mode +from fecg.utils.pandas.join import reset_index_merge +from fecg.utils.pandas.index import ( + index_structure_equal, + is_multiindex, +) +from fecg.utils.pandas.pandas import check_cols, combine_nonoverlapping, or_conditions +from fecg.utils.pandas.type import is_datetime_series + + +def cluster_date_group(dates, dbscan): + dbscan.fit(dates.values.reshape(-1, 1)) + + return pd.Series(dbscan.labels_) + + +def cluster_dates(dates, dbscan: DBSCAN): + # Convert to Unix for clustering + unix = datetime_to_unix(dates) + + # Create clusters for each group + clusters = unix.groupby(level=0).apply(cluster_date_group, dbscan) + + clusters.index = clusters.index.droplevel(1) + clusters = clusters.replace({-1: np.nan}).astype("Int64") + + return clusters + + +def get_date_clusters(dates, max_neighbourhood_delta: datetime.timedelta): + check_cols(dates, ["date", "approx"], raise_err_on_missing=True) + + dbscan = DBSCAN( + eps=max_neighbourhood_delta.total_seconds(), + min_samples=2, + ) + clusters = cluster_dates(dates["date"], dbscan) + clusters.rename("cluster", inplace=True) + + # Combine into the original data + clusters = pd.concat([dates, clusters], axis=1) + + return clusters + + +def cluster_analysis(unres_hard, clusters): + index_col = clusters.index.names + + # Get the max cluster size for each group + cluster_size = clusters.reset_index().groupby(index_col + ["cluster"]).size() + cluster_size.rename("cluster_size", inplace=True) + + max_sizes = cluster_size.groupby(level=0).agg("max") + + clusters_of_max_size = reset_index_merge( + cluster_size, + max_sizes, + on=index_col + ["cluster_size"], + how="inner", + index_col=index_col, + )["cluster"] + clusters_of_max_size + clusters_of_max_size = clusters_of_max_size.to_frame() + clusters_of_max_size["is_max_size"] = True + + # The below averaging methods only make sense if there is a single max cluster, + # so ignore groups with several clusters of same size + clusters_of_max_size_vcs = clusters_of_max_size.index.value_counts() + + clusters_of_max_size = clusters_of_max_size[~clusters_of_max_size.index.isin( + clusters_of_max_size_vcs.index[clusters_of_max_size_vcs > 1] + )] + + # Get the is_max_size column into clusters + clusters = reset_index_merge( + clusters, + clusters_of_max_size, + how="left", + on=index_col + ["cluster"], + index_col=index_col, + ) + clusters["is_max_size"].fillna(False, inplace=True) + + # Get only the dates in the largest cluster + clusters_largest = clusters[clusters["is_max_size"]] + + # Get the hard dates in the largest clusters + clusters_largest_hard = clusters_largest[~clusters_largest["approx"]] + +# # === Resolve: largest_cluster_hard_mode +# single_modes = groupby_agg_mode( +# unres_hard["date"].groupby(level=0), +# single_modes_only=True, +# ) + +# largest_hard_is_mode = clusters_largest_hard.index.isin(single_modes.index) +# largest_cluster_hard_mode = clusters_largest_hard[largest_hard_is_mode]["date"] + +# # Continue without the resolved ones +# clusters_largest_hard = clusters_largest_hard[~largest_hard_is_mode] + + # === Resolve: largest_cluster_hard_mean === + # Take the average of these largest cluster hard dates + largest_cluster_hard_mean = clusters_largest_hard.reset_index( + ).groupby(index_col + ["cluster"])["date"].agg("mean") + largest_cluster_hard_mean.index = largest_cluster_hard_mean.index.droplevel(1) + + # === Resolve: largest_cluster_approx_mean === + # Now consider the largest clusters which have only approximate values + all_approx = clusters_largest.groupby(level=0)["approx"].all() + + clusters_largest_approx = clusters_largest[ + clusters_largest.index.isin(all_approx.index[all_approx]) + ].copy() + + largest_cluster_approx_mean = clusters_largest_approx.groupby( + index_col + ["cluster"], + )["date"].agg("mean") + largest_cluster_approx_mean.index = largest_cluster_approx_mean.index.droplevel(1) + + return clusters_largest, largest_cluster_hard_mean, largest_cluster_approx_mean + + +def analyze_typos(dates_hard): + index_col = dates_hard.index.names + + # Get all unique hard dates for each group + dates_hard_unique = dates_hard["date"].reset_index().value_counts( + ).reset_index().drop(0, axis=1).set_index(index_col)["date"] + + # Ignore any groups which only have one unique hard date + dates_hard_unique_vcs = dates_hard_unique.index.value_counts() + dates_hard_unique_vcs = dates_hard_unique_vcs[dates_hard_unique_vcs > 1] + dates_hard_unique_vcs.rename("n_unique", inplace=True) + + dates_hard_unique = dates_hard_unique.loc[dates_hard_unique_vcs.index] + + def date_to_char(dates): + chars = dates.astype(str).str.split('', expand=True) + chars.drop(columns=[0, 5, 8, 11], inplace=True) + chars.rename({ + 1: 'y1', + 2: 'y2', + 3: 'y3', + 4: 'y4', + 6: 'm1', + 7: 'm2', + 9: 'd1', + 10: 'd2', + }, axis=1, inplace=True) + chars = chars.astype('uint8') + + return chars + + # Convert the dates into characters + chars = date_to_char(dates_hard_unique) + + # Compute hard date character combinations + pairs = chars.groupby(level=0).apply(get_pairs) + pairs.index = pairs.index.droplevel(1) + pairs.index.names = index_col + + pairs_x, pairs_y = split_pairs(pairs) + + # Calculate equal characters + pairs_eq = pairs_x == pairs_y + pairs_eq = pairs_eq.add_suffix("_eq") + pairs_eq["n_diff"] = 8 - pairs_eq.sum(axis=1) + + # Calculate adjacent characters, e.g., 5 vs 6 or 2 vs 1 + # Convert from uint8 to int to avoid rounding issues + pairs_adj = (pairs_x.astype(int) - pairs_y.astype(int)).abs() == 1 + pairs_adj = pairs_adj.add_suffix("_adj") + pairs_adj["n_adj"] = pairs_adj.sum(axis=1) + + # Collect information about the typo pairs + pairs = pd.concat([pairs_eq, pairs_adj], axis=1) + + # Incorporate date info + # Recover the dates from the characters + date_x = pairs_x.astype(str).agg(''.join, axis=1) + date_x = date_x.str.slice(stop=4) + \ + "-" + date_x.str.slice(start=4, stop=6) + \ + "-" + date_x.str.slice(start=6) + + date_y = pairs_y.astype(str).agg(''.join, axis=1) + date_y = date_y.str.slice(stop=4) + \ + "-" + date_y.str.slice(start=4, stop=6) + \ + "-" + date_y.str.slice(start=6) + pairs["date_x"] = pd.to_datetime(date_x) + pairs["date_y"] = pd.to_datetime(date_y) + pairs["year"] = pairs["date_x"].dt.year == pairs["date_y"].dt.year + pairs["month"] = pairs["date_x"].dt.month == pairs["date_y"].dt.month + pairs["day"] = pairs["date_x"].dt.day == pairs["date_y"].dt.day + + # Check if gotten the day/month transposed + pairs["dm_transpose"] = (pairs["date_x"].dt.day == pairs["date_y"].dt.month) & (pairs["date_x"].dt.month == pairs["date_y"].dt.day) + + # Logic for determining whether a typo or not + certain_conds = [ + # Only one different character + (pairs["n_diff"] == 1), + + # Two different characters with at least one adjacent + ((pairs["n_diff"] == 2) & (pairs["n_adj"] >= 1)), + + # Day and month are transposed, but correct year + (pairs["dm_transpose"] & pairs["year"]), + ] + pairs["typo_certain"] = or_conditions(certain_conds) + + pairs["typo_possible"] = pairs["n_diff"] <= 3 + + # Create typo groups from pairs of possible typos + typo_pairs = pairs[pairs["typo_certain"] | pairs["typo_possible"]] + + typo_groups = typo_pairs[["date_x", "date_y"]].astype(str).groupby(level=0).apply( + pairs_to_groups + ).reset_index().set_index(index_col + ["group"])["level_1"] + typo_groups.rename("date", inplace=True) + + # Convert typos to characters + typo_group_chars = date_to_char(typo_groups) + + def mode_scalar_or_list(series): + mode = pd.Series.mode(series) + + if len(mode) > 1: + return mode.to_list() + + return mode + + # Compile the most popular character options seen in each typo group + typo_value_options = typo_group_chars.groupby(level=[0, 1]).agg( + dict(zip(typo_group_chars.columns, [mode_scalar_or_list]*len(typo_group_chars.columns))) + ) + + """ + LEFT TO DO: + Compile a "date_possible" object + - Any completely filled typo_value_options (no lists) are essentially solved + - For day/month transpositions, those would be two possible dates [1914-11-03, 1914-03-11] + Still need to check out letter transpositions - 1956-10-02 vs 1956-10-20 + Perhaps do a mean for the one day/ten day/one month cols? The user can specify what's allowed? + - Trade off between accuracy and just having nulls instead - date accuracy importance is use case specific + + As we go down the line of columns, disagreements become less and less important + That means we could take a mean of two disagreeing days, but not years, or + thousands of years + """ + + return pairs, typo_pairs, typo_groups, typo_value_options + + +@dataclass +class DateReconcilerResults: + index_col: List[Hashable] + resolved: pd.DataFrame + dates: pd.DataFrame + dates_hard: pd.DataFrame + dates_approx: pd.DataFrame + groups: pd.DataFrame + unres: pd.DataFrame + unres_hard: pd.DataFrame + unres_approx: pd.DataFrame + unres_groups: pd.DataFrame + clusters_largest: pd.DataFrame + pairs: pd.DataFrame + typo_pairs: pd.DataFrame + typo_groups: pd.Series + typo_value_options: pd.DataFrame + + +class DateReconciler: + """ + + Notes + ----- + === Resolutions === + - one_entry: Group contains one entry - select this date + - one_date: Contains multiple entries, but one unique date value - select this date + - one_multi_hard: Group which contains multiple of the same hard dates, but not + multiple sets of them, e.g., two instances of 1988-03-09 and two of 1974-06-20. + Works since it's unlikely for a typo or system error to produce the same date. + - hard_single_mode: Groups containing one hard date mode. + ### - largest_cluster_hard_mode: If after clustering, only one cluster of max size is + ### found, then take the mode of the hard dates, provided there is just one mode. + - largest_cluster_hard_mean: From the previous case, if more than one mode, then + take the average all of the hard dates in that cluster. + - largest_cluster_approx_mean: Same scenario as above, except the largest cluster + had no hard dates, so instead take the average of the approx dates. + + === Hard vs approximate dates === + One important distinction is whether a date is approximate (approx) or not: + - Approx: Computed, rounded, etc. - close to the real date, but maybe not equal + (e.g., only the year was given, or computing DOB from age and event time) + - Hard: System-defined or hand-inputted dates - these should be the true date, + with the exception of system errors and typos + + Delta distances are computed for both hard and approx dates, but Levenshtein + distance is only computed for hard dates. + + Approx dates take on supporting roles, e.g., is a given hard date near to many + supporting approx dates, or can be used as a backup with no hard dates available. + """ + def __init__( + self, + sources: Dict[Hashable, pd.Series], + date_score_fn: callable, + approx_sources: Optional[List[Hashable]] = None, + approx_near_thresh: Optional[timedelta] = None, + once_per_source: bool = True, + ): + """ + sources : dict + Dictionary of datetime Series, where the key indicates the source. + date_score_fn : callable + A function which accepts a returns float between 0 and 1, where this value + represents the score (feasibility) of the date. + approx_sources : list of hashable, optional + Sources where the dates have been approximated - rounded, calculated, etc. + approx_near_thresh: datetime.timedelta, optional + Threshold for considering approximated sources to be the same. Must be + specified if there are any approximate sources. + once_per_source : bool, default True + Consider a unique index/date pair only once per source. Helpful for + ensuring that sources with more/repeated entries don't hold more weight + """ + # Handle approximate date sources + if approx_sources is not None and approx_near_thresh is None: + raise ValueError( + "Must specify `approx_near_thresh` if `approx_sources` specified." + ) + approx_sources = to_list_optional(approx_sources, none_to_empty=True) + + if not set(approx_sources).issubset(set(sources.keys())): + raise ValueError( + "`approx_sources` must be a subset of the `sources` keys." + ) + + self.dates = self._preproc_sources(sources, approx_sources, once_per_source) + self.date_score_fn = date_score_fn + + self.approx_sources = approx_sources + self.approx_near_thresh = approx_near_thresh + + + def _preproc_sources(self, sources, approx_sources, once_per_source): + # Preprocess the sources/dates + dates = [] + prev_source = None + + for source, date in deepcopy(sources).items(): + try: + # Confirm datetime dtype + is_datetime_series(date, raise_err=True) + + # Raise an error if having a multiindex + is_multiindex( + sources[list(sources.keys())[0]].index, + raise_err_multi=True, + ) + + # Confirm identical index structures + if prev_source is not None: + index_structure_equal( + date.index, + sources[prev_source].index, + raise_err=True, + ) + + # No dates can have times - it messes things up + has_time(date, raise_err_on_time=True) + + except Exception as exc: + raise ValueError(f"Issue with series - source {source}.") from exc + + date.dropna(inplace=True) + date.rename("date", inplace=True) + + if once_per_source: + index_col = date.index.names + date = date.reset_index().drop_duplicates( + keep="first", + ).set_index(index_col)["date"] + + date = date.to_frame() + date["source"] = source + date["approx"] = source in approx_sources + + dates.append(date) + prev_source = source + + dates = pd.concat(dates) + dates = dates[~dates.index.isna()] + dates.sort_index(inplace=True) + + if not (dates["date"].dt.time == datetime.time(0)).all(): + warnings.warn( + "Dates with times are not supported. Converting to date only." + ) + + return dates + + + def _combined_resolved(self, groups, groups_resolved): + resolved = [] + for reason, dates in groups_resolved.items(): + dates = dates.to_frame() + dates["reason"] = reason + dates = dates.reindex(groups.index) + resolved.append(dates) + + return combine_nonoverlapping(resolved) + + + def __call__(self): + dates = self.dates.copy() + + index_col = list(dates.index.names) + + dates["date_str"] = dates["date"].astype(str) + dates["date_score"] = dates["date"].apply(self.date_score_fn) + + # Split into approximate and hard dates + dates_approx = dates[dates["approx"]].drop("approx", axis=1) + dates_hard = dates[~dates["approx"]].drop("approx", axis=1) + + groups = dates.groupby(dates.index).size().rename("size").to_frame() + groups["one_entry"] = groups["size"] == 1 + groups["n_approx"] = dates_approx.groupby(dates_approx.index).size() + groups["n_approx"].fillna(0, inplace=True) + + # Groups are resolved on a case-by-case basis. Once resolved, they can be + # ignored to avoid wasted computation. The unresolved (unres) dates/groups + # will continue to be analyzed. + unres = dates.copy() + unres_hard = dates_hard.copy() + unres_approx = dates_approx.copy() + unres_groups = groups.copy() + + # Find and analyze typos in the hard dates + pairs, typo_pairs, typo_groups, typo_value_options = analyze_typos(dates_hard) + + # Having extracted the typo information, drop any impossible dates (score = 0) + # which might later confuse the analysis + unres = unres[unres["date_score"] != 0] + unres_hard = unres_hard[unres_hard["date_score"] != 0] + unres_approx = unres_approx[unres_approx["date_score"] != 0] + + groups_resolved = {} + def resolve(resolved, reason): + nonlocal groups_resolved, unres, unres_hard, unres_approx, unres_groups + + groups_resolved[reason] = resolved + + unres = unres[ + ~unres.index.isin(resolved.index) + ] + unres_hard = unres_hard[ + ~unres_hard.index.isin(resolved.index) + ] + unres_approx = unres_approx[ + ~unres_approx.index.isin(resolved.index) + ] + unres_groups = unres_groups[ + ~unres_groups.index.isin(resolved.index) + ] + + # === Resolve: one_entry === + one_entry = unres[ + unres.index.isin(unres_groups.index[unres_groups["size"] == 1]) + ]["date"] + resolve(one_entry, "one_entry") + + # === Resolve: one_date === + vcs = unres["date"].reset_index().value_counts() + vcs.rename("count", inplace=True) + + # Iff a given row has a count equal to its group size, then only one unique date + instance_compare = vcs.reset_index().join(groups, how="left", on="research_id") + instance_compare.set_index(index_col, inplace=True) + one_date_cond = instance_compare["count"] == instance_compare["size"] + one_date = instance_compare[one_date_cond]["date"] + resolve(one_date, "one_date") + + # === Resolve: one_multi_hard === + # For each group, determine the hard dates which appear more than once + vcs_hard = unres_hard["date"].reset_index().value_counts() + vcs_hard_multi = vcs_hard[vcs_hard > 1] + + # Get the groups which only have a single set of these same hard dates + # Otherwise, it may be ambiguous as to which set is the right one + is_multi_one = vcs_hard_multi.index.droplevel(1).value_counts() + is_multi_one = is_multi_one[is_multi_one == 1] + + one_multi_hard = vcs_hard_multi.reset_index().set_index(index_col)["date"] + one_multi_hard = one_multi_hard.loc[is_multi_one.index] + + resolve(one_multi_hard, "one_multi_hard") + + # === Resolve: hard_single_mode === + hard_single_mode = groupby_agg_mode( + unres_hard["date"].groupby(level=0), + single_modes_only=True, + ) + resolve(hard_single_mode, "hard_single_mode") + + + # === Cluster resolutions === + clusters = get_date_clusters( + unres[["date", "approx"]], + self.approx_near_thresh, + ) + + clusters_largest, largest_cluster_hard_mean, largest_cluster_approx_mean = \ + cluster_analysis(unres_hard, clusters) + + resolve(largest_cluster_hard_mean, "largest_cluster_hard_mean") + resolve(largest_cluster_approx_mean, "largest_cluster_approx_mean") + + # Combine all of the resolved data collected into a single DataFrame + resolved = self._combined_resolved(groups, groups_resolved) + + return DateReconcilerResults( + index_col=index_col, + resolved=resolved, + dates=dates, + dates_hard=dates_hard, + dates_approx=dates_approx, + groups=groups, + unres=unres, + unres_hard=unres_hard, + unres_approx=unres_approx, + unres_groups=unres_groups, + clusters_largest=clusters_largest, + pairs=pairs, + typo_pairs=typo_pairs, + typo_groups=typo_groups, + typo_value_options=typo_value_options, + ) diff --git a/cyclops/data/df/dates/type.py b/cyclops/data/df/dates/type.py new file mode 100644 index 000000000..5e77fefff --- /dev/null +++ b/cyclops/data/df/dates/type.py @@ -0,0 +1,221 @@ +from typing import Any, Union + +import numpy as np + +import pandas as pd +from pandas.api.types import ( + is_bool_dtype, + is_datetime64_any_dtype, + is_float_dtype, + is_integer_dtype, + is_string_dtype, +) + + +def is_series(data: Any, raise_err: bool = False) -> bool: + """ + Check if the input is a Pandas Series. + + Parameters + ---------- + data : any + The input data to check. + raise_err : bool, default False + Whether to raise an error if the data is not a Series. + + Raises + ------ + ValueError + If `raise_err` is True and the input data is not a Pandas Series. + + Returns + ------- + bool + True if the input is a Pandas Series, False otherwise. + """ + if isinstance(data, pd.Series): + return True + + if raise_err: + raise ValueError("Data must be a Pandas series.") + + return False + + +def is_bool_series(data: Any, raise_err: bool = False) -> bool: + """ + Check if the input is a Pandas boolean series. + + Parameters + ---------- + data : any + The input data to check. + raise_err : bool, default False + Whether to raise an error if the data is not a boolean Series. + + Raises + ------ + ValueError + If `raise_err` is True and the input data is not a Pandas boolean series. + + Returns + ------- + bool + True if the input is a Pandas boolean series, False otherwise. + """ + if not is_series(data, raise_err=raise_err): + return False + + if is_bool_dtype(data): + return True + + if raise_err: + raise ValueError("Pandas series must have a boolean type.") + + return False + + +def is_int_series( + data: Any, + raise_err: bool = False, + raise_err_with_nullable: bool = False, +) -> bool: + """ + Check if the input is a Pandas integer series. + + Parameters + ---------- + data : any + The input data to check. + raise_err : bool, default False + Whether to raise an error if the data is not an integer Series. + raise_err_with_nullable: bool, default False + Whether to raise an error informing that, if the data is not an integer Series, + consider a nullable integer data type. Takes precedence over raise_err. + + Raises + ------ + ValueError + If `raise_err` is True and the input data is not a Pandas integer series. + + Returns + ------- + bool + True if the input is a Pandas integer series, False otherwise. + """ + if not is_series(data, raise_err=raise_err): + return False + + if is_integer_dtype(data): + return True + + if raise_err_with_nullable: + raise ValueError( + "Pandas series must have an integer type. Consider applying " + "`series.astype('Int64')`, where Int64 is a nullable integer data type " + "which enables the use of null values with an integer dtype." + ) + + if raise_err: + raise ValueError("Pandas series must have an integer type.") + + return False + + +def is_float_series(data: Any, raise_err: bool = False) -> bool: + """ + Check if the input is a Pandas float series. + + Parameters + ---------- + data : any + The input data to check. + raise_err : bool, default False + Whether to raise an error if the data is not a float Series. + + Raises + ------ + ValueError + If `raise_err` is True and the input data is not a Pandas float series. + + Returns + ------- + bool + True if the input is a Pandas float series, False otherwise. + """ + if not is_series(data, raise_err=raise_err): + return False + + if is_float_dtype(data): + return True + + if raise_err: + raise ValueError("Pandas series must have a float type.") + + return False + + +def is_str_series(data: Any, raise_err: bool = False) -> bool: + """ + Check if the input is a Pandas string series. + + Parameters + ---------- + data : any + The input data to check. + raise_err : bool, default False + Whether to raise an error if the data is not a string Series. + + Raises + ------ + ValueError + If `raise_err` is True and the input data is not a Pandas string series. + + Returns + ------- + bool + True if the input is a Pandas string series, False otherwise. + """ + if not is_series(data, raise_err=raise_err): + return False + + if is_string_dtype(data): + return True + + if raise_err: + raise ValueError("Pandas series must have a string type.") + + return False + + +def is_datetime_series(data: Any, raise_err: bool = False) -> bool: + """ + Check if the input is a Pandas datetime series. + + Parameters + ---------- + data : any + The input data to check. + raise_err : bool, default False + Whether to raise an error if the data is not a datetime Series. + + Raises + ------ + ValueError + If `raise_err` is True and the input data is not a Pandas datetime series. + + Returns + ------- + bool + True if the input is a Pandas datetime series, False otherwise. + """ + if not is_series(data, raise_err=raise_err): + return False + + if is_datetime64_any_dtype(data): + return True + + if raise_err: + raise ValueError("Pandas series must have a datetime type.") + + return False From 0676d9c45b50a7d73e330a7f556de0f91206137c Mon Sep 17 00:00:00 2001 From: Franklin <41602287+fcogidi@users.noreply.github.com> Date: Fri, 19 Jan 2024 10:40:08 -0500 Subject: [PATCH 2/3] Initial refactor of date handling module --- cyclops/data/df/dates/__init__.py | 16 + cyclops/data/df/dates/common.py | 52 --- cyclops/data/df/dates/dates.py | 224 ++++++------ cyclops/data/df/dates/groupby.py | 45 --- cyclops/data/df/dates/index.py | 83 ----- cyclops/data/df/dates/join.py | 45 --- cyclops/data/df/dates/pandas.py | 160 --------- cyclops/data/df/dates/reconcile_dates.py | 197 ++++++----- cyclops/data/df/{dates => }/pairs.py | 28 +- .../{dates/type.py => series_validation.py} | 37 +- cyclops/data/df/utils.py | 318 ++++++++++++++++++ 11 files changed, 600 insertions(+), 605 deletions(-) delete mode 100644 cyclops/data/df/dates/common.py delete mode 100644 cyclops/data/df/dates/groupby.py delete mode 100644 cyclops/data/df/dates/index.py delete mode 100644 cyclops/data/df/dates/join.py delete mode 100644 cyclops/data/df/dates/pandas.py rename cyclops/data/df/{dates => }/pairs.py (84%) rename cyclops/data/df/{dates/type.py => series_validation.py} (90%) create mode 100644 cyclops/data/df/utils.py diff --git a/cyclops/data/df/dates/__init__.py b/cyclops/data/df/dates/__init__.py index 4297c5136..1354a58c3 100644 --- a/cyclops/data/df/dates/__init__.py +++ b/cyclops/data/df/dates/__init__.py @@ -1 +1,17 @@ """Processors for date handling.""" +from cyclops.data.df.dates.dates import ( + DatePairHandler, + analyze_dates, + analyzed_dates_differ, + analyzed_dates_failed_to_convert, + combine_date_and_time_components, + components_to_datetime, + datetime_components, + datetime_to_unix, + dateutil_parse_date, + extract_dateutil_components, + filter_date_deltas, + has_time, + round_date, + unix_to_datetime, +) diff --git a/cyclops/data/df/dates/common.py b/cyclops/data/df/dates/common.py deleted file mode 100644 index 7766f6f7d..000000000 --- a/cyclops/data/df/dates/common.py +++ /dev/null @@ -1,52 +0,0 @@ -from typing import Any, Dict, List, Optional, Set, Union - -import numpy as np - - -def to_list(obj: Any) -> List[Any]: - """Convert some object to a list of object(s) unless already one. - - Parameters - ---------- - obj : any - The object to convert to a list. - - Returns - ------- - list - The processed object. - - """ - if isinstance(obj, list): - return obj - - if isinstance(obj, (np.ndarray, set, dict)): - return list(obj) - - return [obj] - - -def to_list_optional( - obj: Optional[Any], none_to_empty: bool = False -) -> Union[List[Any], None]: - """Convert some object to a list of object(s) unless already None or a list. - - Parameters - ---------- - obj : any - The object to convert to a list. - none_to_empty: bool, default = False - If true, return a None obj as an empty list. Otherwise, return as None. - - Returns - ------- - list or None - The processed object. - - """ - if obj is None: - if none_to_empty: - return [] - return None - - return to_list(obj) diff --git a/cyclops/data/df/dates/dates.py b/cyclops/data/df/dates/dates.py index 10235bf84..d6f53ddf7 100644 --- a/cyclops/data/df/dates/dates.py +++ b/cyclops/data/df/dates/dates.py @@ -1,19 +1,17 @@ -from typing import List, Optional, Union -import warnings - +"""Utilities for working with dates in pandas DataFrames.""" import datetime +import warnings from datetime import timedelta +from typing import Any, List, Optional, Union +import numpy as np +import pandas as pd from dateutil import parser as du_parser from dateutil.parser import ParserError -# import datefinder - -import numpy as np -import pandas as pd +from cyclops.data.df.series_validation import is_datetime_series, is_str_series +from cyclops.data.df.utils import check_cols -from fecg.utils.pandas.pandas import check_cols -from fecg.utils.pandas.type import is_datetime_series, is_str_series # Datetime component names DATE_COMPONENTS = ["year", "month", "day"] @@ -27,8 +25,7 @@ def datetime_to_unix(series: pd.Series) -> pd.Series: - """ - Convert a datetime series to UNIX timestamps. + """Convert a datetime series to UNIX timestamps. Parameters ---------- @@ -46,8 +43,7 @@ def datetime_to_unix(series: pd.Series) -> pd.Series: def unix_to_datetime(series: pd.Series) -> pd.Series: - """ - Convert a series of UNIX timestamps to datetime. + """Convert a series of UNIX timestamps to datetime. Parameters ---------- @@ -63,8 +59,7 @@ def unix_to_datetime(series: pd.Series) -> pd.Series: def round_date(dates: pd.Series) -> pd.Series: - """ - Round datetimes to the nearest day. + """Round datetimes to the nearest day. Parameters ---------- @@ -78,15 +73,14 @@ def round_date(dates: pd.Series) -> pd.Series: """ is_datetime_series(dates, raise_err=True) - return dates.dt.round('1d') + return dates.dt.round("1d") def has_time( dates: pd.Series, raise_err_on_time: bool = False, ) -> pd.Series: - """ - Checks whether any datetimes have a time component. + """Check whether any datetimes have a time component. Parameters ---------- @@ -95,15 +89,15 @@ def has_time( raise_err : bool, default False If True, raise an error if any date has a time component. - Raises - ------ - ValueError - If any date has a time component and `raise_err` is True. - Returns ------- bool Whether any dates have a time component. + + Raises + ------ + ValueError + If any date has a time component and `raise_err` is True. """ # Round datetime values rounded = round_date(dates) @@ -120,9 +114,8 @@ def has_time( # DEPRECIATED IN CONTRAST TO `analyze_dates`??? -def invalid_date(dates: pd.Series, **to_datetime_kwargs) -> pd.Series: - """ - Given a Series of dates, return a boolean Series of whether the dates are invalid. +def invalid_date(dates: pd.Series, **to_datetime_kwargs: Any) -> pd.Series: + """Return a boolean Series of whether a given series of dates are invalid. Parameters ---------- @@ -146,14 +139,14 @@ def invalid_date(dates: pd.Series, **to_datetime_kwargs) -> pd.Series: if "errors" in to_datetime_kwargs: raise ValueError("Cannot specify 'errors' in to_datetime_kwargs.") - return pd.isna(pd.to_datetime(dates, errors='coerce', **to_datetime_kwargs)) + return pd.isna(pd.to_datetime(dates, errors="coerce", **to_datetime_kwargs)) def filter_date_deltas( dates: pd.DataFrame, - delta_cutoff: Union[str, timedelta] = None, - left_delta_cutoff: Union[str, timedelta] = None, - right_delta_cutoff: Union[str, timedelta] = None, + delta_cutoff: Optional[Union[str, timedelta]] = None, + left_delta_cutoff: Optional[Union[str, timedelta]] = None, + right_delta_cutoff: Optional[Union[str, timedelta]] = None, ) -> pd.DataFrame: """ Filter DataFrame based on date delta conditions. @@ -162,11 +155,11 @@ def filter_date_deltas( ---------- dates : pandas.DataFrame DataFrame containing 'delta' column. - delta_cutoff : timedelta, optional + delta_cutoff : timedelta, optional, default=None Maximum delta value allowed. - left_delta_cutoff : timedelta, optional + left_delta_cutoff : timedelta, optional, default=None Minimum delta value allowed. - right_delta_cutoff : timedelta, optional + right_delta_cutoff : timedelta, optional, default=None Maximum delta value allowed. Returns @@ -177,29 +170,29 @@ def filter_date_deltas( Raises ------ ValueError - When delta_cutoff specified along with left_delta_cutoff or right_delta_cutoff. + When `delta_cutoff` specified along with `left_delta_cutoff` or + `right_delta_cutoff`. """ if delta_cutoff is not None: if left_delta_cutoff is not None or right_delta_cutoff is not None: raise ValueError( "Cannot specify left_delta_cutoff or right_delta_cutoff when " - "delta_cutoff is specified." + "delta_cutoff is specified.", ) - return dates[abs(dates['delta']) <= pd.to_timedelta(delta_cutoff)] + return dates[abs(dates["delta"]) <= pd.to_timedelta(delta_cutoff)] if left_delta_cutoff is not None: - dates = dates[dates['delta'] >= pd.to_timedelta(left_delta_cutoff)] + dates = dates[dates["delta"] >= pd.to_timedelta(left_delta_cutoff)] if right_delta_cutoff is not None: - dates = dates[dates['delta'] <= pd.to_timedelta(right_delta_cutoff)] + dates = dates[dates["delta"] <= pd.to_timedelta(right_delta_cutoff)] return dates class DatePairHandler: - """ - Handler to create and manipulate pairs based on dates and IDs. + """Handler to create and manipulate pairs based on dates and IDs. Attributes ---------- @@ -213,15 +206,17 @@ class DatePairHandler: The paired data coming from the data_x and data_y columns. Computed and stored based on `date_pairs` when the `paired_data` method is first called. """ + def __init__( self, data_x: pd.DataFrame, data_y: pd.DataFrame, - delta_cutoff: Union[str, timedelta] = None, - left_delta_cutoff: Union[str, timedelta] = None, - right_delta_cutoff: Union[str, timedelta] = None, + delta_cutoff: Optional[Union[str, timedelta]] = None, + left_delta_cutoff: Optional[Union[str, timedelta]] = None, + right_delta_cutoff: Optional[Union[str, timedelta]] = None, keep_closest_to: Optional[str] = None, - ): + ) -> None: + """Initialize an instance of `DatePairHandler`.""" assert data_x.index.name == "id" assert data_y.index.name == "id" assert "idx_x" not in data_x.columns @@ -232,7 +227,11 @@ def __init__( data_x["idx_x"] = np.arange(len(data_x)) data_y["idx_y"] = np.arange(len(data_y)) - date_pairs = data_x[["date", "idx_x"]].merge(data_y[["date", "idx_y"]], on='id', how='inner') + date_pairs = data_x[["date", "idx_x"]].merge( + data_y[["date", "idx_y"]], + on="id", + how="inner", + ) if keep_closest_to is not None: assert keep_closest_to in ["date_x", "date_y"] @@ -249,13 +248,19 @@ def __init__( if keep_closest_to is not None: date_pairs = date_pairs.reset_index() - min_deltas = date_pairs.groupby(["id", keep_closest_to]).agg({ - "abs_delta": "min", - }).reset_index() + min_deltas = ( + date_pairs.groupby(["id", keep_closest_to]) + .agg( + { + "abs_delta": "min", + }, + ) + .reset_index() + ) date_pairs = date_pairs.merge( min_deltas, on=["id", keep_closest_to, "abs_delta"], - how='inner', + how="inner", ) self.data_x = data_x @@ -265,8 +270,7 @@ def __init__( @property def paired_data(self) -> pd.DataFrame: - """ - Get paired data based on the date pairs. + """Get paired data based on the date pairs. Returns ------- @@ -274,17 +278,26 @@ def paired_data(self) -> pd.DataFrame: Paired data based on the date pairs. """ if self._paired_data is None: - self._paired_data = pd.concat([ - self.data_x.set_index("idx_x").loc[self.date_pairs["idx_x"]].reset_index(), - self.data_y.set_index("idx_y").loc[self.date_pairs["idx_y"]].reset_index(), - ], axis=1) + self._paired_data = pd.concat( + [ + self.data_x.set_index("idx_x") + .loc[self.date_pairs["idx_x"]] + .reset_index(), + self.data_y.set_index("idx_y") + .loc[self.date_pairs["idx_y"]] + .reset_index(), + ], + axis=1, + ) return self._paired_data -def du_parse_date(date: str, **parse_kwargs) -> Union[datetime.datetime, float]: - """ - Parse a date string using dateutil's parser. +def dateutil_parse_date( + date: str, + **parse_kwargs: Any, +) -> Union[datetime.datetime, float]: + """Parse a date string using dateutil's parser. Parameters ---------- @@ -307,15 +320,14 @@ def du_parse_date(date: str, **parse_kwargs) -> Union[datetime.datetime, float]: return np.nan -def extract_du_components( +def extract_dateutil_components( du_series: pd.Series, components: Optional[List[str]] = None, ) -> pd.DataFrame: - """ - Extract datetime components from dates parsed from dateutil (du). + """Extract datetime components from dates parsed from `dateutil` (du). - Useful for Series full of datetimes that cannot be converted using - `pandas.to_datetime` without possibly losing dates to errors like + Useful for Series full of datetimes that cannot be converted using + `pandas.to_datetime` without possibly losing dates to errors like `OutOfBoundsDatetime`. Parameters @@ -330,7 +342,11 @@ def extract_du_components( pd.DataFrame DataFrame containing the extracted datetime components. """ - def extract_components(datetime, components): + + def extract_components( + datetime: datetime.datetime, + components: List[str], + ) -> np.ndarray: if pd.isna(datetime): return np.full(len(components), np.nan) return np.array([getattr(datetime, comp) for comp in components]) @@ -348,8 +364,7 @@ def datetime_components( texts: pd.Series, components: Optional[List[str]] = None, ) -> pd.DataFrame: - """ - Extract separate datetime components (NaN when missing) using dateutil. + """Extract separate datetime components (NaN when missing) using dateutil. Useful because functionalities like `pandas.to_datetime` will return NaT if a full date is not present (e.g., missing a year). @@ -367,23 +382,29 @@ def datetime_components( DataFrame containing the extracted datetime components and the parsed date. """ # Extract dates with different values across all components - du = texts.apply(du_parse_date) + du = texts.apply(dateutil_parse_date) du.rename(DU_DT, inplace=True) - du2 = texts.apply(du_parse_date, default=datetime.datetime(1, 2, 2, 2, 2, 2, 2)) + du2 = texts.apply( + dateutil_parse_date, + default=datetime.datetime(1, 2, 2, 2, 2, 2, 2), + ) du2.rename("du2", inplace=True) # Where they are equal is not default, where they aren't is default (i.e., missing) components = components or DT_COMPONENTS - equal = pd.concat([ - extract_du_components(du, components=components), - extract_du_components(du2, components=components).add_suffix('_2'), - ], axis=1) + equal = pd.concat( + [ + extract_dateutil_components(du, components=components), + extract_dateutil_components(du2, components=components).add_suffix("_2"), + ], + axis=1, + ) - for i, comp in enumerate(components): + for _, comp in enumerate(components): # If a value is missing (different for different default components), # then replace it with NaN - equal[comp][equal[comp] != equal[f'{comp}_2']] = np.nan + equal[comp][equal[comp] != equal[f"{comp}_2"]] = np.nan return pd.concat([du, equal[components]], axis=1) @@ -393,8 +414,7 @@ def analyzed_dates_differ( warn: bool = False, raise_err: bool = False, ) -> pd.Series: - """ - Check where the analyzed `dateutil` and `pd.to_datetime` dates differ. + """Check where the analyzed `dateutil` and `pd.to_datetime` dates differ. Parameters ---------- @@ -408,7 +428,7 @@ def analyzed_dates_differ( Returns ------- pd.Series - Boolean series indicating where the dates from `pd.to_datetime` and + Boolean series indicating where the dates from `pd.to_datetime` and `dateutil` do not match. Raises @@ -421,8 +441,9 @@ def analyzed_dates_differ( # If the dates parsed from pd and du aren't the same date (and didn't # both fail to parse), then flag that something funky might be going on - matching = (analyzed[PD_DT] == analyzed[DU_DT]) | \ - (analyzed[[PD_DT, DU_DT]].isna().sum(axis=1) == 2) + matching = (analyzed[PD_DT] == analyzed[DU_DT]) | ( + analyzed[[PD_DT, DU_DT]].isna().sum(axis=1) == 2 + ) if not matching.all(): msg = ( @@ -434,7 +455,7 @@ def analyzed_dates_differ( raise ValueError(msg) if warn: - warnings.warn(msg) + warnings.warn(msg, UserWarning, stacklevel=2) return ~matching @@ -444,8 +465,7 @@ def analyzed_dates_failed_to_convert( warn: bool = False, raise_err: bool = False, ) -> pd.Series: - """ - Check whether any `dateutil` dates which failed to convert using `pd.to_datetime`. + """Check if any `dateutil` dates failed to convert using `pd.to_datetime`. One common failure is due to a `pandas.errors.OutOfBoundsDatetime`. @@ -483,7 +503,7 @@ def analyzed_dates_failed_to_convert( raise ValueError(msg) if warn: - warnings.warn(msg) + warnings.warn(msg, UserWarning, stacklevel=2) return failed @@ -493,8 +513,7 @@ def analyze_dates( components: Optional[List[str]] = None, warn: bool = True, ) -> pd.DataFrame: - """ - Analyze a series of dates and extract datetime components. + """Analyze a series of dates and extract datetime components. Parameters ---------- @@ -515,13 +534,20 @@ def analyze_dates( texts.rename("text", inplace=True) dates = texts.to_frame() - dates[PD_DT] = pd.to_datetime(dates["text"], infer_datetime_format=True, errors="coerce") + dates[PD_DT] = pd.to_datetime( + dates["text"], + infer_datetime_format=True, + errors="coerce", + ) components = components or DT_COMPONENTS - dates = pd.concat([ - dates, - datetime_components(dates["text"], components=components), - ], axis=1) + dates = pd.concat( + [ + dates, + datetime_components(dates["text"], components=components), + ], + axis=1, + ) # Drop a component column if the whole column is NaN - it is likely never specified dates.drop( @@ -547,8 +573,7 @@ def components_to_datetime( comps: pd.DataFrame, default_time: Optional[datetime.time] = None, ) -> pd.Series: - """ - Converts a DataFrame of datetime components into a datetime series. + """Convert a DataFrame of datetime components into a datetime series. Useful for combining separate date and time texts. @@ -582,12 +607,12 @@ def components_to_datetime( check_cols(comps, DT_COMPONENTS, raise_err_on_unexpected=True) avail_time_comps = set(comps.columns).intersection(set(TIME_COMPONENTS)) - if not (comps.dtypes.unique().astype(str) == 'Int64').all(): + if not (comps.dtypes.unique().astype(str) == "Int64").all(): raise ValueError("Components must have type 'Int64'.") # Handle default times default_time = default_time or datetime.time(0) - TIME_COMPONENTS + for time_comp in TIME_COMPONENTS: time_comp_value = getattr(default_time, time_comp) @@ -616,8 +641,7 @@ def combine_date_and_time_components( date_comps: pd.DataFrame, time_comps: pd.DataFrame, ) -> pd.DataFrame: - """ - Combine date components from one DataFrame and time components from another. + """Combine date components from one DataFrame and time components from another. Parameters ---------- @@ -634,6 +658,10 @@ def combine_date_and_time_components( Examples -------- + >>> from cyclops.data.df.dates import ( + ... analyze_dates, + ... combine_date_and_time_components, + ... ) >>> date_comps = analyze_dates(meta["AcquisitionDate"]) >>> time_comps = analyze_dates(meta["AcquisitionTime"]) >>> comps = combine_date_and_time_components( @@ -645,7 +673,7 @@ def combine_date_and_time_components( """ if not date_comps.index.equals(date_comps.index): raise ValueError( - "Indexes of `date_comps` and `time_comps` must be the same." + "Indexes of `date_comps` and `time_comps` must be the same.", ) unexpected_cols_date, _, _ = check_cols(date_comps, DATE_COMPONENTS) @@ -657,5 +685,5 @@ def combine_date_and_time_components( return pd.concat([date_comps, time_comps], axis=1) -#def find_dates(text): +# def find_dates(text): # matches = datefinder.find_dates(text, source=True, index=True) diff --git a/cyclops/data/df/dates/groupby.py b/cyclops/data/df/dates/groupby.py deleted file mode 100644 index 6be041fa5..000000000 --- a/cyclops/data/df/dates/groupby.py +++ /dev/null @@ -1,45 +0,0 @@ -import pandas as pd - - -def agg_mode(series: pd.Series) -> list: - """ - Get the mode(s) of a series by using `.agg(agg_mode)`. - - Parameters - ---------- - series : pd.Series - Series. - - Returns - ------- - list - List containing the mode(s) of the input series. - """ - return pd.Series.mode(series).to_list() - - -def groupby_agg_mode( - grouped: pd.core.groupby.generic.SeriesGroupBy, - single_modes_only: bool = False, -) -> pd.Series: - """ - Compute the mode(s) for each group of a grouped series. - - Parameters - ---------- - grouped : pd.core.groupby.generic.SeriesGroupBy - Grouped series. - single_modes_only : bool, default False - If True, only groups with a singular mode are kept. - - Returns - ------- - pd.Series - A pandas Series containing the mode(s) for each group. - """ - - result = grouped.agg(agg_mode).explode() - if single_modes_only: - duplicate_indices = result.index[result.index.duplicated(keep=False)] - result = result.drop(duplicate_indices) - return result diff --git a/cyclops/data/df/dates/index.py b/cyclops/data/df/dates/index.py deleted file mode 100644 index c2f0818eb..000000000 --- a/cyclops/data/df/dates/index.py +++ /dev/null @@ -1,83 +0,0 @@ -import pandas as pd - - -def index_structure_equal( - idx1: pd.Index, - idx2: pd.Index, - raise_err: bool = False, -) -> bool: - """ - Check whether two indexes have the same structure. Values aren't considered. - - Parameters - ---------- - idx1 : pandas.Index - The first index to compare. - idx2 : pandas.Index - The second index to compare. - raise_err : bool, default False - If True, raises an error if indexes do not have the same structure. - - Returns - ------- - bool - True if the indexes have the same structure, otherwise False. - """ - if type(idx1) != type(idx2): - if raise_err: - raise ValueError("Index dtypes do not match.") - - return False - - if idx1.names != idx2.names: - if raise_err: - raise ValueError("Index names do not match.") - - return False - - if idx1.nlevels != idx2.nlevels: - if raise_err: - raise ValueError("Number of index levels do not match.") - - return False - - return True - - -def is_multiindex( - idx: pd.Index, - raise_err: bool = False, - raise_err_multi: bool = False, -) -> bool: - """ - Check whether a given index is a MultiIndex. - - Parameters - ---------- - idx : pd.Index - Index to check. - raise_err : bool, default False - If True, raise a ValueError when idx is not a MultiIndex. - raise_err_multi : bool, default False - If True, raise a ValueError when idx is a MultiIndex. - - Raises - ------ - ValueError - Raised when `idx` is not a MultiIndex and `raise_err` is True. - Raised when `idx` is a MultiIndex and `raise_err_multi` is True. - - Returns - ------- - bool - True if idx is a MultiIndex, False otherwise. - """ - multiindex = isinstance(idx, pd.MultiIndex) - - if not multiindex and raise_err: - raise ValueError("Index must be a MultiIndex.") - - if multiindex and raise_err_multi: - raise ValueError("Index cannot be a MultiIndex.") - - return multiindex diff --git a/cyclops/data/df/dates/join.py b/cyclops/data/df/dates/join.py deleted file mode 100644 index 4ac7b4da8..000000000 --- a/cyclops/data/df/dates/join.py +++ /dev/null @@ -1,45 +0,0 @@ -from typing import Hashable, Optional, Sequence, Union - -import pandas as pd - -from fecg.utils.pandas.pandas import COLS_TYPE - - -def reset_index_merge( - left: Union[pd.DataFrame, pd.Series], - right: Union[pd.DataFrame, pd.Series], - index_col: Optional[COLS_TYPE] = None, - **merge_kwargs, -) -> pd.DataFrame: - """ - Merges two dataframes after resetting their indexes. - - Parameters - ---------- - left : pandas.DataFrame or pandas.Series - The left object to merge. - right : pandas.DataFrame or pandas.Series - The right object to merge. - index_col : hashable or sequence of hashable, optional - Column(s) to set as index for the merged result. - **merge_kwargs - Additional keyword arguments to pass to pandas merge function. - - Returns - ------- - pd.DataFrame - The merged dataframe. - """ - - # Reset index for both dataframes - left_reset = left.reset_index() - right_reset = right.reset_index() - - # Merge the dataframes - merged = pd.merge(left_reset, right_reset, **merge_kwargs) - - # If index_col is provided, set it for the merged dataframe - if index_col: - merged.set_index(index_col, inplace=True) - - return merged diff --git a/cyclops/data/df/dates/pandas.py b/cyclops/data/df/dates/pandas.py deleted file mode 100644 index 8aea57570..000000000 --- a/cyclops/data/df/dates/pandas.py +++ /dev/null @@ -1,160 +0,0 @@ -from typing import ( - Any, - Dict, - Hashable, - List, - Sequence, - Set, - Tuple, - Union, -) - -from functools import reduce - -import pandas as pd - -from fecg.utils.common import to_list -from fecg.utils.pandas.type import ( - is_bool_series, - is_int_series, - is_series, -) - -COLS_TYPE = Union[Hashable, Sequence[Hashable]] - - -def check_cols( - data: pd.DataFrame, - cols: COLS_TYPE, - raise_err_on_unexpected: bool = False, - raise_err_on_existing: bool = False, - raise_err_on_missing: bool = False, -) -> Tuple[Set[Hashable], Set[Hashable], Set[Hashable]]: - """ - Check DataFrame columns for expected columns and handle errors. - - Parameters - ---------- - data : pd.DataFrame - The input DataFrame to check columns against. - cols : hashable or list of Hashable - The column(s) to check for in the DataFrame. - raise_err_on_unexpected : bool, default False - Raise an error if unexpected columns are found. - raise_err_on_existing : bool, default False - Raise an error if any of the specified columns already exist. - raise_err_on_missing : bool, default False - Raise an error if any of the specified columns are missing. - - Returns - ------- - Tuple[Set[Hashable], Set[Hashable], Set[Hashable]] - A tuple containing sets of unexpected, existing, and missing columns. - """ - cols = set(to_list(cols)) - data_cols = set(data.columns) - - unexpected = data_cols - cols - if raise_err_on_unexpected and len(unexpected) > 0: - raise ValueError(f"Unexpected columns: {', '.join(unexpected)}") - - existing = data_cols.intersection(cols) - if raise_err_on_existing and len(existing) > 0: - raise ValueError(f"Existing columns: {', '.join(existing)}") - - missing = cols - data_cols - if raise_err_on_missing and len(missing) > 0: - raise ValueError(f"Missing columns: {', '.join(missing)}") - - return unexpected, existing, missing - - -def and_conditions(conditions: List[pd.Series]) -> pd.Series: - """ - Perform element-wise logical AND operation on a list of boolean Series. - - Parameters - ---------- - conditions : list of pd.Series - A list of boolean Pandas Series. - - Raises - ------ - ValueError - If the conditions are not Pandas boolean series. - - Returns - ------- - pd.Series - A new Pandas Series resulting from the element-wise logical AND operation. - """ - for condition in conditions: - is_bool_series(condition, raise_err=True) - - return reduce(lambda x, y: x & y, conditions) - - -def or_conditions(conditions: List[pd.Series]) -> pd.Series: - """ - Perform element-wise logical OR operation on a list of boolean Series. - - Parameters - ---------- - conditions : list of pd.Series - A list of boolean Pandas Series. - - Raises - ------ - ValueError - If the conditions are not Pandas boolean series. - - Returns - ------- - pd.Series - A new Pandas Series resulting from the element-wise logical OR operation. - """ - for condition in conditions: - is_bool_series(condition, raise_err=True) - - return reduce(lambda x, y: x | y, conditions) - - -def combine_nonoverlapping(datas: List[Union[pd.DataFrame, pd.Series]]) -> pd.DataFrame: - """ - Combine non-overlapping DataFrames/Series into a single DataFrame/Series. - - The objects in `datas` should be all DataFrames or all Series, not a combination. - - For any given value location, it can be non-null in exactly 0 or 1 of the - DataFrames. The combined DataFrame will contains all of these values. - - Parameters - ---------- - datas : list of pandas.DataFrame - A list of DataFrames to be combined. - - Returns - ------- - pandas.DataFrame - The combined DataFrame. - - Raises - ------ - ValueError - If unauthorized overlap is found between DataFrames. - """ - # Get masks where the DataFrames are NaN - datas_na = [data.isna() for data in datas] - - # Check that there is no unauthorized overlap - datas_not_na = [(~data_na).astype(int) for data_na in datas_na] - datas_not_na_sum = reduce(lambda x, y: x + y, datas_not_na) - if not (datas_not_na_sum <= 1).all().all(): - raise ValueError("Unauthorized overlap found between DataFrames. Cannot combine.") - - # Combine the DataFrames - combined = datas[0].copy() - for data in datas[1:]: - combined = combined.combine_first(data) - - return combined diff --git a/cyclops/data/df/dates/reconcile_dates.py b/cyclops/data/df/dates/reconcile_dates.py index e44aeab62..e2caf0ca9 100644 --- a/cyclops/data/df/dates/reconcile_dates.py +++ b/cyclops/data/df/dates/reconcile_dates.py @@ -1,31 +1,32 @@ -from typing import Dict, Hashable, List, Optional +"""Reconcile issues with dates in a DataFrame.""" +import datetime import warnings from copy import deepcopy -from dataclasses import dataclass, field - -import datetime +from dataclasses import dataclass from datetime import timedelta +from typing import Dict, Hashable, List, Optional import numpy as np import pandas as pd - from sklearn.cluster import DBSCAN -from fecg.utils.common import to_list_optional -from fecg.utils.dates.dates import datetime_to_unix, has_time -from fecg.utils.pairs import ( +from cyclops.data.df.dates.dates import datetime_to_unix, has_time +from cyclops.data.df.pairs import ( get_pairs, pairs_to_groups, split_pairs, ) -from fecg.utils.pandas.groupby import groupby_agg_mode -from fecg.utils.pandas.join import reset_index_merge -from fecg.utils.pandas.index import ( +from cyclops.data.df.series_validation import is_datetime_series +from cyclops.data.df.utils import ( + check_cols, + combine_nonoverlapping, + groupby_agg_mode, index_structure_equal, is_multiindex, + or_conditions, + reset_index_merge, ) -from fecg.utils.pandas.pandas import check_cols, combine_nonoverlapping, or_conditions -from fecg.utils.pandas.type import is_datetime_series +from cyclops.utils.common import to_list_optional def cluster_date_group(dates, dbscan): @@ -87,9 +88,11 @@ def cluster_analysis(unres_hard, clusters): # so ignore groups with several clusters of same size clusters_of_max_size_vcs = clusters_of_max_size.index.value_counts() - clusters_of_max_size = clusters_of_max_size[~clusters_of_max_size.index.isin( - clusters_of_max_size_vcs.index[clusters_of_max_size_vcs > 1] - )] + clusters_of_max_size = clusters_of_max_size[ + ~clusters_of_max_size.index.isin( + clusters_of_max_size_vcs.index[clusters_of_max_size_vcs > 1], + ) + ] # Get the is_max_size column into clusters clusters = reset_index_merge( @@ -107,22 +110,25 @@ def cluster_analysis(unres_hard, clusters): # Get the hard dates in the largest clusters clusters_largest_hard = clusters_largest[~clusters_largest["approx"]] -# # === Resolve: largest_cluster_hard_mode -# single_modes = groupby_agg_mode( -# unres_hard["date"].groupby(level=0), -# single_modes_only=True, -# ) + # # === Resolve: largest_cluster_hard_mode + # single_modes = groupby_agg_mode( + # unres_hard["date"].groupby(level=0), + # single_modes_only=True, + # ) -# largest_hard_is_mode = clusters_largest_hard.index.isin(single_modes.index) -# largest_cluster_hard_mode = clusters_largest_hard[largest_hard_is_mode]["date"] + # largest_hard_is_mode = clusters_largest_hard.index.isin(single_modes.index) + # largest_cluster_hard_mode = clusters_largest_hard[largest_hard_is_mode]["date"] -# # Continue without the resolved ones -# clusters_largest_hard = clusters_largest_hard[~largest_hard_is_mode] + # # Continue without the resolved ones + # clusters_largest_hard = clusters_largest_hard[~largest_hard_is_mode] # === Resolve: largest_cluster_hard_mean === # Take the average of these largest cluster hard dates - largest_cluster_hard_mean = clusters_largest_hard.reset_index( - ).groupby(index_col + ["cluster"])["date"].agg("mean") + largest_cluster_hard_mean = ( + clusters_largest_hard.reset_index() + .groupby(index_col + ["cluster"])["date"] + .agg("mean") + ) largest_cluster_hard_mean.index = largest_cluster_hard_mean.index.droplevel(1) # === Resolve: largest_cluster_approx_mean === @@ -145,8 +151,14 @@ def analyze_typos(dates_hard): index_col = dates_hard.index.names # Get all unique hard dates for each group - dates_hard_unique = dates_hard["date"].reset_index().value_counts( - ).reset_index().drop(0, axis=1).set_index(index_col)["date"] + dates_hard_unique = ( + dates_hard["date"] + .reset_index() + .value_counts() + .reset_index() + .drop(0, axis=1) + .set_index(index_col)["date"] + ) # Ignore any groups which only have one unique hard date dates_hard_unique_vcs = dates_hard_unique.index.value_counts() @@ -156,19 +168,23 @@ def analyze_typos(dates_hard): dates_hard_unique = dates_hard_unique.loc[dates_hard_unique_vcs.index] def date_to_char(dates): - chars = dates.astype(str).str.split('', expand=True) + chars = dates.astype(str).str.split("", expand=True) chars.drop(columns=[0, 5, 8, 11], inplace=True) - chars.rename({ - 1: 'y1', - 2: 'y2', - 3: 'y3', - 4: 'y4', - 6: 'm1', - 7: 'm2', - 9: 'd1', - 10: 'd2', - }, axis=1, inplace=True) - chars = chars.astype('uint8') + chars.rename( + { + 1: "y1", + 2: "y2", + 3: "y3", + 4: "y4", + 6: "m1", + 7: "m2", + 9: "d1", + 10: "d2", + }, + axis=1, + inplace=True, + ) + chars = chars.astype("uint8") return chars @@ -198,15 +214,23 @@ def date_to_char(dates): # Incorporate date info # Recover the dates from the characters - date_x = pairs_x.astype(str).agg(''.join, axis=1) - date_x = date_x.str.slice(stop=4) + \ - "-" + date_x.str.slice(start=4, stop=6) + \ - "-" + date_x.str.slice(start=6) - - date_y = pairs_y.astype(str).agg(''.join, axis=1) - date_y = date_y.str.slice(stop=4) + \ - "-" + date_y.str.slice(start=4, stop=6) + \ - "-" + date_y.str.slice(start=6) + date_x = pairs_x.astype(str).agg("".join, axis=1) + date_x = ( + date_x.str.slice(stop=4) + + "-" + + date_x.str.slice(start=4, stop=6) + + "-" + + date_x.str.slice(start=6) + ) + + date_y = pairs_y.astype(str).agg("".join, axis=1) + date_y = ( + date_y.str.slice(stop=4) + + "-" + + date_y.str.slice(start=4, stop=6) + + "-" + + date_y.str.slice(start=6) + ) pairs["date_x"] = pd.to_datetime(date_x) pairs["date_y"] = pd.to_datetime(date_y) pairs["year"] = pairs["date_x"].dt.year == pairs["date_y"].dt.year @@ -214,16 +238,16 @@ def date_to_char(dates): pairs["day"] = pairs["date_x"].dt.day == pairs["date_y"].dt.day # Check if gotten the day/month transposed - pairs["dm_transpose"] = (pairs["date_x"].dt.day == pairs["date_y"].dt.month) & (pairs["date_x"].dt.month == pairs["date_y"].dt.day) + pairs["dm_transpose"] = (pairs["date_x"].dt.day == pairs["date_y"].dt.month) & ( + pairs["date_x"].dt.month == pairs["date_y"].dt.day + ) # Logic for determining whether a typo or not certain_conds = [ # Only one different character (pairs["n_diff"] == 1), - # Two different characters with at least one adjacent ((pairs["n_diff"] == 2) & (pairs["n_adj"] >= 1)), - # Day and month are transposed, but correct year (pairs["dm_transpose"] & pairs["year"]), ] @@ -234,9 +258,16 @@ def date_to_char(dates): # Create typo groups from pairs of possible typos typo_pairs = pairs[pairs["typo_certain"] | pairs["typo_possible"]] - typo_groups = typo_pairs[["date_x", "date_y"]].astype(str).groupby(level=0).apply( - pairs_to_groups - ).reset_index().set_index(index_col + ["group"])["level_1"] + typo_groups = ( + typo_pairs[["date_x", "date_y"]] + .astype(str) + .groupby(level=0) + .apply( + pairs_to_groups, + ) + .reset_index() + .set_index(index_col + ["group"])["level_1"] + ) typo_groups.rename("date", inplace=True) # Convert typos to characters @@ -252,7 +283,12 @@ def mode_scalar_or_list(series): # Compile the most popular character options seen in each typo group typo_value_options = typo_group_chars.groupby(level=[0, 1]).agg( - dict(zip(typo_group_chars.columns, [mode_scalar_or_list]*len(typo_group_chars.columns))) + dict( + zip( + typo_group_chars.columns, + [mode_scalar_or_list] * len(typo_group_chars.columns), + ), + ), ) """ @@ -265,7 +301,7 @@ def mode_scalar_or_list(series): - Trade off between accuracy and just having nulls instead - date accuracy importance is use case specific As we go down the line of columns, disagreements become less and less important - That means we could take a mean of two disagreeing days, but not years, or + That means we could take a mean of two disagreeing days, but not years, or thousands of years """ @@ -323,6 +359,7 @@ class DateReconciler: Approx dates take on supporting roles, e.g., is a given hard date near to many supporting approx dates, or can be used as a backup with no hard dates available. """ + def __init__( self, sources: Dict[Hashable, pd.Series], @@ -349,13 +386,13 @@ def __init__( # Handle approximate date sources if approx_sources is not None and approx_near_thresh is None: raise ValueError( - "Must specify `approx_near_thresh` if `approx_sources` specified." + "Must specify `approx_near_thresh` if `approx_sources` specified.", ) approx_sources = to_list_optional(approx_sources, none_to_empty=True) if not set(approx_sources).issubset(set(sources.keys())): raise ValueError( - "`approx_sources` must be a subset of the `sources` keys." + "`approx_sources` must be a subset of the `sources` keys.", ) self.dates = self._preproc_sources(sources, approx_sources, once_per_source) @@ -364,7 +401,6 @@ def __init__( self.approx_sources = approx_sources self.approx_near_thresh = approx_near_thresh - def _preproc_sources(self, sources, approx_sources, once_per_source): # Preprocess the sources/dates dates = [] @@ -400,9 +436,13 @@ def _preproc_sources(self, sources, approx_sources, once_per_source): if once_per_source: index_col = date.index.names - date = date.reset_index().drop_duplicates( - keep="first", - ).set_index(index_col)["date"] + date = ( + date.reset_index() + .drop_duplicates( + keep="first", + ) + .set_index(index_col)["date"] + ) date = date.to_frame() date["source"] = source @@ -417,12 +457,11 @@ def _preproc_sources(self, sources, approx_sources, once_per_source): if not (dates["date"].dt.time == datetime.time(0)).all(): warnings.warn( - "Dates with times are not supported. Converting to date only." + "Dates with times are not supported. Converting to date only.", ) return dates - def _combined_resolved(self, groups, groups_resolved): resolved = [] for reason, dates in groups_resolved.items(): @@ -433,7 +472,6 @@ def _combined_resolved(self, groups, groups_resolved): return combine_nonoverlapping(resolved) - def __call__(self): dates = self.dates.copy() @@ -469,23 +507,16 @@ def __call__(self): unres_approx = unres_approx[unres_approx["date_score"] != 0] groups_resolved = {} + def resolve(resolved, reason): nonlocal groups_resolved, unres, unres_hard, unres_approx, unres_groups groups_resolved[reason] = resolved - unres = unres[ - ~unres.index.isin(resolved.index) - ] - unres_hard = unres_hard[ - ~unres_hard.index.isin(resolved.index) - ] - unres_approx = unres_approx[ - ~unres_approx.index.isin(resolved.index) - ] - unres_groups = unres_groups[ - ~unres_groups.index.isin(resolved.index) - ] + unres = unres[~unres.index.isin(resolved.index)] + unres_hard = unres_hard[~unres_hard.index.isin(resolved.index)] + unres_approx = unres_approx[~unres_approx.index.isin(resolved.index)] + unres_groups = unres_groups[~unres_groups.index.isin(resolved.index)] # === Resolve: one_entry === one_entry = unres[ @@ -526,15 +557,17 @@ def resolve(resolved, reason): ) resolve(hard_single_mode, "hard_single_mode") - # === Cluster resolutions === clusters = get_date_clusters( unres[["date", "approx"]], self.approx_near_thresh, ) - clusters_largest, largest_cluster_hard_mean, largest_cluster_approx_mean = \ - cluster_analysis(unres_hard, clusters) + ( + clusters_largest, + largest_cluster_hard_mean, + largest_cluster_approx_mean, + ) = cluster_analysis(unres_hard, clusters) resolve(largest_cluster_hard_mean, "largest_cluster_hard_mean") resolve(largest_cluster_approx_mean, "largest_cluster_approx_mean") diff --git a/cyclops/data/df/dates/pairs.py b/cyclops/data/df/pairs.py similarity index 84% rename from cyclops/data/df/dates/pairs.py rename to cyclops/data/df/pairs.py index f01cb0012..20da62034 100644 --- a/cyclops/data/df/dates/pairs.py +++ b/cyclops/data/df/pairs.py @@ -1,11 +1,11 @@ +"""Functions for working with pairs of values in DataFrames.""" from typing import Tuple, Union +import networkx as nx import numpy as np import pandas as pd -import networkx as nx - -from fecg.utils.pandas.type import to_frame_if_series +from cyclops.data.df.series_validation import to_frame_if_series def get_pairs( @@ -13,8 +13,7 @@ def get_pairs( self_match: bool = False, combinations: bool = True, ) -> pd.DataFrame: - """ - Perform a self-cross to generate pairs. + """Perform a self-cross to generate pairs. Parameters ---------- @@ -36,7 +35,7 @@ def get_pairs( example, if evaluating the pairs using a commutative function, where argument order does not affect the result, we would want to take only the pair combinations. """ - pairs = to_frame_if_series(data).merge(data, how='cross') + pairs = to_frame_if_series(data).merge(data, how="cross") if combinations or not self_match: length = len(data) @@ -44,10 +43,7 @@ def get_pairs( idx1 = np.tile(np.arange(length), length) if combinations: - if self_match: - pairs = pairs[idx0 <= idx1] - else: - pairs = pairs[idx0 < idx1] + pairs = pairs[idx0 <= idx1] if self_match else pairs[idx0 < idx1] else: pairs = pairs[idx0 != idx1] @@ -55,8 +51,7 @@ def get_pairs( def split_pairs(pairs: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]: - """ - Split x and y pair columns into two separate DataFrames. + """Split x and y pair columns into two separate DataFrames. Parameters ---------- @@ -70,7 +65,7 @@ def split_pairs(pairs: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]: pandas.DataFrame A DataFrame of pairs which had the "_y" columns. Suffix now removed. """ - half_len = (len(pairs.columns)//2) + half_len = len(pairs.columns) // 2 pairs_x = pairs.iloc[:, :half_len] pairs_y = pairs.iloc[:, half_len:] @@ -84,8 +79,7 @@ def split_pairs(pairs: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]: def pairs_to_groups(pairs: pd.DataFrame) -> pd.DataFrame: - """ - Convert pairs of values in a DataFrame to groups of connected values. + """Convert pairs of values in a DataFrame to groups of connected values. Given a DataFrame with two columns representing pairs of values, this function constructs a graph where each value is a node and each pair is an edge. It then @@ -124,6 +118,4 @@ def pairs_to_groups(pairs: pd.DataFrame) -> pd.DataFrame: # Convert connected components into a groups series groups = components.explode() - groups = pd.Series(groups.index, index=groups.values, name="group") - - return groups + return pd.Series(groups.index, index=groups.values, name="group") diff --git a/cyclops/data/df/dates/type.py b/cyclops/data/df/series_validation.py similarity index 90% rename from cyclops/data/df/dates/type.py rename to cyclops/data/df/series_validation.py index 5e77fefff..a159467b2 100644 --- a/cyclops/data/df/dates/type.py +++ b/cyclops/data/df/series_validation.py @@ -1,6 +1,5 @@ -from typing import Any, Union - -import numpy as np +"""Functions for validating Pandas Series.""" +from typing import Any import pandas as pd from pandas.api.types import ( @@ -13,12 +12,11 @@ def is_series(data: Any, raise_err: bool = False) -> bool: - """ - Check if the input is a Pandas Series. + """Check if the input is a Pandas Series. Parameters ---------- - data : any + data : Any The input data to check. raise_err : bool, default False Whether to raise an error if the data is not a Series. @@ -43,12 +41,11 @@ def is_series(data: Any, raise_err: bool = False) -> bool: def is_bool_series(data: Any, raise_err: bool = False) -> bool: - """ - Check if the input is a Pandas boolean series. + """Check if the input is a Pandas boolean series. Parameters ---------- - data : any + data : Any The input data to check. raise_err : bool, default False Whether to raise an error if the data is not a boolean Series. @@ -80,12 +77,11 @@ def is_int_series( raise_err: bool = False, raise_err_with_nullable: bool = False, ) -> bool: - """ - Check if the input is a Pandas integer series. + """Check if the input is a Pandas integer series. Parameters ---------- - data : any + data : Any The input data to check. raise_err : bool, default False Whether to raise an error if the data is not an integer Series. @@ -113,7 +109,7 @@ def is_int_series( raise ValueError( "Pandas series must have an integer type. Consider applying " "`series.astype('Int64')`, where Int64 is a nullable integer data type " - "which enables the use of null values with an integer dtype." + "which enables the use of null values with an integer dtype.", ) if raise_err: @@ -123,12 +119,11 @@ def is_int_series( def is_float_series(data: Any, raise_err: bool = False) -> bool: - """ - Check if the input is a Pandas float series. + """Check if the input is a Pandas float series. Parameters ---------- - data : any + data : Any The input data to check. raise_err : bool, default False Whether to raise an error if the data is not a float Series. @@ -156,12 +151,11 @@ def is_float_series(data: Any, raise_err: bool = False) -> bool: def is_str_series(data: Any, raise_err: bool = False) -> bool: - """ - Check if the input is a Pandas string series. + """Check if the input is a Pandas string series. Parameters ---------- - data : any + data : Any The input data to check. raise_err : bool, default False Whether to raise an error if the data is not a string Series. @@ -189,12 +183,11 @@ def is_str_series(data: Any, raise_err: bool = False) -> bool: def is_datetime_series(data: Any, raise_err: bool = False) -> bool: - """ - Check if the input is a Pandas datetime series. + """Check if the input is a Pandas datetime series. Parameters ---------- - data : any + data : Any The input data to check. raise_err : bool, default False Whether to raise an error if the data is not a datetime Series. diff --git a/cyclops/data/df/utils.py b/cyclops/data/df/utils.py new file mode 100644 index 000000000..fdcdb97e0 --- /dev/null +++ b/cyclops/data/df/utils.py @@ -0,0 +1,318 @@ +"""Utility functions for working with Pandas DataFrames.""" +from functools import reduce +from typing import ( + Any, + Hashable, + List, + Optional, + Sequence, + Set, + Tuple, + Union, +) + +import pandas as pd + +from cyclops.data.df.series_validation import is_bool_series +from cyclops.utils.common import to_list + + +COLS_TYPE = Union[Hashable, Sequence[Hashable]] + + +def check_cols( + data: pd.DataFrame, + cols: COLS_TYPE, + raise_err_on_unexpected: bool = False, + raise_err_on_existing: bool = False, + raise_err_on_missing: bool = False, +) -> Tuple[Set[Hashable], Set[Hashable], Set[Hashable]]: + """Check DataFrame columns for expected columns and handle errors. + + Parameters + ---------- + data : pd.DataFrame + The input DataFrame to check columns against. + cols : hashable or list of Hashable + The column(s) to check for in the DataFrame. + raise_err_on_unexpected : bool, default False + Raise an error if unexpected columns are found. + raise_err_on_existing : bool, default False + Raise an error if any of the specified columns already exist. + raise_err_on_missing : bool, default False + Raise an error if any of the specified columns are missing. + + Returns + ------- + Tuple[Set[Hashable], Set[Hashable], Set[Hashable]] + A tuple containing sets of unexpected, existing, and missing columns. + """ + columns = set(to_list(cols)) + data_cols = set(data.columns) + + unexpected = data_cols - columns + if raise_err_on_unexpected and len(unexpected) > 0: + raise ValueError(f"Unexpected columns: {', '.join(unexpected)}") + + existing = data_cols.intersection(columns) + if raise_err_on_existing and len(existing) > 0: + raise ValueError(f"Existing columns: {', '.join(existing)}") + + missing = columns - data_cols + if raise_err_on_missing and len(missing) > 0: + raise ValueError(f"Missing columns: {', '.join(missing)}") + + return unexpected, existing, missing + + +def and_conditions(conditions: List[pd.Series]) -> pd.Series: + """ + Perform element-wise logical AND operation on a list of boolean Series. + + Parameters + ---------- + conditions : list of pd.Series + A list of boolean Pandas Series. + + Raises + ------ + ValueError + If the conditions are not Pandas boolean series. + + Returns + ------- + pd.Series + A new Pandas Series resulting from the element-wise logical AND operation. + """ + for condition in conditions: + is_bool_series(condition, raise_err=True) + + return reduce(lambda x, y: x & y, conditions) + + +def or_conditions(conditions: List[pd.Series]) -> pd.Series: + """ + Perform element-wise logical OR operation on a list of boolean Series. + + Parameters + ---------- + conditions : list of pd.Series + A list of boolean Pandas Series. + + Raises + ------ + ValueError + If the conditions are not Pandas boolean series. + + Returns + ------- + pd.Series + A new Pandas Series resulting from the element-wise logical OR operation. + """ + for condition in conditions: + is_bool_series(condition, raise_err=True) + + return reduce(lambda x, y: x | y, conditions) + + +def combine_nonoverlapping(datas: List[Union[pd.DataFrame, pd.Series]]) -> pd.DataFrame: + """Combine non-overlapping DataFrames/Series into a single DataFrame/Series. + + The objects in `datas` should be all DataFrames or all Series, not a combination. + + For any given value location, it can be non-null in exactly 0 or 1 of the + DataFrames. The combined DataFrame will contains all of these values. + + Parameters + ---------- + datas : list of pandas.DataFrame or pandas.Series + A list of DataFrames/Series to be combined. + + Returns + ------- + pandas.DataFrame + The combined DataFrame. + + Raises + ------ + ValueError + If unauthorized overlap is found between DataFrames. + """ + # Get masks where the DataFrames are NaN + datas_na = [data.isna() for data in datas] + + # Check that there is no unauthorized overlap + datas_not_na = [(~data_na).astype(int) for data_na in datas_na] + datas_not_na_sum = reduce(lambda x, y: x + y, datas_not_na) + if not (datas_not_na_sum <= 1).all().all(): + raise ValueError( + "Unauthorized overlap found between DataFrames. Cannot combine.", + ) + + # Combine the DataFrames + combined = datas[0].copy() + for data in datas[1:]: + combined = combined.combine_first(data) + + return combined + + +def reset_index_merge( + left: Union[pd.DataFrame, pd.Series], + right: Union[pd.DataFrame, pd.Series], + index_col: Optional[COLS_TYPE] = None, + **merge_kwargs: Any, +) -> pd.DataFrame: + """Merge two dataframes after resetting their indexes. + + Parameters + ---------- + left : pandas.DataFrame or pandas.Series + The left object to merge. + right : pandas.DataFrame or pandas.Series + The right object to merge. + index_col : hashable or sequence of hashable, optional + Column(s) to set as index for the merged result. + **merge_kwargs + Additional keyword arguments to pass to pandas merge function. + + Returns + ------- + pd.DataFrame + The merged dataframe. + """ + # Reset index for both dataframes + left_reset = left.reset_index() + right_reset = right.reset_index() + + # Merge the dataframes + merged = pd.merge(left_reset, right_reset, **merge_kwargs) + + # If index_col is provided, set it for the merged dataframe + if index_col: + merged.set_index(index_col, inplace=True) + + return merged + + +def index_structure_equal( + idx1: pd.Index, + idx2: pd.Index, + raise_err: bool = False, +) -> bool: + """Check whether two indexes have the same structure. + + Values aren't considered. + + Parameters + ---------- + idx1 : pandas.Index + The first index to compare. + idx2 : pandas.Index + The second index to compare. + raise_err : bool, default False + If True, raises an error if indexes do not have the same structure. + + Returns + ------- + bool + True if the indexes have the same structure, otherwise False. + """ + if type(idx1) != type(idx2): + if raise_err: + raise ValueError("Index dtypes do not match.") + + return False + + if idx1.names != idx2.names: + if raise_err: + raise ValueError("Index names do not match.") + + return False + + if idx1.nlevels != idx2.nlevels: + if raise_err: + raise ValueError("Number of index levels do not match.") + + return False + + return True + + +def is_multiindex( + idx: pd.Index, + raise_err: bool = False, + raise_err_multi: bool = False, +) -> bool: + """Check whether a given index is a MultiIndex. + + Parameters + ---------- + idx : pd.Index + Index to check. + raise_err : bool, default False + If True, raise a ValueError when idx is not a MultiIndex. + raise_err_multi : bool, default False + If True, raise a ValueError when idx is a MultiIndex. + + Raises + ------ + ValueError + Raised when `idx` is not a MultiIndex and `raise_err` is True. + Raised when `idx` is a MultiIndex and `raise_err_multi` is True. + + Returns + ------- + bool + True if idx is a MultiIndex, False otherwise. + """ + multiindex = isinstance(idx, pd.MultiIndex) + + if not multiindex and raise_err: + raise ValueError("Index must be a MultiIndex.") + + if multiindex and raise_err_multi: + raise ValueError("Index cannot be a MultiIndex.") + + return multiindex + + +def agg_mode(series: pd.Series) -> list[Any]: + """Get the mode(s) of a series by using `.agg(agg_mode)`. + + Parameters + ---------- + series : pd.Series + Series. + + Returns + ------- + list + List containing the mode(s) of the input series. + """ + return pd.Series.mode(series).to_list() # type: ignore[no-any-return] + + +def groupby_agg_mode( + grouped: pd.core.groupby.generic.SeriesGroupBy, + single_modes_only: bool = False, +) -> pd.Series: + """Compute the mode(s) for each group of a grouped series. + + Parameters + ---------- + grouped : pd.core.groupby.generic.SeriesGroupBy + Grouped series. + single_modes_only : bool, default False + If True, only groups with a singular mode are kept. + + Returns + ------- + pd.Series + A pandas Series containing the mode(s) for each group. + """ + result = grouped.agg(agg_mode).explode() + if single_modes_only: + duplicate_indices = result.index[result.index.duplicated(keep=False)] + result = result.drop(duplicate_indices) + return result From cabfbc3c2f952961aaf764ae1592003bf5caf25f Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 24 May 2024 16:56:47 +0000 Subject: [PATCH 3/3] [pre-commit.ci] Add auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- cyclops/data/df/dates/__init__.py | 1 + cyclops/data/df/dates/dates.py | 1 + cyclops/data/df/dates/reconcile_dates.py | 1 + cyclops/data/df/pairs.py | 1 + cyclops/data/df/series_validation.py | 1 + cyclops/data/df/utils.py | 1 + 6 files changed, 6 insertions(+) diff --git a/cyclops/data/df/dates/__init__.py b/cyclops/data/df/dates/__init__.py index 1354a58c3..647e68eb4 100644 --- a/cyclops/data/df/dates/__init__.py +++ b/cyclops/data/df/dates/__init__.py @@ -1,4 +1,5 @@ """Processors for date handling.""" + from cyclops.data.df.dates.dates import ( DatePairHandler, analyze_dates, diff --git a/cyclops/data/df/dates/dates.py b/cyclops/data/df/dates/dates.py index d6f53ddf7..5ff2e1877 100644 --- a/cyclops/data/df/dates/dates.py +++ b/cyclops/data/df/dates/dates.py @@ -1,4 +1,5 @@ """Utilities for working with dates in pandas DataFrames.""" + import datetime import warnings from datetime import timedelta diff --git a/cyclops/data/df/dates/reconcile_dates.py b/cyclops/data/df/dates/reconcile_dates.py index e2caf0ca9..8b32456a6 100644 --- a/cyclops/data/df/dates/reconcile_dates.py +++ b/cyclops/data/df/dates/reconcile_dates.py @@ -1,4 +1,5 @@ """Reconcile issues with dates in a DataFrame.""" + import datetime import warnings from copy import deepcopy diff --git a/cyclops/data/df/pairs.py b/cyclops/data/df/pairs.py index 20da62034..fbcb59240 100644 --- a/cyclops/data/df/pairs.py +++ b/cyclops/data/df/pairs.py @@ -1,4 +1,5 @@ """Functions for working with pairs of values in DataFrames.""" + from typing import Tuple, Union import networkx as nx diff --git a/cyclops/data/df/series_validation.py b/cyclops/data/df/series_validation.py index a159467b2..cb30c3bd6 100644 --- a/cyclops/data/df/series_validation.py +++ b/cyclops/data/df/series_validation.py @@ -1,4 +1,5 @@ """Functions for validating Pandas Series.""" + from typing import Any import pandas as pd diff --git a/cyclops/data/df/utils.py b/cyclops/data/df/utils.py index fdcdb97e0..bdf967128 100644 --- a/cyclops/data/df/utils.py +++ b/cyclops/data/df/utils.py @@ -1,4 +1,5 @@ """Utility functions for working with Pandas DataFrames.""" + from functools import reduce from typing import ( Any,