Skip to content

sarlo.transform

Converting between data types or deriving new data products

AGB_fitting(cross_pol_backscatter, AGB)

Fits cross polarized backscatter to Above Ground Biomass

Parameters:

Name Type Description Default
cross_pol_backscatter

gamma power of the cross polarized SAR backscatter.

required
AGB

LiDAR derived above ground biomass used for fitting SAR gamma power backscatter

required

Returns:

Type Description
y

[y = b + M (HV)(x), R2 = ___]

Source code in sarlo/transform.py
def AGB_fitting(cross_pol_backscatter, AGB):
    """Fits cross polarized backscatter to Above Ground Biomass

    Args:
        cross_pol_backscatter:
            gamma power of the cross polarized SAR backscatter.
        AGB:
            LiDAR derived above ground biomass used for fitting SAR gamma power backscatter

    Return:
        y:
            [y = b + M (HV)**(x), R**2 = ___]
    """
    # biota.LoadTile("/users/jamesash/desktop/", 40, -121, 2017)
    # interpolate.interp1d(cross_pol_backscatter, AGB, kind="quadratic")
    pass

DN_to_GammaDB(DN)

Converts digital number SAR data to dB

Parameters:

Name Type Description Default
DN

Digital number formatted SAR data

required

Returns:

Type Description
Gamma_dB

SAR data in dB format

Source code in sarlo/transform.py
def DN_to_GammaDB(DN):
    """Converts digital number SAR data to dB

    Args:
        DN: Digital number formatted SAR data

    Return:
        Gamma_dB: SAR data in dB format
    """
    data = []
    for n in DN:
        data.append(n ** 2)
    Gamma_dB = 10 * np.log10(data) - 83

    return Gamma_dB

GammadB_to_Pwr(Gamma_dB)

Converts dB data to power

Parameters:

Name Type Description Default
Gamma_dB

SAR dB data

required

Returns:

Type Description
Gamma_pw

SAR power data

Source code in sarlo/transform.py
def GammadB_to_Pwr(Gamma_dB):
    """Converts dB data to power

    Args:
        Gamma_dB: SAR dB data

    Return:
        Gamma_pw: SAR power data
    """
    Gamma_pw = 10 ** (0.1 * Gamma_dB)

    return Gamma_pw

MCH(query_hv, query_hh, model)

Mean Canopy height from ALOS HH and HV dB backscatter and processed through statsmodel.api

Arg

query_hv: list of all ALOS HV tiles to process query_hh: list of all ALOS HH tiles to process model: statsmodel.api linear model

Returns:

Type Description

None.

Source code in sarlo/transform.py
def MCH(query_hv, query_hh, model):
    """Mean Canopy height from ALOS HH and HV dB backscatter and processed through statsmodel.api

    Arg:
        query_hv: list of all ALOS HV tiles to process
        query_hh: list of all ALOS HH tiles to process
        model: statsmodel.api linear model

    Return:
        None.
    """

    zipped = zip(query_hv, query_hh)
    for hv, hh in zipped:
        now = datetime.now()
        current_time = now.strftime("%H:%M:%S")
        print("{}/{} start time: ".format(hv, hh), current_time)

        alos_hv = rxr.open_rasterio(hv)
        alos_hh = rxr.open_rasterio(hh)

        alos_hv_resampled = alos_hv.rio.reproject(
            dst_crs="EPSG:32610", resolution=(100, 100), resampling=Resampling.bilinear
        )

        alos_hh_resampled = alos_hh.rio.reproject(
            dst_crs="EPSG:32610", resolution=(100, 100), resampling=Resampling.bilinear
        )

        alos_hh_values = DN_to_GammaDB(alos_hh_resampled.values.flatten())
        alos_hv_values = DN_to_GammaDB(alos_hv_resampled.values.flatten())

        alos_hh_values = pd.DataFrame(alos_hh_values)
        alos_hv_values = pd.DataFrame(alos_hv_values)

        alos_hh_values.columns = ["ALOS_hh"]
        alos_hv_values.columns = ["ALOS_hv"]

        df = pd.concat([alos_hv_values["ALOS_hv"], alos_hh_values["ALOS_hh"]], axis=1)
        X = df[["ALOS_hv", "ALOS_hh"]]
        X = sm.add_constant(X)

        MCH = model.predict(X)

        MCH = 10 ** MCH

        alos_hh_resampled.values = np.array(MCH).reshape(alos_hh_resampled.values.shape)
        path = os.path.dirname(hv)
        alos_hh_resampled.rio.to_raster(os.path.join(path, "MCH.tif"))

        X = []
        MCH = []
        df = []
        alos_hv = []
        alos_hh = []
        alos_hh_resampled = []
        alos_hv_resampled = []
        # alos_hv_values = []
        # alos_hh_values = []

amp_to_sigma(DN, K)

Converts amplitude backscatter to sigma nought

Parameters:

Name Type Description Default
DN

dataset of backscatter in form of amplitude

required
K int

calibration factor which varies depending on the SAR sensor and processor system used

required

Returns:

Type Description
Sigma_nought

which is the radar backscatter per unit area. The unit of σo is [m2/m2], expressed in decibel (dB).

Source code in sarlo/transform.py
def amp_to_sigma(DN, K: int):
    """Converts amplitude backscatter to sigma nought

    Args:
        DN: dataset of backscatter in form of amplitude
        K: calibration factor which varies depending on the SAR sensor and processor system used

    Return:
        Sigma_nought: which is the radar backscatter per unit area. The unit of σo is [m2/m2], expressed in decibel (dB).
    """
    sigma_nought = (
        10 * np.log(DN ** 2, out=np.zeros_like(DN, dtype=float), where=(DN > 0)) + K
    )

    return sigma_nought

configure_FSH_processing(self)

Using class Sentinel1_process for FSH preprocessing

Parameters:

Name Type Description Default
self required

Returns:

Type Description

None.

Source code in sarlo/transform.py
def configure_FSH_processing(self):
    """Using class Sentinel1_process for FSH preprocessing

    Args:
        self:

    Returns:
        None.

    """
    if not os.path.exists(
        os.path.join(
            self.process_dir,
            "f{0}_o{1}/int_{2}_{3}/".format(
                self.frame1, self.relative_orbit1, self.date1, self.date2
            ),
        )
    ):
        os.mkdir(
            os.path.join(
                self.process_dir,
                "f{0}_o{1}/int_{2}_{3}/".format(
                    self.frame1, self.relative_orbit1, self.date1, self.date2
                ),
            )
        )

    ftm = [
        "merged/resampOnlyImage.amp.geo",
        "merged/resampOnlyImage.amp.geo.xml",
        "merged/topophase.cor.geo",
        "merged/topophase.cor.geo.xml",
        "topsProc.xml",
    ]

    for file in ftm:
        os.system(
            "mv {0} {1}".format(
                os.path.join(self.process_dir, file),
                "f{0}_o{1}/int_{2}_{3}/".format(
                    self.frame1, self.relative_orbit1, self.date1, self.date2
                ),
            )
        )

    flag_txt = "001 {0}_{1}_{2}_VH_{3}_VH {2} {3} {0} {1} VH".format(
        self.frame1, self.relative_orbit1, self.date1, self.date2
    )
    print("writing flagfile")
    with open(os.path.join(self.process_dir, "flagfile.txt"), "w") as fid:
        fid.write(flag_txt)

gamma_base(baseline, incident_angle, llambda, range_pixel_res, slant_range)

Computes the geometric decorrelation of coherence.

Parameters:

Name Type Description Default
baseline int

distance between satellites

required
incident_angle int

angle from line perpendicular to earth through object to slant range line

required
llambda int

wavelength

required
slant_range int

distance between satellite and the object

required
range_pixel_res int

pixel horiontal resolution

required

Returns:

Type Description
gamma_base

geometric decorrelation correction [coherence / gamma_base]

Source code in sarlo/transform.py
def gamma_base(
    baseline: int,
    incident_angle: int,
    llambda: int,
    range_pixel_res: int,
    slant_range: int,
):
    """Computes the geometric decorrelation of coherence.

    Args:
        baseline: distance between satellites
        incident_angle: angle from line perpendicular to earth through object to slant range line
        llambda: wavelength
        slant_range: distance between satellite and the object
        range_pixel_res: pixel horiontal resolution

    Returns:
        gamma_base: geometric decorrelation correction [coherence / gamma_base]
    """
    gamma_base = 1 - (
        2
        * math.fabs(baseline)
        * math.cos(incident_angle / 180 * math.pi)
        * range_pixel_res
        / math.sin(incident_angle / 180 * math.pi)
        / llambda
        / slant_range
    )

    return gamma_base

mean_wo_nan(A)

mean of dateset without NaN values

Parameters:

Name Type Description Default
A int

dateset

required

Returns:

Type Description
B

mean of dateset without NaN

Source code in sarlo/transform.py
def mean_wo_nan(A: int):
    """mean of dateset without NaN values

    Args:
        A: dateset

    Return:
        B: mean of dateset without NaN
    """

    # Copy and flatten A
    B = A.copy().flatten("F")

    # Remove NaN values from B
    B = B[~np.isnan(B)]

    # Return the mean of B
    return np.mean(B)

pwr_to_sigma(DN, K)

Converts amplitude backscatter to sigma nought

Parameters:

Name Type Description Default
DN

dataset of backscatter in form of amplitude

required
K int

calibration factor which varies depending on the SAR sensor and processor system used

required

Returns:

Type Description
Sigma_nought

which is the radar backscatter per unit area. The unit of σo is [m2/m2], expressed in decibel (dB).

Source code in sarlo/transform.py
def pwr_to_sigma(DN, K: int):
    """Converts amplitude backscatter to sigma nought

    Args:
        DN: dataset of backscatter in form of amplitude
        K: calibration factor which varies depending on the SAR sensor and processor system used

    Return:
        Sigma_nought: which is the radar backscatter per unit area. The unit of σo is [m2/m2], expressed in decibel (dB).
    """
    sigma_nought = (
        10 * np.log(DN, out=np.zeros_like(DN, dtype=float), where=(DN != 0)) + K
    )

    return sigma_nought

remove_corr_bias(coherence, numLooks)

Removes range and azimuth correction bias.

Parameters:

Name Type Description Default
coherence

coherence from interferogram generation

required
numLooks int

range * azimuth looks

required

Returns:

Type Description
YC

corrected coherence

Source code in sarlo/transform.py
def remove_corr_bias(coherence, numLooks: int):
    """Removes range and azimuth correction bias.

    Args:
        coherence: coherence from interferogram generation
        numLooks: range * azimuth looks

    Returns:
        YC: corrected coherence
    """
    k = 1.0
    D = np.double(np.arange(0, 1, 0.01))
    m = np.array([])
    for i in range(D.shape[0]):
        m = np.append(
            m,
            mp.gamma(numLooks)
            * mp.gamma(1 + k / 2)
            / mp.gamma(numLooks + k / 2)
            * mp.hyper(
                [1 + k / 2, numLooks, numLooks], [numLooks + k / 2, 1], D[i] ** 2
            )
            * (1 - D[i] ** 2) ** numLooks,
        )
    m = np.double(m)
    p = np.arange(0, min(m), 0.01)
    p = np.double(p)
    m = np.append(np.append(p, m), np.array([1]))
    D = np.append(np.append(0 * p, D), np.array([1]))

    # Run interpolation
    set_interp = interpolate.interp1d(m, D, kind="cubic")
    YC = set_interp(
        numLooks
    )  # Corrects coherence values for sameness based on number of looks
    plt.plot(m, D, "o", coherence, YC, "-")
    return YC

remove_temporal_decorrelation(width, height, coherence)

Removes coherence decorrelation associated with temporal gradient

Parameters:

Name Type Description Default
width int

width of dateset (number of columns)

required
height int

length of dateset (number of lines)

required
coherence

coherence file to be corrected

required

Returns:

Type Description
coherence

corrected for large temporal gradient

Source code in sarlo/transform.py
def remove_temporal_decorrelation(width: int, height: int, coherence):
    """Removes coherence decorrelation associated with temporal gradient

    Args:
        width: width of dateset (number of columns)
        height: length of dateset (number of lines)
        coherence: coherence file to be corrected

    Return:
        coherence: corrected for large temporal gradient
    """
    y = np.linspace(1, width, width)
    x = np.linspace(1, height, height)
    [X, Y] = np.meshgrid(x, y)
    A = np.vstack(
        [
            X[~np.isnan(coherence)],
            Y[~np.isnan(coherence)],
            np.ones(np.size(coherence[~np.isnan(coherence)])),
        ]
    ).T
    coeff = np.linalg.lstsq(A, coherence[~np.isnan(coherence)])[0]
    corr_vs = coherence - X * coeff[0] - Y * coeff[1]
    corr_vs[corr_vs > 1] = 1
    corr_vs[corr_vs < 0] = 0

    return corr_vs

remove_thermal_noise(root_tag, noiselevel, coherence, amplitude, numLooks)

Remove thermal noise from amplitude to correct coherence

Parameters:

Name Type Description Default
root_tag str

root tag of ISCE processing .PROC file

required
noiseLevel

Noise level contribution to amplitude

required
coherence

coherence file from interferometry

required
amplitude

amplitude file from interferometry

required
numLooks int

range x azimuth looks

required

Returns:

Type Description
coherence

coherence corrected for noise influence

Source code in sarlo/transform.py
def remove_thermal_noise(
    root_tag: str, noiselevel: int, coherence, amplitude, numLooks: int
):
    """Remove thermal noise from amplitude to correct coherence

    Args:
        root_tag: root tag of ISCE processing .PROC file
        noiseLevel: Noise level contribution to amplitude
        coherence: coherence file from interferometry
        amplitude: amplitude file from interferometry
        numLooks: range x azimuth looks

    Return:
        coherence: coherence corrected for noise influence
    """
    amp1 = amplitude[::2, :]
    amp2 = amplitude[1::2, :]

    amp1 = np.power(amp1, 2)
    amp2 = np.power(amp2, 2)

    amp1[amp1 <= 0] = np.nan
    amp2[amp2 <= 0] = np.nan
    coherence[coherence <= 0] = np.nan
    if noiselevel == 0.0:
        if root_tag[0] == "i":
            # ALOS thermal noise level (insarApp)
            N1 = (
                55.5 ** 2
            )  # Squared because amplitude was also squared. Noise is added to the system because
            N2 = 55.5 ** 2
        elif root_tag[0] == "s":
            # ALOS thermal noise level (stripmapApp)
            N1 = (55.5 / 81) ** 2
            N2 = (55.5 / 81) ** 2
        elif root_tag[0] == "t":
            # Sentinel thermal noise removal (topsApp)
            N1 = 0
            N2 = 0  # not configured yet
        else:
            raise Exception("invalid *Proc.xml file!!!")
    else:
        N1 = noiselevel
        N2 = noiselevel

    S1 = amp1 - N1
    g_th_1 = np.zeros(S1.shape)
    g_th_1[S1 > N1] = np.sqrt(S1[S1 > N1] / (S1[S1 > N1] + N1))
    g_th_1[np.isnan(S1)] = np.NaN
    g_th_1[S1 <= N1] = np.NaN

    # transforming data with f(x) = (x/(x+N))**.5
    # What percent of amplitude coherence can be attributed to Noiselevel
    S2 = amp2 - N2
    g_th_2 = np.zeros(S2.shape)
    g_th_2[S2 > N2] = np.sqrt(S2[S2 > N2] / (S2[S2 > N2] + N2))
    g_th_2[np.isnan(S2)] = np.NaN
    g_th_2[S2 <= N2] = np.NaN

    g_th = g_th_1 * g_th_2
    # g_th is generated thermal noise for each satellite pass. Multiplied together we get the adjustment we need to make for coherence

    # Correlation must fall between 0 - 1 : Coherence and Correlation of synonomous
    coherence[coherence < 0] = 0
    coherence[coherence > 1] = 1
    coherence = remove_corr_bias(
        coherence, numLooks
    )  # Corr_bias is used to re-estimate the actual correlation
    coherence[coherence < 0] = 0

    # Correcting automatic noiselevel
    coherence = coherence / g_th

    return coherence

vertical_wavenumber(llambda, slant_range, incident_angle, baseline)

Computes the vertical wavenumber from a Radar.

Parameters:

Name Type Description Default
llambda int

wavelength

required
slant_range int

distance between satellite and the object

required
incident_angle int

angle from line perpendicular to earth through object to slant range line

required
baseline int

distance perpendicular to the Satellite flight direction between two repeat orbits

required

Returns:

Type Description
kz

vertical wavenumber

Source code in sarlo/transform.py
def vertical_wavenumber(
    llambda: int, slant_range: int, incident_angle: int, baseline: int
):
    """Computes the vertical wavenumber from a Radar.

    Args:
        llambda: wavelength
        slant_range: distance between satellite and the object
        incident_angle: angle from line perpendicular to earth through object to slant range line
        baseline: distance perpendicular to the Satellite flight direction between two repeat orbits

    Returns:
        kz: vertical wavenumber
    """
    kz = (
        -2
        * math.pi
        * 2
        / llambda
        / slant_range
        / math.sin(incident_angle / 180 * math.pi)
        * baseline
    )
    kz = math.fabs(kz)
    return kz
Back to top