#
# PS4Cast.PY
# module encoding the forecasting package.
#
# date: 2017-12-21
# author: GIUSEPPE PUGLISI
#
# Copyright (C) 2017 Giuseppe Puglisi giuspugl@sissa.it
#
import pylab as pl
import healpy as hp
import numpy as np
import h5py as h5
from pointsource_utilities import *
from Stats import *
import glob
import scipy as sp
from scipy import interpolate
from scipy import integrate
from scipy.interpolate import interp1d
import scipy.optimize as so
from scipy.optimize import minimize_scalar
import string
import astropy
from astropy import units as u, constants as C
import matplotlib
from PointSources import *
from matplotlib import rcParams
rcParams.update({'figure.autolayout': True})
[docs]class Experiment(object):
"""
Initialize a class encodes all the nominal specifics of an experiment, i.e. resolution, sensitivity, size of the observational area,
frequency channels.
**Parameters**
- `ID`:{string}
name of the experiment, needed for plotting and printing info.
- `frequency`:{float or sequence}
the frequency of channels (assumed GHz)
- `nchannels`:{int}
number of frequency channels
- `fwhm`:{float or sequence}
the the Full Width at Half Maximum resolution, units can be deg, arcmin, or arcsec, set the keyword argument ``units_beam``,
(Default= ``arcmin``)
- `sensitivity`:{float or sequence}
the sensitivity at each channel. Default units are :math:`\mu K arcmin`, but one can change different units by the ``units_sensitivity``
keyword.
- `fsky`:{float}
percentage of the sky observed by the experiment.
.. note::
If ``units_sensitivity=uKsqrts`` then another keyword must be set, the observational time.
"""
[docs] def get_pixelsize(self):
"""
Given the fwhm, it computes the optimal (Healpix) pixelization such that the pixel size is about 3 times smaller than the resolution.
"""
nsides=np.power(2, np.arange(5,17))
if self.nchans>1:
pixsize=[]
for fwhm in (self.fwhm):
nside=nsides[np.argmin([ abs(fwhm.value /3 -hp.nside2resol(n, arcmin=True)) for n in nsides]) ]
pixsize.append(hp.nside2resol( nside , arcmin=True))
else:
nside=nsides[np.argmin([ abs(self.fwhm.value /3 -hp.nside2resol(n, arcmin=True)) for n in nsides]) ]
pixsize=(hp.nside2resol( nside , arcmin=True))
(pixsize)*= u.arcmin
return pixsize
[docs] def uKarcmin2Jy(self):
"""
compute the :math:`3\sigma` detection flux limit from the sensitivity in :math:`\mu K arcmin`.
"""
self.sigmanoise*= u.uK*u.arcmin
pixelsize=self.get_pixelsize()
K2Jy=(2.* C.k_B.cgs* (self.nu.to( u.Hz)/ C.c.cgs)**2 *self.omega_b.value ).to(u.Jy /u.uK )
return 3*( K2Jy*self.sigmanoise/pixelsize )
[docs] def uKsrts2Jy(self,**kwargs):
"""
Compute the :math:`3\sigma` detection flux limit from the sensitivity in :math:`\mu K \sqrt{s}` computed from the
Noise Effective Temperature computed on the whole array of focal plane detectors.
"""
time =kwargs['timeobserv'].to(u.s)
Nbeams=self.fsky / self.omega_b #nbeams within the observational patch
noiseT=self.sigmanoise* u.uK /np.sqrt(time.value ) *np.sqrt(Nbeams)
K2Jy=(2.* C.k_B.cgs* (self.nu.to( u.Hz)/ C.c.cgs)**2 *self.omega_b.value ).to(u.Jy /u.uK )
return 3* ( K2Jy * noiseT)
[docs] def Jyperbeam2Jy (self):
"""
Compute the :math:`3\sigma` detection flux limit from the brightness ensitivity.
"""
self.sigmanoise*= u.Jy / u.sr
return 3*(self.sigmanoise * self.omega_b)
[docs] def print_info(self):
c= bash_colors()
print c.header("==="*30)
print c.bold(self.ID + "\tSpecifics\n")
print "Frequency\t......",self.nu
print "Flux limit\t......",self.flux_lim
print "Resolution\t......",self.fwhm
print "# channels\t......",self.nchans
print "Fraction of sky\t......",self.fsky/4./np.pi/u.sr
print "Beam angle\t......",self.omega_b
def __init__(self, ID, sensitivity, frequency, fwhm, fsky,nchannels=1,
units_sensitivity='uKarcmin',units_beam='arcmin', **kwargs):
dic_units_beam={'deg':u.deg, 'arcmin':u.arcmin,'arcsec':u.arcsec}
self.ID=ID
self.sigmanoise=sensitivity
self.nu=(frequency *u.gigahertz)
self.fwhm=fwhm * dic_units_beam[units_beam]
self.nchans=nchannels
self.omega_b =beamsolidangle(self.fwhm )
self.fsky=fsky *4*np.pi *u.sr
dic_converter={'uKarcmin':self.uKarcmin2Jy, 'uKsqrts':self.uKsrts2Jy,
'Jyperbeam':self.Jyperbeam2Jy, 'Jy':lambda :self.sigmanoise*u.Jy}
self.flux_lim= dic_converter[units_sensitivity](**kwargs)
[docs]class Forecaster(object):
"""
Given the specifics of an experiment, it forecasts the contribution of detected and undetected point sources (only Radio Quasars are considered so far). It inheritates the attributes from the :class:`Experiment`, needed to be initialized, the level
of point source detection. The forecasts start when the :class:`Forecaster` class is called.
**Parameters**
- `Experiment`:{:class:`Experiment`}
- `sigmadetection`:{float}
set the level above one can consider point sources as *detected*.
- `ps4c_dir`:{string}
path to the ``PS4C/data`` and ``PS4C/model`` folders
"""
def __init__(self, Experiment, sigmadetection=5., ps4c_dir='./'):
self.ID=Experiment.ID
self.Exp=Experiment
self.nu=Experiment.nu
self.fwhm=Experiment.fwhm
self.omega_b = Experiment.omega_b
self.fsky=Experiment.fsky
self.flux_lim= sigmadetection*Experiment.flux_lim
self.nchans=Experiment.nchans
self.workingdir=ps4c_dir
[docs] def forecast_pi2scaling(self, **kwargs):
"""
Compute the best fit parameters of the linear fit :math:`< Pi^2 > = A x +B`
"""
#compute the frequency scaling of the <Pi2(freq)> = A*freq +B, to forecast at any frequency
self.A,self.B,self.sigmaA,self.sigmaB=compute_polfrac_scaling(self.workingdir+'data/', **kwargs )
#
[docs] def forecast_nsources_in_the_patch(self):
"""
Given the size of the observational patch it computes how many sources should be detected above the detection flux limit.
"""
self.nsources ={}
Integralcount = lambda integrand, Smin,Smax: sp.integrate.quad( integrand, Smin,Smax,limit=1000 ,epsrel=1.e-3) [0]
if self.nchans >1:
idsort=np.argsort(self.nu.value)
ziplist=zip(self.nu[idsort].value, self.flux_lim[idsort])
else:
ziplist=zip([self.nu.value], [self.flux_lim])
for nu, Smin in ziplist:
if Smin<self.S[nu].min():
Smin=self.S[nu].min()
totcounts=Integralcount(self.dNdS[nu], Smin.value, self.Smax.value) /u.sr
self.nsources[nu]=int((totcounts )*self.fsky )
[docs] def get_npolsources_from_flux(self,minflux=None ):
"""
By default, it computes how many polarized sources should be observed above the detection limit
otherwise the sources whose flux >``minflux``.
"""
Integralcount = lambda integrand, Smin,Smax: sp.integrate.quad( integrand, Smin,Smax,limit=1000 ,epsrel=1.e-3) [0]
if minflux is None:
self.npolsources = {}
if self.nchans >1:
idsort=np.argsort(self.nu.value)
ziplist=zip(self.nu[idsort].value,self.flux_lim[idsort].value )
else:
ziplist=zip([self.nu.value], [self.flux_lim].value)
for nu, Pmin in ziplist:
if Pmin<=self.P[nu].min():
Pmin=self.P[nu].min()
totcounts=Integralcount(self.dNdP[nu], Pmin , self.P[nu].max() ) /u.sr
self.npolsources[nu]=int((totcounts )*self.fsky )
return self.npolsources
else:
if self.nchans >1:
idsort=np.argsort(self.nu.value)
ziplist=zip(self.nu[idsort].value,[minflux]*self.nchans )
else:
ziplist=zip([self.nu.value], [minflux]*self.nchans )
npolsources = {}
for nu, Pmin in ziplist:
totcounts=Integralcount(self.dNdP[nu], Pmin , self.P[nu].max() ) /u.sr
npolsources[nu]=int((totcounts )*self.fsky )
return npolsources
[docs] def get_differential_number_counts(self):
"""
Given the frequency channels finds the total intensity number counts from predictions of De Zotti et al. 2005 model (prediction at
several frequencies are provided in `./PS4C/model` directory) and computes the polarization number counts by deconvolving
the intensity number counts with the probability distribution of fractional polarization.
"""
modeldir=self.workingdir+'model/'
model_avail= glob.glob(modeldir+'number_counts_*')
frequency_model_avail=[(k.split("ghz")[0].split("_")[-1]) for k in model_avail]
self.dNdS={}
self.S={}
self.dNdP={}
self.P={}
datadir=self.workingdir+'data/'
data_avail= glob.glob(datadir+'bestfit_params_*GHz*')
bestfit_freqs=[i.split("GHz")[0].split("_")[-1] for i in data_avail]
if self.nchans>1:
for nu in self.nu:
idxfreq=np.argmin(abs(nu.value - np.array(map(float,frequency_model_avail) ) ) )
model= read_number_counts(modeldir+'number_counts_'+frequency_model_avail[idxfreq]+'ghz.dat')
S= np.power(10, model['#']) * u.Jy
differential_counts=np.power(10, model['Total \n'] ) /np.sqrt(S**(5)) /u.Jy /u.sr
self.dNdS[nu.value]=interp1d(S, differential_counts)
self.S[nu.value ]= S
self.Smin=S.min()
idxparams=np.argmin(abs(nu.value - np.array(map(float,bestfit_freqs) ) ) )
A,mu,sigma=np.load(data_avail[idxparams])[0]
self.P[nu.value],self.dNdP[nu.value]= compute_dNdP(A, mu,sigma, self.dNdS[nu.value], self.S[nu.value])
else:
idxfreq=np.argmin(abs(self.nu.value - np.array(map(float,frequency_model_avail) ) ) )
model= read_number_counts(modeldir+'number_counts_'+frequency_model_avail[idxfreq]+'ghz.dat')
S= np.power(10, model['#']) * u.Jy
differential_counts=np.power(10, model['Total \n'] ) /np.sqrt(S**(5)) /u.Jy /u.sr
self.dNdS[self.nu.value ]=interp1d(S, differential_counts)
self.S[self.nu.value ]= S
self.Smin=S.min()
idxparams=np.argmin(abs(self.nu.value - np.array(map(float,bestfit_freqs) ) ) )
A,mu,sigma=np.load(data_avail[idxparams])[0]
self.P[self.nu.value],self.dNdP[self.nu.value]= compute_dNdP(A, mu,sigma, self.dNdS[self.nu.value], self.S[self.nu.value])
[docs] def get_differential_number_counts_tucci(self):
"""
Given the frequency channels finds the total intensity number counts from predictions of Tucci et al. 2011 model and computes the
polarization number counts as in :func:`get_differential_number_counts`.
"""
corrfact={15:1.1,20:1,30:1.6,44:1.4,70:1.1,100:1.,143:1.,217:.8 }
self.dNdS={}
self.S={}
self.dNdP={}
self.P={}
datadir=self.workingdir+'data/'
data_avail= glob.glob(datadir+'bestfit_params_*GHz*')
bestfit_freqs=[i.split("GHz")[0].split("_")[-1] for i in data_avail]
if self.nchans>1:
for nu in self.nu:
if nu.value <=20 or nu.value >=100:
model_avail= glob.glob(self.workingdir+'model/ns_radio*')
frequency_model_avail=[(k.split("GHz")[0].split("_")[-1]) for k in model_avail]
idxfreq=np.argmin(abs(nu.value - np.array(map(float,frequency_model_avail) ) ) )
tucci= np.loadtxt(model_avail[idxfreq])
else:
tucci=np.loadtxt(self.workingdir+'model/ns_radio_95GHz_C2Ex.dat')
idf=np.argmin(abs(nu.value -np.array(corrfact.keys())))
correction= corrfact.values()[idf]
S= tucci[:,0] * u.Jy
differential_counts=tucci[:,1] *correction /u.Jy /u.sr
self.dNdS[nu.value]=interp1d(S, differential_counts)
self.S[nu.value ]= S
self.Smin=S.min()
idxparams=np.argmin(abs(nu.value - np.array(map(float,bestfit_freqs) ) ) )
A,mu,sigma=np.load(data_avail[idxparams])[0]
self.P[nu.value],self.dNdP[nu.value]= compute_dNdP(A, mu,sigma, self.dNdS[nu.value], self.S[nu.value])
else:
nu=self.nu
if nu.value <=20 or nu.value >=100:
model_avail= glob.glob(self.workingdir+'model/ns_radio*')
frequency_model_avail=[(k.split("GHz")[0].split("_")[-1]) for k in model_avail]
idxfreq=np.argmin(abs(nu.value - np.array(map(float,frequency_model_avail) ) ) )
tucci= np.loadtxt(model_avail[idxfreq])
else:
tucci=np.loadtxt(self.workingdir+'model/ns_radio_95GHz_C2Ex.dat')
idf=np.argmin(abs(nu.value -np.array(corrfact.keys())))
correction= corrfact.values()[idf]
S= tucci[:,0] * u.Jy
differential_counts=tucci[:,1] *correction /u.Jy /u.sr
self.dNdS[nu.value]=interp1d(S, differential_counts)
self.S[nu.value ]= S
self.Smin=S.min()
idxparams=np.argmin(abs(nu.value - np.array(map(float,bestfit_freqs) ) ) )
A,mu,sigma=np.load(data_avail[idxparams])[0]
self.P[nu.value],self.dNdP[nu.value]= compute_dNdP(A, mu,sigma, self.dNdS[nu.value], self.S[nu.value])
def __call__(self,model='D05', **kwargs ):
"""
The forecast session.
**Parameters**
- `model`:{string}
if ``model!=D05`` it runs the Tucci et al.2011 model for number counts.
- kwargs:
keywords used by functions in :module:`Stats` and :module:`IO`: ``verbose=False``, ``saveplot=string`` , ``bestfitparams2file=True``.
"""
#Read number counts from theoretical model
if model=='D05':
self.get_differential_number_counts()
else:
self.get_differential_number_counts_tucci()
self.Smax=10. *u.Jy
self.forecast_nsources_in_the_patch()
self.get_npolsources_from_flux()
self.flux_conf={}
self.Pi, self.errPi, self.Pi2={},{},{}
self.C_detdsources, self.C_confsources={},{}
self.C_poldet, self.C_polconf={},{}
if self.nchans >1 :
ziplist= zip(self.omega_b,self.nu.value , self.flux_lim )
else:
ziplist=zip([self.omega_b],[self.nu.value] , [self.flux_lim] )
for beam, nu, flux_lim in ziplist: #np.ndenumerate(np.sort(self.S.keys())) :
K, Gamma = fit_numbercounts_w_powerlaw(self.S[nu], self.dNdS[nu], self.Smax, **kwargs)
self.flux_conf[nu]= ( compute_confusion_limit(K,Gamma, beam))
Pi=linfunc(nu*u.GHz, self.A, + self.B )
errPi=linfunc(nu*u.GHz, self.A+self.sigmaA, self.B+ self.sigmaB ) - Pi
Pi2=((Pi+errPi)*1e-2)**2
self.Pi2[nu]=Pi2
self.errPi[nu]=errPi
self.Pi[nu]=Pi
# compute contribution on TT, EE, BB
c= bash_colors()
try:
self.C_detdsources[nu] = estimate_power_spectrum_contribution(self.dNdS[nu],nu*u.GHz ,beam,
self.Smin.value,flux_lim.value )
except ValueError:
print c.warning("detection limit too low (below the model prediction), unable to compute contribution of undetected sources")
self.C_detdsources[nu] = 0.
try:
self.C_confsources[nu] = estimate_power_spectrum_contribution(self.dNdS[nu],nu*u.GHz , beam,
self.Smin.value,self.flux_conf[nu].value)
except ValueError:
print c.warning("confusion limit too low (below the model prediction), unable to compute contribution of confused sources")
self.C_confsources[nu] = 0.
self.C_poldet[nu]=1./2. * self.Pi2[nu] * self.C_detdsources[nu]
[docs] def print_info(self):
self.Exp.print_info()
c= bash_colors()
print c.header("///"*30)
print c.bold("Forecasted quantities\n")
print "Frequency \t#sources[S,P]\t Confusion\t <Pi> \t <Pi^2>x1e3 \tD^TT(lensing)\tD^BB(lensing)\n"
for i in (np.sort(self.C_detdsources.keys())):
print "{0}\t{4}\t{5}\t{1.value:g}{1.unit:FITS}\t{2:.2f}\t{3:.2f}\t".format(i*u.GHz, self.flux_conf[i].to(u.mJy), self.Pi[i],self.Pi2[i]*1e3, self.nsources[i], self.npolsources[i])," {0.value:g} {0.unit:FITS}\t {1.value:g} {1.unit:FITS}".format(self.C_detdsources[i]* 1e6 /2./np.pi/u.sr ,self.C_poldet[i]* 1e6 /2./np.pi/u.sr)
print c.header("==="*30)
[docs] def plot_powerspectra(self,spectra_to_plot='all', FG='compsep', **kwargs ):
"""
Plot CMB and undetected Point sources Power spectra.
**Parameters**
- `spectra_to_plot`:{string}
a string among:
- `all`: T, E and B spectra will be shown
- `TandB`: T and B spectra
- `Tonly`: T spectrum
- `Bonly`: B spectrum
- `FG`:{string}
show the Galactic foreground levels, a string among:
- `compsep`: the 95perc of residuals after component separation
- `total`: total amplitude
- `none`: will not be shown
- `kwargs`:
keywords for matplotlib plots.
"""
dic_to_plot={'all':['TT', 'EE', 'BB'], 'TandB':['TT', 'BB'], 'Tonly':['TT'],'Bonly':['BB'] }
dic_spectra={'all':range(3), 'TandB':[0,2], 'Tonly':[0], 'Bonly':[2]}
dic_subplot={'all':(3,1), 'TandB':(2,1), 'Tonly':(1,1), 'Bonly':(1,1)}
FGfactor={'compsep':0.01, 'total':1, 'none':0}
ell2=lambda x: x*(x+1) /2./np.pi
lens=np.array(hp.read_cl(self.workingdir+'data/lensedCls.fits'))
l=(np.arange(len(lens[-1])))
tens=np.array(hp.read_cl(self.workingdir+'data/r_0.05_tensCls.fits'))
subplot=dic_subplot[spectra_to_plot]
parameters= self.nu.value
norm = matplotlib.colors.Normalize(
vmin=np.min(parameters),
vmax=np.max(parameters))
c_m = matplotlib.cm.viridis
s_m = matplotlib.cm.ScalarMappable(cmap=c_m, norm=norm)
s_m.set_array([])
if self.nchans>1:
fwhm=min(self.fwhm)
idsort=np.argsort(self.nu.value)
ziplist=zip(self.nu[idsort].value, parameters)
else:
fwhm=self.fwhm
ziplist=zip([self.nu.value], [parameters])
pl.title(self.ID, fontsize=12)#+' '+str(nu)+' GHz')
J=0
for I in dic_spectra[spectra_to_plot]:
pl.subplot(subplot[0],subplot[1],J+1)
for nu, p in ziplist:
ellmax=np.pi/(fwhm).to(u.rad).value
ell=np.arange(1,ellmax)
pl.loglog(l,ell2(l)* (lens[I] +tens[I]),'k' )
if I==2:
pl.loglog(l,ell2(l)*lens[I],'k-.')
pl.loglog(l,ell2(l)*tens[I],'k-.')
pl.fill_between( ell, FGfactor[FG] * forecast_Galaxy (ell,nu*u.GHz, self.fsky).value,
FGfactor[FG]*forecast_Galaxy (ell,nu*u.GHz, self.fsky).value/10.,
alpha=0.3, color=s_m.to_rgba(p) )
if I==0:
pl.loglog(ell,ell2(ell)*self.C_confsources[nu] ,':', lw=2, color=s_m.to_rgba(p))
pl.loglog(ell,ell2(ell)*self.C_detdsources[nu],'--', lw=2, color=s_m.to_rgba(p))
else:
pl.loglog(ell,ell2(ell)*self.C_poldet[nu],'--', lw=2, color=s_m.to_rgba(p))
J+=1
pl.yticks(fontsize=14)
pl.ylabel(r'$\mathcal{D}_\ell\, [\mu K ^2]$', fontsize=19)
pl.tight_layout()
if I ==2 or dic_spectra[spectra_to_plot]=='Tonly' :
pl.xticks(fontsize=14)
pl.xlabel(r'$\ell$', fontsize=17)
else:
pl.xticks(visible=False )
if 'ylim' in kwargs:
pl.ylim(kwargs['ylim'])
if 'xlim' in kwargs:
pl.xlim(kwargs['xlim'])
else:
pl.xlim([10,ellmax ])
if 'savefig' in kwargs:
pl.savefig(kwargs['savefig'])
pl.show()
pl.close('all')
[docs] def plot_integral_numbercounts(self, **kwargs):
"""
Plot integral number counts, i.e. number of sources above the detection flux per square degree units.
"""
parameters= self.nu.value
# norm is a class which, when called, can normalize data into the
# [0.0, 1.0] interval.
norm = matplotlib.colors.Normalize(
vmin=np.min(parameters),
vmax=np.max(parameters))
# choose a colormap
c_m = matplotlib.cm.viridis
# create a ScalarMappable and initialize a data structure
s_m = matplotlib.cm.ScalarMappable(cmap=c_m, norm=norm)
s_m.set_array([])
Integralcount = lambda integrand, Smin,Smax: sp.integrate.quad( integrand, Smin,Smax,limit=1000 ,epsrel=1.e-3) [0]
if self.nchans>1:
idsort=np.argsort(self.nu)
ziplist=zip(self.nu[idsort].value, parameters)
#ziplist=zip(self.S.keys()[idsort], parameters)
else:
ziplist=zip([self.nu.value ], [parameters])
for nu ,p in ziplist:
fluxes= self.S[nu][1::3]
Ncounts=[Integralcount(self.dNdS[nu],Smin.value , self.Smax.value)/u.sr.to((u.deg)**2) for Smin in fluxes ]
pl.loglog(fluxes, Ncounts, label=str(int(nu))+' GHz', color=s_m.to_rgba(p))
pl.legend(loc='best', fontsize=12)
pl.xlabel(r'$S_{cut}$ [ Jy ]',fontsize=12)
pl.ylabel(r'$N(>S) [ \, deg^{-2}\, ]$',fontsize=12)
pl.yticks(fontsize=13)
pl.xticks(fontsize=13)
if 'ylim' in kwargs:
pl.ylim(kwargs['ylim'])
if 'xlim' in kwargs:
pl.xlim(kwargs['xlim'])
if 'savefig' in kwargs:
pl.savefig(kwargs['savefig'])
pl.tight_layout()
pl.grid(True)
pl.show()