#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Fix the correlation rotation of ASKAP MSs.
Converts the ASKAP standard correlations to the 'standard' correlations
This will make them compatible with most imagers (e.g. wsclean, CASA)
The new correlations are placed in a new column called 'CORRECTED_DATA'
"""
[docs]
__author__ = ["Alec Thomson"]
import asyncio
import logging
from pathlib import Path
from typing import AsyncGenerator, Awaitable, Optional
import astropy.units as u
import numpy as np
from casacore.tables import makecoldesc, table
from tqdm.auto import tqdm
from fixms.logger import TqdmToLogger, logger
[docs]
TQDM_OUT = TqdmToLogger(logger, level=logging.INFO)
logger.setLevel(logging.INFO)
# Stolen from https://stackoverflow.com/a/61478547
[docs]
async def gather_with_limit(limit: int, *coros: Awaitable):
"""Gather with a limit on the number of coroutines running at once.
Args:
limit (int): The number of coroutines to run at once
coros (Awaitable): The coroutines to run
Returns:
Awaitable: The result of the coroutines
"""
semaphore = asyncio.Semaphore(limit)
async def sem_coro(coro: Awaitable):
async with semaphore:
return await coro
return await asyncio.gather(*(sem_coro(c) for c in coros))
[docs]
async def set_pol_axis_coro(
ms: Path, pol_ang: u.Quantity, feed_idx: Optional[int] = None
) -> Awaitable:
with table((ms / "FEED").as_posix(), readonly=True, ack=False) as tf:
ms_feed = await asyncio.to_thread(tf.getcol, "RECEPTOR_ANGLE")
ms_feed = ms_feed * u.rad
# PAF is at 45deg to feeds
# 45 - feed_angle = pol_angle
pol_axes = -(ms_feed - 45.0 * u.deg)
# Backup the original RECEPTOR_ANGLE to INSTRUMENT_RECEPTOR_ANGLE
with table((ms / "FEED").as_posix(), readonly=False, ack=False) as tf:
colnames = tf.colnames()
# Only backup if the column does not exist
if "INSTRUMENT_RECEPTOR_ANGLE" not in colnames:
coldesc = makecoldesc(
"INSTRUMENT_RECEPTOR_ANGLE", tf.getcoldesc("RECEPTOR_ANGLE")
)
await asyncio.to_thread(tf.addcols, coldesc)
await asyncio.to_thread(
tf.putcol, "INSTRUMENT_RECEPTOR_ANGLE", ms_feed.to(u.rad).value
)
await asyncio.to_thread(tf.flush)
logger.info(
"Backed up the original RECEPTOR_ANGLE to INSTRUMENT_RECEPTOR_ANGLE"
)
if feed_idx is None:
assert (ms_feed[:, 0] == ms_feed[0, 0]).all() & (
ms_feed[:, 1] == ms_feed[0, 1]
).all(), "The RECEPTOR_ANGLE changes with time, please check the MS"
old_pol_ang = pol_axes[0, 0].to(u.deg)
else:
logger.debug(f"Extracting the third-axis orientation for {feed_idx=}")
old_pol_ang = pol_axes[feed_idx, 0].to(u.deg)
assert old_pol_ang == pol_ang, "Updated pol angle does not match the original!"
pol_axes_cor = pol_axes - pol_ang
# PAF is at 45deg to feeds
# 45 - feed_angle = pol_angle
# -> feed_angle = 45 - pol_angle
new_ms_feed = -(pol_axes_cor - 45.0 * u.deg)
with table((ms / "FEED").as_posix(), readonly=False, ack=False) as tf:
await asyncio.to_thread(
tf.putcol, "RECEPTOR_ANGLE", new_ms_feed.to(u.rad).value
)
await asyncio.to_thread(tf.flush)
[docs]
async def get_pol_axis_coro(
ms: Path, feed_idx: Optional[int] = None, col="RECEPTOR_ANGLE"
) -> Awaitable[u.Quantity]:
"""Get the polarization axis from the ASKAP MS. Checks are performed
to ensure this polarisation axis angle is constant throughout the observation.
Args:
ms (Path): The path to the measurement set that will be inspected
feed_idx (Optional[int], optional): Specify the entry in the FEED
table of `ms` to return. This might be required when a subset of a
measurement set has been extracted from an observation with a varying
orientation.
Returns:
astropy.units.Quantity: The rotation of the PAF throughout the observing.
"""
_known_cols = ("RECEPTOR_ANGLE", "INSTRUMENT_RECEPTOR_ANGLE")
if col not in _known_cols:
raise ValueError(f"Unknown column {col=}, please use one of {_known_cols}")
with table((ms / "FEED").as_posix(), readonly=True, ack=False) as tf:
ms_feed = await asyncio.to_thread(tf.getcol, col)
ms_feed = ms_feed * u.rad
# PAF is at 45deg to feeds
# 45 - feed_angle = pol_angle
pol_axes = -(ms_feed - 45.0 * u.deg)
if feed_idx is None:
assert (ms_feed[:, 0] == ms_feed[0, 0]).all() & (
ms_feed[:, 1] == ms_feed[0, 1]
).all(), f"The {col} changes with time, please check the MS"
feed_idx = 0
logger.debug(f"Extracting the third-axis orientation for {feed_idx=}")
pol_ang = pol_axes[feed_idx, 0].to(u.deg)
return pol_ang
[docs]
def get_pol_axis(
ms: Path, feed_idx: Optional[int] = None, col="RECEPTOR_ANGLE"
) -> u.Quantity:
"""Get the polarization axis from the ASKAP MS. Checks are performed
to ensure this polarisation axis angle is constant throughout the observation.
Args:
ms (Path): The path to the measurement set that will be inspected
feed_idx (Optional[int], optional): Specify the entry in the FEED
table of `ms` to return. This might be required when a subset of a
measurement set has been extracted from an observation with a varying
orientation.
Returns:
astropy.units.Quantity: The rotation of the PAF throughout the observing.
"""
return asyncio.run(get_pol_axis_coro(ms, feed_idx=feed_idx, col=col))
[docs]
def convert_correlations(
correlations: np.ndarray, pol_axis: u.Quantity, fix_stokes_factor: bool = True
) -> np.ndarray:
"""
Convert ASKAP standard correlations to the 'standard' correlations
Args:
correlations (np.ndarray): The correlations from the MS. Has shape (NCOR, NCHAN, 4)
pol_axis (astropy.units.Quantity): The polarization axis angle of the MS
fix_stokes_factor (bool, optional): Whether to fix the Stokes factor. Defaults to True.
Returns:
np.ndarray: The correlations in the 'standard' format
NOTES:
In general, ASKAP forms Stokes I, Q, U, V as follows:
.. code-block::
⎡I⎤ ⎡ 1 0 0 1 ⎤ ⎡XXₐ⎤
⎢ ⎥ ⎢ ⎥ ⎢ ⎥
⎢Q⎥ ⎢sin(2⋅P) cos(2⋅P) cos(2⋅P) -sin(2⋅P)⎥ ⎢XYₐ⎥
⎢ ⎥ = ⎢ ⎥⋅⎢ ⎥
⎢U⎥ ⎢-cos(2⋅P) sin(2⋅P) sin(2⋅P) cos(2⋅P) ⎥ ⎢YXₐ⎥
⎢ ⎥ ⎢ ⎥ ⎢ ⎥
⎣V⎦ ⎣ 0 -1.0⋅ⅈ 1.0⋅ⅈ 0 ⎦ ⎣YYₐ⎦
Where P is the polarization axis angle. In the common case of P=-45deg this becomes:
.. code-block::
⎡I⎤ ⎡1 0 0 1⎤ ⎡XXₐ⎤
⎢ ⎥ ⎢ ⎥ ⎢ ⎥
⎢Q⎥ ⎢-1 0 0 1⎥ ⎢XYₐ⎥
⎢ ⎥ = ⎢ ⎥⋅⎢ ⎥
⎢U⎥ ⎢0 -1 -1 0⎥ ⎢YXₐ⎥
⎢ ⎥ ⎢ ⎥ ⎢ ⎥
⎣V⎦ ⎣0 -1.0⋅ⅈ 1.0⋅ⅈ 0⎦ ⎣YYₐ⎦
or
.. code-block::
⎡I⎤ ⎡ XXₐ + YYₐ ⎤
⎢ ⎥ ⎢ ⎥
⎢Q⎥ ⎢ -XXₐ + YYₐ ⎥
⎢ ⎥ = ⎢ ⎥
⎢U⎥ ⎢ -XYₐ - YXₐ ⎥
⎢ ⎥ ⎢ ⎥
⎣V⎦ ⎣-ⅈ⋅XYₐ + 1.0⋅ⅈ⋅YXₐ⎦
If instead the P=+45deg then this becomes:
.. code-block::
⎡I⎤ ⎡1 0 0 1 ⎤ ⎡XXₐ⎤
⎢ ⎥ ⎢ ⎥ ⎢ ⎥
⎢Q⎥ ⎢1 0 0 -1⎥ ⎢XYₐ⎥
⎢ ⎥ = ⎢ ⎥⋅⎢ ⎥
⎢U⎥ ⎢0 1 1 0 ⎥ ⎢YXₐ⎥
⎢ ⎥ ⎢ ⎥ ⎢ ⎥
⎣V⎦ ⎣0 -1.0⋅ⅈ 1.0⋅ⅈ 0 ⎦ ⎣YYₐ⎦
or
.. code-block::
⎡I⎤ ⎡ XXₐ + YYₐ ⎤
⎢ ⎥ ⎢ ⎥
⎢Q⎥ ⎢ XXₐ - YYₐ ⎥
⎢ ⎥ = ⎢ ⎥
⎢U⎥ ⎢ XYₐ + YXₐ ⎥
⎢ ⎥ ⎢ ⎥
⎣V⎦ ⎣-ⅈ⋅XYₐ + 1.0⋅ⅈ⋅YXₐ⎦
However, most imagers (e.g. wsclean, CASA) expect
.. code-block::
⎡I⎤ ⎡0.5 0 0 0.5 ⎤ ⎡XX_w⎤
⎢ ⎥ ⎢ ⎥ ⎢ ⎥
⎢Q⎥ ⎢0.5 0 0 -0.5⎥ ⎢XY_w⎥
⎢ ⎥ = ⎢ ⎥⋅⎢ ⎥
⎢U⎥ ⎢ 0 0.5 0.5 0 ⎥ ⎢YX_w⎥
⎢ ⎥ ⎢ ⎥ ⎢ ⎥
⎣V⎦ ⎣ 0 -0.5⋅i 0.5⋅i 0 ⎦ ⎣YY_w⎦
or
.. code-block::
⎡I⎤ ⎡ 0.5⋅XX_w + 0.5⋅YY_w ⎤
⎢ ⎥ ⎢ ⎥
⎢Q⎥ ⎢ 0.5⋅XX_w - 0.5⋅YY_w ⎥
⎢ ⎥ = ⎢ ⎥
⎢U⎥ ⎢ 0.5⋅XY_w + 0.5⋅YX_w ⎥
⎢ ⎥ ⎢ ⎥
⎣V⎦ ⎣-0.5⋅i⋅XY_w + 0.5⋅i⋅YX_w⎦
To convert between the two, we can use the following matrix:
.. code-block::
⎡XX_w⎤ ⎡sin(2.0⋅P) + 1 cos(2.0⋅P) cos(2.0⋅P) 1 - sin(2.0⋅P)⎤ ⎡XXₐ⎤
⎢ ⎥ ⎢ ⎥ ⎢ ⎥
⎢XY_w⎥ ⎢ -cos(2.0⋅P) sin(2.0⋅P) + 1 sin(2.0⋅P) - 1 cos(2.0⋅P) ⎥ ⎢XYₐ⎥
⎢ ⎥ = ⎢ ⎥⋅⎢ ⎥
⎢YX_w⎥ ⎢ -cos(2.0⋅P) sin(2.0⋅P) - 1 sin(2.0⋅P) + 1 cos(2.0⋅P) ⎥ ⎢YXₐ⎥
⎢ ⎥ ⎢ ⎥ ⎢ ⎥
⎣YY_w⎦ ⎣1 - sin(2.0⋅P) -cos(2.0⋅P) -cos(2.0⋅P) sin(2.0⋅P) + 1⎦ ⎣YYₐ⎦
Where `_w` is the 'wsclean' format and _a is the 'ASKAP' format.
In the case of P=-45deg this becomes:
.. code-block::
⎡XX_w⎤ ⎡0 0 0 2⎤ ⎡XXₐ⎤
⎢ ⎥ ⎢ ⎥ ⎢ ⎥
⎢XY_w⎥ ⎢0 0 -2 0⎥ ⎢XYₐ⎥
⎢ ⎥ = ⎢ ⎥⋅⎢ ⎥
⎢YX_w⎥ ⎢0 -2 0 0⎥ ⎢YXₐ⎥
⎢ ⎥ ⎢ ⎥ ⎢ ⎥
⎣YY_w⎦ ⎣2 0 0 0⎦ ⎣YYₐ⎦
or
.. code-block::
⎡XX_w⎤ ⎡2⋅YYₐ ⎤
⎢ ⎥ ⎢ ⎥
⎢XY_w⎥ ⎢-2⋅YXₐ⎥
⎢ ⎥ = ⎢ ⎥
⎢YX_w⎥ ⎢-2⋅XYₐ⎥
⎢ ⎥ ⎢ ⎥
⎣YY_w⎦ ⎣2⋅XXₐ ⎦
And in the case of P=+45deg this becomes:
.. code-block::
⎡XX_w⎤ ⎡2 0 0 0⎤ ⎡XXₐ⎤
⎢ ⎥ ⎢ ⎥ ⎢ ⎥
⎢XY_w⎥ ⎢0 2 0 0⎥ ⎢XYₐ⎥
⎢ ⎥ = ⎢ ⎥⋅⎢ ⎥
⎢YX_w⎥ ⎢0 0 2 0⎥ ⎢YXₐ⎥
⎢ ⎥ ⎢ ⎥ ⎢ ⎥
⎣YY_w⎦ ⎣0 0 0 2⎦ ⎣YYₐ⎦
or
.. code-block::
⎡XX_w⎤ ⎡2⋅XXₐ⎤
⎢ ⎥ ⎢ ⎥
⎢XY_w⎥ ⎢2⋅XYₐ⎥
⎢ ⎥ = ⎢ ⎥
⎢YX_w⎥ ⎢2⋅YXₐ⎥
⎢ ⎥ ⎢ ⎥
⎣YY_w⎦ ⎣2⋅YYₐ⎦
That is, when P=+45 H and V are perfectly aligned with X and Y, and
only the factor of 2 is required.
"""
theta = pol_axis.to(u.rad).value
correction_matrix = np.array(
[
[
np.sin(2.0 * theta) + 1,
np.cos(2.0 * theta),
np.cos(2.0 * theta),
1 - np.sin(2.0 * theta),
],
[
-np.cos(2.0 * theta),
np.sin(2.0 * theta) + 1,
np.sin(2.0 * theta) - 1,
np.cos(2.0 * theta),
],
[
-np.cos(2.0 * theta),
np.sin(2.0 * theta) - 1,
np.sin(2.0 * theta) + 1,
np.cos(2.0 * theta),
],
[
1 - np.sin(2.0 * theta),
-np.cos(2.0 * theta),
-np.cos(2.0 * theta),
np.sin(2.0 * theta) + 1,
],
]
)
# This is a matrix multiplication broadcasted along the time and channel axes
rotated_correlations = np.einsum("ij,...j->...i", correction_matrix, correlations)
if not fix_stokes_factor:
logger.warning("Not fixing Stokes factor! Multiplying by 0.5...")
rotated_correlations *= 0.5
return rotated_correlations
[docs]
async def get_data_chunk_generator(
ms: Path, chunksize: int, data_column: str = "DATA"
) -> AsyncGenerator[np.ndarray, None]:
"""Generator function that will yield a chunk of data from the `ms` data table.
Args:
ms (Path): Measurement set whose data will be iterated over
chunksize (int): The number of rows to process per chunk
data_column (str, optional): The column name of the data to iterate. Defaults to "DATA".
Yields:
Iterator[np.ndarray]: Chunk of datta to process
"""
with table(ms.as_posix(), readonly=True, ack=False) as tab:
data = await asyncio.to_thread(tab.__getattr__, data_column)
for i in range(0, len(data), chunksize):
yield np.array(data[i : i + chunksize])
[docs]
async def get_nchunks_coro(
ms: Path, chunksize: int, data_column: str = "DATA"
) -> Awaitable[int]:
"""Returns the number of chunks that are needed to iterator over the datacolumsn
using a specified `chunksize`.
Args:
ms (Path): Measurement sett thatt will be iterated over
chunksize (int): Size of a single chunk
data_column (str, optional): Name of the datacolumn that will be iterated over. Defaults to "DATA".
Returns:
int: Number of chunks in a measurement set
"""
with table(ms.as_posix(), readonly=True, ack=False) as tab:
return int(np.ceil(len(tab.__getattr__(data_column)) / chunksize))
[docs]
async def check_data_coro(
ms: Path, data_column: str, corrected_data_column: str
) -> Awaitable[bool]:
"""Check if the data in the `data_column`, with a correction applied, matches
the data in the `corrected_data_column`.
Args:
ms (Path): MeasurementSet to check
data_column (str): Data column to check
corrected_data_column (str): Corrected data column to check
Returns:
bool: If the data matches
"""
with table(ms.as_posix(), readonly=True) as tab:
idx = 0
data = tab.__getattr__(data_column).getcell(idx)
cor_data = tab.__getattr__(corrected_data_column).getcell(idx)
while (data == 0 + 0j).all():
idx += 1
data = tab.__getattr__(data_column).getcell(idx)
cor_data = tab.__getattr__(corrected_data_column).getcell(idx)
ang = await get_pol_axis_coro(ms)
return np.allclose(convert_correlations(data, ang), cor_data)
[docs]
def check_data(ms: Path, data_column: str, corrected_data_column: str) -> bool:
"""Check if the data in the `data_column`, with a correction applied, matches
the data in the `corrected_data_column`.
Args:
ms (Path): MeasurementSet to check
data_column (str): Data column to check
corrected_data_column (str): Corrected data column to check
Returns:
bool: If the data matches
"""
return asyncio.run(check_data_coro(ms, data_column, corrected_data_column))
[docs]
async def process_chunk(
tab: table,
data_column: str,
corrected_data_column: str,
pol_axis: u.Quantity,
fix_stokes_factor: bool,
start_row: int,
chunksize: int,
pbar: tqdm,
chunk_num: int,
):
"""Work on a chunk of data and write it back to the MS
Args:
data_chunk (np.ndarray): Chunk of MS rows
pol_axis (u.Quantity): Rotation angle of the polarization axis
fix_stokes_factor (bool): Whether to fix the Stokes factor
tab (table): MS table object
corrected_data_column (str): Corrected data column
start_row (int): Starting row of the chunk
"""
logger.debug(f"{chunk_num=} reading chunk...")
data = await asyncio.to_thread(tab.__getattr__, data_column)
data_chunk = await asyncio.to_thread(
np.array, data[start_row : start_row + chunksize]
)
logger.debug(f"{chunk_num=} converting chunk...")
data_chunk_cor = convert_correlations(
data_chunk,
pol_axis,
fix_stokes_factor=fix_stokes_factor,
)
logger.debug(f"{chunk_num=} writing chunk...")
await asyncio.to_thread(
tab.putcol,
corrected_data_column,
data_chunk_cor,
startrow=start_row,
nrow=len(data_chunk_cor),
)
logger.debug(f"{chunk_num=} done!")
pbar.update(1)
[docs]
async def fix_ms_corrs_coro(
ms: Path,
chunksize: int = 1000,
max_chunks: int = 1000,
data_column: str = "DATA",
corrected_data_column: str = "CORRECTED_DATA",
fix_stokes_factor: bool = True,
) -> Awaitable:
"""Apply corrections to the ASKAP visibilities to bring them inline with
what is expectede from other imagers, including CASA and WSClean. The
original data in `data_column` are copied to `corrected_data_column` before
the correction is applied. This is done to ensure that the original data
are not lost.
If 'corrected_data_column' is detected as an existing column then the
correction will not be applied.
Args:
ms (Path): Path of the ASKAP measurement set tto correct.
chunksize (int, optional): Size of chunked data to correct. Defaults to 10_000.
data_column (str, optional): The name of the data column to correct. Defaults to "DATA".
corrected_data_column (str, optional): The name of the corrected data column. Defaults to "CORRECTED_DATA".
fix_stokes_factor (bool, optional): Whether to fix the Stokes factor. Defaults to True.
"""
_function_args = [f"{key}={val}" for key, val in locals().items()]
logger.info(
f"Correcting {data_column} of {str(ms)}.", ms=ms, app_params=_function_args
)
# Do checks
with table(ms.as_posix(), readonly=True, ack=False) as tab:
cols = tab.colnames()
# Check if 'data_column' exists
if data_column not in cols:
logger.critical(
f"Column {data_column} does not exist in {ms}! Exiting...",
ms=ms,
app_params=_function_args,
)
return
# Check if 'corrected_data_column' exists
if corrected_data_column in cols:
logger.critical(
f"Column {corrected_data_column} already exists in {ms}! Exiting...",
ms=ms,
app_params=_function_args,
)
if await check_data_coro(ms, data_column, corrected_data_column):
logger.critical(
f"We checked the data in {data_column} against {corrected_data_column} and it looks like the correction has already been applied!",
ms=ms,
app_params=_function_args,
)
return
all_feed1 = await asyncio.to_thread(tab.getcol, "FEED1")
all_feed2 = await asyncio.to_thread(tab.getcol, "FEED2")
feed1 = np.unique(all_feed1)
feed2 = np.unique(all_feed2)
# For some ASKAP observations the orientation of the third-axis changes
# throughout the observation. For example, bandpass observations vary
# this direction as each beam cycles in the footprint cycles over the
# calibrator source.
assert len(feed1) == 1 and len(feed2) == 1, (
"Found more than one feed orientation!"
)
assert feed1[0] == feed2[0], (
f"The unique feed entries available in the data table differ, {feed1=} {feed2=}"
)
# The two assertions above should enforce enough constraint
# to make sure the rotation matrix constructed is correct
feed_idx = feed1[0]
# Get the polarization axis
pol_axis = await get_pol_axis_coro(ms, feed_idx=feed_idx)
logger.info(f"Polarization axis is {pol_axis}", ms=ms, app_params=_function_args)
# Check for pol_axis = 0deg - no correction needed
if pol_axis == 0 * u.deg and not fix_stokes_factor:
logger.critical(
"Pol axis is 0deg. No correction needed. Exiting...",
ms=ms,
app_params=_function_args,
)
return None
# Get the data chunk by chunk and convert the correlations
# then write them back to the MS in the 'data_column' column
nchunks = await get_nchunks_coro(ms, chunksize, data_column=data_column)
start_row = 0
with table(ms.as_posix(), readonly=False, ack=False) as tab:
desc = makecoldesc(data_column, tab.getcoldesc(data_column))
desc["name"] = corrected_data_column
try:
tab.addcols(desc)
except RuntimeError:
# This can happen if this correction/script has been run more
# than once on the same measurement set.
# Putting this here for interactive use when you might muck around with the MS
logger.critical(
(
f"Column {corrected_data_column} already exists in {ms}! You should never see this message! "
f"Possible an existing {data_column} has already been corrected. "
f"No correction will be applied. "
),
ms=ms,
app_params=_function_args,
)
return
# Only perform this correction if the data column was
# successfully renamed.
tasks = []
pbar = tqdm(total=nchunks, file=TQDM_OUT, desc="Correcting data")
# async for data_chunk in tqdm(data_chunks, total=nchunks, file=TQDM_OUT):
for chunk in range(nchunks):
# task = asyncio.create_task(
task = process_chunk(
tab=tab,
data_column=data_column,
corrected_data_column=corrected_data_column,
pol_axis=pol_axis,
fix_stokes_factor=fix_stokes_factor,
start_row=start_row,
chunksize=chunksize,
pbar=pbar,
chunk_num=chunk,
)
# )
start_row += chunksize
tasks.append(task)
await gather_with_limit(max_chunks, *tasks)
tab.flush()
logger.info(
f"Finished correcting {data_column} of {str(ms)}. Written to {corrected_data_column} column.",
ms=ms,
app_params=_function_args,
)
logger.info(
f"Updating the FEED table by a rotation of {pol_axis}",
ms=ms,
app_params=_function_args,
)
await set_pol_axis_coro(ms, pol_axis, feed_idx=feed_idx)
logger.info(
f"Updated the RECEPTOR_ANGLE column with a rotation of {pol_axis}",
ms=ms.as_posix(),
app_params=_function_args,
)
logger.info("Done!")
[docs]
def fix_ms_corrs(
ms: Path,
chunksize: int = 1000,
max_chunks: int = 1000,
data_column: str = "DATA",
corrected_data_column: str = "CORRECTED_DATA",
fix_stokes_factor: bool = True,
) -> None:
"""Apply corrections to the ASKAP visibilities to bring them inline with
what is expectede from other imagers, including CASA and WSClean. The
original data in `data_column` are copied to `corrected_data_column` before
the correction is applied. This is done to ensure that the original data
are not lost.
If 'corrected_data_column' is detected as an existing column then the
correction will not be applied.
Args:
ms (Path): Path of the ASKAP measurement set tto correct.
chunksize (int, optional): Size of chunked data to correct. Defaults to 10_000.
data_column (str, optional): The name of the data column to correct. Defaults to "DATA".
corrected_data_column (str, optional): The name of the corrected data column. Defaults to "CORRECTED_DATA".
fix_stokes_factor (bool, optional): Whether to fix the Stokes factor. Defaults to True.
"""
return asyncio.run(
fix_ms_corrs_coro(
ms=ms,
chunksize=chunksize,
max_chunks=max_chunks,
data_column=data_column,
corrected_data_column=corrected_data_column,
fix_stokes_factor=fix_stokes_factor,
)
)
[docs]
def cli():
import argparse
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument("ms", type=Path, help="The MS to fix")
parser.add_argument(
"--chunksize",
type=int,
default=1000,
help="The chunksize to use when reading the MS",
)
parser.add_argument(
"--max-chunks",
type=int,
default=1000,
help="The maximum number of chunks to process at once",
)
parser.add_argument(
"--data-column", type=str, default="DATA", help="The column to fix"
)
parser.add_argument(
"--corrected-data-column",
type=str,
default="CORRECTED_DATA",
help="The column to write the corrected data to",
)
parser.add_argument(
"--no-fix-stokes-factor",
dest="no_fix_stokes_factor",
action="store_true",
help="Don't fix the Stokes factor. Use this if you have *not* used ASKAPsoft. If you have used ASKAPsoft, you should leave this option alone.",
)
args = parser.parse_args()
fix_ms_corrs(
Path(args.ms),
chunksize=args.chunksize,
max_chunks=args.max_chunks,
data_column=args.data_column,
corrected_data_column=args.corrected_data_column,
fix_stokes_factor=not args.no_fix_stokes_factor,
)
if __name__ == "__main__":
cli()