Source code for PS4Cast

#
#   PS4Cast.PY
#   module encoding the forecasting package.
#
#   date: 2017-12-21
#   author: GIUSEPPE PUGLISI
#
#   Copyright (C) 2017   Giuseppe Puglisi    giuspugl@sissa.it
#


import pylab as pl
import healpy as hp
import numpy as np
import h5py as h5
from pointsource_utilities import *
from Stats import *
import glob
import scipy as sp
from scipy import interpolate
from scipy import integrate
from scipy.interpolate import interp1d
import scipy.optimize as so
from scipy.optimize import minimize_scalar
import string
import astropy
from astropy import units as u, constants as C
import matplotlib
from PointSources import *

from matplotlib import rcParams
rcParams.update({'figure.autolayout': True})

[docs]class Experiment(object): """ Initialize a class encodes all the nominal specifics of an experiment, i.e. resolution, sensitivity, size of the observational area, frequency channels. **Parameters** - `ID`:{string} name of the experiment, needed for plotting and printing info. - `frequency`:{float or sequence} the frequency of channels (assumed GHz) - `nchannels`:{int} number of frequency channels - `fwhm`:{float or sequence} the the Full Width at Half Maximum resolution, units can be deg, arcmin, or arcsec, set the keyword argument ``units_beam``, (Default= ``arcmin``) - `sensitivity`:{float or sequence} the sensitivity at each channel. Default units are :math:`\mu K arcmin`, but one can change different units by the ``units_sensitivity`` keyword. - `fsky`:{float} percentage of the sky observed by the experiment. .. note:: If ``units_sensitivity=uKsqrts`` then another keyword must be set, the observational time. """
[docs] def get_pixelsize(self): """ Given the fwhm, it computes the optimal (Healpix) pixelization such that the pixel size is about 3 times smaller than the resolution. """ nsides=np.power(2, np.arange(5,17)) if self.nchans>1: pixsize=[] for fwhm in (self.fwhm): nside=nsides[np.argmin([ abs(fwhm.value /3 -hp.nside2resol(n, arcmin=True)) for n in nsides]) ] pixsize.append(hp.nside2resol( nside , arcmin=True)) else: nside=nsides[np.argmin([ abs(self.fwhm.value /3 -hp.nside2resol(n, arcmin=True)) for n in nsides]) ] pixsize=(hp.nside2resol( nside , arcmin=True)) (pixsize)*= u.arcmin return pixsize
[docs] def uKarcmin2Jy(self): """ compute the :math:`3\sigma` detection flux limit from the sensitivity in :math:`\mu K arcmin`. """ self.sigmanoise*= u.uK*u.arcmin pixelsize=self.get_pixelsize() K2Jy=(2.* C.k_B.cgs* (self.nu.to( u.Hz)/ C.c.cgs)**2 *self.omega_b.value ).to(u.Jy /u.uK ) return 3*( K2Jy*self.sigmanoise/pixelsize )
[docs] def uKsrts2Jy(self,**kwargs): """ Compute the :math:`3\sigma` detection flux limit from the sensitivity in :math:`\mu K \sqrt{s}` computed from the Noise Effective Temperature computed on the whole array of focal plane detectors. """ time =kwargs['timeobserv'].to(u.s) Nbeams=self.fsky / self.omega_b #nbeams within the observational patch noiseT=self.sigmanoise* u.uK /np.sqrt(time.value ) *np.sqrt(Nbeams) K2Jy=(2.* C.k_B.cgs* (self.nu.to( u.Hz)/ C.c.cgs)**2 *self.omega_b.value ).to(u.Jy /u.uK ) return 3* ( K2Jy * noiseT)
[docs] def Jyperbeam2Jy (self): """ Compute the :math:`3\sigma` detection flux limit from the brightness ensitivity. """ self.sigmanoise*= u.Jy / u.sr return 3*(self.sigmanoise * self.omega_b)
[docs] def print_info(self): c= bash_colors() print c.header("==="*30) print c.bold(self.ID + "\tSpecifics\n") print "Frequency\t......",self.nu print "Flux limit\t......",self.flux_lim print "Resolution\t......",self.fwhm print "# channels\t......",self.nchans print "Fraction of sky\t......",self.fsky/4./np.pi/u.sr print "Beam angle\t......",self.omega_b
def __init__(self, ID, sensitivity, frequency, fwhm, fsky,nchannels=1, units_sensitivity='uKarcmin',units_beam='arcmin', **kwargs): dic_units_beam={'deg':u.deg, 'arcmin':u.arcmin,'arcsec':u.arcsec} self.ID=ID self.sigmanoise=sensitivity self.nu=(frequency *u.gigahertz) self.fwhm=fwhm * dic_units_beam[units_beam] self.nchans=nchannels self.omega_b =beamsolidangle(self.fwhm ) self.fsky=fsky *4*np.pi *u.sr dic_converter={'uKarcmin':self.uKarcmin2Jy, 'uKsqrts':self.uKsrts2Jy, 'Jyperbeam':self.Jyperbeam2Jy, 'Jy':lambda :self.sigmanoise*u.Jy} self.flux_lim= dic_converter[units_sensitivity](**kwargs)
[docs]class Forecaster(object): """ Given the specifics of an experiment, it forecasts the contribution of detected and undetected point sources (only Radio Quasars are considered so far). It inheritates the attributes from the :class:`Experiment`, needed to be initialized, the level of point source detection. The forecasts start when the :class:`Forecaster` class is called. **Parameters** - `Experiment`:{:class:`Experiment`} - `sigmadetection`:{float} set the level above one can consider point sources as *detected*. - `ps4c_dir`:{string} path to the ``PS4C/data`` and ``PS4C/model`` folders """ def __init__(self, Experiment, sigmadetection=5., ps4c_dir='./'): self.ID=Experiment.ID self.Exp=Experiment self.nu=Experiment.nu self.fwhm=Experiment.fwhm self.omega_b = Experiment.omega_b self.fsky=Experiment.fsky self.flux_lim= sigmadetection*Experiment.flux_lim self.nchans=Experiment.nchans self.workingdir=ps4c_dir
[docs] def forecast_pi2scaling(self, **kwargs): """ Compute the best fit parameters of the linear fit :math:`< Pi^2 > = A x +B` """ #compute the frequency scaling of the <Pi2(freq)> = A*freq +B, to forecast at any frequency self.A,self.B,self.sigmaA,self.sigmaB=compute_polfrac_scaling(self.workingdir+'data/', **kwargs ) #
[docs] def forecast_nsources_in_the_patch(self): """ Given the size of the observational patch it computes how many sources should be detected above the detection flux limit. """ self.nsources ={} Integralcount = lambda integrand, Smin,Smax: sp.integrate.quad( integrand, Smin,Smax,limit=1000 ,epsrel=1.e-3) [0] if self.nchans >1: idsort=np.argsort(self.nu.value) ziplist=zip(self.nu[idsort].value, self.flux_lim[idsort]) else: ziplist=zip([self.nu.value], [self.flux_lim]) for nu, Smin in ziplist: if Smin<self.S[nu].min(): Smin=self.S[nu].min() totcounts=Integralcount(self.dNdS[nu], Smin.value, self.Smax.value) /u.sr self.nsources[nu]=int((totcounts )*self.fsky )
[docs] def get_npolsources_from_flux(self,minflux=None ): """ By default, it computes how many polarized sources should be observed above the detection limit otherwise the sources whose flux >``minflux``. """ Integralcount = lambda integrand, Smin,Smax: sp.integrate.quad( integrand, Smin,Smax,limit=1000 ,epsrel=1.e-3) [0] if minflux is None: self.npolsources = {} if self.nchans >1: idsort=np.argsort(self.nu.value) ziplist=zip(self.nu[idsort].value,self.flux_lim[idsort].value ) else: ziplist=zip([self.nu.value], [self.flux_lim].value) for nu, Pmin in ziplist: if Pmin<=self.P[nu].min(): Pmin=self.P[nu].min() totcounts=Integralcount(self.dNdP[nu], Pmin , self.P[nu].max() ) /u.sr self.npolsources[nu]=int((totcounts )*self.fsky ) return self.npolsources else: if self.nchans >1: idsort=np.argsort(self.nu.value) ziplist=zip(self.nu[idsort].value,[minflux]*self.nchans ) else: ziplist=zip([self.nu.value], [minflux]*self.nchans ) npolsources = {} for nu, Pmin in ziplist: totcounts=Integralcount(self.dNdP[nu], Pmin , self.P[nu].max() ) /u.sr npolsources[nu]=int((totcounts )*self.fsky ) return npolsources
[docs] def get_differential_number_counts(self): """ Given the frequency channels finds the total intensity number counts from predictions of De Zotti et al. 2005 model (prediction at several frequencies are provided in `./PS4C/model` directory) and computes the polarization number counts by deconvolving the intensity number counts with the probability distribution of fractional polarization. """ modeldir=self.workingdir+'model/' model_avail= glob.glob(modeldir+'number_counts_*') frequency_model_avail=[(k.split("ghz")[0].split("_")[-1]) for k in model_avail] self.dNdS={} self.S={} self.dNdP={} self.P={} datadir=self.workingdir+'data/' data_avail= glob.glob(datadir+'bestfit_params_*GHz*') bestfit_freqs=[i.split("GHz")[0].split("_")[-1] for i in data_avail] if self.nchans>1: for nu in self.nu: idxfreq=np.argmin(abs(nu.value - np.array(map(float,frequency_model_avail) ) ) ) model= read_number_counts(modeldir+'number_counts_'+frequency_model_avail[idxfreq]+'ghz.dat') S= np.power(10, model['#']) * u.Jy differential_counts=np.power(10, model['Total \n'] ) /np.sqrt(S**(5)) /u.Jy /u.sr self.dNdS[nu.value]=interp1d(S, differential_counts) self.S[nu.value ]= S self.Smin=S.min() idxparams=np.argmin(abs(nu.value - np.array(map(float,bestfit_freqs) ) ) ) A,mu,sigma=np.load(data_avail[idxparams])[0] self.P[nu.value],self.dNdP[nu.value]= compute_dNdP(A, mu,sigma, self.dNdS[nu.value], self.S[nu.value]) else: idxfreq=np.argmin(abs(self.nu.value - np.array(map(float,frequency_model_avail) ) ) ) model= read_number_counts(modeldir+'number_counts_'+frequency_model_avail[idxfreq]+'ghz.dat') S= np.power(10, model['#']) * u.Jy differential_counts=np.power(10, model['Total \n'] ) /np.sqrt(S**(5)) /u.Jy /u.sr self.dNdS[self.nu.value ]=interp1d(S, differential_counts) self.S[self.nu.value ]= S self.Smin=S.min() idxparams=np.argmin(abs(self.nu.value - np.array(map(float,bestfit_freqs) ) ) ) A,mu,sigma=np.load(data_avail[idxparams])[0] self.P[self.nu.value],self.dNdP[self.nu.value]= compute_dNdP(A, mu,sigma, self.dNdS[self.nu.value], self.S[self.nu.value])
[docs] def get_differential_number_counts_tucci(self): """ Given the frequency channels finds the total intensity number counts from predictions of Tucci et al. 2011 model and computes the polarization number counts as in :func:`get_differential_number_counts`. """ corrfact={15:1.1,20:1,30:1.6,44:1.4,70:1.1,100:1.,143:1.,217:.8 } self.dNdS={} self.S={} self.dNdP={} self.P={} datadir=self.workingdir+'data/' data_avail= glob.glob(datadir+'bestfit_params_*GHz*') bestfit_freqs=[i.split("GHz")[0].split("_")[-1] for i in data_avail] if self.nchans>1: for nu in self.nu: if nu.value <=20 or nu.value >=100: model_avail= glob.glob(self.workingdir+'model/ns_radio*') frequency_model_avail=[(k.split("GHz")[0].split("_")[-1]) for k in model_avail] idxfreq=np.argmin(abs(nu.value - np.array(map(float,frequency_model_avail) ) ) ) tucci= np.loadtxt(model_avail[idxfreq]) else: tucci=np.loadtxt(self.workingdir+'model/ns_radio_95GHz_C2Ex.dat') idf=np.argmin(abs(nu.value -np.array(corrfact.keys()))) correction= corrfact.values()[idf] S= tucci[:,0] * u.Jy differential_counts=tucci[:,1] *correction /u.Jy /u.sr self.dNdS[nu.value]=interp1d(S, differential_counts) self.S[nu.value ]= S self.Smin=S.min() idxparams=np.argmin(abs(nu.value - np.array(map(float,bestfit_freqs) ) ) ) A,mu,sigma=np.load(data_avail[idxparams])[0] self.P[nu.value],self.dNdP[nu.value]= compute_dNdP(A, mu,sigma, self.dNdS[nu.value], self.S[nu.value]) else: nu=self.nu if nu.value <=20 or nu.value >=100: model_avail= glob.glob(self.workingdir+'model/ns_radio*') frequency_model_avail=[(k.split("GHz")[0].split("_")[-1]) for k in model_avail] idxfreq=np.argmin(abs(nu.value - np.array(map(float,frequency_model_avail) ) ) ) tucci= np.loadtxt(model_avail[idxfreq]) else: tucci=np.loadtxt(self.workingdir+'model/ns_radio_95GHz_C2Ex.dat') idf=np.argmin(abs(nu.value -np.array(corrfact.keys()))) correction= corrfact.values()[idf] S= tucci[:,0] * u.Jy differential_counts=tucci[:,1] *correction /u.Jy /u.sr self.dNdS[nu.value]=interp1d(S, differential_counts) self.S[nu.value ]= S self.Smin=S.min() idxparams=np.argmin(abs(nu.value - np.array(map(float,bestfit_freqs) ) ) ) A,mu,sigma=np.load(data_avail[idxparams])[0] self.P[nu.value],self.dNdP[nu.value]= compute_dNdP(A, mu,sigma, self.dNdS[nu.value], self.S[nu.value])
def __call__(self,model='D05', **kwargs ): """ The forecast session. **Parameters** - `model`:{string} if ``model!=D05`` it runs the Tucci et al.2011 model for number counts. - kwargs: keywords used by functions in :module:`Stats` and :module:`IO`: ``verbose=False``, ``saveplot=string`` , ``bestfitparams2file=True``. """ #Read number counts from theoretical model if model=='D05': self.get_differential_number_counts() else: self.get_differential_number_counts_tucci() self.Smax=10. *u.Jy self.forecast_nsources_in_the_patch() self.get_npolsources_from_flux() self.flux_conf={} self.Pi, self.errPi, self.Pi2={},{},{} self.C_detdsources, self.C_confsources={},{} self.C_poldet, self.C_polconf={},{} if self.nchans >1 : ziplist= zip(self.omega_b,self.nu.value , self.flux_lim ) else: ziplist=zip([self.omega_b],[self.nu.value] , [self.flux_lim] ) for beam, nu, flux_lim in ziplist: #np.ndenumerate(np.sort(self.S.keys())) : K, Gamma = fit_numbercounts_w_powerlaw(self.S[nu], self.dNdS[nu], self.Smax, **kwargs) self.flux_conf[nu]= ( compute_confusion_limit(K,Gamma, beam)) Pi=linfunc(nu*u.GHz, self.A, + self.B ) errPi=linfunc(nu*u.GHz, self.A+self.sigmaA, self.B+ self.sigmaB ) - Pi Pi2=((Pi+errPi)*1e-2)**2 self.Pi2[nu]=Pi2 self.errPi[nu]=errPi self.Pi[nu]=Pi # compute contribution on TT, EE, BB c= bash_colors() try: self.C_detdsources[nu] = estimate_power_spectrum_contribution(self.dNdS[nu],nu*u.GHz ,beam, self.Smin.value,flux_lim.value ) except ValueError: print c.warning("detection limit too low (below the model prediction), unable to compute contribution of undetected sources") self.C_detdsources[nu] = 0. try: self.C_confsources[nu] = estimate_power_spectrum_contribution(self.dNdS[nu],nu*u.GHz , beam, self.Smin.value,self.flux_conf[nu].value) except ValueError: print c.warning("confusion limit too low (below the model prediction), unable to compute contribution of confused sources") self.C_confsources[nu] = 0. self.C_poldet[nu]=1./2. * self.Pi2[nu] * self.C_detdsources[nu]
[docs] def print_info(self): self.Exp.print_info() c= bash_colors() print c.header("///"*30) print c.bold("Forecasted quantities\n") print "Frequency \t#sources[S,P]\t Confusion\t <Pi> \t <Pi^2>x1e3 \tD^TT(lensing)\tD^BB(lensing)\n" for i in (np.sort(self.C_detdsources.keys())): print "{0}\t{4}\t{5}\t{1.value:g}{1.unit:FITS}\t{2:.2f}\t{3:.2f}\t".format(i*u.GHz, self.flux_conf[i].to(u.mJy), self.Pi[i],self.Pi2[i]*1e3, self.nsources[i], self.npolsources[i])," {0.value:g} {0.unit:FITS}\t {1.value:g} {1.unit:FITS}".format(self.C_detdsources[i]* 1e6 /2./np.pi/u.sr ,self.C_poldet[i]* 1e6 /2./np.pi/u.sr) print c.header("==="*30)
[docs] def plot_powerspectra(self,spectra_to_plot='all', FG='compsep', **kwargs ): """ Plot CMB and undetected Point sources Power spectra. **Parameters** - `spectra_to_plot`:{string} a string among: - `all`: T, E and B spectra will be shown - `TandB`: T and B spectra - `Tonly`: T spectrum - `Bonly`: B spectrum - `FG`:{string} show the Galactic foreground levels, a string among: - `compsep`: the 95perc of residuals after component separation - `total`: total amplitude - `none`: will not be shown - `kwargs`: keywords for matplotlib plots. """ dic_to_plot={'all':['TT', 'EE', 'BB'], 'TandB':['TT', 'BB'], 'Tonly':['TT'],'Bonly':['BB'] } dic_spectra={'all':range(3), 'TandB':[0,2], 'Tonly':[0], 'Bonly':[2]} dic_subplot={'all':(3,1), 'TandB':(2,1), 'Tonly':(1,1), 'Bonly':(1,1)} FGfactor={'compsep':0.01, 'total':1, 'none':0} ell2=lambda x: x*(x+1) /2./np.pi lens=np.array(hp.read_cl(self.workingdir+'data/lensedCls.fits')) l=(np.arange(len(lens[-1]))) tens=np.array(hp.read_cl(self.workingdir+'data/r_0.05_tensCls.fits')) subplot=dic_subplot[spectra_to_plot] parameters= self.nu.value norm = matplotlib.colors.Normalize( vmin=np.min(parameters), vmax=np.max(parameters)) c_m = matplotlib.cm.viridis s_m = matplotlib.cm.ScalarMappable(cmap=c_m, norm=norm) s_m.set_array([]) if self.nchans>1: fwhm=min(self.fwhm) idsort=np.argsort(self.nu.value) ziplist=zip(self.nu[idsort].value, parameters) else: fwhm=self.fwhm ziplist=zip([self.nu.value], [parameters]) pl.title(self.ID, fontsize=12)#+' '+str(nu)+' GHz') J=0 for I in dic_spectra[spectra_to_plot]: pl.subplot(subplot[0],subplot[1],J+1) for nu, p in ziplist: ellmax=np.pi/(fwhm).to(u.rad).value ell=np.arange(1,ellmax) pl.loglog(l,ell2(l)* (lens[I] +tens[I]),'k' ) if I==2: pl.loglog(l,ell2(l)*lens[I],'k-.') pl.loglog(l,ell2(l)*tens[I],'k-.') pl.fill_between( ell, FGfactor[FG] * forecast_Galaxy (ell,nu*u.GHz, self.fsky).value, FGfactor[FG]*forecast_Galaxy (ell,nu*u.GHz, self.fsky).value/10., alpha=0.3, color=s_m.to_rgba(p) ) if I==0: pl.loglog(ell,ell2(ell)*self.C_confsources[nu] ,':', lw=2, color=s_m.to_rgba(p)) pl.loglog(ell,ell2(ell)*self.C_detdsources[nu],'--', lw=2, color=s_m.to_rgba(p)) else: pl.loglog(ell,ell2(ell)*self.C_poldet[nu],'--', lw=2, color=s_m.to_rgba(p)) J+=1 pl.yticks(fontsize=14) pl.ylabel(r'$\mathcal{D}_\ell\, [\mu K ^2]$', fontsize=19) pl.tight_layout() if I ==2 or dic_spectra[spectra_to_plot]=='Tonly' : pl.xticks(fontsize=14) pl.xlabel(r'$\ell$', fontsize=17) else: pl.xticks(visible=False ) if 'ylim' in kwargs: pl.ylim(kwargs['ylim']) if 'xlim' in kwargs: pl.xlim(kwargs['xlim']) else: pl.xlim([10,ellmax ]) if 'savefig' in kwargs: pl.savefig(kwargs['savefig']) pl.show() pl.close('all')
[docs] def plot_integral_numbercounts(self, **kwargs): """ Plot integral number counts, i.e. number of sources above the detection flux per square degree units. """ parameters= self.nu.value # norm is a class which, when called, can normalize data into the # [0.0, 1.0] interval. norm = matplotlib.colors.Normalize( vmin=np.min(parameters), vmax=np.max(parameters)) # choose a colormap c_m = matplotlib.cm.viridis # create a ScalarMappable and initialize a data structure s_m = matplotlib.cm.ScalarMappable(cmap=c_m, norm=norm) s_m.set_array([]) Integralcount = lambda integrand, Smin,Smax: sp.integrate.quad( integrand, Smin,Smax,limit=1000 ,epsrel=1.e-3) [0] if self.nchans>1: idsort=np.argsort(self.nu) ziplist=zip(self.nu[idsort].value, parameters) #ziplist=zip(self.S.keys()[idsort], parameters) else: ziplist=zip([self.nu.value ], [parameters]) for nu ,p in ziplist: fluxes= self.S[nu][1::3] Ncounts=[Integralcount(self.dNdS[nu],Smin.value , self.Smax.value)/u.sr.to((u.deg)**2) for Smin in fluxes ] pl.loglog(fluxes, Ncounts, label=str(int(nu))+' GHz', color=s_m.to_rgba(p)) pl.legend(loc='best', fontsize=12) pl.xlabel(r'$S_{cut}$ [ Jy ]',fontsize=12) pl.ylabel(r'$N(>S) [ \, deg^{-2}\, ]$',fontsize=12) pl.yticks(fontsize=13) pl.xticks(fontsize=13) if 'ylim' in kwargs: pl.ylim(kwargs['ylim']) if 'xlim' in kwargs: pl.xlim(kwargs['xlim']) if 'savefig' in kwargs: pl.savefig(kwargs['savefig']) pl.tight_layout() pl.grid(True) pl.show()