The Problem`
Applications exist where is is useful to know the spectral response of an image sensor in commercial cameras.
One approach to determine this is present many different images (i.e. Power Spectral Density, PSD) to the sensor, and record the R,G, and B sensors output.
%matplotlib inline
Imports ..
import matplotlib.pyplot as plt
import numpy as np
from mpl_toolkits.mplot3d import Axes3D
from mpl_toolkits.mplot3d import axes3d
import pandas as pd
from rotate import rotanimate
import matplotlib.animation as animation
import subprocess
from IPython.display import Image
from matplotlib import cm
from IPython.display import Video
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import scale
plt.style.use('seaborn-whitegrid')
plt.rcParams["figure.figsize"] = [10,6]
#import numpy as np
#import matplotlib.pyplot as plt
#from mpl_toolkits.mplot3d import Axes3D
from sklearn import decomposition
from sklearn import datasets
from sklearn.preprocessing import StandardScaler
import seaborn as sns; sns.set() # styling
from functools import reduce
import functools
import operator
#### Defaults
sns.set_style("darkgrid", {"axes.facecolor": ".9"})
plt.rcParams.update({'font.size': 12})
import seaborn as sns; sns.set() # styling
Functions
def foldl(func, acc, xs):
return functools.reduce(func, xs, acc)
# tests
#print(foldl(operator.sub, 0, [1,2,3])) # -6
#print(foldl(operator.add, 'L', ['1','2','3'])) # 'L123'
def scanl_plus(data):
'''
returns list of successive reduced values from the list (see haskell foldl)
'''
return [0] + [sum(data[:(k+1)]) for (k,v) in enumerate(data)]
def celsius_to_fahr(temp):
return 9/5 * temp + 32
def gen_answers_from_alphas(inputs, new_alphas):
return (np.matmul(inputs,new_alphas))
# Reduce Example
#reduce(lambda a,b: a+b, [1,2,3,4,5], 0)
Read No Noise Full File PSD Data
df=pd.read_csv('nonoise.txt', sep=' ',header=None, index_col=False)
noNoise = df.values
nnInputs = noNoise[:,0:301] # Do not include index 301 (from zero)
nnAnswers = noNoise[:,301]
print (nnInputs.shape)
print (nnAnswers.shape)
print (noNoise[500:550,150])
(2696, 301) (2696,) [0.1545 0.47516 0.4987 0.63712 0.3862 0.06153452 0.1665 0.07505714 0.229 0.37650002 0.19121 0.4748479 0.65413 0.2585336 0.5848 0.1771 0.0425 0.7967412 0.13154 0.1288136 0.08016283 0.4070833 0.2669557 0.70973 0.57939999 0.0492 0.19030001 0.1884396 0.52755536 0.74401 0.04349386 0.3753 0.5782 0.1777 0.1003821 0.09730412 0.10045774 0.5406 0.0633 0.21193427 0.587882 0.26356 0.81445313 0.0543 0.41665209 0.3176 0.68080002 0.43380001 0.22709999 0.3236634 ]
Read the Solution (Alphas)
df=pd.read_csv('solution.txt', sep=' ',header=None, index_col=False)
alphas = df.values
alphasT = np.transpose(alphas)
print(alphas.shape)
print (alphasT.shape)
print(min(alphas))
print(max(alphas))
(301, 1) (1, 301) [0.0036] [1.00058952]
First Let's Plot the Alphas (The 'Solution')
plt.plot(alphas, 'o', color='blue',marker=".", markersize=1);
To reduce dimensionality PCA was run on the 301 feature dataset. The first 20 principle components will computed to reduce the fitness search space.
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from sklearn import decomposition
from sklearn import datasets
from sklearn.preprocessing import StandardScaler
pca = decomposition.PCA(n_components=3)
x = np.array([
[0.387,4878, 5.42],
[0.723,12104,5.25],
[1,12756,5.52],
[1.524,6787,3.94],
])
x_std = StandardScaler().fit_transform(x)
principle_comp = pca.fit_transform(x_std)
vr = pca.explained_variance_ratio_
comp = pca.components_
print(vr,"\n")
print(comp, "\n")
print(principle_comp)
[0.6234565 0.35862207 0.01792143] [[ 0.64926351 -0.24566452 -0.71979569] [-0.42018576 -0.90471598 -0.07023474] [-0.63395648 0.34804876 -0.69062381]] [[-0.94063993 1.62373172 -0.06406894] [-0.7509653 -0.63365168 0.35357757] [-0.6710958 -1.11766206 -0.29312477] [ 2.36270102 0.12758202 0.00361615]]
A prior PCA was computed in Haskell / R.
Here we read that in for eventual comparison to the PCA computed by SkLearn
Haskell Version
#### From Haskell R
df=pd.read_csv('pcacolsPlusR.csv', sep=' ',header=None)
raw_cols = df.values
pcs = raw_cols[:,0:20] ## confusing doesn't copy [20]
raw_ans = raw_cols[:,20]
raw_cols
array([[-3.23416766e+00, 5.12070872e+00, -1.25477021e+00, ..., 3.32039147e-02, -1.08053296e-02, 3.09387871e+01], [-8.68216835e+00, -1.04887213e+01, 5.06402197e+00, ..., -7.27829689e-02, 3.48979274e-01, 1.28845941e+01], [ 3.22975410e+00, 7.81527557e-01, -1.16965427e-01, ..., 1.32063046e-02, -1.55150741e-02, 3.75778503e+01], ..., [ 1.56845456e+01, 1.48355749e+01, 1.46234028e+00, ..., 5.73328334e-03, 1.77270045e-02, 5.56608258e+01], [ 3.52727653e+00, -5.19561974e+00, -9.11529604e+00, ..., 2.67996338e-02, 3.56004508e-02, 4.79280577e+01], [ 6.37255399e+00, 1.89079334e+01, 2.58364165e+00, ..., 4.18560847e-02, 9.33230621e-04, 4.22117750e+01]])
# Restrict to 20 Cols of PCA to match Haskell
pca = decomposition.PCA(n_components=20)
x_std = StandardScaler().fit_transform(nnInputs)
principle_comps = pca.fit_transform(x_std)
vr = scanl_plus ( pca.explained_variance_ratio_ ) # make it cumulative
comp = pca.components_
print("Variance Ratios =>\n", vr,"\n")
#print(comp, "\n")
print("Principle Components =>\n",principle_comps)
Variance Ratios => [0, 0.6451305526885973, 0.878744185510165, 0.9695899303460758, 0.9819021285116163, 0.9900781178484303, 0.9944059631086534, 0.9970722530205757, 0.9981092875973712, 0.998814433589737, 0.9992615141113731, 0.9995618209466867, 0.9997226786785746, 0.9998232231028532, 0.9998876794241804, 0.9999267586743806, 0.9999483384695663, 0.9999609739943445, 0.9999691794925987, 0.9999752360910285, 0.9999798750098285] Principle Components => [[-3.23476764e+00 -5.12165867e+00 1.25500298e+00 ... 7.51627005e-02 -3.32100743e-02 -1.08073352e-02] [-8.68377900e+00 1.04906671e+01 -5.06496140e+00 ... -2.72652172e-01 7.27964753e-02 3.49043975e-01] [ 3.23035325e+00 -7.81672539e-01 1.16987125e-01 ... -1.66845463e-01 -1.32087549e-02 -1.55179481e-02] ... [ 1.56874553e+01 -1.48383271e+01 -1.46261156e+00 ... -6.12433882e-02 -5.73434736e-03 1.77302969e-02] [ 3.52793088e+00 5.19658359e+00 9.11698704e+00 ... -3.74485036e-02 -2.68046052e-02 3.56070526e-02] [ 6.37373617e+00 -1.89114411e+01 -2.58412094e+00 ... 1.44590025e-02 -4.18638495e-02 9.33404402e-04]]
pc_row1 = principle_comps[0,:]
pc_row2 = principle_comps[100,:]
pc_row3 = principle_comps[1000,:]
Let's see how well behaved the first 20 PCAs are ...
This is important since this is what we are assuming is somewhat smooth. Here we plot three samples from the post PCA dataset. This is the equivalent of printing the PSD for all 301 points, but we limit it to the 20 PCA cols.
plt.rcParams.update({'font.size': 12})
plt.plot(pc_row1, '-', color='blue',marker=".", markersize=10);
plt.plot(pc_row2, '-', color='orange',marker=".", markersize=10);
plt.plot(pc_row3, '-', color='green',marker=".", markersize=10);
# Add title and axis names
plt.title('First 20 PCA Values for Three Test Cases')
plt.xlabel('PCA Columns')
plt.ylabel('Values')
plt.legend(["PC1", "PC2", "PC3"])
nada = 0
To prepare for scaling we have to stitch together the PCAs and the 'Answer"
print(principle_comps.shape)
print(nnAnswers.shape)
m1 = np.array(list(map (lambda x : [x],nnAnswers))) # Why did I have to do this?
print(m1.shape)
fCases = np.append (principle_comps,m1,axis=1)
print(fCases.shape)
(2696, 20) (2696,) (2696, 1) (2696, 21)
# We are scaling fCases from above ..
# i.e. 20 inputs and one 'Answer' for 2696 rows
from sklearn import preprocessing
scaler = preprocessing.StandardScaler().fit(fCases)
pcas_scaled = scaler.transform(fCases)
pcas_scaled
#print (pcas_scaled.mean(axis=0))
#from sklearn.preprocessing import scale
#ans_scaled = scale( answers_pca, axis=0, with_mean=True, with_std=True, copy=True )
#all_scaled = np.append(pcas_scaled, ans_scaled, axis=0)
array([[-0.23213255, -0.61077084, 0.23999888, ..., -0.77780765, -0.28921911, -0.09891512], [-0.62316307, 1.25103876, -0.96859137, ..., 1.70495419, 9.34089549, -0.992661 ], [ 0.23181576, -0.09321644, 0.02237188, ..., -0.30936006, -0.41528157, 0.22974179], ..., [ 1.12575905, -1.76950829, -0.27970064, ..., -0.1343032 , 0.47448706, 1.12491251], [ 0.25317045, 0.61970582, 1.74347528, ..., -0.62778622, 0.95289356, 0.74211336], [ 0.45739038, -2.2552375 , -0.49417104, ..., -0.98048628, 0.02497918, 0.45913732]])
Write the scaled data out so Haskell can do fitness ..
np.savetxt("fcases_pca20.txt", pcas_scaled, delimiter=" ")
Individual ... [0.325880913228656,0.39667643306649025,0.7473014477142477,8.434155613051841e-2,-2.6434725189356578e-2,-1.4737156547028556e-2]
anInd = [
0.325880913228656,
0.39667643306649025,
0.7473014477142477,
8.434155613051841e-2,
-2.6434725189356578e-2,
-1.4737156547028556e-2]
Fitness Cases [FitnessCase [-0.23213254724324522,-0.6107708369235977,0.23999887959073862,0.8467276982575322,0.48775006287997447,-2.5080243207554913,-1.6827055558196122,-0.8305324734160002,-1.3677144725147194,0.8701923515463181,-3.4105364359106605,1.8618782853379916,-3.554742358123332,1.161281390129975,0.259121070757041,-1.494009766131712,-0.8479441055976709,1.5123999298371094,-0.7778076500051748,-0.2892190888978542] (-9.891512311482657e-2),FitnessCase [-0.6231630714884775,1.2510387625530222,-0.9685913744980068,-2.134637737482353,-1.1802306213456357,-0.3687864365511998,0.6374883143095859,-0.10991819345725727,0.3213625883996374,0.6128359489871866,-0.6762067819575514,1.7359594408021162,1.3894764274474027,-1.0282933311986495,1.5158998150040963,-2.258650865948252,-3.727967020197284,-5.486220238485005,1.7049540836202024,9.340896540033592] (-0.9926610004408698)]
fitnessCases = [
[-0.23213254724324522,-0.6107708369235977,0.23999887959073862,0.8467276982575322,0.48775006287997447,-2.5080243207554913,-1.6827055558196122,-0.8305324734160002,-1.3677144725147194,0.8701923515463181,-3.4105364359106605,1.8618782853379916,-3.554742358123332,1.161281390129975,0.259121070757041,-1.494009766131712,-0.8479441055976709,1.5123999298371094,-0.7778076500051748,-0.2892190888978542]
,[-0.6231630714884775,1.2510387625530222,-0.9685913744980068,-2.134637737482353,-1.1802306213456357,-0.3687864365511998,0.6374883143095859,-0.10991819345725727,0.3213625883996374,0.6128359489871866,-0.6762067819575514,1.7359594408021162,1.3894764274474027,-1.0282933311986495,1.5158998150040963,-2.258650865948252,-3.727967020197284,-5.486220238485005,1.7049540836202024,9.340896540033592]
]
out1 = -9.891512311482657e-2
out2 = -0.9926610004408698
Note that the length of the fitness case inputs is 20, and the range of x's is
$\ \ \ \ \ 0.0 < x < 3.0$
def calcValue (ind, anX):
return ( foldl (lambda power_value, b,:
(power_value[0]+1,(power_value[1]+b*anX**power_value[0])),
(0.0,0.0), ind) )[1]
def polyVals (ecVals,theLen):
# return list( map (\x -> calcValue ecVals x) [0.0,maxXValue/(fromIntegral len)..maxXValue])
return list(map (lambda x : calcValue (ecVals,x),np.arange(0,3, 3/theLen)))
def dot (theAs, theBs):
return ( sum (list (map (lambda a,b : a*b, theAs, theBs))))
pvs = polyVals(anInd,20)
plt.plot(pvs, '-', color='blue',marker=".", markersize=10);
plt.plot(fitnessCases[0], '-', color='orange',marker=".", markersize=10);
plt.plot(fitnessCases[1], '-', color='green',marker=".", markersize=10);
# Add title and axis names
plt.title('Polyvals vs Fitness')
plt.xlabel('Cols')
plt.ylabel('Values')
plt.legend(["PVS", "Fitn1", "Fitn2"])
nada = 0
err1 = dot (pvs,fitnessCases[0]) - out1
err2 = dot (pvs, fitnessCases[1]) - out2
theSum = err1*err1 + err2*err2
theAvg = theSum / 2
theAvg
370.9277067139941
Haskell Matches this Fitness!
Guessing first
a0=0
a1=200
a2=3
a3=4
a4=50
a5=10
f = lambda x : a0 + (a1-x)*(a2-x)*(a3-x)*(a4-x)*(a5-x)
y = list(map (f,np.arange(0,10,0.1)))
plt.plot(y, '-', color='orange',marker=".", markersize=10);
After playing with it a bit it looks like the search space for a reasonable shape (not just a monotinically increasing/decreasing curve) is big .. Let's try to fix this by restricting the $a's$ using some constraints ..
Try to compute $a_0 \rightarrow a_4$ by giving two constraints, $$ \begin{align} y = 0\ \ at\ \ x = -5\ \ and\ \ x = 5\\ \end{align} $$ and $$ \begin{align} \frac{dy}{dx} = 0\ \ at\ x = -5 \ \ and\ \ x = 5 \end{align}$$
So,
$$ \begin{align} &y = a_0 + a_1x + a_2x^2 + a_3x^3 + a4x^4 \label{y}\\ &\frac{dy}{dx} = a_1 + 2a_2x + 3a_3x^2 + 4a_4x^3\label{dydx} \end{align}$$Take Eq $\eqref{y}$ and substitute 5, and -5 for x,
Adding Eq's $\eqref{yplus}$ and $\eqref{yminus}$, and also subtracting Eq $\eqref{yminus}$ from Eq $\eqref{yplus}$ yields,
Let's do the same thing with the derivative equation $\eqref{dydx}$
Adding Eq's $\eqref{dydxplus}$ and $\eqref{dydxminus}$, and also subtracting Eq $\eqref{dydxminus}$ from Eq $\eqref{dydxplus} yields,
Collecting the parametric equations ..
This is not completely solvable but we can do a little work ..
But wait .... There is a conflict if we look at Eq's $\eqref{a1a3y}$ and $\eqref{a1a3dydx}$ !!
Let's try both and plot them ...
# Guesses
a4=0.1
a3=50
# Calculated
a2=25*a4
a0=-25*a2
a1=-25*a3
f = lambda x : a0 + a1*x + a2*x**2 + a3*x**3 + a4*x**4
xs = np.arange(-6,6,0.05)
y = list(map (f,xs))
plt.plot(xs,y, '-', color='orange',marker=".", markersize=10);
a1=-75*a3
y2 = list(map(f,xs))
plt.plot(xs,y2, '-', color='green',marker=".", markersize=10);
plt.title("Polynomials with two a1\'s")
plt.xlabel('Cols')
plt.ylabel('Values')
plt.legend(["a1 =-25a3", "a1 = -75a3"])
print (a0,a1,a2,a3,a4)
-62.5 -3750 2.5 50 0.1
For example, $$P_{w1}*f(1) + P_{w2}*f(2) + P_{w3}*f(3) + .... P_{w301}*f(301) = S_1 \ \ \ \ \ \textbf{Eq (2)}$$ $$\vdots$$
Where f is a polynomial,
$$ f(x) = a_0 + a_1*x + a_2*x^2 + a_3*x^3 ... a_n*x^n\ \ \ \ \ \textbf{Eq (3)}$$
And P is the PSD input vector.
And R is the scaler red sensor response
X= [1,2,3]
from sklearn.preprocessing import scale
X = scale( X, axis=0, with_mean=True, with_std=True, copy=True )
X
from sklearn.preprocessing import scale
X = scale(pcas_scaled[:,-1], axis=0, with_mean=True, with_std=True, copy=True )
X
Ok, Here we will just take the 20 cols of PCA with unscaled output .. and try the polynomial idea .. Later we can try scaling the output with zero centered mean and a variance of one.
df=pd.read_csv('pcacolsPlusR.csv', sep=' ',header=None, index_col=False)
pca20= df.values
pca20Ins = pca20[:,0:20] # Do not include index 301 (from zero)
pca20Ans = pca20[:,20]
pca20SclAns = scale( pca20Ans, axis=0, with_mean=True, with_std=True, copy=True )
So we let Haskell and a basic EC algorithm work on a solution. The representation was 6 floating point numbers, $a_0 \rightarrow a_5$ as shown in Eq (3). During the evolution a candate solution is chosen, then each of the 301 points is passed through the polynomial, resulting in 301 new numbers. A dot product is now performed with the input data as shown in Eq (2) yielding $S_x$. This is a potential solution which is compared with the actual solution to generate the actual fitness used for subsequent selection.
For the first test three PCA columns were used as fitness cases, with a polynomial length of 4. It trained well as the original fitness was $10^{13}$ and ended up after about 1700 generations at 250.
It's best solution was,
a0 = 0.8370480734416847
a1 = -2.5529616676687357
a2 = -7.162803406956089e-2
a3 = 1.5195077322214723
a4 = 0.3133276367356378
# $$
# \begin{align}
# a_0=&0.500337\\
# \ a_1=&0.1452207\\
# \ a_2=&4.02314e-3\\
# \ a_3=&1.371223e-2
# \end{align}
# $$
Let's plot it .. Note: Python Lambda, Map, and Filter
q2 = np.linspace(0,5,num=300)
# Using List Comprehension ..
m0 = [ (a0 + a1*x + a2*x**2 + a3*x**3) for x in q2 ]
# Using Map
#m1 = list (map (lambda x : a0 + a1*x + a2*x**2 + a3*x**3,q2))
m1 = list (map (lambda x : a0 + a1*x + a2*x**2 + a3*x**3 + a4*x**4,q2))
plt.title('Estimated Camera Transfer Function')
plt.xlabel('Relative Wavelength')
plt.ylabel('Amplitude')
plt.legend([""])
plt.plot(m1, 'o', color='blue',marker=".", markersize=1)
nada = 0
#m1[0:10]
First let's plot the real solution ..
np.exp(complex(3,1))
SAMPLE_RATE = 44100 # Hertz
DURATION = 5 # Seconds
def generate_sine_wave(freq, sample_rate, duration):
x = np.linspace(0, duration, sample_rate * duration, endpoint=False)
frequencies = x * freq
# 2pi because np.sin takes radians
y = np.sin((2 * np.pi) * frequencies)
return x, y
# Generate a 2 hertz sine wave that lasts for 5 seconds
x, y = generate_sine_wave(2, SAMPLE_RATE, DURATION)
plt.plot(x, y)
plt.show()
_, nice_tone = generate_sine_wave(400, SAMPLE_RATE, DURATION)
_, noise_tone = generate_sine_wave(4000, SAMPLE_RATE, DURATION)
noise_tone = noise_tone * 0.3
mixed_tone = nice_tone + noise_tone
normalized_tone = np.int16((mixed_tone / mixed_tone.max()) * 32767)
plt.plot(normalized_tone[:1000])
plt.show()
from scipy.io.wavfile import write
# Remember SAMPLE_RATE = 44100 Hz is our playback rate
write("mysinewave.wav", SAMPLE_RATE, normalized_tone)
<lambda>() missing 1 required positional argument:from scipy.fft import rfft, rfftfreq
# Number of samples in normalized_tone
N = SAMPLE_RATE * DURATION
yf = rfft(normalized_tone)
xf = rfftfreq(N, 1 / SAMPLE_RATE)
plt.plot(xf, np.abs(yf))
plt.show()
# The maximum frequency is half the sample rate
points_per_freq = len(xf) / (SAMPLE_RATE / 2)
# Our target frequency is 4000 Hz
target_idx = int(points_per_freq * 4000)
yf[target_idx - 1 : target_idx + 2] = 0
plt.plot(xf, np.abs(yf))
plt.show()
Now the inverse FFT
from scipy.fft import irfft
new_sig = irfft(yf)
plt.plot(new_sig[:1000])
plt.show()
How can I find the point spread function ( impulse response function) of a camera? Using a 2D-Convolution technique
Maybe you could guess and use deconvlucy() until you get your fuzzballs as point-like as possible.
<lambda>() missing 1 required positional argument:df2=pd.read_csv('walowit_phone1_35_50_R.spd', sep='\t',header=None, index_col=False)
df2_vals = df2.values
df2_vals.shape
single = df2_vals[:,1]
sns.set_style("darkgrid", {"axes.facecolor": ".9"})
plt.plot(single, 'o', color='blue',marker=".", markersize=1);
from scipy.fft import rfft, rfftfreq
print(len(single))
DURATION=1
SAMPLE_RATE = 376
# Number of samples in normalized_tone
N = SAMPLE_RATE * DURATION
yf = rfft(single)
xf = rfftfreq(N, 1 / SAMPLE_RATE)
plt.plot(xf[0:25], np.abs(yf)[0:25])
plt.show()
from scipy.fft import irfft
new_sig = irfft(yf)
plt.plot(new_sig[:1000])
plt.show()