diff --git a/changelog/16.feature.md b/changelog/16.feature.md new file mode 100644 index 0000000..677a67d --- /dev/null +++ b/changelog/16.feature.md @@ -0,0 +1,12 @@ ++ include support for polynomial-decay (besides cosine-decay) ++ we added three new classes/functions to the API in the module `convergence` + + `PolynomialDecaySplineHelper` + + `PolynomialDecaySplineHelperDerivative` + + `get_polynomial_decay_harmonised_spline` ++ whereby the function `get_polynomial_decay_harmonised_spline` is mainly meant as user-interface + This function can be used as value for the `get_harmonise_spline` parameter in `harmonise`. + The function takes the same arguments as `get_cosine_decay_harmonised_spline` and additionally a + `power` argument. ++ The `power` argument can be passed to the function using + `functools.partial(get_polynomial_decay_harmonised_spline, power=2)` ++ An example use-case is added to the `getting-started-tutorial`. diff --git a/changelog/17.fix.md b/changelog/17.fix.md new file mode 100644 index 0000000..1c44cd8 --- /dev/null +++ b/changelog/17.fix.md @@ -0,0 +1 @@ ++ fix computation of cosine decay derivative (we forgot a constant value in the formula) diff --git a/docs/tutorials/tutorial.py b/docs/tutorials/tutorial.py index 198a6bd..1d47bb2 100644 --- a/docs/tutorials/tutorial.py +++ b/docs/tutorials/tutorial.py @@ -26,9 +26,14 @@ # %% # Imports +from functools import partial + import numpy as np from gradient_aware_harmonisation import harmonise +from gradient_aware_harmonisation.convergence import ( + get_polynomial_decay_harmonised_spline, +) from gradient_aware_harmonisation.plotting import plotting from gradient_aware_harmonisation.timeseries import Timeseries @@ -96,6 +101,27 @@ convergence_time, ) +# %% [markdown] +# ### Use polynomial-decay + +# %% +convergence_time = 8.0 + +harmonised_timeseries = harmonise( + target_timeseries=target_timeseries, + harmonisee_timeseries=harmonisee_timeseries, + harmonisation_time=harmonisation_time, + convergence_time=convergence_time, + get_harmonised_spline=partial(get_polynomial_decay_harmonised_spline, power=2.0), +) + +plotting( + harmonisee_timeseries, + target_timeseries, + harmonised_timeseries, + harmonisation_time, + convergence_time, +) # %% [markdown] # ## Toy Example 2: Use timeseries data # ### Read data diff --git a/src/gradient_aware_harmonisation/convergence.py b/src/gradient_aware_harmonisation/convergence.py index 8887163..8969313 100644 --- a/src/gradient_aware_harmonisation/convergence.py +++ b/src/gradient_aware_harmonisation/convergence.py @@ -212,7 +212,9 @@ def calc_gamma_rising_derivative( angle = ( np.pi * (x - self.initial_time) / (self.final_time - self.initial_time) ) - gamma_decaying_derivative = -0.5 * np.sin(angle) + + const_factor = -np.pi / (2 * (self.final_time - self.initial_time)) + gamma_decaying_derivative = const_factor * np.sin(angle) return gamma_decaying_derivative @@ -329,3 +331,336 @@ def get_cosine_decay_harmonised_spline( convergence_spline, ), ) + + +@define +class PolynomialDecaySplineHelper: + """ + Spline that supports being used as a polynomial-decay between splines + + Between `initial_time` and `final_time`, + we return values based on a polynomial-decay between 1 and 0 + if `self.apply_to_convergence` is `False`, + otherwise we return values based on a polynomial-increase between 0 and 1. + """ + + initial_time: Union[float, int] + """ + At and before this time, we return values from `initial` + """ + + final_time: Union[float, int] + """ + At and after this time, we return values from `final` + """ + + power: Union[float, int] + """ + Power of polynomial decay; with power >= 1 + """ + + apply_to_convergence: bool = False + """ + Is this helper being applied to the convergence spline? + + If `True`, we return 1 - the weights, rather than the weights. + """ + + # domain: ClassVar[list[float, float]] = [ + # np.finfo(np.float64).tiny, + # np.finfo(np.float64).max, + # ] + # """Domain of spline""" + + @overload + def __call__(self, x: int | float) -> int | float: ... + + @overload + def __call__(self, x: NP_FLOAT_OR_INT) -> NP_FLOAT_OR_INT: ... + + @overload + def __call__(self, x: NP_ARRAY_OF_FLOAT_OR_INT) -> NP_ARRAY_OF_FLOAT_OR_INT: ... + + def __call__( + self, x: int | float | NP_FLOAT_OR_INT | NP_ARRAY_OF_FLOAT_OR_INT + ) -> int | float | NP_FLOAT_OR_INT | NP_ARRAY_OF_FLOAT_OR_INT: + """ + Evaluate the spline at a given x-value + + Parameters + ---------- + x + x-value + + Returns + ------- + : + Value of the spline at `x` + """ + + def calc_gamma( + x: int | float | NP_FLOAT_OR_INT | NP_ARRAY_OF_FLOAT_OR_INT, + ) -> int | float | NP_FLOAT_OR_INT | NP_ARRAY_OF_FLOAT_OR_INT: + """Get polynomial-decay""" + # compute weight (here: gamma) according to polynomial-decay + gamma_decaying = ( + 1 - (x - self.initial_time) / (self.final_time - self.initial_time) + ) ** self.power + + return gamma_decaying + + if not isinstance(x, np.ndarray): + if x <= self.initial_time: + gamma: float | NP_FLOAT_OR_INT | NP_ARRAY_OF_FLOAT_OR_INT = 1.0 + elif x >= self.final_time: + gamma = 0.0 + else: + gamma = calc_gamma(x) + + # The weighted sum for computing the harmonised AND converged + # function has the form: "gamma * harmonised + (1-gamma) * convergence". + # Depending on which product we want to compute (LHS or RHS of sum), + # we need gamma or 1-gamma, therefore we include the following condition + # in all our return statements. + if self.apply_to_convergence: + return 1.0 - gamma + + return gamma + + # apply decay function only to values that lie between harmonisation + # time and convergence-time + x_gte_final_time = np.where(x >= self.final_time) + x_decay = np.logical_and(x >= self.initial_time, x < self.final_time) + gamma = np.ones_like(x, dtype=np.floating) + gamma[x_gte_final_time] = 0.0 + gamma[x_decay] = calc_gamma(x[x_decay]) + + if self.apply_to_convergence: + return 1.0 - gamma + + return gamma + + def derivative(self) -> PolynomialDecaySplineHelperDerivative: + """ + Calculate the derivative of self + + Returns + ------- + : + Derivative of self + + """ + return PolynomialDecaySplineHelperDerivative( + initial_time=self.initial_time, + final_time=self.final_time, + power=self.power, + apply_to_convergence=self.apply_to_convergence, + ) + + def antiderivative(self) -> PolynomialDecaySplineHelperDerivative: + """ + Calculate the anti-derivative/integral of self + + Returns + ------- + : + Anti-derivative of self + """ + raise NotImplementedError + + +@define +class PolynomialDecaySplineHelperDerivative: + """ + Derivative of [PolynomialDecaySplineHelper][(m).] + """ + + initial_time: Union[float, int] + """ + Initial time of the polynomial-decay + """ + + final_time: Union[float, int] + """ + Final time of the polynomial-decay + """ + + power: Union[float, int] + """ + Power of polynomial decay; with power >= 1 + """ + + apply_to_convergence: bool = False + """ + Is this helper being applied to the convergence spline? + """ + + # domain: ClassVar[list[float, float]] = [ + # np.finfo(np.float64).tiny, + # np.finfo(np.float64).max, + # ] + # """Domain of spline""" + + @overload + def __call__(self, x: int | float) -> int | float: ... + + @overload + def __call__(self, x: NP_FLOAT_OR_INT) -> NP_FLOAT_OR_INT: ... + + @overload + def __call__(self, x: NP_ARRAY_OF_FLOAT_OR_INT) -> NP_ARRAY_OF_FLOAT_OR_INT: ... + + def __call__( + self, x: int | float | NP_FLOAT_OR_INT | NP_ARRAY_OF_FLOAT_OR_INT + ) -> int | float | NP_FLOAT_OR_INT | NP_ARRAY_OF_FLOAT_OR_INT: + """ + Evaluate the spline at a given x-value + + Parameters + ---------- + x + x-value + + Returns + ------- + : + Value of the spline at `x` + """ + + # compute weight (here: gamma) according to a polynomial-decay + def calc_gamma_rising_derivative( + x: int | float | NP_FLOAT_OR_INT | NP_ARRAY_OF_FLOAT_OR_INT, + ) -> int | float | NP_FLOAT_OR_INT | NP_ARRAY_OF_FLOAT_OR_INT: + """Get polynomial-decay derivative""" + # compute derivative of gamma according to a polynomial-decay + numerator = ( + self.power + * (1 - (x - self.initial_time) / (self.final_time - self.initial_time)) + ** self.power + ) + denominator = x - self.final_time + + gamma_decaying_derivative = numerator / denominator + + return gamma_decaying_derivative + + if not isinstance(x, np.ndarray): + if x <= self.initial_time or x >= self.final_time: + return 0.0 + + gamma_rising_derivative = calc_gamma_rising_derivative(x) + + # The weighted sum for computing the harmonised AND converged + # function has the form: "gamma * harmonised + (1-gamma) * convergence". + # Depending on which product we want to compute (LHS or RHS of sum), + # we need gamma or 1-gamma, therefore we include the following condition + # in all our return statements. + if self.apply_to_convergence: + return -gamma_rising_derivative + + return gamma_rising_derivative + + # apply decay function only to values that lie between harmonisation + # time and convergence-time + x_decay = np.where(np.logical_and(x > self.initial_time, x < self.final_time)) + gamma_rising_derivative = np.zeros_like(x, dtype=np.floating) + gamma_rising_derivative[x_decay] = calc_gamma_rising_derivative(x[x_decay]) + + if self.apply_to_convergence: + return -gamma_rising_derivative + return gamma_rising_derivative + + def derivative(self) -> PolynomialDecaySplineHelperDerivative: + """ + Calculate the derivative of self + + Returns + ------- + : + Derivative of self + """ + raise NotImplementedError + + def antiderivative(self) -> PolynomialDecaySplineHelperDerivative: + """ + Calculate the anti-derivative/integral of self + + Returns + ------- + : + Anti-derivative of self + + Raises + ------ + NotImplementedError + """ + raise NotImplementedError + + +def get_polynomial_decay_harmonised_spline( + harmonisation_time: Union[int, float], + convergence_time: Union[int, float], + harmonised_spline_no_convergence: Spline, + convergence_spline: Spline, + power: Union[int, float], +) -> SumOfSplines: + """ + Generate the harmonised spline based on a polynomial-decay + + Parameters + ---------- + harmonisation_time + Harmonisation time + + This is the time at and before which + the solution should be equal to `harmonised_spline_no_convergence`. + + convergence_time + Convergence time + + This is the time at and after which + the solution should be equal to `convergence_spline`. + + harmonised_spline_no_convergence + Harmonised spline that does not consider convergence + + convergence_spline + The spline to which the result should converge + + power + The power of the polynomial-decay, whereby power >= 1. + With power = 1 being equivalent to linear decay. + + Returns + ------- + : + Harmonised spline + """ + # The harmonised spline is considered as the spline that match + # the target-spline at the harmonisation time (wrt to zero-and + # first order derivative). Then we use a decay function to let + # the harmonised spline converge to the convergence-spline. + # This decay function has the form of a weighted sum: + # weight * harmonised_spline + (1-weight) * convergence_spline + # With weights decaying from 1 to 0 whereby the decay trajectory + # is determined by the polynomial decay. + return SumOfSplines( + ProductOfSplines( + PolynomialDecaySplineHelper( + initial_time=harmonisation_time, + final_time=convergence_time, + power=power, + apply_to_convergence=False, + ), + harmonised_spline_no_convergence, + ), + ProductOfSplines( + PolynomialDecaySplineHelper( + initial_time=harmonisation_time, + final_time=convergence_time, + power=power, + apply_to_convergence=True, + ), + convergence_spline, + ), + ) diff --git a/tests/integration/test_convergence_integration.py b/tests/integration/test_convergence_integration.py index 9aabb09..171af70 100644 --- a/tests/integration/test_convergence_integration.py +++ b/tests/integration/test_convergence_integration.py @@ -98,22 +98,34 @@ def test_cosine_decay_spline_derivative(): + gamma = cosine_decay_derivative(x) for harm_time < time < conv_time """ + scipy = pytest.importorskip("scipy") + spline = CosineDecaySplineHelper( initial_time=2.0, final_time=10.0, ).derivative() + def cos_deriv(x, initial_time, final_time): + angle = np.pi * (x - initial_time) / (final_time - initial_time) + const_factor = -np.pi / (2 * (final_time - initial_time)) + return const_factor * np.sin(angle) + # Before the decay np.testing.assert_equal(spline(1.0), 0.0) # After the decay np.testing.assert_equal(spline(11.0), 0.0) # In the decay - np.testing.assert_equal(spline(6.0), -0.5) - np.testing.assert_equal(spline(3.0), -0.5 * np.sin(np.pi * 1.0 / 8.0)) + np.testing.assert_equal(spline(6.0), cos_deriv(6.0, 2.0, 10.0)) + np.testing.assert_equal(spline(3.0), cos_deriv(3.0, 2.0, 10.0)) + # ensure correct computation of derivative itself + integral, _ = scipy.integrate.quad(spline, 2.0, 10.0) + np.testing.assert_allclose(integral, -1.0) # Array input np.testing.assert_equal( - np.array([0.0, 0.0, -0.5 * np.sin(np.pi * 1.0 / 8.0), -0.5, 0.0, 0.0]), + np.array( + [0.0, 0.0, cos_deriv(3.0, 2.0, 10.0), cos_deriv(6.0, 2.0, 10.0), 0.0, 0.0] + ), spline(np.array([1.5, 2.0, 3.0, 6.0, 10.0, 11.1])), ) @@ -125,21 +137,33 @@ def test_cosine_decay_spline_derivative_apply_to_convergence(): + gamma = 0 harmonisation_time > time > convergence_time + gamma = -cosine_decay_derivative(x) for harm_time < x < conv_time """ + scipy = pytest.importorskip("scipy") + spline = CosineDecaySplineHelper( initial_time=2.0, final_time=10.0, apply_to_convergence=True ).derivative() + def cos_deriv(x, initial_time, final_time): + angle = np.pi * (x - initial_time) / (final_time - initial_time) + const_factor = -np.pi / (2 * (final_time - initial_time)) + return const_factor * np.sin(angle) + # Before the decay np.testing.assert_equal(spline(1.0), 0.0) # After the decay np.testing.assert_equal(spline(11.0), 0.0) # In the decay - np.testing.assert_equal(spline(6.0), 0.5) - np.testing.assert_equal(spline(3.0), 0.5 * np.sin(np.pi * 1.0 / 8.0)) + np.testing.assert_equal(spline(6.0), -cos_deriv(6.0, 2.0, 10.0)) + np.testing.assert_equal(spline(3.0), -cos_deriv(3.0, 2.0, 10.0)) + # ensure correct computation of derivative itself + integral, _ = scipy.integrate.quad(spline, 2.0, 10.0) + np.testing.assert_allclose(integral, 1.0) # Array input np.testing.assert_equal( - np.array([0.0, 0.0, 0.5 * np.sin(np.pi * 1.0 / 8.0), 0.5, 0.0, 0.0]), + np.array( + [0.0, 0.0, -cos_deriv(3.0, 2.0, 10.0), -cos_deriv(6.0, 2.0, 10.0), 0.0, 0.0] + ), spline(np.array([1.5, 2.0, 3.0, 6.0, 10.0, 11.1])), ) diff --git a/tests/unit/test_cosine_decay_derivative.py b/tests/unit/test_cosine_decay_derivative.py new file mode 100644 index 0000000..4bd7611 --- /dev/null +++ b/tests/unit/test_cosine_decay_derivative.py @@ -0,0 +1,27 @@ +import numpy as np +import pytest + +from gradient_aware_harmonisation.convergence import ( + CosineDecaySplineHelperDerivative, +) + +scipy = pytest.importorskip("scipy") + + +def test_CosineDecaySplineHelperDerivative(): + """ + Test correct computation of derivative + """ + initial_time = 2002 + final_time = 2020 + apply_to_convergence = False + + cos_helper_deriv = CosineDecaySplineHelperDerivative( + initial_time=initial_time, + final_time=final_time, + apply_to_convergence=apply_to_convergence, + ) + + integral, _ = scipy.integrate.quad(cos_helper_deriv, a=initial_time, b=final_time) + + np.testing.assert_allclose(integral, -1.0) diff --git a/tests/unit/test_polynomial_decay_derivative.py b/tests/unit/test_polynomial_decay_derivative.py new file mode 100644 index 0000000..2e8e984 --- /dev/null +++ b/tests/unit/test_polynomial_decay_derivative.py @@ -0,0 +1,28 @@ +import numpy as np +import pytest + +from gradient_aware_harmonisation.convergence import ( + PolynomialDecaySplineHelperDerivative, +) + +scipy = pytest.importorskip("scipy") + + +def test_PolynomialDecaySplineHelperDerivative(): + """ + Test correct computation of derivative + """ + initial_time = 2002 + final_time = 2020 + apply_to_convergence = False + + cos_helper_deriv = PolynomialDecaySplineHelperDerivative( + initial_time=initial_time, + final_time=final_time, + apply_to_convergence=apply_to_convergence, + power=2.0, + ) + + integral, _ = scipy.integrate.quad(cos_helper_deriv, a=initial_time, b=final_time) + + np.testing.assert_allclose(integral, -1.0)