Source code for TheCannon.dataset

from __future__ import (absolute_import, division, print_function)
import numpy as np
import sys
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
from matplotlib import rc
rc('text', usetex=True)
rc('font', family='serif')
from .helpers.corner import corner
from .helpers import Table
from .find_continuum_pixels import * 
from .normalization import \
from .find_continuum_pixels import _find_contpix,_find_contpix_regions
from multiprocessing import cpu_count
from import fits
from astropy.table import Table

n_cpus = cpu_count()

PY3 = sys.version_info[0] > 2

if PY3:
    basestring = (str, bytes)
    basestring = (str, unicode)

[docs]class Dataset(object): """ A class to represent Cannon input: a dataset of spectra and labels """ def __init__(self, wl, tr_ID, tr_flux, tr_ivar, tr_label, test_ID, test_flux, test_ivar): """ Initiate a Dataset object Parameters ---------- wl: grid of wavelength values, onto which all spectra are mapped tr_ID: array of IDs of training objects tr_flux: array of flux values for training objects, [nobj, npix] tr_ivar: array [nobj, npix] of inverse variance values for training objects tr_label: array [nobj, nlabel] test_ID: array [nobj[ of IDs of test objects test_flux: array [nobj, npix] of flux values for test objects test_ivar: array [nobj, npix] of inverse variance values for test objects """ print("Loading dataset") print("This may take a while...") self.wl = wl self.tr_ID = tr_ID self.tr_flux = tr_flux self.tr_ivar = tr_ivar self.tr_label = tr_label self.test_ID = test_ID self.test_flux = test_flux self.test_ivar = test_ivar self._label_names = None self.ranges = None # calculate SNR self.tr_SNR = np.array( [self._SNR(*s) for s in zip(tr_flux, tr_ivar)]) self.test_SNR = np.array( [self._SNR(*s) for s in zip(test_flux, test_ivar)]) def _SNR(self, flux, ivar): """ Calculate the SNR of a spectrum, ignoring bad pixels Parameters ---------- flux: numpy ndarray pixel intensities ivar: numpy ndarray inverse variances corresponding to flux Returns ------- SNR: float """ take = ivar > 0 SNR = float(np.median(flux[take]*(ivar[take]**0.5))) return SNR
[docs] def set_label_names(self, names): """ Set the label names for plotting Parameters ---------- names: ndarray or list The names of the labels used for plotting, ex. in LaTeX syntax """ self._label_names = names
[docs] def get_plotting_labels(self): """ Return the label names used make plots Returns ------- label_names: ndarray The label names """ if self._label_names is None: print("No label names yet!") return None else: return self._label_names
[docs] def bin_flux(flux, ivar): """ bin two neighboring flux values """ if np.sum(ivar)==0: return np.sum(flux)/2. return np.average(flux, weights=ivar)
[docs] def smooth_spectrum(wl, flux, ivar): """ Bins down one spectrum Parameters ---------- wl: numpy ndarray wavelengths flux: numpy ndarray flux values ivar: numpy ndarray inverse variances associated with fluxes Returns ------- wl: numpy ndarray updated binned pixel wavelengths flux: numpy ndarray updated binned flux values ivar: numpy ndarray updated binned inverse variances """ # if odd, discard the last point if len(wl)%2 == 1: wl = np.delete(wl, -1) flux = np.delete(flux, -1) ivar = np.delete(ivar, -1) wl = wl.reshape(-1,2) ivar = ivar.reshape(-1,2) flux = flux.reshape(-1,2) wl_binned = np.mean(wl, axis=1) ivar_binned = np.sqrt(np.sum(ivar**2, axis=1)) flux_binned = np.array([bin_flux(f,w) for f,w in zip(flux, ivar)]) return wl_binned, flux_binned, ivar_binned
[docs] def smooth_spectra(wl, fluxes, ivars): """ Bins down a block of spectra """ output = np.asarray( [smooth_spectrum(wl, flux, ivar) for flux,ivar in zip(fluxes, ivars)]) return output
[docs] def smooth_dataset(self): """ Bins down all of the spectra and updates the dataset """ output = smooth_spectra(self.wl, self.tr_flux, self.tr_ivar) self.wl = output[:,0,:] self.tr_flux = output[:,1,:] self.tr_ivar = output[:,2,:] output = smooth_spectra(self.wl, self.test_flux, self.test_ivar) self.test_flux = output[:,1,:] self.test_ivar = output[:,2,:]
[docs] def diagnostics_SNR(self): """ Plots SNR distributions of ref and test object spectra """ print("Diagnostic for SNRs of reference and survey objects") fig = plt.figure() data = self.test_SNR plt.hist(data, bins=int(np.sqrt(len(data))), alpha=0.5, facecolor='r', label="Survey Objects") data = self.tr_SNR plt.hist(data, bins=int(np.sqrt(len(data))), alpha=0.5, color='b', label="Ref Objects") plt.legend(loc='upper right') #plt.xscale('log') plt.title("SNR Comparison Between Reference and Survey Objects") #plt.xlabel("log(Formal SNR)") plt.xlabel("Formal SNR") plt.ylabel("Number of Objects") return fig
[docs] def diagnostics_ref_labels(self): """ Plots all training labels against each other """ return self._label_triangle_plot(self.tr_label)
def _label_triangle_plot(self, label_vals): """Make a triangle plot for the selected labels Parameters ---------- label_vals: numpy ndarray values of the labels """ labels = [r"$%s$" % l for l in self.get_plotting_labels()] print("Plotting every label against every other") fig = corner(label_vals, labels=labels, show_titles=True, title_args={"fontsize":12}) return fig
[docs] def make_contmask(self, fluxes, ivars, frac): """ Identify continuum pixels using training spectra Does this for each region of the spectrum if dataset.ranges is not None Parameters ---------- fluxes: ndarray Flux data values ivars: ndarray Inverse variances corresponding to flux data values frac: float The fraction of pixels that should be identified as continuum Returns ------- contmask: ndarray Mask with True indicating that the pixel is continuum """ print("Finding continuum pixels...") if self.ranges is None: print("assuming continuous spectra") contmask = _find_contpix(self.wl, fluxes, ivars, frac) else: print("taking spectra in %s regions" %len(self.ranges)) contmask = _find_contpix_regions( self.wl, fluxes, ivars, frac, self.ranges) print("%s pixels returned as continuum" %sum(contmask)) return contmask
[docs] def set_continuum(self, contmask): """ Set the contmask attribute Parameters ---------- contmask: ndarray Mask with True indicating that the pixel is continuum """ self.contmask = contmask
[docs] def fit_continuum(self, deg, ffunc, n_proc=1): """ Fit a continuum to the continuum pixels Parameters ---------- deg: int Degree of the fitting function ffunc: str Type of fitting function, 'sinusoid' or 'chebyshev' Returns ------- tr_cont: ndarray Flux values corresponding to the fitted continuum of training objects test_cont: ndarray Flux values corresponding to the fitted continuum of test objects """ print("Fitting Continuum...") if self.ranges == None: tr_cont = _find_cont_fitfunc( self.tr_flux, self.tr_ivar, self.contmask, deg, ffunc, n_proc=n_proc) test_cont = _find_cont_fitfunc( self.test_flux, self.test_ivar, self.contmask, deg, ffunc, n_proc=n_proc) else: print("Fitting Continuum in %s Regions..." %len(self.ranges)) tr_cont = _find_cont_fitfunc_regions( self.tr_flux, self.tr_ivar, self.contmask, deg, self.ranges, ffunc, n_proc=n_proc) test_cont = _find_cont_fitfunc_regions( self.test_flux, self.test_ivar, self.contmask, deg, self.ranges, ffunc, n_proc=n_proc) return tr_cont, test_cont
[docs] def continuum_normalize_training_q(self, q, delta_lambda, n_proc=1, verbose=True): """ Continuum normalize the training set using a running quantile Parameters ---------- q: float The quantile cut delta_lambda: float The width of the pixel range used to calculate the median Modified by: [08 Jun 2016] Bo Zhang (NAOC): add multiprocessing option Note: Multiprocessing calls for more memory on computer. """ # don't use too many process assert n_proc >= 1 and n_proc <= 100*n_cpus if n_proc > n_cpus: print('@Bo Zhang: n_proc is larger than your numbers of physical CPUs!') print("Continuum normalizing the tr set using running quantile...") if n_proc == 1: # use old version (single process) print('##########################################################') print('@Bo Zhang: you will use only 1 process ...') print(' i.e., the original TheCannon version') print('##########################################################') if self.ranges is None: return _cont_norm_running_quantile( self.wl, self.tr_flux, self.tr_ivar, q=q, delta_lambda=delta_lambda, verbose=verbose) else: return _cont_norm_running_quantile_regions( self.wl, self.tr_flux, self.tr_ivar, q=q, delta_lambda=delta_lambda, ranges=self.ranges, verbose=verbose) else: # use new version (multi process) print('##########################################################') print('@Bo Zhang: you will use ** %d ** processes ... ' % n_proc) print('Note: Multiprocessing calls for more memory on computer!') print('##########################################################') if self.ranges is None: return _cont_norm_running_quantile_mp( self.wl, self.tr_flux, self.tr_ivar, q=q, delta_lambda=delta_lambda, n_proc=n_proc, verbose=verbose) else: return _cont_norm_running_quantile_regions_mp( self.wl, self.tr_flux, self.tr_ivar, q=q, delta_lambda=delta_lambda, ranges=self.ranges, n_proc=n_proc, verbose=verbose)
[docs] def continuum_normalize(self, cont): """ Continuum normalize spectra, in chunks if spectrum has regions Parameters ---------- cont: ndarray Flux values corresponding to the continuum Returns ------- norm_tr_flux: ndarray Normalized flux values for the training objects norm_tr_ivar: ndarray Rescaled inverse variance values for the training objects norm_test_flux: ndarray Normalized flux values for the test objects norm_test_ivar: numpy ndarray Rescaled inverse variance values for the test objects """ tr_cont, test_cont = cont if self.ranges is None: print("assuming continuous spectra") norm_tr_flux, norm_tr_ivar = _cont_norm( self.tr_flux, self.tr_ivar, tr_cont) norm_test_flux, norm_test_ivar = _cont_norm( self.test_flux, self.test_ivar, test_cont) else: print("taking spectra in %s regions" %(len(self.ranges))) norm_tr_flux, norm_tr_ivar = _cont_norm_regions( self.tr_flux, self.tr_ivar, tr_cont, self.ranges) norm_test_flux, norm_test_ivar = _cont_norm_regions( self.test_flux, self.test_ivar, test_cont, self.ranges) return norm_tr_flux, norm_tr_ivar, norm_test_flux, norm_test_ivar
[docs] def continuum_normalize_gaussian_smoothing(self, L): """ Continuum normalize using a Gaussian-weighted smoothed spectrum Parameters ---------- dataset: Dataset the dataset to continuum normalize L: float the width of the Gaussian used for weighting """ norm_tr_flux, norm_tr_ivar, norm_test_flux, norm_test_ivar = \ _cont_norm_gaussian_smooth(self, L) self.tr_flux = norm_tr_flux self.tr_ivar = norm_tr_ivar self.test_flux = norm_test_flux self.test_ivar = norm_test_ivar
[docs] def diagnostics_test_step_flagstars(self): """ Write files listing stars whose inferred labels lie outside 2 standard deviations from the reference label space """ label_names = self.get_plotting_labels() nlabels = len(label_names) reference_labels = self.tr_label test_labels = self.test_label_vals test_IDs = np.array(self.test_ID) mean = np.mean(reference_labels, 0) stdev = np.std(reference_labels, 0) lower = mean - 2 * stdev upper = mean + 2 * stdev for i in range(nlabels): label_name = label_names[i] test_vals = test_labels[:,i] warning = np.logical_or(test_vals < lower[i], test_vals > upper[i]) filename = "flagged_stars_%s.txt" % i with open(filename, 'w') as output: for star in test_IDs[warning]: output.write('{0:s}\n'.format(star)) print("Reference label %s" % label_name) print("flagged %s stars beyond 2-sig of ref labels" % sum(warning)) print("Saved list %s" % filename)
[docs] def diagnostics_survey_labels(self): """ Plot all survey labels against each other """ fig = self._label_triangle_plot(self.test_label_vals)
[docs] def diagnostics_1to1(self, figname="1to1_label"): """ Plots survey labels vs. training labels, color-coded by survey SNR """ snr = self.test_SNR label_names = self.get_plotting_labels() nlabels = len(label_names) reference_labels = self.tr_label test_labels = self.test_label_vals for i in range(nlabels): name = label_names[i] orig = reference_labels[:,i] cannon = test_labels[:,i] # calculate bias and scatter scatter = np.round(np.std(orig-cannon),5) bias = np.round(np.mean(orig-cannon),5) low = np.minimum(min(orig), min(cannon)) high = np.maximum(max(orig), max(cannon)) fig = plt.figure(figsize=(10,6)) gs = gridspec.GridSpec(1,2,width_ratios=[2,1], wspace=0.3) ax1 = plt.subplot(gs[0]) ax2 = plt.subplot(gs[1]) ax1.plot([low, high], [low, high], 'k-', linewidth=2.0, label="x=y") ax1.set_xlim(low, high) ax1.set_ylim(low, high) ax1.legend(fontsize=14, loc='lower right') pl = ax1.scatter(orig, cannon, marker='x', c=snr, vmin=50, vmax=200, alpha=0.7) cb = plt.colorbar(pl, ax=ax1, orientation='horizontal') cb.set_label('SNR from Test Set', fontsize=12) textstr = 'Scatter: %s \nBias: %s' %(scatter, bias) ax1.text(0.05, 0.95, textstr, transform=ax1.transAxes, fontsize=14, verticalalignment='top') ax1.tick_params(axis='x', labelsize=14) ax1.tick_params(axis='y', labelsize=14) ax1.set_xlabel("Reference Value", fontsize=14) ax1.set_ylabel("Cannon Test Value", fontsize=14) ax1.set_title("1-1 Plot of Label " + r"$%s$" % name) diff = cannon-orig npoints = len(diff) mu = np.mean(diff) sig = np.std(diff) #ax2.hist(diff, orientation='horizontal') ax2.hist(diff, range=[-3*sig,3*sig], color='k', bins=int(np.sqrt(npoints)), orientation='horizontal', alpha=0.3, histtype='stepfilled') ax2.tick_params(axis='x', labelsize=14) ax2.tick_params(axis='y', labelsize=14) ax2.set_xlabel("Count", fontsize=14) ax2.set_ylabel("Difference", fontsize=14) ax2.axhline(y=0, c='k', lw=3, label='Difference=0') ax2.set_title("Training Versus Test Labels for $%s$" %name, fontsize=14) ax2.legend(fontsize=14) figname_full = "%s_%s.png" %(figname, i) plt.savefig(figname_full) print("Diagnostic for label output vs. input") print("Saved fig %s" % figname_full) plt.close()
[docs] def set_test_label_vals(self, vals): """ Set test label values Parameters ---------- vals: ndarray Test label values """ self.test_label_vals = vals
[docs] def diagnostics_best_fit_spectra(self, model): """ Plot results of best-fit spectra for ten random test objects """ overlay_spectra(model, self)
def write_to_fits(self, filepath, **kwargs): hl = convert_to_hdulist(self) print('@Bo Zhang: writting to fits [%s] ...' % filepath) hl.writeto(filepath, **kwargs) print('@Bo Zhang: ---------------------------------------------------')
def convert_to_hdulist(ds): """ transform TheCannon dataset into fits HDU list """ print('@Bo Zhang: ---------------------------------------------------') print('@Bo Zhang: transforming TheCannon data cubes into HDU list ...') print('@Bo Zhang: ---------------------------------------------------') # initialize data list data_list = [ds.wl, ds.ranges, ds.contmask*1, ds.tr_ID, ds.tr_SNR, ds.tr_flux, ds.tr_ivar, ds.tr_label, ds.test_ID, ds.test_SNR, ds.test_flux, ds.test_ivar] name_list = ['wl', 'ranges', 'contmask', 'tr_ID', 'tr_SNR', 'tr_flux', 'tr_ivar', 'tr_label', 'test_ID', 'test_SNR', 'test_flux', 'test_ivar'] hdu_format_list_rw = ['image', 'image', 'image', 'table', 'image', 'image', 'image', 'image', 'table', 'image', 'image', 'image'] # construct Primary header header = fits.Header() header['author'] = 'Bo Zhang (@NAOC)' header['version'] = 'v1.0' header['last_modified'] = 'Sat 11 Jun 2016' header['data'] = 'TheCannon dataset object' # initialize HDU list print('@Bo Zhang: initializing the HDU list ...') hl = [fits.hdu.PrimaryHDU(header=header)] # construct HDU list assert len(data_list) == len(name_list) n_hdus = len(data_list) for i in xrange(n_hdus): print('@Bo Zhang: transforming HDU [%d/%d]: %s ...' % (i+1, n_hdus, name_list[i])) if hdu_format_list_rw[i] == 'table': data = Table(data=data_list[i].reshape((-1, 1)), names=[name_list[i]]) hl.append(fits.BinTableHDU(data.as_array())) else: hl.append(fits.ImageHDU(data=np.array(data_list[i]), name=name_list[i])) print('@Bo Zhang: ---------------------------------------------------') return fits.HDUList(hl)