|
from __future__ import annotations |
|
|
|
from typing import TYPE_CHECKING |
|
|
|
import numpy as np |
|
|
|
from pandas._libs import lib |
|
from pandas._libs.algos import unique_deltas |
|
from pandas._libs.tslibs import ( |
|
Timestamp, |
|
get_unit_from_dtype, |
|
periods_per_day, |
|
tz_convert_from_utc, |
|
) |
|
from pandas._libs.tslibs.ccalendar import ( |
|
DAYS, |
|
MONTH_ALIASES, |
|
MONTH_NUMBERS, |
|
MONTHS, |
|
int_to_weekday, |
|
) |
|
from pandas._libs.tslibs.dtypes import ( |
|
OFFSET_TO_PERIOD_FREQSTR, |
|
freq_to_period_freqstr, |
|
) |
|
from pandas._libs.tslibs.fields import ( |
|
build_field_sarray, |
|
month_position_check, |
|
) |
|
from pandas._libs.tslibs.offsets import ( |
|
DateOffset, |
|
Day, |
|
to_offset, |
|
) |
|
from pandas._libs.tslibs.parsing import get_rule_month |
|
from pandas.util._decorators import cache_readonly |
|
|
|
from pandas.core.dtypes.common import is_numeric_dtype |
|
from pandas.core.dtypes.dtypes import ( |
|
DatetimeTZDtype, |
|
PeriodDtype, |
|
) |
|
from pandas.core.dtypes.generic import ( |
|
ABCIndex, |
|
ABCSeries, |
|
) |
|
|
|
from pandas.core.algorithms import unique |
|
|
|
if TYPE_CHECKING: |
|
from pandas._typing import npt |
|
|
|
from pandas import ( |
|
DatetimeIndex, |
|
Series, |
|
TimedeltaIndex, |
|
) |
|
from pandas.core.arrays.datetimelike import DatetimeLikeArrayMixin |
|
|
|
|
|
|
|
_need_suffix = ["QS", "BQE", "BQS", "YS", "BYE", "BYS"] |
|
|
|
for _prefix in _need_suffix: |
|
for _m in MONTHS: |
|
key = f"{_prefix}-{_m}" |
|
OFFSET_TO_PERIOD_FREQSTR[key] = OFFSET_TO_PERIOD_FREQSTR[_prefix] |
|
|
|
for _prefix in ["Y", "Q"]: |
|
for _m in MONTHS: |
|
_alias = f"{_prefix}-{_m}" |
|
OFFSET_TO_PERIOD_FREQSTR[_alias] = _alias |
|
|
|
for _d in DAYS: |
|
OFFSET_TO_PERIOD_FREQSTR[f"W-{_d}"] = f"W-{_d}" |
|
|
|
|
|
def get_period_alias(offset_str: str) -> str | None: |
|
""" |
|
Alias to closest period strings BQ->Q etc. |
|
""" |
|
return OFFSET_TO_PERIOD_FREQSTR.get(offset_str, None) |
|
|
|
|
|
|
|
|
|
|
|
|
|
def infer_freq( |
|
index: DatetimeIndex | TimedeltaIndex | Series | DatetimeLikeArrayMixin, |
|
) -> str | None: |
|
""" |
|
Infer the most likely frequency given the input index. |
|
|
|
Parameters |
|
---------- |
|
index : DatetimeIndex, TimedeltaIndex, Series or array-like |
|
If passed a Series will use the values of the series (NOT THE INDEX). |
|
|
|
Returns |
|
------- |
|
str or None |
|
None if no discernible frequency. |
|
|
|
Raises |
|
------ |
|
TypeError |
|
If the index is not datetime-like. |
|
ValueError |
|
If there are fewer than three values. |
|
|
|
Examples |
|
-------- |
|
>>> idx = pd.date_range(start='2020/12/01', end='2020/12/30', periods=30) |
|
>>> pd.infer_freq(idx) |
|
'D' |
|
""" |
|
from pandas.core.api import DatetimeIndex |
|
|
|
if isinstance(index, ABCSeries): |
|
values = index._values |
|
if not ( |
|
lib.is_np_dtype(values.dtype, "mM") |
|
or isinstance(values.dtype, DatetimeTZDtype) |
|
or values.dtype == object |
|
): |
|
raise TypeError( |
|
"cannot infer freq from a non-convertible dtype " |
|
f"on a Series of {index.dtype}" |
|
) |
|
index = values |
|
|
|
inferer: _FrequencyInferer |
|
|
|
if not hasattr(index, "dtype"): |
|
pass |
|
elif isinstance(index.dtype, PeriodDtype): |
|
raise TypeError( |
|
"PeriodIndex given. Check the `freq` attribute " |
|
"instead of using infer_freq." |
|
) |
|
elif lib.is_np_dtype(index.dtype, "m"): |
|
|
|
inferer = _TimedeltaFrequencyInferer(index) |
|
return inferer.get_freq() |
|
|
|
elif is_numeric_dtype(index.dtype): |
|
raise TypeError( |
|
f"cannot infer freq from a non-convertible index of dtype {index.dtype}" |
|
) |
|
|
|
if not isinstance(index, DatetimeIndex): |
|
index = DatetimeIndex(index) |
|
|
|
inferer = _FrequencyInferer(index) |
|
return inferer.get_freq() |
|
|
|
|
|
class _FrequencyInferer: |
|
""" |
|
Not sure if I can avoid the state machine here |
|
""" |
|
|
|
def __init__(self, index) -> None: |
|
self.index = index |
|
self.i8values = index.asi8 |
|
|
|
|
|
|
|
if isinstance(index, ABCIndex): |
|
|
|
|
|
self._creso = get_unit_from_dtype( |
|
index._data._ndarray.dtype |
|
) |
|
else: |
|
|
|
self._creso = get_unit_from_dtype(index._ndarray.dtype) |
|
|
|
|
|
|
|
if hasattr(index, "tz"): |
|
if index.tz is not None: |
|
self.i8values = tz_convert_from_utc( |
|
self.i8values, index.tz, reso=self._creso |
|
) |
|
|
|
if len(index) < 3: |
|
raise ValueError("Need at least 3 dates to infer frequency") |
|
|
|
self.is_monotonic = ( |
|
self.index._is_monotonic_increasing or self.index._is_monotonic_decreasing |
|
) |
|
|
|
@cache_readonly |
|
def deltas(self) -> npt.NDArray[np.int64]: |
|
return unique_deltas(self.i8values) |
|
|
|
@cache_readonly |
|
def deltas_asi8(self) -> npt.NDArray[np.int64]: |
|
|
|
|
|
return unique_deltas(self.index.asi8) |
|
|
|
@cache_readonly |
|
def is_unique(self) -> bool: |
|
return len(self.deltas) == 1 |
|
|
|
@cache_readonly |
|
def is_unique_asi8(self) -> bool: |
|
return len(self.deltas_asi8) == 1 |
|
|
|
def get_freq(self) -> str | None: |
|
""" |
|
Find the appropriate frequency string to describe the inferred |
|
frequency of self.i8values |
|
|
|
Returns |
|
------- |
|
str or None |
|
""" |
|
if not self.is_monotonic or not self.index._is_unique: |
|
return None |
|
|
|
delta = self.deltas[0] |
|
ppd = periods_per_day(self._creso) |
|
if delta and _is_multiple(delta, ppd): |
|
return self._infer_daily_rule() |
|
|
|
|
|
if self.hour_deltas in ([1, 17], [1, 65], [1, 17, 65]): |
|
return "bh" |
|
|
|
|
|
|
|
|
|
if not self.is_unique_asi8: |
|
return None |
|
|
|
delta = self.deltas_asi8[0] |
|
pph = ppd // 24 |
|
ppm = pph // 60 |
|
pps = ppm // 60 |
|
if _is_multiple(delta, pph): |
|
|
|
return _maybe_add_count("h", delta / pph) |
|
elif _is_multiple(delta, ppm): |
|
|
|
return _maybe_add_count("min", delta / ppm) |
|
elif _is_multiple(delta, pps): |
|
|
|
return _maybe_add_count("s", delta / pps) |
|
elif _is_multiple(delta, (pps // 1000)): |
|
|
|
return _maybe_add_count("ms", delta / (pps // 1000)) |
|
elif _is_multiple(delta, (pps // 1_000_000)): |
|
|
|
return _maybe_add_count("us", delta / (pps // 1_000_000)) |
|
else: |
|
|
|
return _maybe_add_count("ns", delta) |
|
|
|
@cache_readonly |
|
def day_deltas(self) -> list[int]: |
|
ppd = periods_per_day(self._creso) |
|
return [x / ppd for x in self.deltas] |
|
|
|
@cache_readonly |
|
def hour_deltas(self) -> list[int]: |
|
pph = periods_per_day(self._creso) // 24 |
|
return [x / pph for x in self.deltas] |
|
|
|
@cache_readonly |
|
def fields(self) -> np.ndarray: |
|
return build_field_sarray(self.i8values, reso=self._creso) |
|
|
|
@cache_readonly |
|
def rep_stamp(self) -> Timestamp: |
|
return Timestamp(self.i8values[0], unit=self.index.unit) |
|
|
|
def month_position_check(self) -> str | None: |
|
return month_position_check(self.fields, self.index.dayofweek) |
|
|
|
@cache_readonly |
|
def mdiffs(self) -> npt.NDArray[np.int64]: |
|
nmonths = self.fields["Y"] * 12 + self.fields["M"] |
|
return unique_deltas(nmonths.astype("i8")) |
|
|
|
@cache_readonly |
|
def ydiffs(self) -> npt.NDArray[np.int64]: |
|
return unique_deltas(self.fields["Y"].astype("i8")) |
|
|
|
def _infer_daily_rule(self) -> str | None: |
|
annual_rule = self._get_annual_rule() |
|
if annual_rule: |
|
nyears = self.ydiffs[0] |
|
month = MONTH_ALIASES[self.rep_stamp.month] |
|
alias = f"{annual_rule}-{month}" |
|
return _maybe_add_count(alias, nyears) |
|
|
|
quarterly_rule = self._get_quarterly_rule() |
|
if quarterly_rule: |
|
nquarters = self.mdiffs[0] / 3 |
|
mod_dict = {0: 12, 2: 11, 1: 10} |
|
month = MONTH_ALIASES[mod_dict[self.rep_stamp.month % 3]] |
|
alias = f"{quarterly_rule}-{month}" |
|
return _maybe_add_count(alias, nquarters) |
|
|
|
monthly_rule = self._get_monthly_rule() |
|
if monthly_rule: |
|
return _maybe_add_count(monthly_rule, self.mdiffs[0]) |
|
|
|
if self.is_unique: |
|
return self._get_daily_rule() |
|
|
|
if self._is_business_daily(): |
|
return "B" |
|
|
|
wom_rule = self._get_wom_rule() |
|
if wom_rule: |
|
return wom_rule |
|
|
|
return None |
|
|
|
def _get_daily_rule(self) -> str | None: |
|
ppd = periods_per_day(self._creso) |
|
days = self.deltas[0] / ppd |
|
if days % 7 == 0: |
|
|
|
wd = int_to_weekday[self.rep_stamp.weekday()] |
|
alias = f"W-{wd}" |
|
return _maybe_add_count(alias, days / 7) |
|
else: |
|
return _maybe_add_count("D", days) |
|
|
|
def _get_annual_rule(self) -> str | None: |
|
if len(self.ydiffs) > 1: |
|
return None |
|
|
|
if len(unique(self.fields["M"])) > 1: |
|
return None |
|
|
|
pos_check = self.month_position_check() |
|
|
|
if pos_check is None: |
|
return None |
|
else: |
|
return {"cs": "YS", "bs": "BYS", "ce": "YE", "be": "BYE"}.get(pos_check) |
|
|
|
def _get_quarterly_rule(self) -> str | None: |
|
if len(self.mdiffs) > 1: |
|
return None |
|
|
|
if not self.mdiffs[0] % 3 == 0: |
|
return None |
|
|
|
pos_check = self.month_position_check() |
|
|
|
if pos_check is None: |
|
return None |
|
else: |
|
return {"cs": "QS", "bs": "BQS", "ce": "QE", "be": "BQE"}.get(pos_check) |
|
|
|
def _get_monthly_rule(self) -> str | None: |
|
if len(self.mdiffs) > 1: |
|
return None |
|
pos_check = self.month_position_check() |
|
|
|
if pos_check is None: |
|
return None |
|
else: |
|
return {"cs": "MS", "bs": "BMS", "ce": "ME", "be": "BME"}.get(pos_check) |
|
|
|
def _is_business_daily(self) -> bool: |
|
|
|
if self.day_deltas != [1, 3]: |
|
return False |
|
|
|
|
|
first_weekday = self.index[0].weekday() |
|
shifts = np.diff(self.i8values) |
|
ppd = periods_per_day(self._creso) |
|
shifts = np.floor_divide(shifts, ppd) |
|
weekdays = np.mod(first_weekday + np.cumsum(shifts), 7) |
|
|
|
return bool( |
|
np.all( |
|
((weekdays == 0) & (shifts == 3)) |
|
| ((weekdays > 0) & (weekdays <= 4) & (shifts == 1)) |
|
) |
|
) |
|
|
|
def _get_wom_rule(self) -> str | None: |
|
weekdays = unique(self.index.weekday) |
|
if len(weekdays) > 1: |
|
return None |
|
|
|
week_of_months = unique((self.index.day - 1) // 7) |
|
|
|
week_of_months = week_of_months[week_of_months < 4] |
|
if len(week_of_months) == 0 or len(week_of_months) > 1: |
|
return None |
|
|
|
|
|
week = week_of_months[0] + 1 |
|
wd = int_to_weekday[weekdays[0]] |
|
|
|
return f"WOM-{week}{wd}" |
|
|
|
|
|
class _TimedeltaFrequencyInferer(_FrequencyInferer): |
|
def _infer_daily_rule(self): |
|
if self.is_unique: |
|
return self._get_daily_rule() |
|
|
|
|
|
def _is_multiple(us, mult: int) -> bool: |
|
return us % mult == 0 |
|
|
|
|
|
def _maybe_add_count(base: str, count: float) -> str: |
|
if count != 1: |
|
assert count == int(count) |
|
count = int(count) |
|
return f"{count}{base}" |
|
else: |
|
return base |
|
|
|
|
|
|
|
|
|
|
|
|
|
def is_subperiod(source, target) -> bool: |
|
""" |
|
Returns True if downsampling is possible between source and target |
|
frequencies |
|
|
|
Parameters |
|
---------- |
|
source : str or DateOffset |
|
Frequency converting from |
|
target : str or DateOffset |
|
Frequency converting to |
|
|
|
Returns |
|
------- |
|
bool |
|
""" |
|
if target is None or source is None: |
|
return False |
|
source = _maybe_coerce_freq(source) |
|
target = _maybe_coerce_freq(target) |
|
|
|
if _is_annual(target): |
|
if _is_quarterly(source): |
|
return _quarter_months_conform( |
|
get_rule_month(source), get_rule_month(target) |
|
) |
|
return source in {"D", "C", "B", "M", "h", "min", "s", "ms", "us", "ns"} |
|
elif _is_quarterly(target): |
|
return source in {"D", "C", "B", "M", "h", "min", "s", "ms", "us", "ns"} |
|
elif _is_monthly(target): |
|
return source in {"D", "C", "B", "h", "min", "s", "ms", "us", "ns"} |
|
elif _is_weekly(target): |
|
return source in {target, "D", "C", "B", "h", "min", "s", "ms", "us", "ns"} |
|
elif target == "B": |
|
return source in {"B", "h", "min", "s", "ms", "us", "ns"} |
|
elif target == "C": |
|
return source in {"C", "h", "min", "s", "ms", "us", "ns"} |
|
elif target == "D": |
|
return source in {"D", "h", "min", "s", "ms", "us", "ns"} |
|
elif target == "h": |
|
return source in {"h", "min", "s", "ms", "us", "ns"} |
|
elif target == "min": |
|
return source in {"min", "s", "ms", "us", "ns"} |
|
elif target == "s": |
|
return source in {"s", "ms", "us", "ns"} |
|
elif target == "ms": |
|
return source in {"ms", "us", "ns"} |
|
elif target == "us": |
|
return source in {"us", "ns"} |
|
elif target == "ns": |
|
return source in {"ns"} |
|
else: |
|
return False |
|
|
|
|
|
def is_superperiod(source, target) -> bool: |
|
""" |
|
Returns True if upsampling is possible between source and target |
|
frequencies |
|
|
|
Parameters |
|
---------- |
|
source : str or DateOffset |
|
Frequency converting from |
|
target : str or DateOffset |
|
Frequency converting to |
|
|
|
Returns |
|
------- |
|
bool |
|
""" |
|
if target is None or source is None: |
|
return False |
|
source = _maybe_coerce_freq(source) |
|
target = _maybe_coerce_freq(target) |
|
|
|
if _is_annual(source): |
|
if _is_annual(target): |
|
return get_rule_month(source) == get_rule_month(target) |
|
|
|
if _is_quarterly(target): |
|
smonth = get_rule_month(source) |
|
tmonth = get_rule_month(target) |
|
return _quarter_months_conform(smonth, tmonth) |
|
return target in {"D", "C", "B", "M", "h", "min", "s", "ms", "us", "ns"} |
|
elif _is_quarterly(source): |
|
return target in {"D", "C", "B", "M", "h", "min", "s", "ms", "us", "ns"} |
|
elif _is_monthly(source): |
|
return target in {"D", "C", "B", "h", "min", "s", "ms", "us", "ns"} |
|
elif _is_weekly(source): |
|
return target in {source, "D", "C", "B", "h", "min", "s", "ms", "us", "ns"} |
|
elif source == "B": |
|
return target in {"D", "C", "B", "h", "min", "s", "ms", "us", "ns"} |
|
elif source == "C": |
|
return target in {"D", "C", "B", "h", "min", "s", "ms", "us", "ns"} |
|
elif source == "D": |
|
return target in {"D", "C", "B", "h", "min", "s", "ms", "us", "ns"} |
|
elif source == "h": |
|
return target in {"h", "min", "s", "ms", "us", "ns"} |
|
elif source == "min": |
|
return target in {"min", "s", "ms", "us", "ns"} |
|
elif source == "s": |
|
return target in {"s", "ms", "us", "ns"} |
|
elif source == "ms": |
|
return target in {"ms", "us", "ns"} |
|
elif source == "us": |
|
return target in {"us", "ns"} |
|
elif source == "ns": |
|
return target in {"ns"} |
|
else: |
|
return False |
|
|
|
|
|
def _maybe_coerce_freq(code) -> str: |
|
"""we might need to coerce a code to a rule_code |
|
and uppercase it |
|
|
|
Parameters |
|
---------- |
|
source : str or DateOffset |
|
Frequency converting from |
|
|
|
Returns |
|
------- |
|
str |
|
""" |
|
assert code is not None |
|
if isinstance(code, DateOffset): |
|
code = freq_to_period_freqstr(1, code.name) |
|
if code in {"h", "min", "s", "ms", "us", "ns"}: |
|
return code |
|
else: |
|
return code.upper() |
|
|
|
|
|
def _quarter_months_conform(source: str, target: str) -> bool: |
|
snum = MONTH_NUMBERS[source] |
|
tnum = MONTH_NUMBERS[target] |
|
return snum % 3 == tnum % 3 |
|
|
|
|
|
def _is_annual(rule: str) -> bool: |
|
rule = rule.upper() |
|
return rule == "Y" or rule.startswith("Y-") |
|
|
|
|
|
def _is_quarterly(rule: str) -> bool: |
|
rule = rule.upper() |
|
return rule == "Q" or rule.startswith(("Q-", "BQ")) |
|
|
|
|
|
def _is_monthly(rule: str) -> bool: |
|
rule = rule.upper() |
|
return rule in ("M", "BM") |
|
|
|
|
|
def _is_weekly(rule: str) -> bool: |
|
rule = rule.upper() |
|
return rule == "W" or rule.startswith("W-") |
|
|
|
|
|
__all__ = [ |
|
"Day", |
|
"get_period_alias", |
|
"infer_freq", |
|
"is_subperiod", |
|
"is_superperiod", |
|
"to_offset", |
|
] |
|
|