Source code for polytex.stats.kde

import numpy as np
import scipy
from scipy.signal import argrelextrema
import matplotlib.pyplot as plt
from sklearn.neighbors import KernelDensity

from .bw_opt import opt_bandwidth
from ..thirdparty.bcolors import bcolors


[docs]def kdeScreen(variable, x_test, bw, kernels='gaussian', plot=False): """ This function estimates the probability density distribution of the input variable with the non-parametric kernel density estimation (KDE) method. The local maxima and minima of the probability density distribution are identified to decompose the input variable into a set of clusters. The former is used as the cluster centers and the latter is used as the cluster boundaries. Parameters ---------- variable : Numpy array A N x 1 dimension numpy array to apply the kernel density estimation. x_test : Numpy array Test data to get the density distribution. It has the same shape as the given variable. It should cover the whole range of the variable. bw : float The bandwidth of the kernel. kernel : string, optional The kernel to use. The default is 'gaussian'. The possible values are {'gaussian', 'tophat', 'epanechnikov', 'exponential', 'linear', 'cosine'}. plot : bool, optional Whether plot the probability density distribution. The default is False. Returns ------- clusters : dictionary The index of the cluster centers, cluster boundary and the probability density distribution (pdf). """ # check if the variable is 1D array if variable.ndim == 1: variable = variable.reshape(-1, 1) else: raise ValueError("The input variable should be a 1D array.") print("Kernel density estimation ...") model = KernelDensity(kernel=kernels, bandwidth=bw) model.fit(variable) log_dens = model.score_samples(x_test.reshape(-1, 1)) pdf_input = np.exp(model.score_samples(variable)) kde = plt.plot(x_test, np.exp(log_dens), c='cyan') _, pdf = kde[0].get_data() if plot: # x label plt.xlabel('Normalized distance (radial)') # y label plt.ylabel('Probability density') plt.show() # mask for the local maxima of density # argrelextrema: identify the relative extrema of `data`. cluster_bounds = np.insert(argrelextrema(pdf, np.less)[0], 0, 0) cluster_bounds = np.append(cluster_bounds, -1) clusters = {"cluster centers": argrelextrema(pdf, np.greater)[0], # Indices of local maxima "cluster boundary": cluster_bounds, "t test": x_test, "pdf": pdf, "t input": variable, "pdf input": pdf_input, } # pdf_input is the pdf of the input variable return clusters
[docs]def movingKDE(dataset, bw=0.002, windows=1, n_clusters=20, x_test=None): """ This function applies the kernel density estimation (KDE) method to the input dataset with a moving window. Namely, the dataset is divided into a set of windows and the KDE method is applied to each window. This allows to capture more details of geometry changes of a fiber tow. Parameters ---------- dataset : Numpy array A N x 2 dimension numpy array for kernel density estimation. The first colum should be the variable under analysis, the second is the label of cross-sections that the variable belongs to. bw : Numpy array or float, optional A range of bandwidth values for kde operation usually generated with np.arange(). The optimal bandwidth will be identified within this range and be used for kernel density estimation. If a number is given, the number will be used as the bandwidth for kernel estimation. windows : int, The number of windows (segmentations) for KDE analysis. The default is 1, namely, the whole dataset is used for KDE analysis and gives the same result as using the function kdeScreen() directly. n_clusters : int The target number of cluster_center. The default is 20. x_test : Numpy array Test data to get the density distribution. The default is None. Returns ------- kdeOutput : Numpy array A N x 3 dimension numpy array. The first column is the label of the window under analysis, the second is normlized distance, the third is the probability density. cluster_center : Numpy array A M x N dimension numpy array. M is the number of windows and N-1 is the number of cluster centers. The first column is the maximum index for each window, the following columns are the cluster centers. """ import pandas as pd kdeOutput = np.zeros([dataset.shape[0], 3]) cluster_center = np.zeros([windows - 1, n_clusters + 1]) anchor = 0 nslices = np.unique(dataset[:, -1]).size winLen = int((nslices / windows + 1)) for win in range(0, windows - 1): # Point cloud in a window mask = (dataset[:, -1] < (win + 1) * winLen) & \ (dataset[:, -1] >= win * winLen) variable = dataset[:, 0][mask].reshape(-1, 1) if x_test is None: # Generate test data to get the density distribution x_test = np.linspace(0, 1, variable.size)[:, np.newaxis] if type(bw).__module__ == "numpy": opt_bw = opt_bandwidth(variable, x_test, bw) elif str(int(bw * 1e20)).isdigit(): opt_bw = bw else: print("Please check if bandwidth is given correctly!!!") # Call function kdeScreen() to get the variable-density curve # the index for cluster_center. variable = variable.flatten() clusters = kdeScreen(variable, x_test, opt_bw) # Get the index for cluster_center cluster_center_idx = clusters["cluster centers"] ykde = clusters["pdf"] print("The variable size is: ", x_test.size) upperLimit = anchor + x_test.size if len(cluster_center_idx) >= n_clusters: print(bcolors.ok("Window: {}:".format(win))) print("√ The required number of points {} was reached at h = {}. \ \nThe number of actual cluster_center is [{}]".format( n_clusters, round(opt_bw, 4), len(cluster_center_idx))) print("start index {}; end index {}; number of variables {}".format( anchor, upperLimit, upperLimit - anchor)) print("The cluster centers are: ", cluster_center_idx) # sort the data from minimum to maximun and return the index maskSort = np.argsort(ykde[cluster_center_idx]) extrDisordered = cluster_center_idx[maskSort][len(cluster_center_idx) - n_clusters:] # kdeOutput print("test", anchor, upperLimit) print(x_test.shape, ykde.shape) kdeOutput[anchor:upperLimit, 0] = win kdeOutput[anchor:upperLimit, 1] = x_test.flatten() kdeOutput[anchor:upperLimit, 2] = ykde.flatten() # cluster_center cluster_center[win, 0] = upperLimit cluster_center[win, 1:] = extrDisordered[np.argsort(extrDisordered)] anchor = upperLimit else: print("Window: {}:".format(win)) print(bcolors.warning("--> Cannot reach the targeted {} points. \ There are [{}] points for h = {}. Please reduce bandwidth.\n" "--------------------".format( n_clusters, len(cluster_center_idx), round(opt_bw, 4)))) kdeOutput[anchor:upperLimit, 0] = win cluster_center[win, 0] = upperLimit anchor = upperLimit continue kdeOutput_col = ["window", "normalized distance", "probability density"] kdeOutput = pd.DataFrame(kdeOutput, columns=kdeOutput_col) # Returns a column mask to show if any zero values are present in the row of kdeOutput # If any zero values are present, the row is masked (False), otherwise the row is not # masked (True). The mask is used to remove the rows with zero values from the kdeOutput. # the first row is always as True since it is the starting point (0) of the contour. mask = np.all(kdeOutput.iloc[:, [1, 2]] != 0, axis=1) mask[0] = True # remove the rows with zero values from the kdeOutput kdeOutput = kdeOutput[mask] return kdeOutput, cluster_center
[docs]def kdePlot(xkde, ykde, cluster_center_idx): """ Parameters ---------- xkde : Numpy array The normalized distance. ykde : Numpy array The probability density distribution corresponding to the normalized distance. cluster_center_idx : Numpy array The index of the cluster centers. Returns ------- None. """ fig = plt.figure(1, figsize=(12, 9)) plt.close("all") plt.clf() plt.rcParams.update({'font.size': 16}) plt.scatter(xkde[cluster_center_idx], ykde[cluster_center_idx]) plt.plot(xkde, ykde) # Median cdf = scipy.integrate.cumtrapz(ykde, xkde, initial=0) nearest_05 = np.abs(cdf - cdf / 2).argmin() x_median, y_median = xkde[nearest_05], ykde[nearest_05] # Plot the median value as vertical line plt.vlines(x_median, 0, y_median, 'r') plt.legend() plt.xlabel('Normalized distance') plt.ylabel('Distribution density') # plt.savefig(str(windowIndex)+'.tiff') plt.show()