#compute expected number of massive galaxies beyond a mass threshold and in a pencil-like volume. #by Zhiqi Huang, huangzhq25@mail.sysu.edu.cn import numpy as np from scipy.stats import skewnorm, norm from scipy.integrate import odeint from scipy.special import erfc, gammaln from scipy.interpolate import interp1d import matplotlib.pyplot as plt import camb ##configuration delta_c_collapse = 1.686 halo_filling_factor = 1. ##default cosmological parameters Planck_Omega_m = 0.3158 Planck_Omega_b = 0.049389 Planck_hlittle = 0.6732 Planck_sigma_8 = 0.812 Planck_n_s = 0.96605 #here I use an approximate method for cosmic variance, since cosmic variance is much smaller than Poisson noise cosmic_variance_fudge_factor = 1.42 #simulation result / linear halo bias approximation #linear growth equation, use y = [D, dD/dt] and t def linear_growth_equation(y, t, cosmo): a = cosmo.aoft(t) H = cosmo.Hoft(t) return np.array( [y[1], 1.5*cosmo.Omega_m/a**3*y[0]-2*H*y[1] ] ) #linear growth equation, use y = [G , dG / d ln a] and lna def linear_growth_equation_var(y, lna, cosmo): a = np.exp(lna) Hsq = cosmo.Hsqofa(a) dotH = cosmo.dotHofa(a) return np.array( [ y[1], -(4+dotH/Hsq)*y[1]-(3+dotH/Hsq-1.5/Hsq/a**3*cosmo.Omega_m)*y[0] ] ) #input P(sample #0, #1, #2, ... is in the bin); return P(total number of samples in the bin = 0, 1, 2, ...) def prob_combine(p_is_in): n = len(p_is_in) if(n == 1): return [ 1. - p_is_in[0], p_is_in[0] ] p_tot_in = np.empty(n+1) # P(total number of samples in the bin = 0, 1, 2, ...) tmp = prob_combine(p_is_in[0:n-1]) p_tot_in[0] = tmp[0] * (1.-p_is_in[n-1]) p_tot_in[n] = tmp[n-1] * p_is_in[n-1] for i in range(1, n): p_tot_in[i] = tmp[i-1]*p_is_in[n-1] + tmp[i]*(1.-p_is_in[n-1]) return p_tot_in #window function |W(kR)|^2, for 3D tophat ball def tophat_window_sq(kR): if(kR < 0.03): x = kR**2 return 1. + x * (-0.2 + x*(3./175. + x*(-4./4725.))) else: return (3.0*(np.sin(kR)-kR * np.cos(kR))/kR**3)**2 #ST halo model def ST_halo_f(sigma): nu = delta_c_collapse/sigma anu2 = 0.707*nu**2 return 0.644*(1.+ 1./anu2**0.3)*np.sqrt(anu2/np.pi/2.)*np.exp(-anu2/2.) #ST halo bias def ST_halo_bias(sigma): return 1. - (1.+ np.log(ST_halo_f(sigma*np.exp(-0.01))/ST_halo_f(sigma*np.exp(0.01)) )/0.02)/delta_c_collapse ## peak of joint Poisson&Gaussian distribution (Poisson distribution with its parameter nbar Gaussianly distributed) def peak_Poisson(nbar, sigma_nbar, n): if(n == 0): return max(nbar - sigma_nbar**2, 1.e-5) xp = (nbar + sigma_nbar**2)/(1.+sigma_nbar**2/n) while( abs(n/xp - (xp-nbar)/sigma_nbar**2 - 1.) > 1.e-10 ): xp += (n/xp - (xp-nbar)/sigma_nbar**2 - 1.)/(n/xp**2 + 1./sigma_nbar**2) return xp #joint Poisson&Gaussian distribution (Poisson distribution with its parameter nbar Gaussianly distributed) def prob_num_Poisson(nbar, sigma_nbar, n): #input and its uncertainty, return P(N_theory) xp = peak_Poisson(nbar, sigma_nbar, n) logs = n*np.log(xp)-xp - ((xp-nbar)/sigma_nbar)**2/2. xmin = xp/1.01 while( n*np.log(xmin)-xmin - ((xmin-nbar)/sigma_nbar)**2/2. > logs - 30. and xmin > 1.e-5): xmin /= 1.01 xmax = xp*1.01 while( n*np.log(xmax)-xmax - ((xmax-nbar)/sigma_nbar)**2/2. > logs - 30.): xmax *= 1.01 s = 0. w = 0. nsteps = 512 dx = (xmax-xmin)/(nsteps-1) for i in range(nsteps): x = xmin + i * dx tmp = np.exp(-((x-nbar)/sigma_nbar)**2/2.) w += tmp s += tmp*(x**n*np.exp(-x)) return s/w*np.exp(-gammaln(n+1.)) def prob_eq_greater_Poisson(nbar, sigma_nbar, nmax): ns = max(int(nbar + np.sqrt(sigma_nbar**2+nbar)*5) + 1, nmax+2) p_eq = np.zeros(ns) #P(N_theory = 0, 1, 2, ...) p_greater = np.zeros(nmax) #P(N_theory >= 0, 1, 2, ...) for n in range(ns): p_eq[n] = prob_num_Poisson(nbar, sigma_nbar, n) s = sum(p_eq) if(s > 1.): if( s < 1.002): p_eq /= s #fix numeric errors else: print("Error in prob_list_Poisson: sum(p) > 1") exit() p_greater[0] = 1. for i in range(1, nmax): if(p_greater[i-1] - p_eq[i-1] > 1.e-3): p_greater[i] = p_greater[i-1] - p_eq[i-1] else: p_greater[i] = sum(p_eq[i : ns]) return p_eq[0:nmax], p_greater class rand_sampler: def __init__(self, median): self.median = median self.name = "delta_function" def cdf(self, x): if(x >= self.median): return 1. else: return 0. def ppf(self, p): return self.median def rvs(self, size=1): r = np.random.rand(size) return np.array([ self.ppf(x) for x in r ]) #skew trangular distribution class skew_triangular(rand_sampler): def __init__(self, median, upper_bound, lower_bound): assert(upper_bound > median and lower_bound < median) self.median = median self.upper_bound = upper_bound self.lower_bound = lower_bound self.upper_slope = 1./(self.upper_bound - self.median)**2 self.lower_slope = 1./(self.median - self.lower_bound)**2 self.name = "skew_triangular" def pdf(self, x): if(x >= self.upper_bound or x <= self.lower_bound): return 0. if(x > self.median): return self.upper_slope * (self.upper_bound -x) else: return self.lower_slope * (x - self.lower_bound) def cdf(self, x): if(x >= self.upper_bound): return 1. if(x <= self.lower_bound): return 0. if(x > self.median): return 1. - self.upper_slope * (self.upper_bound - x)**2/2. else: return self.lower_slope * (x - self.lower_bound)**2/2. def ppf(self, p): if(p < 0.5): return np.sqrt(p*2.)*(self.median - self.lower_bound)+self.lower_bound else: return self.upper_bound - np.sqrt((1.-p)*2.)*(self.upper_bound - self.median) #skew rectangular distribution class skew_rectangular(rand_sampler): def __init__(self, median, upper_bound, lower_bound): assert(upper_bound > median and lower_bound < median) self.median = median self.upper_bound = upper_bound self.lower_bound = lower_bound self.p_upper = 0.5/(self.upper_bound - self.median) self.p_lower = 0.5/(self.median - self.lower_bound) self.name = "skew_rectangular" def pdf(self, x): if(x >= self.upper_bound or x <= self.lower_bound): return 0. if(x > self.median): return self.p_upper else: return self.p_lower def cdf(self, x): if(x >= self.upper_bound): return 1. if(x <= self.lower_bound): return 0. if(x > self.median): return 1. - self.p_upper * (self.upper_bound - x) else: return self.p_lower * (x - self.lower_bound) def ppf(self, p): if(p < 0.5): return self.lower_bound + p/self.p_lower else: return self.upper_bound - (1.-p)/self.p_upper #generalized skew normal distribution class skew_normal(rand_sampler): def __init__(self, median, sig_upper, sig_lower): assert(sig_upper > 0. and sig_lower > 0.) self.median = median self.sig_upper = sig_upper self.sig_lower = sig_lower self.name = "skew_normal" if(sig_upper > sig_lower): self.sign = 1. self.sig_small = sig_lower self.sig_big = sig_upper else: self.sign = -1. self.sig_small = sig_upper self.sig_big = sig_lower self.nonlinear_map = False sigrat = self.sig_big / self.sig_small #compute a if(sigrat == 1.): self.a = 0. elif(sigrat < 1.001): self.a = ((sigrat - 1.)/0.0726)**(1./3.) elif(sigrat < 1.488): x = np.log(sigrat - 1.) self.a = np.exp(x*(x*(x*(x*(x*(x*(x*(x*(x*(x*(x*2.83269244e-07+1.75642561e-05)+4.77035076e-04)+7.46879829e-03)+7.46585387e-02)+4.98582710e-01)+2.26225149e+00)+6.95544240e+00)+1.41932241e+01)+1.84154004e+01)+1.43915426e+01)+6.20830294e+00) elif(sigrat <= 1.54985): self.a = ( -np.log(1.54985043696967-sigrat)/0.455) ** (0.9 - 0.25*(1.54985043696967-sigrat)) else: self.a = 22.744027206558968 if(sigrat > 1.5498504): self.nonlinear_map = True if(self.nonlinear_map): #compute lambda self.rs_median = 0.6744897501960816 self.rs_sig_lower = 0.47431608753370647 self.rs_sig_upper = 0.7351189590973726 self.scale = self.rs_sig_lower / self.sig_small mu = self.rs_sig_upper/self.sig_big/self.scale # mu < 1 self.lam = 1. - mu*(1.-np.tanh(1./mu)) for i in range(50): self.lam = 1. - mu*(1.-np.tanh(self.lam/mu)) if(self.lam >= 1.): self.lam = 1.-1.e-14 self.lamscale = self.lam * self.scale self.tanhscale = self.lamscale / self.rs_sig_upper self.llscale = (1.-self.lam)*self.scale self.narr = 1024 self.y_arr = np.linspace( 0., 4./self.tanhscale , self.narr) self.rsy_arr = np.array( [ self.llscale * y + self.rs_sig_upper * np.tanh(self.tanhscale * y) for y in self.y_arr]) self.y_of_rsy = interp1d(self.rsy_arr, self.y_arr, copy=False, assume_sorted=True) else: self.rs_median = skewnorm.median(self.a) self.rs_sig_lower = self.rs_median - skewnorm.ppf(0.15865525393145707, self.a) self.rs_sig_upper = skewnorm.ppf(0.8413447460685429, self.a) - self.rs_median self.scale = self.rs_sig_lower / self.sig_small def rsx_of_x(self, x): #convert x to skewnorm variable y = (x - self.median)*self.sign if(y > 0. and self.nonlinear_map): return self.llscale * y + self.rs_sig_upper * np.tanh(self.tanhscale * y) + self.rs_median else: return y*self.scale + self.rs_median def rsx_drsx_of_x(self, x): #convert x to skewnorm variable y = (x - self.median)*self.sign if(y > 0. and self.nonlinear_map): rsx = self.llscale * y + self.rs_sig_upper * np.tanh(self.tanhscale * y) + self.rs_median drsx = self.llscale + self.lamscale/ np.cosh(self.tanhscale * y)**2 else: rsx = y*self.scale + self.rs_median drsx = self.scale return rsx, drsx def nonlinear_y_of_rsy(self, rsy): #here I assume nonlinear_map = True if(rsy > 0.): if(rsy < self.rsy_arr[self.narr-1]): return self.y_of_rsy(rsy) else: return (rsy - self.rs_sig_upper * np.tanh(self.tanhscale * ((rsy - self.rs_sig_upper) / self.llscale))) / self.llscale else: return rsy/self.scale def full_y_of_rsy(self, rsy): if(rsy > 0. and self.nonlinear_map): if(rsy < self.rsy_arr[self.narr-1]): return self.y_of_rsy(rsy) else: return (rsy - self.rs_sig_upper * np.tanh(self.tanhscale * ((rsy - self.rs_sig_upper) / self.llscale))) / self.llscale else: return rsy/self.scale def x_of_rsx(self, rsx): return self.full_y_of_rsy(rsx - self.rs_median)*self.sign + self.median def cdf(self, x): if(self.sign > 0.): return skewnorm.cdf(self.rsx_of_x(x), self.a) else: return 1.-skewnorm.cdf(self.rsx_of_x(x), self.a) def ppf(self, p): if(self.sign > 0.): rsx = skewnorm.ppf(p, self.a) else: rsx = skewnorm.ppf(1.-p, self.a) return self.x_of_rsx(rsx) def pdf(self, x): rsx, drsx = self.rsx_drsx_of_x(x) return skewnorm.pdf(rsx, self.a) * drsx def rvs(self, size=1): rsy = skewnorm.rvs(a = self.a, size = size) - self.rs_median if(self.nonlinear_map): for i in range(size): rsy[i] = self.nonlinear_y_of_rsy(rsy[i]) return rsy*self.sign + self.median else: return rsy*(self.sign/self.scale) + self.median class massive_galaxy: #sampler can be "norm" (skew normal), "fixed" (no errors), " def __init__(self, logm_median, stat_logm_sig_upper, stat_logm_sig_lower, z_median, stat_z_sig_upper = 0., stat_z_sig_lower = 0., sys_logm_sig_upper = 0., sys_logm_sig_lower = 0., sys_z_sig_upper= 0., sys_z_sig_lower = 0., sys_sampler = 'tri'): self.logm_median = logm_median self.z_median = z_median if(stat_logm_sig_upper == 0. or stat_logm_sig_lower == 0.): #delta distribution self.stat_logm_sampler = rand_sampler(logm_median) else: #use skew normal self.stat_logm_sampler = skew_normal(median = logm_median, sig_upper = stat_logm_sig_upper, sig_lower = stat_logm_sig_lower) if(stat_z_sig_upper == 0. or stat_z_sig_lower == 0.): self.stat_z_sampler = rand_sampler(z_median) else: self.stat_z_sampler = skew_normal(median = z_median, sig_upper = stat_z_sig_upper, sig_lower = stat_z_sig_lower) if(sys_logm_sig_upper == 0. or sys_logm_sig_lower == 0. or sys_sampler == 'none'): self.sys_logm_sampler = rand_sampler(0.) else: #use skew triangular/rectangular/normal, in the sknew normal case, I assume bounds are 2sigma if(sys_sampler == 'tri'): self.sys_logm_sampler = skew_triangular(median = 0., upper_bound = sys_logm_sig_upper, lower_bound = -sys_logm_sig_lower) elif(sys_sampler == "rect"): self.sys_logm_sampler = skew_rectangular(median = 0., upper_bound = sys_logm_sig_upper, lower_bound = -sys_logm_sig_lower) else: self.sys_logm_sampler = skew_normal(median = 0., sig_upper = sys_logm_sig_upper/2., sig_lower = sys_logm_sig_lower/2.) if(sys_z_sig_upper == 0. or sys_z_sig_lower == 0. or sys_sampler == 'none'): self.sys_z_sampler = rand_sampler(0.) else: #use triangular/rectangular if(sys_sampler == 'tri'): self.sys_z_sampler = skew_triangular(median = 0., upper_bound = sys_z_sig_upper, lower_bound = -sys_z_sig_lower) elif(sys_sampler == "rect"): self.sys_z_sampler = skew_rectangular(median = 0., upper_bound = sys_z_sig_upper, lower_bound = -sys_z_sig_lower) else: self.sys_z_sampler = skew_normal(median = 0., sig_upper = sys_z_sig_upper / 2., sig_lower = sys_z_sig_lower / 2.) def prob_in(self, logm_cut, z_min, z_max, mz_correlated = True): #the probability that m > 10^logm_cut, z_min < z < z_max; when mz_correlated = True, we force the systematic displacements of logm and z have the same sign if(self.sys_logm_sampler.name == "delta_function" and self.sys_z_sampler.name == "delta_function"): return (1.-self.stat_logm_sampler.cdf(logm_cut))*(self.stat_z_sampler.cdf(z_max)-self.stat_z_sampler.cdf(z_min)) hnp = 256 fnp = hnp*2 p1 = 0. q1 = 0. p2 = 0. q2 = 0. for i in range(hnp): p = (i+0.5)/fnp p1 += (1.-self.stat_logm_sampler.cdf(logm_cut - self.sys_logm_sampler.ppf(p))) dz = self.sys_z_sampler.ppf(p) q1 += (self.stat_z_sampler.cdf(z_max - dz) -self.stat_z_sampler.cdf(z_min - dz)) for i in range(hnp, fnp): p = (i+0.5)/fnp p2 += (1.-self.stat_logm_sampler.cdf(logm_cut - self.sys_logm_sampler.ppf(p))) dz = self.sys_z_sampler.ppf(p) q2 += (self.stat_z_sampler.cdf(z_max - dz) -self.stat_z_sampler.cdf(z_min - dz)) if(mz_correlated): return (p1*q1 + p2*q2)/(fnp*hnp) else: return (p1+p2)*(q1+q2)/(fnp**2) def load_massive_galaxies(filename, stat_only = False, sys_sampler = 'tri', z_column = 3, m_column = 8): f = np.loadtxt(filename) nrows = f.shape[0] ncols = f.shape[1] hgs = [] if(stat_only): assert(abs(z_column - m_column) > 2) for i in range(nrows): hgs.append( massive_galaxy(z_median = f[i, z_column], stat_z_sig_lower = f[i, z_column]-f[i, z_column + 1], stat_z_sig_upper = f[i,z_column +2]-f[i,z_column], logm_median = f[i, m_column], stat_logm_sig_lower = f[i, m_column] - f[i, m_column+1], stat_logm_sig_upper = f[i, m_column+2]-f[i, m_column], sys_sampler = sys_sampler )) else: assert(abs(z_column - m_column) > 4) for i in range(nrows): hgs.append( massive_galaxy(z_median = f[i, z_column], stat_z_sig_lower = f[i, z_column]-f[i, z_column + 1], stat_z_sig_upper = f[i,z_column +2]-f[i,z_column], logm_median = f[i, m_column], stat_logm_sig_lower = f[i, m_column] - f[i, m_column+1], stat_logm_sig_upper = f[i, m_column+2]-f[i, m_column], sys_z_sig_lower = f[i, z_column]-f[i, z_column+3], sys_z_sig_upper = f[i,z_column+4]-f[i,z_column], sys_logm_sig_lower = f[i, m_column] - f[i, m_column+3], sys_logm_sig_upper = f[i, m_column+4]-f[i, m_column], sys_sampler = sys_sampler )) return hgs class Cosmology: Omega_m = Planck_Omega_m Omega_k = 0. hlittle = Planck_hlittle Omega_b = Planck_Omega_b sigma_8 = Planck_sigma_8 n_s = Planck_n_s zmax = 50. amin = 1./51. def Hsqofa(self, a): return None def Hofa(self, a): return np.sqrt(self.Hsqofa(a)) def __init__(self, nsteps = 0, a = None, H = None): self.Omega_r = 4.187e-5/self.hlittle**2 self.Omega_Lambda = 1. - self.Omega_m - self.Omega_r self.H0Mpc = self.hlittle / 2.99792458e3 self.H0 = self.hlittle * 100. self.ombh2 = self.Omega_b * self.hlittle**2 self.omch2 = (self.Omega_m - self.Omega_b)*self.hlittle**2 self.zmax = 1./self.amin - 1. def __set_chi_D__(self, nsteps = 0, a = None, H = None, D = None, f = None): if(nsteps > 0): self.amin = a[0] self.zmax = 1./self.amin - 1. z = np.empty(nsteps) chi = np.empty(nsteps) chi[0] = 0. z[0] = 0. for i in range(nsteps-1): z[i+1] = 1./a[nsteps-i-2] - 1. chi[i+1] = chi[i] + 4.*(np.sqrt(a[nsteps-i-1])-np.sqrt(a[nsteps-i-2]))/(H[nsteps-i-2]*a[nsteps-i-2]**1.5 + H[nsteps-i-1]*a[nsteps-i-1]**1.5) self.chiofz_interp = interp1d(z, chi, kind = "cubic", assume_sorted = True) self.chimax = chi[nsteps-1] self.intchi_edge = 2./ (H[0]*a[0]**1.5) self.Dofa_interp = interp1d(a, D, kind="cubic", assume_sorted = True) self.fofa_interp = interp1d(a, f , kind="cubic", assume_sorted = True) self.Dbya = D[0] / self.amin pars = camb.CAMBparams() pars.set_cosmology(H0 = self.H0, ombh2 = self.ombh2, omch2 = self.omch2) pars.InitPower.set_params(ns = self.n_s) pars.set_matter_power( redshifts = [ self.zmax ], kmax = 2. ) pars.NonLinear = camb.model.NonLinear_none results = camb.get_results(pars) npoints = 600 kh, z, pk = results.get_matter_power_spectrum(minkh = 1.e-4, maxkh = np.pi*4.235/8., npoints = npoints) s2 = 0. for i in range(npoints-1): s2 += (kh[i]**3*pk[0][i]+kh[i+1]**3*pk[0][i+1])*tophat_window_sq(np.sqrt(kh[i]*kh[i+1])*8.)*np.log(kh[i+1]/kh[i]) s2 = s2/2. s2 += (kh[0]**3*pk[0][0]*tophat_window_sq(kh[0]*8.)*2./(6.+self.n_s) + kh[npoints-1]**3*pk[0][npoints-1]*tophat_window_sq(kh[npoints-1]*8.)/(self.n_s+2)) s2 = s2 / (2.*np.pi**2) self.matterpower_interp = interp1d(np.log(kh), np.log(pk[0]*(self.sigma_8**2/s2)), kind="cubic", bounds_error = False, fill_value = "extrapolate", assume_sorted = True) #below are shared functions that are actually useful; def Dofa(self, a): if(a <= self.amin): return self.Dbya * a else: return self.Dofa_interp(a) def fofa(self, a): if(a <= self.amin): return 1. else: return self.fofa_interp(a) def chiofz(self, z): #return chi in unit of c/H_0 if(z <= self.zmax): return self.chiofz_interp(z) else: return self.chimax + self.intchi_edge * (np.sqrt(self.amin) - np.sqrt(1./(1.+z))) def dchibydz(self, z): #d chi /dz in Mpc/h return 2.99792458e3/self.Hofa(1./(1.+z)) def ComovingDistance(self, z): #input: redshift, output: comoving distance in Mpc/h chi = self.chiofz(z) if( abs(self.Omega_k*chi**2) < 0.01 ): return chi*(1.+self.Omega_k*chi**2/6.)*2.99792458e3 elif(self.Omega_k > 0.): return np.sinh(np.sqrt(self.Omega_k)*chi)/np.sqrt(self.Omega_k)*2.99792458e3 else: return np.sin(np.sqrt(-self.Omega_k)*chi)/np.sqrt(-self.Omega_k)*2.99792458e3 def ComovingDistance_2p(self, z1, z2): #input: redshift, output: comoving distance in Mpc/h chi = self.chiofz(max(z1, z2)) - self.chiofz(min(z1, z2)) if( abs(self.Omega_k*chi**2) < 0.01 ): return chi*(1.+self.Omega_k*chi**2/6.)*2.99792458e3 elif(self.Omega_k > 0.): return np.sinh(np.sqrt(self.Omega_k)*chi)/np.sqrt(self.Omega_k)*2.99792458e3 else: return np.sin(np.sqrt(-self.Omega_k)*chi)/np.sqrt(-self.Omega_k)*2.99792458e3 def LuminosityDistance(self, z): #input: redshift, output: luminosity distance in Mpc/h return self.ComovingDistance(z)*(1.+z) def AngularDiameterDistance(self, z): #input: redshift, output: angular diameter distance in Mpc/h return self.ComovingDistance(z)/(1.+z) def DistanceModuli(self, z): #input: redshift, output: distance moduli return 5.*np.log10(self.LuminosityDistance(z)/self.hlittle) + 25. def dV_by_dzdS(self, z): #comoving volume per solid angle per redshift return self.ComovingDistance(z)**2 * self.dchibydz(z) def dV_by_dS(self, z_min, z_max): nz = int((z_max-z_min)/0.04) + 1 dz = (z_max-z_min)/nz vol = (self.dV_by_dzdS(z_min)+self.dV_by_dzdS(z_max))/2. + 2.*self.dV_by_dzdS(z_max-dz/2.) for i in range(nz-1): vol += ( 2.*self.dV_by_dzdS(z_min+(i+0.5)*dz) + self.dV_by_dzdS(z_min+(i+1.)*dz) ) return vol*(dz/3.) def ComovingVolume(self, z_min, z_max, sky_area): return self.dV_by_dS(z_min, z_max) * sky_area def GrowthFactor(self, z): return self.Dofa(1./(1.+z)) def GrowthRate(self, z): return self.fofa(1./(1.+z)) def MatterPowerSpectrum(self, kh): return np.exp(self.matterpower_interp(np.log(kh))) def MatterPowerSpectrumAtz(self, kh, z): return np.exp(self.matterpower_interp(np.log(kh))) * self.GrowthFactor(z)**2 def sigma2_tophat(self, Rh): kh_min = 0.01/Rh kh_max = np.pi*2.25/Rh lnkh_min = np.log(kh_min) lnkh_max = np.log(kh_max) nkh = int((lnkh_max - lnkh_min)/0.02) dlnkh = (lnkh_max - lnkh_min)/nkh pk = self.MatterPowerSpectrum(kh_min) s2 = 0. lnkh = lnkh_min + dlnkh/2. for i in range(nkh): kh = np.exp(lnkh) s2 += kh**3 * self.MatterPowerSpectrum(kh) * tophat_window_sq(kh*Rh) lnkh += dlnkh s2 *= dlnkh s2 += (kh_min**3*pk*tophat_window_sq(kh_min*Rh)/(6.+self.n_s) + kh_max**3 * self.MatterPowerSpectrum(kh_max) * tophat_window_sq(kh_max*Rh)/(2.+self.n_s)) return s2 / (2.*np.pi**2) def sigma2_pencil(self, x_angle, y_angle, z_min, z_max): #here zh >> xh, yh gives a pencil-like shape; r = self.ComovingDistance((z_min+z_max)/2.) xh = x_angle * r yh = y_angle * r zh = self.ComovingDistance_2p(z_min, z_max) nx = 30 ny = 30 nz = 21 #must be an odd number dlnkx = 4./nx dlnky = 4./ny dlnkz = 3./nz dvol = dlnkx*dlnky*dlnkz kxhmax = np.pi/xh kyhmax = np.pi/yh kzhmax = np.pi/zh ints2 = np.zeros(nz) for iz in range(nz): kzh = np.exp((iz-nz)*dlnkz)*kzhmax for ix in range(nx): kxh = np.exp((ix-nx)*dlnkx)*kxhmax for iy in range(ny): kyh = np.exp((iy-ny)*dlnky)*kyhmax ints2[iz] += self.MatterPowerSpectrum(np.sqrt(kxh**2+kyh**2+kzh**2))*(kxh*kyh) ints2[iz] *= kzh return (sum(ints2[2:nz-1:2])*2. + sum(ints2[1:nz-1:2])*4. + (ints2[0] + ints2[nz-1]))*(dvol/np.pi**3/3.)*self.GrowthFactor((z_min+z_max)/2.)**2 def Rh_of_M(self, M): #M in unit of M_sun/h return 0.000095105 * (M/self.Omega_m)**(1./3.) def sigma_tophat_M(self, M): #M in unit of M_sun/h Rh = self.Rh_of_M(M) return np.sqrt(self.sigma2_tophat(Rh)) def get_dndlnM(self, M_min, M_max, z_min, z_max): num_M = int(np.log(M_max/M_min)/0.12)+2 num_z = int((z_max-z_min)/0.01)+2 lnM_list = np.linspace(np.log(M_min), np.log(M_max), num_M) z_list = np.linspace(z_min, z_max, num_z) D_list = np.empty(num_z) for i in range(num_z): D_list[i] = self.GrowthFactor(z_list[i]) dlnM = 0.04 pfac = np.exp(dlnM/2.) dndlnM = np.empty( (num_z, num_M) ) bias = 0. totw = 0. for i in range(num_M): M = np.exp(lnM_list[i]) Rh = self.Rh_of_M(M) sig = self.sigma_tophat_M(M) dlnsig_dlnM = np.log(self.sigma_tophat_M(M*pfac)/self.sigma_tophat_M(M/pfac))/dlnM for iz in range(num_z): sigz = sig * D_list[iz] dndlnM[iz, i] = -ST_halo_f(sigz) * dlnsig_dlnM / ( np.pi*4./3.*Rh**3 ) w = dndlnM[iz, i] * self.dV_by_dzdS(z_list[iz]) totw += w bias += ST_halo_bias(sigz) * w average_bias = bias/totw return lnM_list, z_list, dndlnM, average_bias def num_massive_galaxy(self, z_min, z_max, sky_area, mass_cut, fstar_10 = 0.06, alphastar_10 = 0.5): #stellar mass_cut in unit M_sun/h; fstar_10 is the fraction of baryon in stars in 10^10 M_sun halo; fstar_10 * (M_halo /10^10 M_sun)^alphastar_10 is the fraction of baryon in stars in a halo with mass M_halo. # SF_efficiency = fstar_10 * (halo mass cut/self.hlittle/1.e10)**alphastar_10 M_min = (mass_cut / (self.Omega_b/self.Omega_m) / fstar_10 * 10.**(10.*alphastar_10))**(1./(1.+alphastar_10)) M_max = max(M_min * 10., 1.e10) lnM_list, z_list, dndlnM, ave_bias = self.get_dndlnM(M_min, M_max, z_min, z_max) nbar = 0. nz = len(z_list) nM = len(lnM_list) for iz in range(nz): tmp = dndlnM[iz, 0]/2. iM = 2 while(iM < nM): tmp += dndlnM[iz, iM-1]*2. + dndlnM[iz, iM] iM += 2 if(iM-1 < nM): tmp += dndlnM[iz, iM-1] nbar += tmp/1.5 * self.dV_by_dzdS(z_list[iz]) nbar *= (lnM_list[1]-lnM_list[0])*(z_list[1]-z_list[0])*sky_area*halo_filling_factor #now approximately estimate the cosmic variance cosmic_variance = np.sqrt(self.sigma2_pencil(x_angle = np.sqrt(sky_area), y_angle = np.sqrt(sky_area), z_min = z_min, z_max = z_max))*ave_bias*cosmic_variance_fudge_factor return nbar, cosmic_variance #average number n, sigma(n) due to cosmic variance def prob_to_see_obs(self, massive_galaxy_samples, z_min, z_max, sky_area, mass_cut, fstar_10 = 0.06, alphastar_10 = 0.5): nmax = len(massive_galaxy_samples)+1 p_is_in = [] logm_cut = np.log10(mass_cut/self.hlittle) #the astrophysical parameter M does not contain 1/h for hg in massive_galaxy_samples: p_is_in.append( hg.prob_in(z_min = z_min, z_max = z_max, logm_cut = logm_cut) ) p_tot_in = prob_combine(p_is_in) nbar, cosmic_variance = self.num_massive_galaxy(z_min, z_max, sky_area, mass_cut, fstar_10, alphastar_10) p_eq, p_greater = prob_eq_greater_Poisson(nbar, nbar*cosmic_variance, nmax) return sum(p_greater*p_tot_in), nbar, cosmic_variance, p_eq, p_greater, p_is_in, p_tot_in class flatLCDM_Cosmology(Cosmology): def __init__(self, Omega_m = Planck_Omega_m, hlittle = Planck_hlittle, Omega_b = Planck_Omega_b, sigma_8 = Planck_sigma_8, n_s = Planck_n_s): self.Omega_m = Omega_m self.hlittle = hlittle self.Omega_b = Omega_b self.sigma_8 = sigma_8 self.n_s = n_s super().__init__() super().__set_chi_D__() def Hsqofa(self, a): return self.Omega_Lambda + (self.Omega_m + self.Omega_r/a ) /a**3 def Dofa(self, a): return np.sqrt(self.Omega_m/a**3 + self.Omega_Lambda) * a**0.25 * ((10.+self.Omega_m)/(11.*self.Omega_m/a**3 +10.*self.Omega_Lambda))**0.75 def fofa(self, a): return (self.Omega_m/(self.Omega_Lambda*a**3 + self.Omega_m + self.Omega_r/a ))**0.55 def chiofz(self, z): return 2.0826/self.Omega_m**0.395*(1./(1.+0.47*self.Omega_m)**0.105 - 1./(1.+z)**0.185/(1.-self.Omega_m+1.47*self.Omega_m*(1.+z)**3)**0.105) class w0waCDM_Cosmology(Cosmology): ## rho_DE/rho_{DE0} as a function of a def rhoDE_ratio(self, a): return np.exp(-3.*((1.+self.w0+self.wa)*np.log(a)+self.wa*(1.-a))) def Hsqofa(self, a): return ((self.Omega_r/a + self.Omega_m)/a + self.Omega_k)/a**2 + self.Omega_Lambda*self.rhoDE_ratio(a) def dotHofa(self, a): return -2*self.Omega_r/a**4 - 1.5*self.Omega_m/a**3 - self.Omega_k/a**2 - 1.5*(1.+self.w0+self.wa*(1-a))*self.Omega_Lambda * self.rhoDE_ratio(a) def __init__(self, Omega_m = Planck_Omega_m, hlittle = Planck_hlittle, Omega_b = Planck_Omega_b, sigma_8 = Planck_sigma_8, n_s = Planck_n_s, w0 = -1., wa=0., Omega_k = 0.): self.Omega_k = Omega_k self.Omega_m = Omega_m self.hlittle = hlittle self.Omega_b = Omega_b self.sigma_8 = sigma_8 self.n_s = n_s self.w0 = w0 self.wa = wa super().__init__() n = 1024 #set initial conditions Hsq_ini = self.Hsqofa(self.amin) dotH_ini = self.dotHofa(self.amin) coef_1 = 4+dotH_ini/Hsq_ini coef_0 = 3+dotH_ini/Hsq_ini-1.5/Hsq_ini/self.amin**3*Omega_m eps = coef_0/(coef_1 - coef_0 - 1.) G_ini = 1.+ eps #this is arbitrary normalization; will renormalize in the end dGdN_ini = -eps nsteps = 500 lna = np.linspace(np.log(self.amin), 0., nsteps) a = np.exp(lna) H = np.array([ self.Hofa(aa) for aa in a ] ) Dbya = odeint(linear_growth_equation_var, [ G_ini, dGdN_ini ], lna, args = ( self, ) ) #now renormalize to D(z=0) = 1 D = Dbya[:, 0] / Dbya[nsteps-1, 0] * a #compute the growth rate f = d ln D/d ln a f = 1. + Dbya[:,1]/Dbya[:,0] #interpolate to get functions D(a), f(a) super().__set_chi_D__(nsteps = nsteps, a = a, H = H, D= D, f =f) class PAge_Cosmology(Cosmology): page = 0.9503 eta = 0.3583 def Hoft(self, t): #input H0*t, return H/H_0 return 1+(2./3.)*(1-self.eta*t/self.page)*(1./t - 1./self.page) def aoft(self, t): #input H0*t, return a return (t/self.page)**(2.0/3.0) * np.exp(self.eta/3.0 * ((t/self.page)**2 - 1) + (self.page-(2.0/3.0)*(1.0+self.eta))*(t/self.page-1.0) ) def __init__(self, page = 0.9503, eta = 0.3583, Omega_m = Planck_Omega_m, Omega_k = 0., hlittle = Planck_hlittle, Omega_b = Planck_Omega_b, sigma_8 = Planck_sigma_8, n_s = Planck_n_s): self.page = page self.eta = eta self.Omega_m = Omega_m self.Omega_k = Omega_k self.hlittle = hlittle self.Omega_b = Omega_b self.sigma_8 = sigma_8 self.n_s = n_s super().__init__() self.tmin = 2.e-2 #start from early time D_ini = 1. H_ini = self.Hoft(self.tmin) dotD_ini = H_ini*D_ini nsteps = 512 t = np.linspace(self.tmin, self.page, nsteps) a = np.array([ self.aoft(tau) for tau in t ]) self.tofa_interp = interp1d(a, t, kind="cubic", assume_sorted = True) a[nsteps-1] = 1. H = np.array([self.Hoft(tau) for tau in t]) D = odeint(linear_growth_equation, [ D_ini, dotD_ini ], t, args = ( self, ) ) super().__set_chi_D__(nsteps = nsteps, a = a, H = H, D= D[:, 0]/D[nsteps-1, 0], f = D[:, 1]/D[:, 0]/H ) def tofa(self, a): if(a < self.amin): return self.tmin * (a/self.amin)**1.5 else: return self.tofa_interp(a) def Hofa(self, a): return self.Hoft(self.tofa(a)) def Hsqofa(self, a): return self.Hofa(a)**2 #specify the parameters ( Omega_m, Omega_b, hlittle, Omega_k, sigma_8, n_s, w0, wa, page, eta ) when you initialize the cosmology object #default parameters are Planck18 bestfit cosmo = flatLCDM_Cosmology() #cosmo = flatLCDM_Cosmology(Omega_m = 0.3, hlittle = 0.7) #cosmo = PAge_Cosmology(page = 0.9503, eta = 0.3583, Omega_m = 0.3) #cosmo = w0waCDM_Cosmology(w0 = -1., wa = 1./3., Omega_m = 0.3, Omega_k = 0., sigma_8 = 0.8, n_s=0.96, Omega_b=Planck_Omega_b) mass_cut = 1.e10 * cosmo.hlittle sky_area = 38.*(np.pi/180/60)**2 z_min = 7. z_max = 10. #load massive galaxies; #original file from Labbe et al 2023 #galaxies = load_massive_galaxies("samples.txt", stat_only = False, sys_sampler = "norm", z_column = 3, m_column = 8) #some of galaxies in this file have been updated with recent spectroscopic obsevations galaxies = load_massive_galaxies("samples_with_zspec.txt", stat_only = False, sys_sampler = "tri", z_column = 3, m_column = 8) #sys_sampler use "none", "tri", "rect", or "norm" for Dirac delta, skew triangular, skew rectangular and skew normal #compute the probability of N_observed <= N_theory prob, nbar, cosmic_variance, p_eq, p_greater, p_is_in, p_tot_in = cosmo.prob_to_see_obs( massive_galaxy_samples = galaxies, z_min = z_min, z_max = z_max, sky_area = sky_area, mass_cut = mass_cut, fstar_10 = 0.1, alphastar_10 = 0.) print(' = ', nbar, ' x (1 +/- ', cosmic_variance, ')') print('P ( N_th = 0, 1, 2,...) = ', p_eq) #print('P ( N_th >= 0, 1, 2,...) = ', p_greater) #print('P ( gal0, gal1, gal2,... is in) = ', p_is_in) print('P ( N_obs = 0, 1, 2,...) = ', p_tot_in) print('P(N_obs <= N_theory) = ', prob, ", equivalent to a Gaussian ", -norm.ppf(prob), "sigma tail" )