import astropy.constants as const
import astropy.units as u
from astropy.units import Quantity
import numpy as np
__all__ = [
'create_spectral_array',
'convert_spectral_resolution',
'obs_to_rest',
'rest_to_obs',
'obs_to_vel',
'vel_to_obs',
'rest_to_vel',
'vel_to_rest',
'measure_snr_peak',
'measure_snr_profile_fit',
'measure_snr_profile_observed',
'estimate_rms_profile_fit',
'estimate_rms_peak',
]
[docs]
def create_spectral_array(nchan, cdelt, crpix, crval):
"""
Create a spectral axis array.
Parameters
----------
nchan : `~int`
Number of channels in the spectrum.
cdelt : `~float` or `~astropy.units.Quantity`
Width of a channel.
crpix : `~float`
Reference channel of the spectrum.
crval : `~float`
Value of the reference channel.
Returns
-------
spectral_array : `~numpy.ndarray` or `~astropy.units.Quantity`
Returned spectral axis array with units if cdelt is a quantity.
"""
# if cdelt has units
if isinstance(cdelt, Quantity):
first_chan = crval - (cdelt.value * crpix)
last_chan = crval + (cdelt.value * (nchan-1 - crpix))
spectral_array = np.linspace(first_chan, last_chan, nchan) * cdelt.unit
# if cdelt is adimensional
else:
first_chan = crval - (cdelt * crpix)
last_chan = crval + (cdelt * (nchan-1 - crpix))
spectral_array = np.linspace(first_chan, last_chan, nchan)
return spectral_array
[docs]
def convert_spectral_resolution(spectral_res, rest_freq):
"""
Convert a channel width between velocity and frequency.
Parameters
----------
spectral_res : `~astropy.units.Quantity`
Width of channel in velocity or frequency units.
rest_freq : `~astropy.units.Quantity`
Rest frequency of the spectrum.
Returns
-------
`~astropy.units.Quantity`
Spectral rest converted to the other units.
"""
if not isinstance(spectral_res, u.Quantity):
raise TypeError("spectral_res must be an astropy Quantity,"
+ f"not {type(spectral_res)}")
if spectral_res.unit.is_equivalent(u.Hz):
spectral_res_vel = const.c * (spectral_res / rest_freq.to(spectral_res.unit))
return spectral_res_vel.to(u.km / u.s)
elif spectral_res.unit.is_equivalent(u.m / u.s):
spectral_res_freq = rest_freq * (spectral_res / const.c)
return spectral_res_freq.to(u.Hz)
else:
raise u.UnitConversionError(
f"Unit '{spectral_res.unit}' is not a valid spectral resolution unit. "
"Must be equivalent to Hz or km/s."
)
[docs]
def obs_to_rest(obs_freq, radial_velocity, doppler_convention="radio"):
"""
Convert observed frequency to rest frequency.
Parameters
----------
obs_freq : `~astropy.units.Quantity`
Observed frequency.
radial_velocity : `~astropy.units.Quantity`
Radial velocity of the source relative to the observer.
doppler_convention : {"radio", "relativistic"}, default: "radio"
The Doppler convention to use.
Returns
-------
rest_freq : `~astropy.units.Quantity`
Rest frequency.
"""
if not isinstance(obs_freq, u.Quantity):
raise TypeError("obs_freq must be an astropy Quantity.")
if not isinstance(radial_velocity, u.Quantity):
raise TypeError("vel must be an astropy Quantity.")
if doppler_convention == "radio":
rest_freq = _obs_to_rest_radio(obs_freq, radial_velocity)
elif doppler_convention == "relativistic":
rest_freq = _obs_to_rest_rel(obs_freq, radial_velocity)
else:
raise ValueError(
"Doppler convention can only be radio or relativistic.")
return rest_freq
[docs]
def rest_to_obs(rest_freq, radial_velocity, doppler_convention="radio"):
"""
Convert rest frequency to observed frequency.
Parameters
----------
rest_freq : `~astropy.units.Quantity`
Rest frequency.
radial_velocity : `~astropy.units.Quantity`
Radial velocity of the source relative to the observer.
doppler_convention : {"radio", "relativistic"}, default: "radio"
The Doppler convention to use.
Returns
-------
obs_freq : `~astropy.units.Quantity`
Observed frequency.
"""
if not isinstance(rest_freq, u.Quantity):
raise TypeError("rest_freq must be an astropy Quantity.")
if not isinstance(radial_velocity, u.Quantity):
raise TypeError("vel must be an astropy Quantity.")
if doppler_convention == "radio":
obs_freq = _rest_to_obs_radio(rest_freq, radial_velocity)
elif doppler_convention == "relativistic":
obs_freq = _rest_to_obs_rel(rest_freq, radial_velocity)
else:
raise ValueError(
"Doppler convention can only be radio or relativistic.")
return obs_freq
[docs]
def obs_to_vel(obs_freq, doppler_rest, doppler_convention="radio"):
"""
Convert observed frequency to velocity.
Parameters
----------
obs_freq : `~astropy.units.Quantity`
Observed frequency.
doppler_rest : `~astropy.units.Quantity`
Rest frequency of the line.
doppler_convention : {"radio", "relativistic"}, default: "radio"
The Doppler convention to use.
Returns
-------
vel : `~astropy.units.Quantity`
Velocity.
"""
if not isinstance(obs_freq, u.Quantity):
raise TypeError("obs_freq must be an astropy Quantity.")
if not isinstance(doppler_rest, u.Quantity):
raise TypeError("doppler_rest must be an astropy Quantity.")
if doppler_convention == "radio":
vel = _obs_to_vel_radio(obs_freq, doppler_rest)
elif doppler_convention == "relativistic":
vel = _obs_to_vel_rel(obs_freq, doppler_rest)
else:
raise ValueError(
"Doppler convention can only be radio or relativistic.")
return vel
[docs]
def vel_to_obs(vel, doppler_rest, doppler_convention="radio"):
"""
Convert velocity to observed frequency.
Parameters
----------
vel : `~astropy.units.Quantity`
Velocity.
doppler_rest : `~astropy.units.Quantity`
Rest frequency of the line.
doppler_convention : {"radio", "relativistic"}, default: "radio"
The Doppler convention to use.
Returns
-------
obs_freq : `~astropy.units.Quantity`
Observed frequency.
"""
if not isinstance(vel, u.Quantity):
raise TypeError("vel must be an astropy Quantity.")
if not isinstance(doppler_rest, u.Quantity):
raise TypeError("doppler_rest must be an astropy Quantity.")
if doppler_convention == "radio":
obs_freq = _vel_to_obs_radio(vel, doppler_rest)
elif doppler_convention == "relativistic":
obs_freq = _vel_to_obs_rel(vel, doppler_rest)
else:
raise ValueError(
"Doppler convention can only be radio or relativistic.")
return obs_freq
[docs]
def rest_to_vel(rest_freq, radial_velocity, doppler_rest, doppler_convention="radio"):
"""
Convert rest frequency to velocity.
Parameters
----------
rest_freq : `~astropy.units.Quantity`
Observed frequency.
radial_velocity : `~astropy.units.Quantity`
Radial velocity of the source relative to the observer.
doppler_rest : `~astropy.units.Quantity`
Rest frequency of the line.
doppler_convention : {"radio", "relativistic"}, default: "radio"
The Doppler convention to use.
Returns
-------
vel : `~astropy.units.Quantity`
Velocity.
"""
if not isinstance(rest_freq, u.Quantity):
raise TypeError("rest_freq must be an astropy Quantity.")
if not isinstance(radial_velocity, u.Quantity):
raise TypeError("radial_velocity must be an astropy Quantity.")
if not isinstance(doppler_rest, u.Quantity):
raise TypeError("doppler_rest must be an astropy Quantity.")
if doppler_convention not in ["radio", "relativistic"]:
raise ValueError(
"Doppler convention can only be radio or relativistic.")
obs_freq = rest_to_obs(rest_freq, radial_velocity, doppler_convention)
vel = obs_to_vel(obs_freq, doppler_rest, doppler_convention)
return vel
[docs]
def vel_to_rest(vel, radial_velocity, doppler_rest, doppler_convention="radio"):
"""
Convert velocty to rest frequency.
Parameters
----------
vel : `~astropy.units.Quantity`
Velocity.
radial_velocity : `~astropy.units.Quantity`
Radial velocity of the source relative to the observer.
doppler_rest : `~astropy.units.Quantity`
Rest frequency of the line.
doppler_convention : {"radio", "relativistic"}, default: "radio"
The Doppler convention to use.
Returns
-------
rest_freq : `~astropy.units.Quantity`
Rest frequency.
"""
if not isinstance(vel, u.Quantity):
raise TypeError("vel must be an astropy Quantity.")
if not isinstance(radial_velocity, u.Quantity):
raise TypeError("radial_velocity must be an astropy Quantity.")
if not isinstance(doppler_rest, u.Quantity):
raise TypeError("doppler_rest must be an astropy Quantity.")
if doppler_convention not in ["radio", "relativistic"]:
raise ValueError(
"Doppler convention can only be radio or relativistic.")
obs_freq = vel_to_obs(vel, doppler_rest, doppler_convention)
rest_freq = obs_to_rest(obs_freq, radial_velocity, doppler_convention)
return rest_freq
def _obs_to_rest_radio(obs_freq, vel):
return obs_freq / (1 - vel / const.c)
def _obs_to_rest_rel(obs_freq, vel):
beta = vel / const.c
return obs_freq * np.sqrt((1 + beta) / (1 - beta))
def _rest_to_obs_radio(rest_freq, vel):
return rest_freq * (1 - vel / const.c)
def _rest_to_obs_rel(rest_freq, vel):
beta = vel / const.c
return rest_freq * ((1 - beta) / (1 + beta))**0.5
def _obs_to_vel_radio(obs_freq, rest_freq):
return const.c * ((rest_freq - obs_freq) / rest_freq)
def _obs_to_vel_rel(obs_freq, rest_freq):
R = obs_freq / rest_freq
beta = (1 - R**2) / (1 + R**2)
return beta * const.c
def _vel_to_obs_radio(vel, rest_freq):
return rest_freq * (1 - vel / const.c)
def _vel_to_obs_rel(vel, rest_freq):
beta = (vel / const.c)
return rest_freq * ((1 - beta) / (1 + beta))**0.5
[docs]
def measure_snr_peak(peak_int, rms):
"""Measure the peak SNR of a line by providing peak intensity and rms"""
if (not isinstance(peak_int, u.Quantity)
and not isinstance (peak_int, int)
and not isinstance (peak_int, float)):
raise TypeError("'peak_int' must be a float or astropy Quantity.")
if (not isinstance(rms, u.Quantity)
and not isinstance (rms, int)
and not isinstance (rms, float)):
raise TypeError("'rms' must be a float or astropy Quantity.")
return peak_int / rms
[docs]
def measure_snr_profile_fit(area, fwhm, dv, rms):
"""
Measure the SNR of a line by integrating the emission on the fitted
gaussian profile
Parameters
----------
area : `float` or `~astropy.units.Quantity`
Area of the fitted gaussian line.
fwhm : `float` or `~astropy.units.Quantity`
FWHM of the fitted gaussian line.
dv : `float` or `~astropy.units.Quantity`
Channel width.
rms : `float` or `~astropy.units.Quantity`
Measured rms on the spectral data.
Returns
-------
snr : `float`
Measured integrated SNR.
"""
if (not isinstance(area, u.Quantity)
and not isinstance (area, int)
and not isinstance (area, float)):
raise TypeError("'area' must be a float or astropy Quantity.")
if (not isinstance(dv, u.Quantity)
and not isinstance (dv, int)
and not isinstance (dv, float)):
raise TypeError("'dv' must be a float or astropy Quantity.")
if (not isinstance(fwhm, u.Quantity)
and not isinstance (fwhm, int)
and not isinstance (fwhm, float)):
raise TypeError("'fwhm' must be a float or astropy Quantity.")
if (not isinstance(rms, u.Quantity)
and not isinstance (rms, int)
and not isinstance (rms, float)):
raise TypeError("'rms' must be a float or astropy Quantity.")
sigma_area = rms * dv * np.sqrt(fwhm/dv)
snr = area / sigma_area
return snr
[docs]
def measure_snr_profile_observed(x_array, y_array, v0, fwhm, dv, rms,
window_sigma_factor=3,
window_selection_method="fractional"):
"""
Measure the SNR of a line by integrating the emission on the observed
channels.
Note: Needs the velocity and width of a fitted gaussian line on the
observed line profile to select the channels.
Parameters
----------
x_array : `~np.array` or `~astropy.units.Quantity`
X axis array of the observed spectrum in velocity units.
y_array : `~np.array` or `~astropy.units.Quantity`
Y axis array of the observed spectrum.
v0 : `float` or `~astropy.units.Quantity`
Central velocity of the fitted gaussian line.
fwhm : `float` or `~astropy.units.Quantity`
FWHM of the fitted gaussian line.
dv : `float` or `~astropy.units.Quantity`
Channel width.
rms : `float` or `~astropy.units.Quantity`
Measured rms on the spectral data.
window_sigma_factor : `float`, default: 3
Number of sigma deviation to be used for each side of the gaussian
center as the selected window.
window_selection_method : {"fractional", "binary"}, default: "fractional"
Method for handling channels at the edge of the integration window:
- 'fractional' (Recommended): Weights boundary channels by the fraction
of their width that falls inside the selected window.
- 'binary': Boolean masking; includes the full flux of any channel
whose center is within the window boundaries.
Returns
-------
snr : `float`
Measured integrated SNR.
"""
if (not isinstance(x_array, u.Quantity)
and not isinstance (x_array, np.ndarray)):
raise TypeError("'x_array' must be a numpy array or astropy Quantity.")
if (not isinstance(y_array, u.Quantity)
and not isinstance (y_array, np.ndarray)):
raise TypeError("'y_array' must be a numpy array or astropy Quantity.")
if (not isinstance(v0, u.Quantity)
and not isinstance (v0, int)
and not isinstance (v0, float)):
raise TypeError("'area' must be a float or astropy Quantity.")
if (not isinstance(fwhm, u.Quantity)
and not isinstance (fwhm, int)
and not isinstance (fwhm, float)):
raise TypeError("'fwhm' must be a float or astropy Quantity.")
if (not isinstance(dv, u.Quantity)
and not isinstance (dv, int)
and not isinstance (dv, float)):
raise TypeError("'dv' must be a float or astropy Quantity.")
if (not isinstance(rms, u.Quantity)
and not isinstance (rms, int)
and not isinstance (rms, float)):
raise TypeError("'rms' must be a float or astropy Quantity.")
sigma_v = fwhm / (2 * np.sqrt(2 * np.log(2)))
if window_selection_method == "binary":
mask_hard = ((x_array >= v0 - window_sigma_factor*sigma_v) &
(x_array <= v0 + window_sigma_factor*sigma_v))
# sum of the flux in each channel multiplied by the channel width
area_obs_hard = np.sum(y_array[mask_hard]) * dv
N_eff_hard = mask_hard.sum()
sigma_area_hard = rms * dv * np.sqrt(N_eff_hard)
snr = area_obs_hard / sigma_area_hard
return snr
elif window_selection_method == "fractional":
vmin = v0 - window_sigma_factor*sigma_v
vmax = v0 + window_sigma_factor*sigma_v
v_left = x_array - dv/2
v_right = x_array + dv/2
# Create a mask with overlapping channels with contributions on each
# channel. 0 to 1. Channels that are only partially on the window have
# a number between 0 and 1. The ones entirely in the window are 1, the
# ones outside are 0.
f_overlap = np.clip((np.minimum(v_right, vmax)
- np.maximum(v_left, vmin))/dv, 0, 1)
area_obs_frac = np.sum(y_array * f_overlap * dv)
N_eff_frac = np.sum(f_overlap)
sigma_area_frac = rms * dv * np.sqrt(N_eff_frac)
snr = area_obs_frac / sigma_area_frac
return snr
else:
raise TypeError("'window_selection_method' must be 'fractional' or 'binary'.")
[docs]
def estimate_rms_profile_fit(area, fwhm, dv, snr):
"""
Measure the SNR of a line by integrating the emission on the fitted
gaussian profile
Parameters
----------
area : `float` or `~astropy.units.Quantity`
Area of the fitted gaussian line.
fwhm : `float` or `~astropy.units.Quantity`
FWHM of the fitted gaussian line.
dv : `float` or `~astropy.units.Quantity`
Channel width.
snr : `float` or `~astropy.units.Quantity`
Target SNR.
Returns
-------
rms : `float` or `~astropy.units.Quantity`
Estimated rms for the target SNR.
"""
sigma_area = area / snr
rms = sigma_area / (dv * np.sqrt(fwhm/dv))
return rms
[docs]
def estimate_rms_peak(peak_int, snr):
"""Estimate the rms for a given peak intensity and desired SNR of a line"""
rms = peak_int / snr
return rms