import numpy as np
from kneed import KneeLocator
from typing import Iterable, Optional
from scipy.optimize import curve_fit as _curve_fit
from .utils import *
[docs]
class CircleClust:
"""
Circular clustering with automatic detection of centroids as distribution peaks.
This class clusters circular data (e.g., times of day, hues) by:
- Binning data on a circle with periodic boundaries.
- Applying a fixed Hann smoothing window of 9 bins with circular padding.
- Screening bin counts via k = 1..max_screen_divisor where B = 9*k and
window width (in radians of the unit circle) is 2π/k.
- Selecting the binning that minimizes test RMS between histogram and
smoothed envelope, averaged over `max_screen_iter` random splits.
- Detecting circular local maxima on the smoothed histogram as cluster centers.
Parameters
----------
data : Iterable[float] | None
Optional data to fit immediately upon construction.
period : float, default 2π
Period of the input values; inputs are wrapped into [0, range).
Important to provide valid data range, for example:
- 1 if your data period is 1 (e.g. color hues in HLS or HSV space)
- 24 if your data is measured in hours (e.g. times of day)
- 360 if your data is measured in degrees
- 2π if your data is measured in radians (e.g. angles)
window : float | None
Manual override for smoothing window width (in radians on unit circle).
If provided, the screener will be skipped and nearest k≈2π/window used.
max_screen_divisor : int, default 32
Maximum divisor k in screening; B = 9*k.
max_screen_iter : int, default 2
Number of screening repetitions with different train/test splits; RMS is
averaged across repetitions before selecting k.
train_frac : float, default 0.7
Fraction of samples for training during screening.
random_seed : int, default 0
Base random seed for reproducible splits.
verbose : bool, default False
Enable informational prints via `verbose_print`.
"""
[docs]
def __init__(
self,
data: Optional[Iterable[float]] = None,
period: float = None,
window: float | None = None,
max_screen_divisor: int = 32,
max_screen_iter: int = 2,
train_frac: float = 0.7,
random_seed: int = 0,
verbose: bool = False,
debug: bool = False,
):
self.period = period
self.train_frac = float(train_frac)
self.random_seed = int(random_seed)
self.max_screen_divisor = int(max_screen_divisor)
self.max_screen_iter = int(max_screen_iter)
self.window = window
self.window_optimal = None
self.debug = bool(debug)
# If debug is on, force verbose on for richer context
self.verbose = True if self.debug else bool(verbose)
self.debug_dir = 'debug'
self.bins_per_window_ = 9
self.peak_idx_ = None
self.peak_std_ = None
self.centroid_ = None
self.centroid_std_ = None
self.h_all_ = None
self.s_all_ = None
if data is not None:
self.fit(data, period)
[docs]
def set_window(self, window: float):
"""
Sets manually specified smoothing window.
Parameters
----------
window : float
Smoothing window width (in radians on unit circle).
"""
w = float(window)
if w <= 0:
raise ValueError("Window must be positive.")
if self.period is not None and w >= self.period:
raise ValueError("Window must be less than period.")
self.window = w
def _set_period(self, period: float | None = None):
"""
Sets period for the input values.
Parameters
----------
period : float | None, default None
Period of the input values. If None, uses default 2π.
"""
if period is None or period <= 0:
if self.period is None:
warning_print("Valid period not provided, using default 2π.")
self.period = 2 * np.pi
else:
self.period = period
def _screen_smoothing_windows_rmsd(self, x_train: np.ndarray, x_test: np.ndarray, iteration: int):
"""
Screens bin counts for fixed smoothing window and computes RMS.
For each k in 1..max_screen_divisor, compute B=9*k, histogram and smooth
on train, and compute RMS(train) and RMS(test) against the (scaled) smooth
envelope. Returns arrays aligned to k values.
Parameters
----------
x_train : np.ndarray
Wrapped train split in [0, range).
x_test : np.ndarray
Wrapped test split in [0, range).
iteration : int
Iteration number (1..max_screen_iter).
Returns
-------
(rmsd_tr, rmsd_ts) : Tuple[np.ndarray, np.ndarray]
RMS arrays per each screened window.
"""
# Initialize array of N windows per period
nwindows = np.arange(1, self.max_screen_divisor + 1)
# Initialize arrays for RMS
rmsd_tr = np.zeros_like(nwindows, dtype=float)
rmsd_ts = np.zeros_like(nwindows, dtype=float)
for i, nwindow in enumerate(nwindows):
# Compute histogram bin edges
nbins = int(nwindow) * self.bins_per_window_
edges = np.linspace(0.0, self.period, nbins + 1, endpoint=True)
# Compute histogram and smooth on train
h_train, _ = np.histogram(x_train, bins=edges)
s_train = _periodic_smooth(h_train, self.bins_per_window_)
# Compute histogram and smooth on test
h_test, _ = np.histogram(x_test, bins=edges)
scale = (h_test.sum() / max(1, h_train.sum())) if h_train.sum() else 1.0
s_test = s_train * scale
# Compute masked RMSD
mask =( h_train > 0) | (h_test > 0)
tr_rmsd = _rmsd(h_train, s_train, mask=mask)
ts_rmsd = _rmsd(h_test, s_test, mask=mask)
rmsd_tr[i] = tr_rmsd
rmsd_ts[i] = ts_rmsd
if self.debug:
debug_plot_screen_iteration(h_train, s_train, h_test, s_test, iteration, i, outdir=self.debug_dir)
return rmsd_tr, rmsd_ts
def _find_optimal_window(self, data: Iterable[float], period: float):
"""
Finds optimal smoothing window by screening.
Parameters
----------
data : Iterable[float]
Input data to fit.
period : float
Period of the input values; inputs are wrapped into [0, period).
Important to provide valid data range, for example:
- 1 if your data period is 1 (e.g. color hues in HLS or HSV space)
- 24 if your data is measured in hours (e.g. times of day)
- 360 if your data is measured in degrees
- 2π if your data is measured in radians (e.g. angles)
Returns
-------
float
Optimal smoothing window width in radians on the unit circle.
"""
# Check if there is enough data
x = np.asarray(list(data), dtype=float)
if x.size <= 1:
warning_print("Not enough data points to fit optimal window.")
return self
# Initialize random number generator
rng = np.random.default_rng(self.random_seed)
# Initialize number of train and test samples
n_train = max(1, int(self.train_frac * x.size))
n_train = min(n_train, x.size - 1)
n_test = x.size - n_train
# Check if there is enough data
if n_train < 2 or n_test < 2:
warning_print("Not enough data points to fit optimal window.")
return self
# Initialize accumulator of train and test RMS
n_screened_windows = self.max_screen_divisor
nwindows = np.arange(1, n_screened_windows + 1)
rmsd_tr_acc = np.zeros(n_screened_windows, dtype=float)
rmsd_ts_acc = np.zeros(n_screened_windows, dtype=float)
for i in range(self.max_screen_iter):
# Shuffle data and split into train and test
idx = np.arange(x.size)
rng.shuffle(idx)
train_idx = idx[:n_train]
test_idx = idx[n_train:]
x_train = x[train_idx]
x_test = x[test_idx]
rmsd_tr, rmsd_ts = self._screen_smoothing_windows_rmsd(x_train, x_test, i)
rmsd_tr_acc += rmsd_tr
rmsd_ts_acc += rmsd_ts
# Normalize RMS across iterations
rmsd_tr_mean = rmsd_tr_acc / max(1, self.max_screen_iter)
rmsd_ts_mean = rmsd_ts_acc / max(1, self.max_screen_iter)
# Find optimal window at minimum of the test RMSD
i0 = int(np.argmin(rmsd_ts_mean))
confidence = 1.0
if i0 <= 1 or i0 >= n_screened_windows - 2:
confidence = 0.0
else:
r = np.concatenate([rmsd_ts_mean[i0-2:i0], rmsd_ts_mean[i0+1:i0+3]])
if rmsd_ts_mean[i0] >= 0.5 * np.mean(r):
confidence = 0.0
# Try knee (elbow) on decreasing convex curve of test RMS vs k (nwindows)
if confidence < 0.5:
try:
knee = KneeLocator(nwindows, rmsd_ts_mean, curve='convex', direction='decreasing')
if knee.knee is not None:
# map knee x-value (k) to nearest index
i0 = int(np.argmin(np.abs(nwindows - float(knee.knee))))
except Exception:
pass
# Avoid edges if possible (clamp to interior) to reduce over/under-smoothing
if n_screened_windows >= 3:
if i0 == 0:
i0 = 1
elif i0 == n_screened_windows - 1:
i0 = n_screened_windows - 2
self.window_optimal = self.period / nwindows[i0]
if i0 == 0 or i0 == (n_screened_windows - 1):
warning_print("Optimal smoothing window at edge of search range; screening may not have converged.")
if self.verbose:
print()
print("Selected optimal window via knee (fallback: min) on test RMS:")
print("idx | window | tr_rmsd | ts_rmsd")
for i in range(n_screened_windows):
label = "*" if i == i0 else ""
print(f"{int(nwindows[i]):3d} | {self.period / nwindows[i]:.4f} | {rmsd_tr_mean[i]:.4f} | {rmsd_ts_mean[i]:.4f} {label}")
print()
if self.debug:
debug_plot_window_selection(rmsd_tr_mean, rmsd_ts_mean, i0=i0, outdir=self.debug_dir)
return self.window_optimal
[docs]
def fit(self, data: Iterable[float], period: float | None = None, verbose: bool | None = None):
"""
Fits the model by selecting smoothing binning and finding centers.
Parameters
----------
data : Iterable[float]
Input data to fit.
period : float, default 2π
Period of the input values; inputs are wrapped into [0, period).
Important to provide valid data range, for example:
- 1 if your data period is 1 (e.g. color hues in HLS or HSV space)
- 24 if your data is measured in hours (e.g. times of day)
- 360 if your data is measured in degrees
- 2π if your data is measured in radians (e.g. angles)
verbose : bool | None, default None
Whether to print verbose output. If None, uses self.verbose.
Returns
-------
self : CircleClust
The fitted instance with detected peak_idx, peak_std, centroid and centroid_std arrays.
"""
# Set period
self._set_period(period)
# Set verbose
if verbose is not None:
self.verbose = verbose
# Check if data are within the period range
x = np.asarray(list(data), dtype=float)
if np.any(x < 0) or np.any(x >= self.period):
raise ValueError(f"Data must be within [0, {self.period}) or provide a valid data period.")
# Set window
window = None
if self.window is not None:
# Use manually set window if provided
window = float(self.window)
else:
# Automatically detect optimal window
self._find_optimal_window(data, period)
window = self.window_optimal
if window is None:
raise ValueError("Failed to detect optimal window.")
# Calculate histogram and binning
nwindows = max(1, int(np.round(self.period / window)))
nbins = 9 * nwindows
edges = np.linspace(0.0, self.period, nbins + 1, endpoint=True)
self.h_all_, _ = np.histogram(x, bins=edges)
self.s_all_ = _periodic_smooth(self.h_all_, self.bins_per_window_)
# Find and fill peak index and centroid coordinate arrays
self.peak_idx_ = _find_idxs_of_periodic_peaks(self.s_all_, self.bins_per_window_)
self.centroid_ = self.peak_idx_ * self.period / nbins
# Move to determine peak std and centroid std
# Compute periodic midpoints between adjacent peaks to define disjoint segments
nbins = len(self.h_all_)
self.peak_midpoints_ = _find_idxs_of_midpoints_between_peaks(self.peak_idx_, nbins)
# Fit Gaussian per peak to estimate std (width) using only its segment
means: list[float] = []
centroids: list[float] = []
stds: list[float] = []
amplitudes: list[float] = []
baselines: list[float] = []
for k, peak_id in enumerate(self.peak_idx_):
left_b = int(self.peak_midpoints_[k - 1])
right_b = int(self.peak_midpoints_[k])
# Build segment values from smoothed signal and compute center index within the segment
curve_segment = _slice_periodic_segment(self.s_all_, left_b, right_b)
std = 0.0
mean = np.mod(peak_id - left_b, nbins)
centroid = self.centroid_[k]
std = len(curve_segment) / 6
baseline = np.mean(curve_segment)
amplitude = np.max(curve_segment) - baseline
if len(curve_segment) >= 2:
popt = fit_gaussian_shifted(curve_segment)
popt_ = fit_gaussian_shifted(curve_segment, mu=mean)
if popt is not None and abs(popt[1] - mean) <= 1:
amplitude, mean, std, baseline = popt
elif popt_ is not None:
amplitude, mean, std, baseline = popt_
mean = np.mod(mean + left_b, nbins)
centroid = mean * self.period / nbins
means.append(int(mean))
centroids.append(float(centroid))
std = min(std, len(curve_segment) / 4)
stds.append(float(std))
amplitudes.append(float(amplitude))
baselines.append(float(baseline))
# Filter out peaks with too small amplitude (likely coming from noise)
means = np.asarray(means, dtype=int)
centroids = np.asarray(centroids, dtype=float)
stds = np.asarray(stds, dtype=float)
amplitudes = np.asarray(amplitudes, dtype=float)
baselines = np.asarray(baselines, dtype=float)
mask = amplitudes > baselines
self.peak_idx_ = means[mask]
self.centroid_ = centroids[mask]
self.peak_std_ = stds[mask]
self.centroid_std_ = self.peak_std_ * (self.period / nbins)
if self.verbose:
print()
print("Identified peaks:")
print("idx | mean | std")
for i in range(len(self.centroid_)):
print(f"{i:3d} | {self.centroid_[i]:8.3f} | {self.centroid_std_[i]:8.3f}")
print()
return self
[docs]
def predict(self, x: Iterable[float], width_scale: float = 1.0):
"""
Predicts cluster labels by nearest circular distance to centers.
Parameters
----------
x : Iterable[float]
Input values (wrapped to [0, range)). Units must match `range` used
during fitting (e.g., radians, minutes, or fraction of circle).
width_scale : float, default 1.0
Multiplies the peak width (std) to define the distance threshold for
assignment.
Returns
-------
np.ndarray
Integer labels in [0, n_centers-1], or -1 for outliers.
"""
# Check if model is fitted
if self.centroid_ is None or self.centroid_std_ is None:
raise RuntimeError("CircleClust is not fitted.")
centroid_array = np.asarray(self.centroid_, dtype=float)
std_array = np.asarray(self.centroid_std_, dtype=float)
# Convert inputs
x_ = np.asarray(x, dtype=float)
n = x_.shape[0]
# Initialize all points as outliers and return if no centroids found
labels = np.full(n, -1, dtype=int)
if centroid_array.size == 0:
return labels
# Compute circular distances to each centroid: shape (n_points, n_centroids)
diffs = np.abs(x_[:, None] - centroid_array[None, :])
diffs = np.minimum(diffs, self.period - diffs)
# Valid assignments are only those within each centroid's std * width_scale
valid = diffs <= std_array[None, :] * width_scale
# Replace invalid distances with +inf so argmin chooses only valid ones
dist_masked = np.where(valid, diffs, np.inf)
# Nearest valid centroid per point
nearest = np.argmin(dist_masked, axis=1)
nearest_dist = dist_masked[np.arange(n), nearest]
# Assign where there exists at least one valid centroid (finite distance)
assignable = np.isfinite(nearest_dist)
labels[assignable] = nearest[assignable]
return labels
[docs]
def get_peaks(self, as_list: bool = False):
"""
Returns detected peaks.
Parameters
----------
as_list : bool, default False
If True, returns a list of dicts [{'centroid': c, 'std': s}, ...].
If False, returns a dict with numpy arrays {'centroid': array, 'std': array}.
Returns
-------
dict | list
Peaks information as a dict of arrays or a list of per-peak dicts.
"""
if getattr(self, "centroid_", None) is None or getattr(self, "centroid_std_", None) is None:
raise RuntimeError("CircleClust is not fitted.")
centroids = np.asarray(self.centroid_, dtype=float)
stds = np.asarray(self.centroid_std_, dtype=float)
if as_list:
return [{"centroid": float(c), "std": float(s)} for c, s in zip(centroids, stds)]
return {"centroid": centroids, "std": stds}
[docs]
def get_centroids(self, as_list: bool = False):
"""
Returns detected centroids.
Parameters
----------
as_list : bool, default False
If True, returns a list of dicts [{'centroid': c, 'std': s}, ...].
If False, returns a dict with numpy arrays {'centroid': array, 'std': array}.
Returns
-------
dict | list
Centroids information as a dict of arrays or a list of per-centroid dicts.
"""
data = self.get_peaks(as_list=as_list)
return data
[docs]
def get_clusters(self, as_list: bool = False):
"""
Returns detected clusters.
Parameters
----------
as_list : bool, default False
If True, returns a list of dicts [{'centroid': c, 'std': s}, ...].
If False, returns a dict with numpy arrays {'centroid': array, 'std': array}.
Returns
-------
dict | list
Clusters information as a dict of arrays or a list of per-cluster dicts.
"""
data = self.get_peaks(as_list=as_list)
return data
[docs]
def show_peaks(self, color: Optional[str] = "#2980BA", title: str = "CircleClust peaks", output: Optional[str] = None):
"""
Shows peaks in the histogram.
Parameters
----------
color : Optional[str], default "#2980BA"
Color of the peaks.
title : str, default "CircleClust peaks"
Title of the plot.
output : Optional[str], default None
Output file name. If None, shows plot in a window.
"""
# Check if model is fitted
if getattr(self, "h_all_", None) is None or getattr(self, "s_all_", None) is None:
raise RuntimeError("Call fit() before show_peaks().")
# Get histogram and smoothed signal
h = np.asarray(self.h_all_, dtype=float)
s = np.asarray(self.s_all_, dtype=float)
n = h.size
x = np.arange(n)
# Plot histogram and smoothed signal
plt.figure(figsize=(12, 6))
plt.bar(x, h, color=color, alpha=0.3, width=0.9, align="center")
plt.plot(x, s, color=color, linewidth=2.0)
if getattr(self, "peak_idx_", None) is not None and self.centroid_.size:
px = self.centroid_ * n / self.period
ix = self.peak_idx_
py = s[ix] + 0.1 * h.max()
plt.scatter(px, py, marker='*', s=200, color=color, zorder=4)
# Add +/- std whiskers for each peak by filling a NaN array between i0..i1 (with wrap)
if getattr(self, "peak_std_", None) is not None and self.peak_std_.size:
lw = 2.0
wy = py - 0.05 * h.max()
y_cap = 0.01 * h.max()
for i, c in enumerate(px):
x0 = c - self.peak_std_[i]
x1 = c + self.peak_std_[i]
_draw_horizontal_whisker(x0, x1, wy[i], y_cap=y_cap, linewidth=lw, color=color)
if x0 < 0:
_draw_horizontal_whisker(x0 + n, x1 + n, wy[i], y_cap=y_cap, linewidth=lw, color=color)
if x1 > n:
_draw_horizontal_whisker(x0 - n, x1 - n, wy[i], y_cap=y_cap, linewidth=lw, color=color)
plt.xlim(-0.5, n - 0.5)
# Add xticks covering the full period at 0, 0.25, 0.5, 0.75, 1.0 * period
ticks_frac = np.array([0.0, 0.25, 0.5, 0.75, 1.0], dtype=float)
tick_pos = ticks_frac * n
tick_pos = np.minimum(tick_pos, n - 1) # keep within axis range
tick_labels = [f"{(f * self.period):.2f}" for f in ticks_frac]
plt.xticks(tick_pos, tick_labels, fontsize=12)
plt.yticks(fontsize=12)
plt.title(title, fontsize=20)
plt.tight_layout()
if output:
plt.savefig(output, dpi=72)
plt.close()
else:
plt.show()
return
[docs]
def show_centroids(self, output: Optional[str] = None):
"""
Shows centroids in the histogram.
Parameters
----------
output : Optional[str], default None
Output file name. If None, shows plot in a window.
"""
self.show_peaks(title="CircleClust centroids", output=output)
return
[docs]
def show_clusters(self, output: Optional[str] = None):
"""
Shows clusters in the histogram.
Parameters
----------
output : Optional[str], default None
Output file name. If None, shows plot in a window.
"""
self.show_peaks(title="CircleClust clusters", output=output)
return
def _draw_horizontal_whisker(
x_start: float,
x_end: float,
y: float,
y_cap: float = 0.2,
cap_start: bool = True,
cap_end: bool = True,
linewidth: float = 1.0,
color: str = "black",
**kwargs):
"""
Draws a horizontal whisker line on a given matplotlib Axes.
Parameters
----------
x_start : float
Start x-coordinate of whisker
x_end : float
End x-coordinate of whisker
y : float
Y-coordinate of the horizontal line
y_cap : float
Half-height of vertical caps (extends ±y_height/2 from y)
cap_start : bool
Whether to draw a vertical cap at x_start
cap_end : bool
Whether to draw a vertical cap at x_end
linewidth : float
Line width for all parts
color : str
Color of the line and caps
kwargs : dict
Additional kwargs passed to `ax.plot` (e.g., color)
"""
# Draw horizontal line
plt.plot([x_start, x_end], [y, y], linewidth=linewidth, color=color, **kwargs)
# Draw start cap
if cap_start:
plt.plot([x_start, x_start], [y - y_cap, y + y_cap], linewidth=linewidth, color=color, **kwargs)
# Draw end cap
if cap_end:
plt.plot([x_end, x_end], [y - y_cap, y + y_cap], linewidth=linewidth, color=color, **kwargs)
return
def _hann_window(n: int) -> np.ndarray:
"""
Generates a normalized Hann window of given length.
Parameters
----------
n : int
Length of the window.
Returns
-------
np.ndarray
Hann window of length n.
"""
window = np.hanning(n)
s = window.sum()
window = window / s if s != 0 else window
return window
def _periodic_smooth(x: np.ndarray, bins_per_window: int) -> np.ndarray:
"""
Smoothes periodically a 1D sequence with a Hann window of given bin width.
Parameters
----------
x : np.ndarray
Sequence to smooth (e.g., histogram counts).
bins_per_window : int
Odd window length in bins (e.g., 9). Must be >= 3.
Returns
-------
np.ndarray
Smoothed sequence of the same length as input.
"""
if bins_per_window is None:
bins_per_window = 9
win = int(bins_per_window)
if win < 3:
win = 3
if win % 2 == 0:
win += 1
pad = win // 2
w = _hann_window(win)
x_ = np.asarray(x, dtype=float)
x_padded = np.pad(x_, (pad, pad), mode='wrap')
smoothed = np.convolve(x_padded, w, mode='valid')
return smoothed
def _rmsd(x: np.ndarray, y: np.ndarray, mask: Optional[np.ndarray] = None) -> float:
"""
Computes root mean square deviation between two 1D arrays.
Parameters
----------
x : np.ndarray
First input array.
y : np.ndarray
Second input array.
Returns
-------
float
Root mean square deviation of the input arrays.
"""
x_ = np.asarray(x, dtype=float)
y_ = np.asarray(y, dtype=float)
if mask is not None and np.any(mask):
x_ = x_[mask]
y_ = y_[mask]
rmsd = float(np.sqrt(np.mean((x_ - y_) * (x_ - y_))))
return rmsd
def _find_idxs_of_periodic_peaks(x_smooth: np.ndarray,
bins_per_window: int | None = None,
epsilon: float = 1e-6) -> np.ndarray:
"""
Finds true local peaks in a sequence of binned and smoothed periodic data.
Algorithm
---------
1) Pad by half-window in wrap mode.
2) Find local maxima by comparing to rolls of ±1.
3) Merge flat maxima into plateaus and take their centers.
4) For each candidate, ensure no higher value exists within ±window around it.
5) Map back to original indices (mod n) and deduplicate.
Parameters
----------
x_smooth : np.ndarray
Already smoothed 1D sequence.
bins_per_window : int
Odd window length in bins (e.g., 9). Must be >= 3. Used to define binning of data.
epsilon : float, default 1e-6
Epsilon value for neighborhood check.
Returns
-------
np.ndarray
Unique indices of detected peaks in the original (un-padded) array. Sorted in ascending order.
"""
x_ = np.asarray(x_smooth, dtype=float)
n = x_.size
if n == 0:
return np.array([], dtype=int)
win = int(bins_per_window)
if win < 3:
win = 3
if win % 2 == 0:
win += 1
half = win // 2
x_padded = np.pad(x_, (half, half), mode='wrap')
left = np.roll(x_padded, 1)
right = np.roll(x_padded, -1)
is_plateau = (x_padded >= left) & (x_padded >= right) & (x_padded > epsilon)
# Use find_intervals on the boolean mask (converted to int) over padded domain
intervals = find_intervals(is_plateau.astype(int)) # shape (K,2), half-open [start,end)
peaks_idx = []
for i0, i1 in intervals:
# center index in [a,b): map to inclusive representation by b-1
c = (int(i0) + int(i1) - 1) // 2
c_val = x_padded[c]
# Neighborhood check within ±win
neighborhood = x_padded[i0 - half : i1 + half]
if c_val > epsilon and np.all(neighborhood <= c_val + epsilon):
peaks_idx.append(c)
# Map back to original indices (remove padding and wrap)
peaks_orig = [((idx - half) % n) for idx in peaks_idx]
if not peaks_orig:
return np.array([], dtype=int)
peaks_idxs = np.unique(np.asarray(peaks_orig, dtype=int))
# Periodic non-maximum suppression: keep strongest peaks separated by >= half-window
if peaks_idxs.size <= 1:
return peaks_idxs
peak_strengths = x_smooth[peaks_idxs]
idxs = np.argsort(peak_strengths)[::-1] # descending by strength
peaks_retained: list[int] = []
for idx in idxs:
candidate_id = int(peaks_idxs[idx])
if not peaks_retained:
peaks_retained.append(candidate_id)
continue
retain = True
for peak_id in peaks_retained:
d = abs(candidate_id - peak_id)
d = min(d, n - d) # circular distance in bins
if d < half:
retain = False
break
if retain:
peaks_retained.append(candidate_id)
peak_idxs = np.sort(np.array(peaks_retained, dtype=int))
return peak_idxs
def _find_idxs_of_midpoints_between_peaks(peak_idxs: np.ndarray, nbins: int) -> np.ndarray:
"""
Finds midpoints between consecutive peaks of periodic data.
Parameters
----------
peak_idxs : np.ndarray
Array of peak indices. Important: must be sorted in ascending order.
nbins : int
Number of bins in the binned periodic data.
Returns
-------
np.ndarray
Array of midpoint indices. Sorted in ascending order.
"""
# Check that peak_idxs is not empty
if peak_idxs.size == 0:
return np.array([], dtype=int)
# If only one peak, return midpoint on the opposite side of the circle
if peak_idxs.size == 1:
midpoint = peak_idxs[0] + nbins // 2
return np.array([midpoint % nbins], dtype=int)
# Check that peak_idxs is sorted in ascending order
if not np.all(np.diff(peak_idxs) >= 0):
raise ValueError("peak_idxs must be sorted in ascending order")
# Forward midpoints between consecutive peaks on circle
midpoints = []
for i in range(peak_idxs.size):
i0 = int(peak_idxs[i])
i1 = int(peak_idxs[(i + 1) % peak_idxs.size])
d = (i1 - i0) % nbins
# Handle periodic wrap-around
if d < 0:
d += nbins
midpoints.append(i0 + d // 2)
midpoints = np.asarray(midpoints, dtype=int)
return midpoints
def _slice_periodic_segment(x: np.ndarray, i0: int, i1: int) -> np.ndarray:
"""
Slices a periodic segment from an array.
Parameters
----------
x : np.ndarray
Input array.
i0 : int
Start index.
i1 : int
End index.
Returns
-------
np.ndarray
Sliced array.
"""
x_ = np.asarray(x)
n = x_.size
if n == 0:
return x_.copy()
i0 = int(i0) % n
i1 = int(i1) % n
if i0 < i1:
return x_[i0 : i1 + 1]
return np.concatenate([x_[i0:], x_[: i1 + 1]])