Source code for polytex.kriging.curve2D

# !/usr/bin/env python
# -*- coding: utf-8 -*-

"""
Implementation of 2D Curve Kriging.

Bin Yang  2021-9-2
"""
import numpy as np
import sympy as sym
import matplotlib.pyplot as plt


[docs]def addPoints(coordinate, threshold=0.03): """ Linearly interpolate the points between each of two points that further than the threshold. Parameters ---------- coordinate : numpy array The coordinates of the points. [normalized distance, X, Y, Z] threshold : float, optional The distance between two points. The default is 0.03. Returns ------- coordinate : numpy array The coordinates of the points after linear interpolation. [normalized distance, X, Y, Z] """ deltaD = np.diff(coordinate[:, 0]) # print(deltaD[:3]) deltaD[abs(deltaD) == 1] = 0 for iDelta in np.arange(deltaD.size): if deltaD[iDelta] < threshold: try: temp = np.vstack((temp, coordinate[iDelta, :])) except NameError: temp = coordinate[iDelta, :] else: dtemp = np.linspace(coordinate[iDelta, 0], coordinate[iDelta + 1, 0], int(deltaD[iDelta] / threshold) + 1, endpoint=True) xtemp = np.interp(dtemp, coordinate[[iDelta, iDelta + 1], 0], coordinate[[iDelta, iDelta + 1], 1]) ytemp = np.interp(dtemp, coordinate[[iDelta, iDelta + 1], 0], coordinate[[iDelta, iDelta + 1], 2]) ztemp = np.interp(dtemp, coordinate[[iDelta, iDelta + 1], 0], coordinate[[iDelta, iDelta + 1], 3]) try: temp = np.vstack((temp, np.vstack((dtemp.T, xtemp.T, ytemp.T, ztemp.T)).T)) except NameError: temp = np.vstack((dtemp.T, xtemp.T, ytemp.T, ztemp.T)).T coordinate = np.vstack((temp, coordinate[-1, :])) return coordinate
# Drift and Covariance
[docs]def func_select(drift_name, cov_name): """ This is the function for definition of drift function and covariance function in dictionary drif_funcs and cov_funcs. Parameters ---------- drift_name : string The name of the drift function. Possible values are: "const", "lin", and "quad" for "constant", "linear", "quadratic", respectively. cov_name : string The name of the covariance function. Possible values are: "lin", "cub", and "log" for "linear", "cubic", "logarithmic", respectively. Returns ------- drift_func : function The drift function. cov_func : function The covariance function. """ # Definitions of drift functions by dictionary import sympy as sym drift_funcs = { 'const': lambda x, a: [a[0]], 'lin': lambda x, a: [a[0], a[1] * x], 'quad': lambda x, a: [a[0], a[1] * x, a[2] * (x ** 2.0)] } # Definitions of covariance functions by dictionary cov_funcs = { 'lin': lambda h: h, 'cub': lambda h: h ** 3.0, # Natural logarithm, element-wise. Does not work in this version!!! 'log': lambda h: h ** 2.0 * sym.log(h) if h != 0 else 0, } # Int number of 'a_len', which is based on the drift function, will be used in building of kriging matrix. a_len = len(drift_funcs[drift_name](0, [1, 1, 1, 1, 1, 1])) return drift_funcs[drift_name], cov_funcs[cov_name], a_len
[docs]def solve(dataset, krig_len, mat_krig, inverse_type="inverse"): """ Solve the kriging equation: [Matrix_kriging] [b_a] = [u.. 0..] dataset: numpy array. The sample points. X-Y. krig_len: int. The length of the kriging vector. mat_krig: numpy array. The kriging matrix. inverse_type: String. The type of the inverse matrix. "inverse" or "pseudoinverse". "inverse" : inverse matrix "pseudoinverse" : generalized inverse/pseudoinverse Returns ------- b_a: numpy array. The kriging vector. mat_krig_inv: numpy array. The inverse matrix of the kriging matrix. """ # TODO: seperate the build of U vector and the solve of the kriging equation len_b = dataset.shape[0] u = np.zeros((krig_len, 1)) u[:len_b, 0] = dataset[:, 1] # Solution if inverse_type == "pseudoinverse": mat_krig_inv = np.linalg.pinv(mat_krig) # generalized inverse/pseudoinverse else: mat_krig_inv = np.linalg.inv(mat_krig) # inverse matrix vector_ba = mat_krig_inv.dot(u) return mat_krig_inv, vector_ba
[docs]def krig_expression(len_b, func_drift, func_cov, adef, dataset, vector_ba): """ return the Kriging function expression. Parameters ---------- len_b : int The length of the b vector (the coefficient of linear combination). func_drift : function The drift function. func_cov : function The covariance function. adef : numpy array The coefficients of the drift function. dataset : numpy array The sample points. X-Y. vector_ba : numpy array The kriging vector. Returns ------- expr : sympy expression The analytical expression of the function created by kriging. """ x = sym.symbols('x') # drift: func_drift(x, adef) a_drift = vector_ba[len_b:] drift = np.squeeze(a_drift).dot(func_drift(x, adef)) doc_krig = [] doc_krig = np.append(doc_krig, drift) for i in np.arange(len_b): fluctuation = ( func_cov( sym.Abs(x - dataset[:, 0][i]) ) ) * vector_ba[i, 0] # store all the terms including drift and generilised covariance doc_krig = np.append(doc_krig, fluctuation) expr = doc_krig.sum() return expr
[docs]def curve_krig_2D(dataset, name_drift, name_cov, nugget_effect=0): """ Parameters ---------- dataset: numpy array X-Y. name_drift : String Name of drift. name_cov : String Name of covariance. nugget_effect : Float Returns ------- expr : Expression The kriging expression. """ # Read the drift and covariance type. func_drift, func_cov, len_a = func_select(name_drift, name_cov) len_b = dataset.shape[0] # same as the number of sample points krig_len = len_a + len_b # len_a determined by the selected drift # empty Kriging matrix mat_krig = np.zeros([krig_len, krig_len]) nugget = np.zeros([krig_len, krig_len]) nugget[:len_b, :len_b] = np.identity(len_b) # define the last sevel columns of kriging matrix # depending on the type of drift function adef = [1, 1, 1, 1, 1, 1] a = func_drift(dataset[:, 0], adef) # Assembling the kriging matrix for i in np.arange(len_b): h = func_cov(abs(dataset[:, 0] - dataset[i, 0])) for j in np.arange(len_b): if i <= j: mat_krig[i, j] = h[j] for i in np.arange(len_a): mat_krig[:len_b, len_b + i] = a[i] mat_krig = mat_krig + mat_krig.T - np.diag(mat_krig.diagonal()) + nugget * nugget_effect ## print("the final mat_krig:\n", mat_krig) ## print('The value of determinant is {} '.format(np.linalg.det(mat_krig))) # solve kriging linear equation system to get the vector_ba mat_krig_inv, vector_ba = solve(dataset, krig_len, mat_krig, inverse_type="inverse") # get the kriging function expression expr = krig_expression(len_b, func_drift, func_cov, adef, dataset, vector_ba) return mat_krig, mat_krig_inv, vector_ba, expr, func_drift, func_cov
[docs]def lambda_weight(X, X_train, func_drift, func_cov, mat_krig): """ Calculate the weight of the lambda matrix Parameters ---------- X : numpy array The locations where function values are unknown and to be predicted. X_train : numpy array The training data. The locations where function values are known. func_drift : function The drift function. func_cov : function The covariance function. mat_krig : numpy array The kriging matrix. Returns ------- K_h : numpy array The kriging matrix. lambda_ : numpy array The weight of gloabl combination. Note that it is different from the vector_ba in dual Kriging formulation. """ len_a = len(func_drift(1, [1, 1, 1, 1, 1, 1])) len_b = len(X_train) K_h = np.zeros((len_a + len_b, X.shape[0])) for i, x_i in enumerate(X): K_h[:len_b, i] = func_cov(np.squeeze(abs(X_train - x_i))) K_h[len_b:, :] = np.array([func_drift(x, [1, 1, 1, 1, 1, 1, 1]) for x in np.squeeze(X)]).transpose() lambda_ = np.linalg.solve(mat_krig, K_h) return K_h, lambda_
[docs]def func_var(X, K_h, lambda_, nugget_effect=0): """ Calculate the variance of the prediction. Parameters ---------- X : numpy array The locations where function values are unknown and to be predicted. K_h : numpy array The kriging matrix. lambda_ : numpy array The weight of gloabl combination. Note that it is different from the vector_ba in dual Kriging formulation. nugget_effect : float The nugget effect. Default is 0. Returns ------- std_prediction : numpy array the standard deviation of the prediction """ var = np.zeros(X.shape[0]) for i in np.arange(X.shape[0]): var[i] = np.abs(nugget_effect - np.dot(K_h[:, i], lambda_[:, i])) std_prediction = np.sqrt(var) return std_prediction
[docs]def interpolate(dataset, name_drift, name_cov, nugget_effect=0, interp=' ', return_std=False): """ Parameters ---------- dataset : numpy array X-Y. name_drift : String Name of drift. name_cov : String Name of covariance. nugget_effect : Float smoothing strength control interp: Numpy array The points that need to be interpolated, 1D numpy array. If interp is not given, the x-coordinate of the sample points is used. Returns ------- expr : Expression The kriging expression. """ mat_krig, mat_krig_inv, vector_ba, expr, func_drift, func_cov = \ curve_krig_2D(dataset, name_drift, name_cov, nugget_effect) x = sym.symbols('x') if type(interp) is np.ndarray: X_predict = interp else: X_predict = dataset[:, 0] yinter = np.empty(X_predict.shape[0]) xfnp = sym.lambdify(x, expr, 'numpy') yinter = xfnp(np.array(X_predict)) yinter[np.abs(yinter) < 1e-15] = 0. if dataset[0, 0] == 0 and dataset[-1, 0] == 1: yinter[-1] = yinter[0] X_train = dataset[:, 0] if return_std: K_h, lambda_ = lambda_weight(X_predict, X_train, func_drift, func_cov, mat_krig) std_prediction = func_var(X_predict, K_h, lambda_, nugget_effect=nugget_effect) return yinter.flatten(), expr, std_prediction.flatten() return yinter.flatten(), expr
# ---------------------Derivative Kriging-------------- # Initially programed by: Yixun Sun # Modified by Bin Yang # -----------------------------------------------------
[docs]def solveB(M, U): """ Solve the linear equation system M * B = U TODO: check the comments Parameters ---------- M : numpy array The kriging matrix. U : numpy array The vector of the right hand side of the linear equation system. Returns ------- B : numpy array The solution of the linear equation system. """ B = np.linalg.solve(M, U) print('solution Matrix b writes:') print(B) return B
[docs]def h(x1, x2): """ The function h(x1, x2) = abs(x1 - x2). Parameters ---------- x1 : float The first point. x2 : float The second point. Returns ------- h : float """ return np.abs(x1 - x2)
[docs]def buildKrigFunc_deriv(x, xKnown, xKnown_deriv, B, deriveFuncs, covFuncs, covFuncs_deriv): funcKrig = 0 for i in range(len(xKnown)): funcKrig = covFuncs(h(x, xKnown[i])) * B[i] + funcKrig for i in range(len(xKnown), len(xKnown) + len(xKnown_deriv)): funcKrig = covFuncs_deriv(h(x, xKnown_deriv[i - len(xKnown)])) * B[i] * np.sign( xKnown_deriv[i - len(xKnown)] - x) + funcKrig for i in range(len(xKnown) + len(xKnown_deriv), len(B)): funcKrig = funcKrig + B[i] * deriveFuncs(x)[i - (len(xKnown) + len(xKnown_deriv))] return funcKrig
[docs]def buildM_deriv(x, x_deriv, name_drift, name_cov, covFuncs_deriv, covFuncs_deriv2, nugg): """ Build the matrix M for the derivative kriging system Parameters ---------- x : array x points xDeriv : array x points for derivative name_drift : function derivative functions name_cov : function covariance functions covFuncs_deriv : function derivative of covariance functions covFuncs_deriv2 : function second derivative of covariance functions nugg : float nugget effect (variance) """ # Initialization of matrix M xlen, xlen_deriv, driftLen = len(x), len(x_deriv), len(name_drift(x[0])) lenMatM = driftLen + xlen + xlen_deriv M = np.zeros((lenMatM, lenMatM)) """ Build the matrix M for the derivative kriging system """ for i in range(xlen): for j in range(xlen): if i == j: M[i, j] = 0 + nugg else: M[i, j] = name_cov(h(x[i], x[j])) for i in range(xlen): for j in range(xlen_deriv): # TODO: replace the np.sign with np.sign() M[i, j + xlen] = covFuncs_deriv(x[i] - x_deriv[j]) * np.sign(-x[i] + x_deriv[j]) M[j + xlen, i] = covFuncs_deriv(x[i] - x_deriv[j]) * np.sign(-x[i] + x_deriv[j]) for i in range(xlen_deriv): for j in range(xlen_deriv): # TODO: replace the np.sign with np.sign() M[i + xlen, j + xlen] = covFuncs_deriv2(x_deriv[i] - x_deriv[j]) * np.sign(-x_deriv[i] + x_deriv[j]) M[j + xlen, i + xlen] = covFuncs_deriv2(x_deriv[i] - x_deriv[j]) * np.sign(-x_deriv[i] + x_deriv[j]) for i in range(xlen): for j in range(driftLen): M[i, j + xlen + xlen_deriv] = name_drift(x[i])[j] M[j + xlen + xlen_deriv, i] = name_drift(x[i])[j] print('Matrix M writes:') print(M) print('Solve M*b=u to obtain Kriging function') return M
[docs]def buildU_deriv(y, y_deriv, deriveFuncs): ylen, ylen_deriv, deriveFuncLen = len(y), len(y_deriv), len(deriveFuncs(y[0])) lenMatU = deriveFuncLen + ylen + ylen_deriv U = np.zeros(lenMatU) U[:ylen] = y U[ylen:ylen + ylen_deriv] = y_deriv print('Matrix u writes:') print(U) return U
[docs]def bd_Deriv_kriging_func(x, y, xDeriv, yDeriv, choixDerive, choixCov, plot_x_pts, nugg): """ Derivative kriging function. Parameters ---------- x : array x points y : array y points xDeriv : array x points for derivative yDeriv : array the derivative of xDeriv points choixDerive : string the name of the derivative function. Possible values are 'cst', 'lin' or 'quad'. choixCov : string the name of the covariance function. Possible values are 'lin' or 'cub'. plot_x_pts: array number of points for plot nugg: float nugget effect (variance) Returns ------- kringFunctionStr: string String of the kriging function. x_var_sym: string string of the x variable """ # plot the original dataset using scatter plt.scatter(x, y, color='k', marker='x', alpha=0.7, s=12, label='') # ---------------Choice of drift ----------------------- deriveFuncs = {'cst': lambda x: [1], 'lin': lambda x: [1, x]} # -------------------Choice of covariance--------------- covFuncs = {'cub': lambda x: x ** 3., 'lin': lambda x: x} # ------------------Derivative of covariance------------- covFuncs_deriv = {'cub': lambda x: 3 * x ** 2., 'lin': lambda x: x ** 0} covFuncs_deriv2 = {'cub': lambda x: 6 * x ** 1., 'lin': lambda x: x * 0} # ---------------------Build matrix M--------------------- M = buildM_deriv(x, xDeriv, deriveFuncs[choixDerive], covFuncs[choixCov], covFuncs_deriv[choixCov], covFuncs_deriv2[choixCov], nugg) # ---------------------Build matrix u-------------- U1 = buildU_deriv(y, yDeriv, deriveFuncs[choixDerive]) # ----------------------solve b-------------------- B1 = solveB(M, U1) # ----------------build string function------------ lowerX, upperX = min(x), max(x) intervalX = (upperX - lowerX) / plot_x_pts x_krig = [i * intervalX for i in range(int(lowerX / intervalX), int((upperX) / intervalX) + 1)] y_krig = [buildKrigFunc_deriv(x_krig[i], x, xDeriv, B1, deriveFuncs[choixDerive], covFuncs[choixCov], covFuncs_deriv[choixCov]) for i in range(len(x_krig))] sum_ave = 0 for i in range(1, len(x_krig)): hh = x_krig[i] - x_krig[i - 1] a_b = y_krig[i] + y_krig[i - 1] sum_ave = sum_ave + 0.5 * hh * a_b sum_ave = sum_ave / h(min(x), max(x)) plt.plot(x_krig, y_krig, linestyle='--', lw=1, label='Nugget effect = ' + str(nugg)) plt.ylabel('y') plt.xlabel('x') plt.legend(loc='upper left', ncol=1) plt.show() return sum_ave
if __name__ == "__main__": dataset = np.array([[0, 0], [0.25, -1], [0.75, -1], [1, 0]]) name_drift = 'const' name_cov = 'lin' nugget_effect = 0 mat_krig, mat_krig_inv, vector_ba, expr = \ curve_krig_2D(dataset, name_drift, name_cov, nugget_effect=0.0) yinter = interpolate(dataset, name_drift, name_cov, nugget_effect=0, interp=' ') print("the kriging matrix:\n", mat_krig) print("the kriged y values:\n", yinter) print("vector_ba:\n", vector_ba) ''' the final mat_krig for checking purpose: [[0. 0.015625 0.421875 1. 1. ] [0.015625 0. 0.125 0.421875 1. ] [0.421875 0.125 0. 0.015625 1. ] [1. 0.421875 0.015625 0. 1. ] [1. 1. 1. 1. 0. ]] [[0. 0.25 0.75 1. 1. ] [0.25 0. 0.5 0.75 1. ] [0.75 0.5 0. 0.25 1. ] [1. 0.75 0.25 0. 1. ] [1. 1. 1. 1. 0. ]] '''