cntmn5.py – treatment of Ni K-emission from Cu target X-ray tube
Implementation of deconvolutional treatment of parasite Ni K-emission peaks from Cu-target X-ray tube [Ida, et al., 2018a, 2018b].
Python code “cntmn5.py” is shown below.
Click (tap) [Python] icon on the right-top edge of the code area to copy the code to clipboard.
# -*- coding: utf-8 -*-
import numpy as np # import "numpy" module
from scipy import interpolate # import "interpolate" module from "scipi"
import sys # import "sys" module to use "exit()"
import os.path # import "os.path" module to use "os.path.exists()"
import configparser # import "configparser" module
import common5 as cmn # import "dct_common" (common module) as cmn
########################################################
# Decontamination (1/2), definition of horizontal scale
########################################################
def scale_hrz(degTwoTheta):
x = np.log(np.sin(np.pi * degTwoTheta / 360))
return x
######################################################
# Decontamination (2/2), definition of ordinate scale
######################################################
def scale_ord(degTwoTheta):
f = 2 * np.tan(np.pi * degTwoTheta / 360)
return f
#########################################################
# Definition of lower-truncated Lorentzian profile model
#########################################################
def f_Lor(x, x0, w, c):
# x: horizontal values (array)
# x0: location
# w: width
# c: cut-off position
nPoint = x.size
dx = x[1] - x[0]
iZero = max(0, np.int((c - x[0])/dx))
x_w = np.array(x)
x_w[iZero:nPoint] = (x[iZero:nPoint] - x0) / w
y = np.zeros(nPoint)
y[iZero:nPoint] = 2 / (np.pi * w * (1 + 4 * x_w[iZero:nPoint]**2))
return y
###########################################
# Parser for multi-peak profile parameters
###########################################
def DefFunc(config, section):
# config: configuration
# section: section
nPeak = config.getint(section,'nPeak')
nSubPeak = np.zeros(nPeak, dtype=int)
intns = np.zeros(nPeak)
ofset = np.zeros(nPeak)
type = []
posit = []
width = []
integ = []
cutof = []
for i in range(0, nPeak):
key_n = ('nSubPeak{:02d}').format(i)
nSubPeak[i] = config.getint(section, key_n)
key_o = ('o{:02d}').format(i)
ofset[i] = config.getfloat(section, key_o)
key_I = ('I{:02d}').format(i)
intns[i] = config.getfloat(section, key_I)
typeSub = []
positSub = np.zeros(nSubPeak[i])
widthSub = np.zeros(nSubPeak[i])
integSub = np.zeros(nSubPeak[i])
cutofSub = np.zeros(nSubPeak[i])
for j in range(0, nSubPeak[i]):
typeSub = typeSub + ['Lorentz']
key_x = ('x{:02d}_{:02d}').format(i, j)
positSub[j] = config.getfloat(section, key_x)
key_w = ('w{:02d}_{:02d}').format(i, j)
widthSub[j] = config.getfloat(section, key_w)
key_I = ('I{:02d}_{:02d}').format(i, j)
integSub[j] = config.getfloat(section, key_I)
key_c = ('c{:02d}_{:02d}').format(i, j)
cutofSub[j] = config.getfloat(section, key_c)
posit = posit + [positSub]
width = width + [widthSub]
type = type + [typeSub]
integ = integ + [integSub]
cutof = cutof + [cutofSub]
params = [nPeak, nSubPeak, intns, ofset, type, posit, width, integ, cutof]
return params
###########################################
# Contamination peak profile composed of
# lower-truncated Lorentzian peak profiles
###########################################
def FormFunc(i, params, x):
# i: index of contamination peak
# params = [nPeak, nSubPeak, intns, ofset, type, posit, width, integ, cutof]
# x: array or partial array of abscissa. Equidistant ascending order assumed.
nPeak = params[0] # Number of peaks (not used)
nSubPeak = params[1] # Number of subpeaks for each peak
intns = params[2] # total peak intensity
ofset = params[3] # angular offset (not used)
posit = params[5] # array of subpeak location
width = params[6] # array of subpeak width
integ = params[7] # array of subpeak relative intensity
cutof = params[8] # array of subpeak lower-cut position
nP = x.size
y = np.zeros(nP)
dx = x[1] - x[0]
for j in range(0, nSubPeak[i]):
nC = max(0, int((cutof[i][j] - x[0]) / dx))
# tricky? ... using the relation: x[n1:n2][0] == x[n1]
y[nC:nP] += intns[i]* integ[i][j] * f_Lor(x[nC:nP], posit[i][j], width[i][j], cutof[i][j])
return y
############################################
# Fourier transform of shift operation
# integrate[-inf,inf] δ(x-x0) exp(2πikx) dx
# = exp(2πikx0)
############################################
def fFtShift(shift, k):
ans = np.exp(2 * np.pi * 1j * k * shift)
return ans
##########################################
# Decontamination treatment, main body
##########################################
def treat(source, DPATH):
#******************
# Read "cntmn.cfg"
#******************
CFG_FILE = os.path.join(DPATH, 'cntmn.cfg')
cntmn_config = configparser.ConfigParser()
cntmn_config.read(CFG_FILE, encoding = 'utf-8')
# cntmn_config.readfp(codecs.open(CFG_FILE, "r", "utf8"))
#****************
# Read "dct.cfg"
#****************
CFG_FILE = os.path.join(DPATH, 'dct.cfg')
dct_config = configparser.ConfigParser()
dct_config.read(CFG_FILE, encoding = 'utf-8')
# dct_config.readfp(codecs.open(CFG_FILE, "r", "utf8"))
error_type = dct_config.get('Control','error_type')
# Parse deconvolution (CuKα1) control
paramsDeconv = DefFunc(cntmn_config, 'CuK_alpha1')
# Parse convolution (contamination) control
paramsConv = DefFunc(cntmn_config, 'contamination')
# Number of contamination peaks
nPeak = cntmn_config.getint('contamination', 'nPeak')
#***********************************************
# Main loop for deocontamination process
# Repeat for each indicated contamination peaks
#***********************************************
for iPeak in range(nPeak):
s_angle = source[0] # horizontal values of original data
s_intensity = source[1] # intensity values of original data
#==========================================
# Make copy of original data (important !)
#==========================================
s_intensity0 = s_intensity.copy() # original intensity data
#============================================
# Treatment (1/2)
# on ln(sin(theta)) scale
# treatment of reduction, shift and smearing
#============================================
source_x = scale_hrz(s_angle) # scale-transformed abscissa values
source_corr = scale_ord(s_angle)
source_corr = source_corr * cmn.g_corr(s_angle / 2) # intensity correction factor
source_y = s_intensity * source_corr # scale-transformed ordinate values
#=======================
# Error treatment (1/2)
#=======================
if (error_type == '1'):
s_error1 = source[2]
elif (error_type == '2'):
s_error2 = source[2]
elif (error_type == '3'):
s_error1 = source[2]
s_error2 = source[3]
if ((error_type == '1') or (error_type == '3')):
s_error1_0 = s_error1.copy()
source_d1 = s_error1 * source_corr # correct ordinate errors
d_error1 = np.array(s_error1)
if ((error_type == '2') or (error_type == '3')):
s_error2_0 = s_error2.copy()
source_d2 = s_error2 * source_corr # correct ordinate errors
d_error2 = np.array(s_error1)
#===================================================
# Set margin, total number of points, step interval
# Margin is set to 0.5 on ln(sin(theta)) scale.
#===================================================
index = cmn.indices(source_x, 0.5)
nSource = index[0]
i1 = index[1] # 1st index
i2 = index[2] # 2nd index
i3 = index[3] # 3rd index
nDest = index[4]
nDest_2 = int(nDest/2)
dx = (source_x[nSource-1] - source_x[0]) / i1 # step interval
# Prepare abscissa data for valid range
nValid = int((source_x[nSource-1] - source_x[0]) / dx)
chi0 = source_x[0]
chi = np.linspace(chi0, chi0 + nValid * dx, nValid, endpoint = False)
# Cubic spline interpolation
eta = np.array(chi)
f3 = interpolate.interp1d(source_x, source_y, kind="cubic")
eta = f3(chi)
# Fill values into marginal area
eta = np.resize(eta, nDest)
iLin = np.linspace(0, i3-i2, i3-i2+1)
etaL = source_y[nSource - 1] # last value (left side of marginal area)
etaR = source_y[0] # first value (right side of marginal area)
eta[i1:i2] = etaL # fill with last value
eta[i2:i3] = etaL + (etaR - etaL) * iLin[0:i3-i2] / (i3-i2)
# linear interpolation
eta[i3:nDest] = etaR # fill with first value
ft_eta = np.fft.fft(eta) # Fourier transform of intensity
dxi = 1.0 / (dx * nDest) # Interval of Fourier transformed data
xi = np.linspace(0, nDest * dxi, nDest, endpoint=False) # abscissa of Fourier transform
xi[nDest_2:nDest] = xi[nDest_2:nDest] - nDest * dxi
# Starting deconvolution-convolution
# Shift range (to start from zero index)
hx = nDest * dx # width of horizonatl
chi = np.linspace(0, hx, nDest, endpoint = False)
deconv = np.array(eta) # copy
conv = np.array(eta) # copy
# Calculate deconvolution (CuKα1) function
deconv[0:nDest_2] = FormFunc(0, paramsDeconv, chi[0:nDest_2])
deconv[nDest_2:nDest] = FormFunc(0, paramsDeconv, chi[nDest_2:nDest]-hx)
# Calculate convolution (contamination peak) profile.
conv[0:nDest_2] = FormFunc(iPeak, paramsConv, chi[0:nDest_2])
conv[nDest_2:nDest] = FormFunc(iPeak, paramsConv, chi[nDest_2:nDest]-hx)
# Fourier transforms of deconvolution and convolution functions
ft_deconv = np.fft.fft(deconv) # Fourier transform
ft_conv = np.fft.fft(conv) # Fourier transform
# Calculation of instrumental function
ft_inst = np.array(ft_deconv) # Make a copy
ft_inst = ft_deconv / ft_conv # Fourier transform of instrumental function
# Inverse Fourier transform & deconvolution of intensity data
ft_eta = ft_eta / ft_inst
zeta = np.real(np.fft.ifft(ft_eta)) # deconvolution-convolution treatment about intensity
# Retrieve original abscissa.
chi = np.linspace(chi0, chi0 + nDest * dx, nDest, endpoint = False)
# Cubic spline interpolation
f3 = interpolate.interp1d(chi, zeta, kind="cubic")
dest_y = f3(source_x)
# Rescale transform (from "logarithmic sine" to "diffraction angle")
d_intensity = dest_y / source_corr # Ordinate values
# Treatment about estimated errors...
if ((error_type == '1') or (error_type == '3')):
# Correction of error data
source_e = cmn.mod_error(source_x, source_d1, dx)
# Abscissa
chi = np.linspace(chi0, chi0 + nValid * dx, nValid, endpoint = False)
# Spline interpolation
f3 = interpolate.interp1d(source_x, source_e, kind="cubic")
eps = f3(chi)
epsL = source_e[nSource - 1]
epsR = source_e[0]
eps = np.resize(eps, nDest)
# Fill marginal data area.
eps[i1:i2] = epsL
eps[i2:i3] = epsL + (epsR - epsL) * iLin[0:i3-i2] / (i3-i2)
eps[i3:nDest] = epsR
inv_sq_eps = 1.0 / (eps * eps) # Inverse squared errors
ft_inv_sq_eps = np.fft.fft(inv_sq_eps) # Fourier transform of inverse squared errors
# Calculate squared instrumental function
inst = np.fft.ifft(ft_inst) # Instrumental function
sq_inst = inst * inst # Squared instrumental function
# Fourier transform of squared instrumental function
ft_sq_inst = np.fft.fft(sq_inst) # Fourier transform of squared instrumental function
# Transform of error data (2) (IFFT)
ft_inv_sq_eps = ft_inv_sq_eps * np.conj(ft_sq_inst)
delta = np.real(np.fft.ifft(ft_inv_sq_eps))
delta = 1 / np.sqrt(np.abs(delta)) # Propagation of estimated error (1)
# Set abscissa (may not be necessary?)
chi = np.linspace(chi0, chi0 + nDest * dx, nDest, endpoint = False)
# Spline interpolation
f3 = interpolate.interp1d(chi, delta, kind="cubic")
dest_e = f3(source_x) # Estimated error (model 1)
# Recorrect error data
dest_d = cmn.remod_error(source_x, dest_e, dx)
# Rescale transform (from "logarithmic sine" to "diffraction angle")
d_error1 = dest_d / source_corr # Ordinate error (model 1)
if ((error_type == '2') or (error_type == '3')):
# Correction of error data
source_e = cmn.mod_error(source_x, source_d2, dx)
# Abscissa
chi = np.linspace(chi0, chi0 + nValid * dx, nValid, endpoint = False)
f3 = interpolate.interp1d(source_x, source_e, kind="cubic")
eps = f3(chi)
epsL = source_e[nSource - 1]
epsR = source_e[0]
eps = np.resize(eps, nDest)
# Fill marginal data area
eps[i1:i2] = epsL
eps[i2:i3] = epsL + (epsR - epsL) * iLin[0:i3-i2] / (i3-i2)
eps[i3:nDest] = epsR
inv_inst = np.real(np.fft.ifft(1 / ft_inst))
# inverse instrumental function
sq_inv_inst = inv_inst * inv_inst
# squared inverse instrumental function
ft_sq_inv_inst = np.fft.fft(sq_inv_inst)
# Fourier transform of squared inverse instrumental function
sq_eps = eps * eps # Squared errors
ft_sq_eps = np.fft.fft(sq_eps)
# Fourier transform of squared errors
ft_sq_eps = ft_sq_eps * ft_sq_inv_inst
sq_eps = np.real(np.fft.ifft(ft_sq_eps))
delta = np.sqrt(np.abs(sq_eps))
# Abscissa
chi = np.linspace(chi0, chi0 + nDest * dx, nDest, endpoint = False)
# Spline interpolation
f3 = interpolate.interp1d(chi, delta, kind="cubic")
dest_e = f3(source_x) # Estimated error (model 2)
# Recorrect error data
dest_d = cmn.remod_error(source_x, dest_e, dx)
d_error2 = dest_d / source_corr # ordinate error (model 2)
if (error_type == '0'):
dest = [s_angle, d_intensity]
elif (error_type == '1'):
dest = [s_angle, d_intensity, d_error1]
elif (error_type == '2'):
dest = [s_angle, d_intensity, d_error2]
else:
dest = [s_angle, d_intensity, d_error1, d_error2]
#====================================
# Decontamination process (part 2/2)
# Shift on diffraction-angle scale
#====================================
source = dest
# Identity transformation
source_x = s_angle.copy() # abscissa value
source_corr = 1.0
source_y = d_intensity.copy() # ordinate value
#============================
# Error treatment (part 2/2)
#============================
if (error_type == '1'):
s_error1 = source[2]
source_d1 = s_error1 * source_corr # ordinate error
d_error1 = np.array(s_error1)
elif (error_type == '2'):
s_error2 = source[2]
source_d2 = s_error2 * source_corr # ordinate error
d_error2 = np.array(s_error1)
elif (error_type == '3'):
s_error1 = source[2]
s_error2 = source[3]
source_d1 = s_error1 * source_corr # ordinate error
source_d2 = s_error2 * source_corr # ordinate error
d_error1 = np.array(s_error1)
d_error2 = np.array(s_error2)
#================================================
# Determine marginal range, number of total data
# and step interval
#=================================================
index = cmn.indices(source_x, 0.5)
nSource = index[0]
i1 = index[1] # 1st index
i2 = index[2] # 2nd index
i3 = index[3] # 3rd index
nDest = index[4]
nDest_2 = int(nDest/2)
dx = (source_x[nSource-1] - source_x[0]) / i1 # step interval
#=========================================
# Prepare abscissa data for interpolation
# of valid data (part 2/2)
#=========================================
nValid = int((source_x[nSource-1] - source_x[0]) / dx)
chi0 = source_x[0]
chi = np.linspace(chi0, chi0 + nValid * dx, nValid, endpoint = False)
# Cubic spline interpolation
eta = np.array(chi)
f3 = interpolate.interp1d(source_x, source_y, kind="cubic")
eta = f3(chi)
# Fill margin area
chi = np.linspace(chi0, chi0 + nDest * dx, nDest, endpoint = False)
eta = np.resize(eta, nDest)
iLin = np.linspace(0, i3-i2, i3-i2+1)
etaL = source_y[nSource - 1]
etaR = source_y[0]
eta[i1:i2] = etaL
eta[i2:i3] = etaL + (etaR - etaL) * iLin[0:i3-i2] / (i3-i2)
eta[i3:nDest] = etaR
ft_eta = np.fft.fft(eta) # Fourier transform of intensity
dxi = 1.0 / (dx * nDest) # Fourier transform of abscissa
xi = np.linspace(0, nDest * dxi, nDest, endpoint=False)
# Abscissa of Fourier transform
xi[nDest_2:nDest] = xi[nDest_2:nDest] - nDest * dxi
#====================================
# Main process of part 2, shift only
#====================================
key_o = ('o{:02d}').format(iPeak)
ofset = cntmn_config.getfloat('contamination', key_o)
ft_deconv = np.array(ft_eta) # duplicate Fourier transformed intensity
ft_deconv = 1.0 # Fourier transform of Dirac delta function
ft_conv = np.array(ft_deconv) # duplicate
ft_conv = fFtShift(-ofset, xi) # Fourier transform of shift function
# Calculate insstrumental function
ft_inst = np.array(ft_deconv)
# duplicate Fourier transform of deconvolution
ft_inst = ft_deconv / ft_conv
# Fourier transform of instrumental function
# Deconvolution & inverse Fourier transform of intensity data
ft_eta = ft_eta / ft_inst
zeta = np.real(np.fft.ifft(ft_eta)) # deconvolution-cnvolution process
# Cubic spline interpolation
f3 = interpolate.interp1d(chi, zeta, kind="cubic")
dest_y = f3(source_x)
# Rescale data (from "logarithmic sine" to "diffraction angle")
d_intensity = dest_y / source_corr # ordinate value
# Treatment of estimated error (part 2, model 1)
if ((error_type == '1') or (error_type == '3')):
source_e = cmn.mod_error(source_x, source_d1, dx)
# Abscissa
chi = np.linspace(chi0, chi0 + nValid * dx, nValid, endpoint = False)
# Cubic spline interpolation
f3 = interpolate.interp1d(source_x, source_e, kind="cubic")
eps = f3(chi)
epsL = source_e[nSource - 1]
epsR = source_e[0]
eps = np.resize(eps, nDest)
# Fill marginal areas with values
eps[i1:i2] = epsL
eps[i2:i3] = epsL + (epsR - epsL) * iLin[0:i3-i2] / (i3-i2)
eps[i3:nDest] = epsR
inv_sq_eps = 1.0 / (eps * eps) # inverse squared error
ft_inv_sq_eps = np.fft.fft(inv_sq_eps)
# Fourier transform of inverse squared error
# Calculate squared instrumental function
inst = np.fft.ifft(ft_inst) # instrumental function
sq_inst = inst * inst # squared instrumental function
# Fourier transform of squared instrumental function
ft_sq_inst = np.fft.fft(sq_inst)
# Fourier transform of squared instrumental function
# Error tranformation (2) (IFFT)
ft_inv_sq_eps = ft_inv_sq_eps * np.conj(ft_sq_inst)
delta = np.real(np.fft.ifft(ft_inv_sq_eps))
delta = 1 / np.sqrt(np.abs(delta))
# propagated estimated error (1)
# Abscissa
chi = np.linspace(chi0, chi0 + nDest * dx, nDest, endpoint = False)
# Cubic spline interpolation
f3 = interpolate.interp1d(chi, delta, kind="cubic")
dest_e = f3(source_x)
# estimated error (model 1)
# Recorrect error data
dest_d = cmn.remod_error(source_x, dest_e, dx)
# Rescale data (from "logarithmic sine" to "diffraction angle")
d_error1 = dest_d / source_corr
# ordinate error (model 1)
# Treatment of error estimation (part 2/2, model 2)
if ((error_type == '2') or (error_type == '3')):
source_e = cmn.mod_error(source_x, source_d2, dx)
# Abscissa
chi = np.linspace(chi0, chi0 + nValid * dx, nValid, endpoint = False)
f3 = interpolate.interp1d(source_x, source_e, kind="cubic")
eps = f3(chi)
epsL = source_e[nSource - 1]
epsR = source_e[0]
eps = np.resize(eps, nDest)
# Fill margin area with values.
eps[i1:i2] = epsL
eps[i2:i3] = epsL + (epsR - epsL) * iLin[0:i3-i2] / (i3-i2)
eps[i3:nDest] = epsR
inv_inst = np.real(np.fft.ifft(1 / ft_inst))
# inverse instrumental function
sq_inv_inst = inv_inst * inv_inst # squared inverse instrumental function
ft_sq_inv_inst = np.fft.fft(sq_inv_inst) # squared inverse instrumental function
sq_eps = eps * eps # squared error
ft_sq_eps = np.fft.fft(sq_eps) # Fourier transform of squared error
ft_sq_eps = ft_sq_eps * ft_sq_inv_inst
sq_eps = np.real(np.fft.ifft(ft_sq_eps))
delta = np.sqrt(np.abs(sq_eps))
# Abscissa
chi = np.linspace(chi0, chi0 + nDest * dx, nDest, endpoint = False)
# Cubic spline interpolation
f3 = interpolate.interp1d(chi, delta, kind="cubic")
dest_e = f3(source_x) # estimated error (model 2)
# Recorrect error data
dest_d = cmn.remod_error(source_x, dest_e, dx)
d_error2 = dest_d / source_corr # ordinate error (model 2)
#========================================
# Recorrect intensity data (important !)
#========================================
s_intensity0 = s_intensity0 - d_intensity
#==================================
# Correct error data (important !)
#==================================
if ((error_type == '1') or (error_type == '3')):
s_error1_0 = np.sqrt(s_error1_0**2 + d_error1**2)
elif ((error_type == '1') or (error_type == '3')):
s_error2_0 = np.sqrt(s_error2_0**2 + d_error2**2)
#=================================
# Revise input data (important !)
#=================================
if (error_type == '0'):
source = [s_angle, s_intensity0]
elif (error_type == '1'):
source = [s_angle, s_intensity0, s_error1_0]
elif (error_type == '2'):
source = [s_angle, s_intensity0, s_error2_0]
else:
source = [s_angle, s_intensity0, s_error1_0, s_error2_0]
return source