cntmn5.py

cntmn5.py – treatment of Ni K-emission from Cu target X-ray tube

Implementation of deconvolutional treatment of parasite Ni K-emission peaks from Cu-target X-ray tube [Ida, et al., 2018a, 2018b].

Python code “cntmn5.py” is shown below.

Click (tap) [Python] icon on the right-top edge of the code area to copy the code to clipboard.

# -*- coding: utf-8 -*-
import numpy as np # import "numpy" module
from scipy import interpolate # import "interpolate" module from "scipi"
import sys # import "sys" module to use "exit()"
import os.path # import "os.path" module to use "os.path.exists()"
import configparser # import "configparser" module

import common5 as cmn # import "dct_common" (common module) as cmn

########################################################
# Decontamination (1/2), definition of horizontal scale
########################################################
def scale_hrz(degTwoTheta):
    x = np.log(np.sin(np.pi * degTwoTheta / 360))
    return x

######################################################
# Decontamination (2/2), definition of ordinate scale
######################################################
def scale_ord(degTwoTheta):
    f = 2 * np.tan(np.pi * degTwoTheta / 360)
    return f

#########################################################
# Definition of lower-truncated Lorentzian profile model
#########################################################
def f_Lor(x, x0, w, c):
    # x: horizontal values (array)
    # x0: location
    # w: width
    # c: cut-off position
    nPoint = x.size
    dx = x[1] - x[0]
    iZero = max(0, np.int((c - x[0])/dx))
    x_w = np.array(x)
    x_w[iZero:nPoint] = (x[iZero:nPoint] - x0) / w
    y = np.zeros(nPoint)
    y[iZero:nPoint] = 2 / (np.pi * w * (1 + 4 * x_w[iZero:nPoint]**2))
    return y

###########################################
# Parser for multi-peak profile parameters
###########################################
def DefFunc(config, section):
    # config: configuration
    # section: section
    nPeak = config.getint(section,'nPeak')
    nSubPeak = np.zeros(nPeak, dtype=int)
    intns = np.zeros(nPeak)
    ofset = np.zeros(nPeak)
    type = []
    posit = []
    width = []
    integ = []
    cutof = []
    for i in range(0, nPeak):
        key_n = ('nSubPeak{:02d}').format(i)
        nSubPeak[i] = config.getint(section, key_n)
        key_o = ('o{:02d}').format(i)
        ofset[i] = config.getfloat(section, key_o)
        key_I = ('I{:02d}').format(i)
        intns[i] = config.getfloat(section, key_I)
        typeSub = []
        positSub = np.zeros(nSubPeak[i])
        widthSub = np.zeros(nSubPeak[i])
        integSub = np.zeros(nSubPeak[i])
        cutofSub = np.zeros(nSubPeak[i])
        for j in range(0, nSubPeak[i]):
            typeSub = typeSub + ['Lorentz']
            key_x = ('x{:02d}_{:02d}').format(i, j)
            positSub[j] = config.getfloat(section, key_x)
            key_w = ('w{:02d}_{:02d}').format(i, j)
            widthSub[j] = config.getfloat(section, key_w)
            key_I = ('I{:02d}_{:02d}').format(i, j)
            integSub[j] = config.getfloat(section, key_I)
            key_c = ('c{:02d}_{:02d}').format(i, j)
            cutofSub[j] = config.getfloat(section, key_c)
        posit = posit + [positSub]
        width = width + [widthSub]
        type = type + [typeSub]
        integ = integ + [integSub]
        cutof = cutof + [cutofSub]
    params = [nPeak, nSubPeak, intns, ofset, type, posit, width, integ, cutof]

    return params

###########################################
# Contamination peak profile composed of 
# lower-truncated Lorentzian peak profiles
###########################################
def FormFunc(i, params, x):
# i: index of contamination peak
# params = [nPeak, nSubPeak, intns, ofset, type, posit, width, integ, cutof]
# x: array or partial array of abscissa. Equidistant ascending order assumed.
    nPeak = params[0]    # Number of peaks (not used)
    nSubPeak = params[1] # Number of subpeaks for each peak
    intns = params[2] # total peak intensity
    ofset = params[3] # angular offset (not used)
    posit = params[5] # array of subpeak location
    width = params[6] # array of subpeak width
    integ = params[7] # array of subpeak relative intensity
    cutof = params[8] # array of subpeak lower-cut position
    nP = x.size
    y = np.zeros(nP)
    dx = x[1] - x[0]
    for j in range(0, nSubPeak[i]):
        nC = max(0, int((cutof[i][j] - x[0]) / dx))
            # tricky? ... using the relation: x[n1:n2][0] == x[n1]
        y[nC:nP] += intns[i]* integ[i][j] * f_Lor(x[nC:nP], posit[i][j], width[i][j], cutof[i][j])
    return y

############################################
# Fourier transform of shift operation
# integrate[-inf,inf] δ(x-x0) exp(2πikx) dx
# = exp(2πikx0)
############################################
def fFtShift(shift, k):
    ans = np.exp(2 * np.pi * 1j * k * shift)
    return ans

##########################################
# Decontamination treatment, main body
##########################################
def treat(source, DPATH):

    #******************
    # Read "cntmn.cfg" 
    #******************
    CFG_FILE = os.path.join(DPATH, 'cntmn.cfg')
    cntmn_config = configparser.ConfigParser()
    cntmn_config.read(CFG_FILE, encoding = 'utf-8')
    # cntmn_config.readfp(codecs.open(CFG_FILE, "r", "utf8"))
    
    #****************
    # Read "dct.cfg"
    #****************
    CFG_FILE = os.path.join(DPATH, 'dct.cfg')
    dct_config = configparser.ConfigParser()
    dct_config.read(CFG_FILE, encoding = 'utf-8')
    # dct_config.readfp(codecs.open(CFG_FILE, "r", "utf8"))
    error_type = dct_config.get('Control','error_type')

    # Parse deconvolution (CuKα1) control
    paramsDeconv = DefFunc(cntmn_config, 'CuK_alpha1')
    # Parse convolution (contamination) control
    paramsConv = DefFunc(cntmn_config, 'contamination')
    # Number of contamination peaks
    nPeak = cntmn_config.getint('contamination', 'nPeak')

    #***********************************************
    # Main loop for deocontamination process
    # Repeat for each indicated contamination peaks
    #***********************************************
    for iPeak in range(nPeak):

        s_angle = source[0]    # horizontal values of original data
        s_intensity = source[1]    # intensity values of original data
        #==========================================
        # Make copy of original data (important !)
        #==========================================
        s_intensity0 = s_intensity.copy()    # original intensity data

        #============================================
        # Treatment (1/2)
        # on ln(sin(theta)) scale
        # treatment of reduction, shift and smearing
        #============================================
        source_x = scale_hrz(s_angle)    # scale-transformed abscissa values
        source_corr = scale_ord(s_angle)
        source_corr = source_corr * cmn.g_corr(s_angle / 2) # intensity correction factor
        source_y = s_intensity * source_corr # scale-transformed ordinate values

        #=======================
        # Error treatment (1/2)
        #=======================
        if (error_type == '1'):
            s_error1 = source[2]
        elif (error_type == '2'):
            s_error2 = source[2]
        elif (error_type == '3'):
            s_error1 = source[2]
            s_error2 = source[3]
        if ((error_type == '1') or (error_type == '3')):
            s_error1_0 = s_error1.copy()
            source_d1 = s_error1 * source_corr # correct ordinate errors
            d_error1 = np.array(s_error1)
        if ((error_type == '2') or (error_type == '3')):
            s_error2_0 = s_error2.copy()
            source_d2 = s_error2 * source_corr # correct ordinate errors
            d_error2 = np.array(s_error1)

        #===================================================
        # Set margin, total number of points, step interval
        # Margin is set to 0.5 on ln(sin(theta)) scale.
        #===================================================
        index = cmn.indices(source_x, 0.5)
        nSource = index[0]
        i1 = index[1] # 1st index
        i2 = index[2] # 2nd index
        i3 = index[3] # 3rd index
        nDest = index[4]
        nDest_2 = int(nDest/2)
        dx = (source_x[nSource-1] - source_x[0]) / i1 # step interval
        # Prepare abscissa data for valid range
        nValid = int((source_x[nSource-1] - source_x[0]) / dx)
        chi0 = source_x[0]
        chi = np.linspace(chi0, chi0 + nValid * dx, nValid, endpoint = False)
        # Cubic spline interpolation
        eta = np.array(chi)
        f3 = interpolate.interp1d(source_x, source_y, kind="cubic")
        eta = f3(chi)

        # Fill values into marginal area
        eta = np.resize(eta, nDest)
        iLin = np.linspace(0, i3-i2, i3-i2+1)
        etaL = source_y[nSource - 1] # last value (left side of marginal area)
        etaR = source_y[0] # first value (right side of marginal area)
        eta[i1:i2] = etaL # fill with last value
        eta[i2:i3] = etaL + (etaR - etaL) * iLin[0:i3-i2] / (i3-i2)
            # linear interpolation
        eta[i3:nDest] = etaR # fill with first value
        ft_eta = np.fft.fft(eta) # Fourier transform of intensity
        dxi = 1.0 / (dx * nDest) # Interval of Fourier transformed data
        xi = np.linspace(0, nDest * dxi, nDest, endpoint=False) # abscissa of Fourier transform
        xi[nDest_2:nDest] = xi[nDest_2:nDest] - nDest * dxi
        # Starting deconvolution-convolution
        # Shift range (to start from zero index)
        hx = nDest * dx # width of horizonatl
        chi = np.linspace(0, hx, nDest, endpoint = False)
        deconv = np.array(eta) # copy
        conv = np.array(eta) # copy
        # Calculate deconvolution (CuKα1) function
        deconv[0:nDest_2] = FormFunc(0, paramsDeconv, chi[0:nDest_2])
        deconv[nDest_2:nDest] = FormFunc(0, paramsDeconv, chi[nDest_2:nDest]-hx)
        # Calculate convolution (contamination peak) profile.
        conv[0:nDest_2] = FormFunc(iPeak, paramsConv, chi[0:nDest_2])
        conv[nDest_2:nDest] = FormFunc(iPeak, paramsConv, chi[nDest_2:nDest]-hx)
        # Fourier transforms of deconvolution and convolution functions
        ft_deconv = np.fft.fft(deconv)    # Fourier transform
        ft_conv = np.fft.fft(conv) # Fourier transform
        # Calculation of instrumental function
        ft_inst = np.array(ft_deconv) # Make a copy
        ft_inst = ft_deconv / ft_conv # Fourier transform of instrumental function
        # Inverse Fourier transform & deconvolution of intensity data
        ft_eta = ft_eta / ft_inst
        zeta = np.real(np.fft.ifft(ft_eta)) # deconvolution-convolution treatment about intensity

        # Retrieve original abscissa.
        chi = np.linspace(chi0, chi0 + nDest * dx, nDest, endpoint = False)
        # Cubic spline interpolation
        f3 = interpolate.interp1d(chi, zeta, kind="cubic")
        dest_y = f3(source_x)
        # Rescale transform (from "logarithmic sine" to "diffraction angle")
        d_intensity = dest_y / source_corr # Ordinate values

        # Treatment about estimated errors...
        if ((error_type == '1') or (error_type == '3')):
            # Correction of error data
            source_e = cmn.mod_error(source_x, source_d1, dx)
            # Abscissa
            chi = np.linspace(chi0, chi0 + nValid * dx, nValid, endpoint = False)
            # Spline interpolation
            f3 = interpolate.interp1d(source_x, source_e, kind="cubic")
            eps = f3(chi)
            epsL = source_e[nSource - 1]
            epsR = source_e[0]
            eps = np.resize(eps, nDest)
            # Fill marginal data area.
            eps[i1:i2] = epsL
            eps[i2:i3] = epsL + (epsR - epsL) * iLin[0:i3-i2] / (i3-i2)
            eps[i3:nDest] = epsR
            inv_sq_eps = 1.0 / (eps * eps) # Inverse squared errors
            ft_inv_sq_eps = np.fft.fft(inv_sq_eps) # Fourier transform of inverse squared errors
            # Calculate squared instrumental function
            inst = np.fft.ifft(ft_inst) # Instrumental function
            sq_inst = inst * inst # Squared instrumental function
            # Fourier transform of squared instrumental function
            ft_sq_inst = np.fft.fft(sq_inst) # Fourier transform of squared instrumental function
            # Transform of error data (2) (IFFT)
            ft_inv_sq_eps = ft_inv_sq_eps * np.conj(ft_sq_inst)
            delta = np.real(np.fft.ifft(ft_inv_sq_eps))
            delta = 1 / np.sqrt(np.abs(delta)) # Propagation of estimated error (1)
            # Set abscissa (may not be necessary?)
            chi = np.linspace(chi0, chi0 + nDest * dx, nDest, endpoint = False)
            # Spline interpolation
            f3 = interpolate.interp1d(chi, delta, kind="cubic")
            dest_e = f3(source_x) # Estimated error (model 1)
            # Recorrect error data
            dest_d = cmn.remod_error(source_x, dest_e, dx)
            # Rescale transform (from "logarithmic sine" to "diffraction angle")
            d_error1 = dest_d / source_corr # Ordinate error (model 1)

        if ((error_type == '2') or (error_type == '3')):
            # Correction of error data
            source_e = cmn.mod_error(source_x, source_d2, dx)
            # Abscissa
            chi = np.linspace(chi0, chi0 + nValid * dx, nValid, endpoint = False)
            f3 = interpolate.interp1d(source_x, source_e, kind="cubic")
            eps = f3(chi)
            epsL = source_e[nSource - 1]
            epsR = source_e[0]
            eps = np.resize(eps, nDest)
            # Fill marginal data area
            eps[i1:i2] = epsL
            eps[i2:i3] = epsL + (epsR - epsL) * iLin[0:i3-i2] / (i3-i2)
            eps[i3:nDest] = epsR
            inv_inst = np.real(np.fft.ifft(1 / ft_inst)) 
                # inverse instrumental function
            sq_inv_inst = inv_inst * inv_inst 
                # squared inverse instrumental function
            ft_sq_inv_inst = np.fft.fft(sq_inv_inst) 
                # Fourier transform of squared inverse instrumental function
            sq_eps = eps * eps # Squared errors
            ft_sq_eps = np.fft.fft(sq_eps) 
                # Fourier transform of squared errors
            ft_sq_eps = ft_sq_eps * ft_sq_inv_inst
            sq_eps = np.real(np.fft.ifft(ft_sq_eps))
            delta = np.sqrt(np.abs(sq_eps))
            # Abscissa
            chi = np.linspace(chi0, chi0 + nDest * dx, nDest, endpoint = False)
            # Spline interpolation
            f3 = interpolate.interp1d(chi, delta, kind="cubic")
            dest_e = f3(source_x) # Estimated error (model 2)
            # Recorrect error data
            dest_d = cmn.remod_error(source_x, dest_e, dx)
            d_error2 = dest_d / source_corr # ordinate error (model 2)

        if (error_type == '0'):
            dest = [s_angle, d_intensity]
        elif (error_type == '1'):
            dest = [s_angle, d_intensity, d_error1]
        elif (error_type == '2'):
            dest = [s_angle, d_intensity, d_error2]
        else:
            dest = [s_angle, d_intensity, d_error1, d_error2]

        #====================================
        # Decontamination process (part 2/2)
        # Shift on diffraction-angle scale
        #====================================
        source = dest
        # Identity transformation
        source_x = s_angle.copy() # abscissa value
        source_corr = 1.0
        source_y = d_intensity.copy() # ordinate value

        #============================
        # Error treatment (part 2/2)
        #============================
        if (error_type == '1'):
            s_error1 = source[2]
            source_d1 = s_error1 * source_corr # ordinate error
            d_error1 = np.array(s_error1)
        elif (error_type == '2'):
            s_error2 = source[2]
            source_d2 = s_error2 * source_corr # ordinate error
            d_error2 = np.array(s_error1)
        elif (error_type == '3'):
            s_error1 = source[2]
            s_error2 = source[3]
            source_d1 = s_error1 * source_corr # ordinate error
            source_d2 = s_error2 * source_corr # ordinate error
            d_error1 = np.array(s_error1)
            d_error2 = np.array(s_error2)

        #================================================
        # Determine marginal range, number of total data
        # and step interval
        #=================================================
        index = cmn.indices(source_x, 0.5)
        nSource = index[0]
        i1 = index[1] # 1st index
        i2 = index[2] # 2nd index
        i3 = index[3] # 3rd index
        nDest = index[4]
        nDest_2 = int(nDest/2)
        dx = (source_x[nSource-1] - source_x[0]) / i1 # step interval
        #=========================================
        # Prepare abscissa data for interpolation 
        # of valid data (part 2/2)
        #=========================================
        nValid = int((source_x[nSource-1] - source_x[0]) / dx)
        chi0 = source_x[0]
        chi = np.linspace(chi0, chi0 + nValid * dx, nValid, endpoint = False)
        # Cubic spline interpolation
        eta = np.array(chi)
        f3 = interpolate.interp1d(source_x, source_y, kind="cubic")
        eta = f3(chi)

        # Fill margin area
        chi = np.linspace(chi0, chi0 + nDest * dx, nDest, endpoint = False)
        eta = np.resize(eta, nDest)
        iLin = np.linspace(0, i3-i2, i3-i2+1)
        etaL = source_y[nSource - 1]
        etaR = source_y[0]
        eta[i1:i2] = etaL
        eta[i2:i3] = etaL + (etaR - etaL) * iLin[0:i3-i2] / (i3-i2)
        eta[i3:nDest] = etaR
        ft_eta = np.fft.fft(eta) # Fourier transform of intensity
        dxi = 1.0 / (dx * nDest) # Fourier transform of abscissa
        xi = np.linspace(0, nDest * dxi, nDest, endpoint=False) 
            # Abscissa of Fourier transform
        xi[nDest_2:nDest] = xi[nDest_2:nDest] - nDest * dxi
        #====================================
        # Main process of part 2, shift only
        #====================================
        key_o = ('o{:02d}').format(iPeak)
        ofset = cntmn_config.getfloat('contamination', key_o)
        ft_deconv = np.array(ft_eta) # duplicate Fourier transformed intensity
        ft_deconv = 1.0    # Fourier transform of Dirac delta function
        ft_conv = np.array(ft_deconv) # duplicate
        ft_conv = fFtShift(-ofset, xi) # Fourier transform of shift function
        # Calculate insstrumental function
        ft_inst = np.array(ft_deconv) 
            # duplicate Fourier transform of deconvolution
        ft_inst = ft_deconv / ft_conv 
            # Fourier transform of instrumental function
        # Deconvolution & inverse Fourier transform of intensity data
        ft_eta = ft_eta / ft_inst
        zeta = np.real(np.fft.ifft(ft_eta)) # deconvolution-cnvolution process
        # Cubic spline interpolation
        f3 = interpolate.interp1d(chi, zeta, kind="cubic")
        dest_y = f3(source_x)
        # Rescale data (from "logarithmic sine" to "diffraction angle")
        d_intensity = dest_y / source_corr # ordinate value

        # Treatment of estimated error (part 2, model 1)
        if ((error_type == '1') or (error_type == '3')):
            source_e = cmn.mod_error(source_x, source_d1, dx)
            # Abscissa
            chi = np.linspace(chi0, chi0 + nValid * dx, nValid, endpoint = False)
            # Cubic spline interpolation
            f3 = interpolate.interp1d(source_x, source_e, kind="cubic")
            eps = f3(chi)
            epsL = source_e[nSource - 1]
            epsR = source_e[0]
            eps = np.resize(eps, nDest)
            # Fill marginal areas with values
            eps[i1:i2] = epsL
            eps[i2:i3] = epsL + (epsR - epsL) * iLin[0:i3-i2] / (i3-i2)
            eps[i3:nDest] = epsR
            inv_sq_eps = 1.0 / (eps * eps) # inverse squared error
            ft_inv_sq_eps = np.fft.fft(inv_sq_eps)
                # Fourier transform of inverse squared error
            # Calculate squared instrumental function
            inst = np.fft.ifft(ft_inst) # instrumental function
            sq_inst = inst * inst # squared instrumental function
            # Fourier transform of squared instrumental function
            ft_sq_inst = np.fft.fft(sq_inst) 
                # Fourier transform of squared instrumental function
            # Error tranformation (2) (IFFT)
            ft_inv_sq_eps = ft_inv_sq_eps * np.conj(ft_sq_inst)
            delta = np.real(np.fft.ifft(ft_inv_sq_eps))
            delta = 1 / np.sqrt(np.abs(delta)) 
                # propagated estimated error (1)
            # Abscissa
            chi = np.linspace(chi0, chi0 + nDest * dx, nDest, endpoint = False)
            # Cubic spline interpolation
            f3 = interpolate.interp1d(chi, delta, kind="cubic")
            dest_e = f3(source_x) 
                # estimated error (model 1)
            # Recorrect error data
            dest_d = cmn.remod_error(source_x, dest_e, dx)
            # Rescale data (from "logarithmic sine" to "diffraction angle")
            d_error1 = dest_d / source_corr 
                # ordinate error (model 1)

        # Treatment of error estimation (part 2/2, model 2)
        if ((error_type == '2') or (error_type == '3')):
            source_e = cmn.mod_error(source_x, source_d2, dx)
            # Abscissa
            chi = np.linspace(chi0, chi0 + nValid * dx, nValid, endpoint = False)
            f3 = interpolate.interp1d(source_x, source_e, kind="cubic")
            eps = f3(chi)
            epsL = source_e[nSource - 1]
            epsR = source_e[0]
            eps = np.resize(eps, nDest)
            # Fill margin area with values.
            eps[i1:i2] = epsL
            eps[i2:i3] = epsL + (epsR - epsL) * iLin[0:i3-i2] / (i3-i2)
            eps[i3:nDest] = epsR
            inv_inst = np.real(np.fft.ifft(1 / ft_inst)) 
                # inverse instrumental function
            sq_inv_inst = inv_inst * inv_inst # squared inverse instrumental function
            ft_sq_inv_inst = np.fft.fft(sq_inv_inst) # squared inverse instrumental function
            sq_eps = eps * eps # squared error
            ft_sq_eps = np.fft.fft(sq_eps) # Fourier transform of squared error
            ft_sq_eps = ft_sq_eps * ft_sq_inv_inst
            sq_eps = np.real(np.fft.ifft(ft_sq_eps))
            delta = np.sqrt(np.abs(sq_eps))
            # Abscissa
            chi = np.linspace(chi0, chi0 + nDest * dx, nDest, endpoint = False)
            # Cubic spline interpolation
            f3 = interpolate.interp1d(chi, delta, kind="cubic")
            dest_e = f3(source_x) # estimated error (model 2)
            # Recorrect error data
            dest_d = cmn.remod_error(source_x, dest_e, dx)
            d_error2 = dest_d / source_corr # ordinate error (model 2)

        #========================================
        # Recorrect intensity data (important !)
        #========================================
        s_intensity0 = s_intensity0 - d_intensity
        #==================================
        # Correct error data (important !)
        #==================================
        if ((error_type == '1') or (error_type == '3')):
            s_error1_0 = np.sqrt(s_error1_0**2 + d_error1**2)
        elif ((error_type == '1') or (error_type == '3')):
            s_error2_0 = np.sqrt(s_error2_0**2 + d_error2**2)
        #=================================
        # Revise input data (important !)
        #=================================
        if (error_type == '0'):
            source = [s_angle, s_intensity0]
        elif (error_type == '1'):
            source = [s_angle, s_intensity0, s_error1_0]
        elif (error_type == '2'):
            source = [s_angle, s_intensity0, s_error2_0]
        else:
            source = [s_angle, s_intensity0, s_error1_0, s_error2_0]

    return source