Skip to content

myco.scalers

Methods for applying data transformations to rescale feature and response data

ClassBalancer

Bases: BaseEstimator

Compute balanced class weights for categorical data

Source code in myco/scalers.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
class ClassBalancer(BaseEstimator):
    """Compute balanced class weights for categorical data"""

    weights: dict = None

    def __init__(self):
        pass

    def _format_ydata(self, y: np.ndarray):
        """Reshape one-hot encoded data to 1-band categorical values"""
        return np.expand_dims(np.argmax(y, axis=-1), axis=-1) if y.shape[-1] > 1 else y

    def fit(self, y: np.ndarray) -> None:
        """Compute balanced class weights"""
        y = self._format_ydata(y)
        classes = np.unique(y)
        weights = class_weight.compute_class_weight(
            "balanced", y=y.flatten(), classes=classes
        )
        self.weights = dict(zip(classes, weights))

    def transform(self, y: np.ndarray) -> np.ndarray:
        """Apply class weights to each sample in an array"""
        y = self._format_ydata(y)
        unique_bins = np.unique(y)
        valid_weights = {bin: self.weights[bin] for bin in unique_bins}
        return class_weight.compute_sample_weight(valid_weights, y).astype(np.float32)

    def fit_transform(self, y: np.ndarray) -> np.ndarray:
        """Compute and apply class weights to each sample"""
        self.fit(y)
        return self.transform(y)

fit(y)

Compute balanced class weights

Source code in myco/scalers.py
227
228
229
230
231
232
233
234
def fit(self, y: np.ndarray) -> None:
    """Compute balanced class weights"""
    y = self._format_ydata(y)
    classes = np.unique(y)
    weights = class_weight.compute_class_weight(
        "balanced", y=y.flatten(), classes=classes
    )
    self.weights = dict(zip(classes, weights))

fit_transform(y)

Compute and apply class weights to each sample

Source code in myco/scalers.py
243
244
245
246
def fit_transform(self, y: np.ndarray) -> np.ndarray:
    """Compute and apply class weights to each sample"""
    self.fit(y)
    return self.transform(y)

transform(y)

Apply class weights to each sample in an array

Source code in myco/scalers.py
236
237
238
239
240
241
def transform(self, y: np.ndarray) -> np.ndarray:
    """Apply class weights to each sample in an array"""
    y = self._format_ydata(y)
    unique_bins = np.unique(y)
    valid_weights = {bin: self.weights[bin] for bin in unique_bins}
    return class_weight.compute_sample_weight(valid_weights, y).astype(np.float32)

NetworkScaler

Bases: BaseEstimator

Class for applying scalers to [height, width, nbands] ndarrays.

Source code in myco/scalers.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
class NetworkScaler(BaseEstimator):
    """Class for applying scalers to [height, width, nbands] ndarrays."""

    scaler: BaseEstimator = None
    is_fit: bool = False
    n_classes: int = None

    def __init__(self, scaler: BaseEstimator):
        self.scaler = scaler

    def _reshape_ndarray(self, array: np.ndarray) -> np.ndarray:
        """Converts an ndarray to a [nsamples, nbands] array for scaling."""
        return np.reshape(array, (-1, array.shape[-1]))

    def _get_valid_locations(self, array: np.ndarray, srcnodata: float):
        """Finds indices with nodata values"""
        return array[..., 0] != srcnodata

    def _get_valid_onehot_locations(self, array: np.ndarray):
        """Finds indices with valid onehot predictions"""
        return array.sum(axis=1) != 0

    def _get_sample_subset(self, size: int, n_subset: int):
        """Creates a random sample of 1-d array indices"""
        random_generator = np.random.default_rng()
        subset = random_generator.choice(size, n_subset, replace=False)
        subset.sort()
        return subset

    def _get_transform_shape(self, array: np.ndarray):
        """Determine the shape of the output array"""
        output_shape = list(array.shape)
        output_shape[-1] = self.n_classes
        return output_shape

    def _get_inverse_transform_shape(self, array: np.ndarray):
        """Determine the shape of the output array"""
        output_shape = list(array.shape)
        if isinstance(self.scaler, OneHotEncoder) or isinstance(
            self.scaler, OrdinalEncoder
        ):
            output_shape[-1] = 1
        return output_shape

    def _get_chunk_idxs(
        self, array: np.ndarray, chunk_size: int = scaling_config.transform_chunksize
    ) -> Tuple[int, int]:
        """Get the start/stop indices to read an array chunk-by-chunk"""
        total_samples = array.shape[0]
        n_chunks = np.ceil(total_samples / chunk_size).astype(np.int16)
        start = [i * chunk_size for i in range(n_chunks)]
        stop = [(i + 1) * chunk_size for i in range(n_chunks)]
        leftover = total_samples % chunk_size
        if leftover != 0:
            stop[-1] = start[-1] + leftover
        return start, stop

    def _reshape_and_mask(self, array: np.ndarray, srcnodata: float) -> np.ndarray:
        """Convert an nd array to a (n_samples, n_bands) array and removes nodata"""
        reshaped = self._reshape_ndarray(array)
        valid = self._get_valid_locations(reshaped, srcnodata)
        if valid.sum() == len(reshaped):
            return reshaped
        else:
            return reshaped[valid].reshape(-1, reshaped.shape[-1])

    def _sample_chunk(self, array: np.ndarray, srcnodata, n_random) -> np.ndarray:
        """Draw random samples from an array chunk"""
        samples = self._reshape_and_mask(array, srcnodata)
        n_samples = samples.shape[0]
        if n_samples <= n_random:
            return samples
        else:
            subset = self._get_sample_subset(n_samples, n_random)
            return samples[subset]

    def _sample_array_in_chunks(
        self, array: np.ndarray, srcnodata: float, max_samples: int
    ) -> np.ndarray:
        """Process an array in chunks to get a random sample of valid pixels"""
        if hasattr(array, "get_chunk_idxs"):
            cstart, cstop = array.get_chunk_idxs()
        else:
            cstart, cstop = self._get_chunk_idxs(array)
        chunk_size = cstop[0] - cstart[0]
        total_samples = np.product(array.shape[:-1])
        sample_fraction = max_samples / total_samples
        total_samples_per_chunk = np.product((chunk_size, *array.shape[1:-1]))
        random_samples_per_chunk = int(sample_fraction * total_samples_per_chunk)
        samples = []
        for start, stop in zip(cstart, cstop):
            sub_samples = self._sample_chunk(
                array[start:stop], srcnodata, random_samples_per_chunk
            )
            if len(sub_samples) > 0:
                samples.append(sub_samples)
        return np.concatenate(samples, axis=0)

    def _format_nodata(
        self, array: np.ndarray, srcnodata: int = None, dstnodata: int = None
    ) -> Tuple[int, int]:
        """Parses passed nodata values and arrays to retrieve appropriate nodata values"""
        if srcnodata is None:
            if hasattr(array, "nodata"):
                srcnodata = array.nodata
        dstnodata = srcnodata if dstnodata is None else dstnodata
        return srcnodata, dstnodata

    def fit(
        self,
        array: np.ndarray,
        srcnodata: float = None,
        max_samples: int = scaling_config.max_samples,
    ) -> None:
        """Fits the scaler to data"""
        srcnodata, _ = self._format_nodata(array, srcnodata, None)
        if max_samples > 0:
            total_samples = np.product(array.shape[:-1])
            if max_samples >= total_samples:
                reshaped = self._reshape_and_mask(array, srcnodata)
            else:
                reshaped = self._sample_array_in_chunks(array, srcnodata, max_samples)
                n_random = len(reshaped)
                if n_random > max_samples:
                    subset = self._get_sample_subset(n_random, max_samples)
                    reshaped = reshaped[subset]
        else:
            reshaped = self._reshape_and_mask(array, srcnodata)

        # update class attributes
        self.scaler.fit(reshaped)
        self.is_fit = True
        if isinstance(self.scaler, OneHotEncoder):
            self.n_classes = len(self.scaler.categories_[0])
        elif isinstance(self.scaler, ClassBalancer):
            self.n_classes = 1  # class weights should always be 1d output
        elif isinstance(self.scaler, OrdinalEncoder):
            self.n_classes = self.scaler.ymax - self.scaler.ymin
        else:
            self.n_classes = array.shape[-1]

    def transform(
        self, array: np.ndarray, srcnodata: float = None, dstnodata: float = None
    ) -> np.ndarray:
        """Apply the scaler to data."""
        assert self.is_fit, "Scaler has not been fit yet."
        srcnodata, dstnodata = self._format_nodata(array, srcnodata, dstnodata)
        output_shape = self._get_transform_shape(array)
        reshaped = self._reshape_ndarray(array)
        valid = self._get_valid_locations(reshaped, srcnodata)
        transformed = np.zeros((reshaped.shape[0], output_shape[-1]))
        if dstnodata is not None:
            transformed += dstnodata
        transformed[valid] = self.scaler.transform(reshaped[valid]).reshape(
            (-1, output_shape[-1])
        )

        return transformed.reshape(output_shape)

    def fit_transform(
        self,
        array: np.ndarray,
        srcnodata: float = None,
        dstnodata: float = None,
        max_samples: int = scaling_config.max_samples,
    ) -> np.ndarray:
        """Fit and apply the scaler to data"""
        self.fit(array, srcnodata, max_samples)
        return self.transform(array, srcnodata, dstnodata)

    def inverse_transform(
        self, array: np.ndarray, srcnodata: float = None, dstnodata: float = None
    ) -> np.ndarray:
        """Convert from scaled to unscaled units"""
        assert self.is_fit, "Scaler has not been fit yet."
        srcnodata, dstnodata = self._format_nodata(array, srcnodata, dstnodata)
        output_shape = self._get_inverse_transform_shape(array)
        reshaped = self._reshape_ndarray(array)
        valid = self._get_valid_locations(reshaped, srcnodata)
        inverse = np.zeros((reshaped.shape[0], output_shape[-1]))
        if dstnodata is not None:
            inverse += dstnodata
        inverse[valid] = self.scaler.inverse_transform(reshaped[valid])

        return inverse.reshape(output_shape)

fit(array, srcnodata=None, max_samples=scaling_config.max_samples)

Fits the scaler to data

Source code in myco/scalers.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def fit(
    self,
    array: np.ndarray,
    srcnodata: float = None,
    max_samples: int = scaling_config.max_samples,
) -> None:
    """Fits the scaler to data"""
    srcnodata, _ = self._format_nodata(array, srcnodata, None)
    if max_samples > 0:
        total_samples = np.product(array.shape[:-1])
        if max_samples >= total_samples:
            reshaped = self._reshape_and_mask(array, srcnodata)
        else:
            reshaped = self._sample_array_in_chunks(array, srcnodata, max_samples)
            n_random = len(reshaped)
            if n_random > max_samples:
                subset = self._get_sample_subset(n_random, max_samples)
                reshaped = reshaped[subset]
    else:
        reshaped = self._reshape_and_mask(array, srcnodata)

    # update class attributes
    self.scaler.fit(reshaped)
    self.is_fit = True
    if isinstance(self.scaler, OneHotEncoder):
        self.n_classes = len(self.scaler.categories_[0])
    elif isinstance(self.scaler, ClassBalancer):
        self.n_classes = 1  # class weights should always be 1d output
    elif isinstance(self.scaler, OrdinalEncoder):
        self.n_classes = self.scaler.ymax - self.scaler.ymin
    else:
        self.n_classes = array.shape[-1]

fit_transform(array, srcnodata=None, dstnodata=None, max_samples=scaling_config.max_samples)

Fit and apply the scaler to data

Source code in myco/scalers.py
187
188
189
190
191
192
193
194
195
196
def fit_transform(
    self,
    array: np.ndarray,
    srcnodata: float = None,
    dstnodata: float = None,
    max_samples: int = scaling_config.max_samples,
) -> np.ndarray:
    """Fit and apply the scaler to data"""
    self.fit(array, srcnodata, max_samples)
    return self.transform(array, srcnodata, dstnodata)

inverse_transform(array, srcnodata=None, dstnodata=None)

Convert from scaled to unscaled units

Source code in myco/scalers.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def inverse_transform(
    self, array: np.ndarray, srcnodata: float = None, dstnodata: float = None
) -> np.ndarray:
    """Convert from scaled to unscaled units"""
    assert self.is_fit, "Scaler has not been fit yet."
    srcnodata, dstnodata = self._format_nodata(array, srcnodata, dstnodata)
    output_shape = self._get_inverse_transform_shape(array)
    reshaped = self._reshape_ndarray(array)
    valid = self._get_valid_locations(reshaped, srcnodata)
    inverse = np.zeros((reshaped.shape[0], output_shape[-1]))
    if dstnodata is not None:
        inverse += dstnodata
    inverse[valid] = self.scaler.inverse_transform(reshaped[valid])

    return inverse.reshape(output_shape)

transform(array, srcnodata=None, dstnodata=None)

Apply the scaler to data.

Source code in myco/scalers.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def transform(
    self, array: np.ndarray, srcnodata: float = None, dstnodata: float = None
) -> np.ndarray:
    """Apply the scaler to data."""
    assert self.is_fit, "Scaler has not been fit yet."
    srcnodata, dstnodata = self._format_nodata(array, srcnodata, dstnodata)
    output_shape = self._get_transform_shape(array)
    reshaped = self._reshape_ndarray(array)
    valid = self._get_valid_locations(reshaped, srcnodata)
    transformed = np.zeros((reshaped.shape[0], output_shape[-1]))
    if dstnodata is not None:
        transformed += dstnodata
    transformed[valid] = self.scaler.transform(reshaped[valid]).reshape(
        (-1, output_shape[-1])
    )

    return transformed.reshape(output_shape)

OrdinalBalancer

Bases: BaseEstimator

Compute sample weights for ordinal data by wrapping the OrdinalEncoder and RegressionBalancer

Source code in myco/scalers.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
class OrdinalBalancer(BaseEstimator):
    """Compute sample weights for ordinal data by wrapping the OrdinalEncoder and RegressionBalancer"""

    balancer: RegressionBalancer = None
    encoder: OrdinalEncoder = None
    method: str = None

    def __init__(self, method: str = scaling_config.regression_scaling_method):
        """Create an ordinal data weights balancer.

        Computes the range of ordinal discrete bins across the observed `y`
            data, then uses the frequency of those bins to increase the
            weights for rare samples.

        Args:
            n_bins: the number of uniformly-spaced bins to compute weights for.
            method: the method for transforming absolute sample frequency
                per-bin to a scaled weight value. options include
                ['linear', 'log', 'sqrt']
        """
        self.encoder = OrdinalEncoder()
        self.method = method

    def fit(self, y: np.ndarray) -> None:
        """Compute discrete weights for a response dataset"""
        self.encoder.fit(y)
        n_bins = 1 + self.encoder.ymax - self.encoder.ymin
        self.balancer = RegressionBalancer(n_bins=n_bins, method=self.method)
        self.balancer.fit(y)

    def transform(self, y: np.ndarray) -> np.ndarray:
        """Apply ordinal weights to each sample in an array"""
        if y.shape[1] > 1:
            y = self.encoder.inverse_transform(y)
        return self.balancer.transform(y)

    def fit_transform(self, y: np.ndarray) -> np.ndarray:
        """Compute and apply ordinal weights to each sample"""
        self.fit(y)
        return self.transform(y)

__init__(method=scaling_config.regression_scaling_method)

Create an ordinal data weights balancer.

Computes the range of ordinal discrete bins across the observed y data, then uses the frequency of those bins to increase the weights for rare samples.

Parameters:

Name Type Description Default
n_bins

the number of uniformly-spaced bins to compute weights for.

required
method str

the method for transforming absolute sample frequency per-bin to a scaled weight value. options include ['linear', 'log', 'sqrt']

scaling_config.regression_scaling_method
Source code in myco/scalers.py
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
def __init__(self, method: str = scaling_config.regression_scaling_method):
    """Create an ordinal data weights balancer.

    Computes the range of ordinal discrete bins across the observed `y`
        data, then uses the frequency of those bins to increase the
        weights for rare samples.

    Args:
        n_bins: the number of uniformly-spaced bins to compute weights for.
        method: the method for transforming absolute sample frequency
            per-bin to a scaled weight value. options include
            ['linear', 'log', 'sqrt']
    """
    self.encoder = OrdinalEncoder()
    self.method = method

fit(y)

Compute discrete weights for a response dataset

Source code in myco/scalers.py
381
382
383
384
385
386
def fit(self, y: np.ndarray) -> None:
    """Compute discrete weights for a response dataset"""
    self.encoder.fit(y)
    n_bins = 1 + self.encoder.ymax - self.encoder.ymin
    self.balancer = RegressionBalancer(n_bins=n_bins, method=self.method)
    self.balancer.fit(y)

fit_transform(y)

Compute and apply ordinal weights to each sample

Source code in myco/scalers.py
394
395
396
397
def fit_transform(self, y: np.ndarray) -> np.ndarray:
    """Compute and apply ordinal weights to each sample"""
    self.fit(y)
    return self.transform(y)

transform(y)

Apply ordinal weights to each sample in an array

Source code in myco/scalers.py
388
389
390
391
392
def transform(self, y: np.ndarray) -> np.ndarray:
    """Apply ordinal weights to each sample in an array"""
    if y.shape[1] > 1:
        y = self.encoder.inverse_transform(y)
    return self.balancer.transform(y)

OrdinalEncoder

Bases: BaseEstimator

Transform ordered count data into pseudo-one hot encoded classes

Source code in myco/scalers.py
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
class OrdinalEncoder(BaseEstimator):
    """Transform ordered count data into pseudo-one hot encoded classes"""

    ymin: int = None
    ymax: int = None

    def __init__(self):
        pass

    def fit(self, y: np.ndarray) -> None:
        """Compute the range of ordinal values"""
        self.ymin = int(np.min(y))
        self.ymax = int(np.max(y))

    def transform(self, y: np.ndarray) -> np.ndarray:
        """Convert ordered data into an ordinal-encoded (n_samples, n_classes) array"""
        shape = (y.shape[0], self.ymax - self.ymin)
        outarr = np.zeros(shape, dtype=y.dtype)
        yrange = np.arange(self.ymin, self.ymax)
        for idx, yval in enumerate(yrange):
            outarr[:, idx] = y[:, 0] > yval
        return outarr

    def fit_transform(self, y: np.ndarray) -> np.ndarray:
        """Compute and apply ordinal transormations to each sample"""
        self.fit(y)
        return self.transform(y)

    def inverse_transform(self, y: np.ndarray) -> np.ndarray:
        """Revert transformed data to the original ordinal space"""
        return np.sum(y, axis=1, keepdims=True) + self.ymin

fit(y)

Compute the range of ordinal values

Source code in myco/scalers.py
334
335
336
337
def fit(self, y: np.ndarray) -> None:
    """Compute the range of ordinal values"""
    self.ymin = int(np.min(y))
    self.ymax = int(np.max(y))

fit_transform(y)

Compute and apply ordinal transormations to each sample

Source code in myco/scalers.py
348
349
350
351
def fit_transform(self, y: np.ndarray) -> np.ndarray:
    """Compute and apply ordinal transormations to each sample"""
    self.fit(y)
    return self.transform(y)

inverse_transform(y)

Revert transformed data to the original ordinal space

Source code in myco/scalers.py
353
354
355
def inverse_transform(self, y: np.ndarray) -> np.ndarray:
    """Revert transformed data to the original ordinal space"""
    return np.sum(y, axis=1, keepdims=True) + self.ymin

transform(y)

Convert ordered data into an ordinal-encoded (n_samples, n_classes) array

Source code in myco/scalers.py
339
340
341
342
343
344
345
346
def transform(self, y: np.ndarray) -> np.ndarray:
    """Convert ordered data into an ordinal-encoded (n_samples, n_classes) array"""
    shape = (y.shape[0], self.ymax - self.ymin)
    outarr = np.zeros(shape, dtype=y.dtype)
    yrange = np.arange(self.ymin, self.ymax)
    for idx, yval in enumerate(yrange):
        outarr[:, idx] = y[:, 0] > yval
    return outarr

RegressionBalancer

Bases: BaseEstimator

Compute binned class weights for continuous data

Source code in myco/scalers.py
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
class RegressionBalancer(BaseEstimator):
    """Compute binned class weights for continuous data"""

    n_bins: int = None
    method: str = None
    kb: BaseEstimator = None
    weights: dict = None

    def __init__(
        self,
        n_bins: int = scaling_config.n_regression_bins,
        method: str = scaling_config.regression_scaling_method,
    ):
        """Create a regression data weights balancer.

        Computes uniformly-spaced discrete bins across the range of `y`
            data, then uses the frequency of those bins to increase the
            weights for rare samples.

        By default, it uses an inverse log probability to compute sample
            weights a) because much of our data is exponentially distributed
            and b) because inverse linear proportions will create really
            high weight values at the tails of distributions

        It can also fit square root-transformed sample weights, which increase
            sample weights for rare bins while reducing the

        Args:
            n_bins: the number of uniformly-spaced bins to compute weights for.
            method: the method for transforming absolute sample frequency
                per-bin to a scaled weight value. options include
                ['linear', 'log', 'sqrt']
        """
        self.n_bins = n_bins
        self.method = method
        self.kb = KBinsDiscretizer(n_bins=n_bins, strategy="uniform", encode="ordinal")

    def fit(self, y: np.ndarray) -> None:
        """Compute discretized weights for a response dataset"""
        kbins = self.kb.fit_transform(y).astype(np.uint8)
        freq, _ = np.histogram(kbins, bins=range(self.kb.n_bins + 1))
        proportions = freq / freq.sum()

        where = proportions > 0
        weights = np.zeros_like(proportions)

        if self.method == "linear":
            np.divide(1.0, proportions, where=where, out=weights)
            weights /= weights.mean()

        elif self.method == "log":
            weights = -np.log10(proportions, where=where, out=weights)

        elif self.method == "sqrt":
            np.divide(1.0, proportions, where=where, out=weights)
            weights = np.sqrt(weights)
            weights /= weights.mean()

        classes = range(self.kb.n_bins)
        self.weights = dict(zip(classes, weights.astype(np.float32)))

    def transform(self, y: np.ndarray) -> np.ndarray:
        """Apply class weights to each sample in an array"""
        bins = self.kb.transform(y).astype(np.uint8)
        unique_bins = np.unique(bins)
        valid_weights = {bin: self.weights[bin] for bin in unique_bins}
        return class_weight.compute_sample_weight(valid_weights, bins).astype(
            np.float32
        )

    def fit_transform(self, y: np.ndarray) -> np.ndarray:
        """Compute and apply regression weights to each sample"""
        self.fit(y)
        return self.transform(y)

__init__(n_bins=scaling_config.n_regression_bins, method=scaling_config.regression_scaling_method)

Create a regression data weights balancer.

Computes uniformly-spaced discrete bins across the range of y data, then uses the frequency of those bins to increase the weights for rare samples.

By default, it uses an inverse log probability to compute sample weights a) because much of our data is exponentially distributed and b) because inverse linear proportions will create really high weight values at the tails of distributions

It can also fit square root-transformed sample weights, which increase sample weights for rare bins while reducing the

Parameters:

Name Type Description Default
n_bins int

the number of uniformly-spaced bins to compute weights for.

scaling_config.n_regression_bins
method str

the method for transforming absolute sample frequency per-bin to a scaled weight value. options include ['linear', 'log', 'sqrt']

scaling_config.regression_scaling_method
Source code in myco/scalers.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
def __init__(
    self,
    n_bins: int = scaling_config.n_regression_bins,
    method: str = scaling_config.regression_scaling_method,
):
    """Create a regression data weights balancer.

    Computes uniformly-spaced discrete bins across the range of `y`
        data, then uses the frequency of those bins to increase the
        weights for rare samples.

    By default, it uses an inverse log probability to compute sample
        weights a) because much of our data is exponentially distributed
        and b) because inverse linear proportions will create really
        high weight values at the tails of distributions

    It can also fit square root-transformed sample weights, which increase
        sample weights for rare bins while reducing the

    Args:
        n_bins: the number of uniformly-spaced bins to compute weights for.
        method: the method for transforming absolute sample frequency
            per-bin to a scaled weight value. options include
            ['linear', 'log', 'sqrt']
    """
    self.n_bins = n_bins
    self.method = method
    self.kb = KBinsDiscretizer(n_bins=n_bins, strategy="uniform", encode="ordinal")

fit(y)

Compute discretized weights for a response dataset

Source code in myco/scalers.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
def fit(self, y: np.ndarray) -> None:
    """Compute discretized weights for a response dataset"""
    kbins = self.kb.fit_transform(y).astype(np.uint8)
    freq, _ = np.histogram(kbins, bins=range(self.kb.n_bins + 1))
    proportions = freq / freq.sum()

    where = proportions > 0
    weights = np.zeros_like(proportions)

    if self.method == "linear":
        np.divide(1.0, proportions, where=where, out=weights)
        weights /= weights.mean()

    elif self.method == "log":
        weights = -np.log10(proportions, where=where, out=weights)

    elif self.method == "sqrt":
        np.divide(1.0, proportions, where=where, out=weights)
        weights = np.sqrt(weights)
        weights /= weights.mean()

    classes = range(self.kb.n_bins)
    self.weights = dict(zip(classes, weights.astype(np.float32)))

fit_transform(y)

Compute and apply regression weights to each sample

Source code in myco/scalers.py
319
320
321
322
def fit_transform(self, y: np.ndarray) -> np.ndarray:
    """Compute and apply regression weights to each sample"""
    self.fit(y)
    return self.transform(y)

transform(y)

Apply class weights to each sample in an array

Source code in myco/scalers.py
310
311
312
313
314
315
316
317
def transform(self, y: np.ndarray) -> np.ndarray:
    """Apply class weights to each sample in an array"""
    bins = self.kb.transform(y).astype(np.uint8)
    unique_bins = np.unique(bins)
    valid_weights = {bin: self.weights[bin] for bin in unique_bins}
    return class_weight.compute_sample_weight(valid_weights, bins).astype(
        np.float32
    )

TFMinMaxScaler

Bases: MinMaxScaler

TF-enabled scaling for MinMaxScaler objects

Source code in myco/scalers.py
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
class TFMinMaxScaler(MinMaxScaler):
    """TF-enabled scaling for MinMaxScaler objects"""

    def transform(self, tensor: tf.Tensor) -> tf.Tensor:
        """Apply the MinMaxScaler to tensor data."""
        dtype = tensor.dtype
        min = tf.convert_to_tensor(self.min_, dtype=dtype)
        scale = tf.convert_to_tensor(self.scale_, dtype=dtype)
        tensor *= scale
        tensor += min
        if self.clip:
            fmin = tf.convert_to_tensor(self.feature_range[0], dtype=dtype)
            fmax = tf.convert_to_tensor(self.feature_range[1], dtype=dtype)
            tensor = tf.clip_by_value(tensor, fmin, fmax)
        return tensor

    def inverse_transform(self, tensor: tf.Tensor) -> tf.Tensor:
        """Convert from scaled to unscaled units"""
        dtype = tensor.dtype
        min = tf.convert_to_tensor(self.min_, dtype=dtype)
        scale = tf.convert_to_tensor(self.scale_, dtype=dtype)
        tensor -= min
        tensor /= scale
        return tensor

inverse_transform(tensor)

Convert from scaled to unscaled units

Source code in myco/scalers.py
612
613
614
615
616
617
618
619
def inverse_transform(self, tensor: tf.Tensor) -> tf.Tensor:
    """Convert from scaled to unscaled units"""
    dtype = tensor.dtype
    min = tf.convert_to_tensor(self.min_, dtype=dtype)
    scale = tf.convert_to_tensor(self.scale_, dtype=dtype)
    tensor -= min
    tensor /= scale
    return tensor

transform(tensor)

Apply the MinMaxScaler to tensor data.

Source code in myco/scalers.py
599
600
601
602
603
604
605
606
607
608
609
610
def transform(self, tensor: tf.Tensor) -> tf.Tensor:
    """Apply the MinMaxScaler to tensor data."""
    dtype = tensor.dtype
    min = tf.convert_to_tensor(self.min_, dtype=dtype)
    scale = tf.convert_to_tensor(self.scale_, dtype=dtype)
    tensor *= scale
    tensor += min
    if self.clip:
        fmin = tf.convert_to_tensor(self.feature_range[0], dtype=dtype)
        fmax = tf.convert_to_tensor(self.feature_range[1], dtype=dtype)
        tensor = tf.clip_by_value(tensor, fmin, fmax)
    return tensor

TFOneHotEncoder

Bases: OneHotEncoder

TF-enabled scaling for OneHotEncoder objects

Source code in myco/scalers.py
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
class TFOneHotEncoder(OneHotEncoder):
    """TF-enabled scaling for OneHotEncoder objects"""

    def transform(self, tensor: tf.Tensor) -> tf.Tensor:
        """Apply the OneHotEncoder to tensor data."""
        dtype = tensor.dtype
        shape = tensor.shape
        class_maps = []
        for category in self.categories_:
            bool_class = tf.math.equal(tensor, category)
            inds_class = tf.where(bool_class)
            ones_map = tf.ones_like(bool_class, dtype=dtype)
            ones_mask = tf.boolean_mask(ones_map, bool_class)
            class_map = tf.scatter_nd(inds_class, ones_mask, shape)
            class_maps.append(class_map)
        encoded = tf.concat(class_maps, axis=0)
        return encoded

    def inverse_transform(self, tensor: tf.Tensor) -> tf.Tensor:
        """Convert from scaled to unscaled units"""
        dtype = tensor.dtype
        n_samples = tensor.shape[0]
        shape = tf.TensorShape((n_samples, 1))
        class_maps = []
        class_idxs = tf.expand_dims(tf.argmax(tensor, axis=0), axis=-1)
        for idx, category in enumerate(self.categories_):
            bool_class = tf.math.equal(class_idxs, idx)
            inds_class = tf.where(bool_class)
            cat_map = tf.ones_like(bool_class, dtype=dtype) * category
            cat_mask = tf.boolean_mask(cat_map, bool_class)
            class_map = tf.scatter_nd(inds_class, cat_mask, shape)
            class_maps.append(class_map)
        stacked = tf.concat(class_maps, axis=1)
        decoded = tf.reduce_max(stacked, axis=1)
        return decoded

inverse_transform(tensor)

Convert from scaled to unscaled units

Source code in myco/scalers.py
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
def inverse_transform(self, tensor: tf.Tensor) -> tf.Tensor:
    """Convert from scaled to unscaled units"""
    dtype = tensor.dtype
    n_samples = tensor.shape[0]
    shape = tf.TensorShape((n_samples, 1))
    class_maps = []
    class_idxs = tf.expand_dims(tf.argmax(tensor, axis=0), axis=-1)
    for idx, category in enumerate(self.categories_):
        bool_class = tf.math.equal(class_idxs, idx)
        inds_class = tf.where(bool_class)
        cat_map = tf.ones_like(bool_class, dtype=dtype) * category
        cat_mask = tf.boolean_mask(cat_map, bool_class)
        class_map = tf.scatter_nd(inds_class, cat_mask, shape)
        class_maps.append(class_map)
    stacked = tf.concat(class_maps, axis=1)
    decoded = tf.reduce_max(stacked, axis=1)
    return decoded

transform(tensor)

Apply the OneHotEncoder to tensor data.

Source code in myco/scalers.py
625
626
627
628
629
630
631
632
633
634
635
636
637
638
def transform(self, tensor: tf.Tensor) -> tf.Tensor:
    """Apply the OneHotEncoder to tensor data."""
    dtype = tensor.dtype
    shape = tensor.shape
    class_maps = []
    for category in self.categories_:
        bool_class = tf.math.equal(tensor, category)
        inds_class = tf.where(bool_class)
        ones_map = tf.ones_like(bool_class, dtype=dtype)
        ones_mask = tf.boolean_mask(ones_map, bool_class)
        class_map = tf.scatter_nd(inds_class, ones_mask, shape)
        class_maps.append(class_map)
    encoded = tf.concat(class_maps, axis=0)
    return encoded

TFOrdinalEncoder

Bases: OrdinalEncoder

TF-enabled scaling for OrdinalEncoder objects

Source code in myco/scalers.py
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
class TFOrdinalEncoder(OrdinalEncoder):
    """TF-enabled scaling for OrdinalEncoder objects"""

    def transform(self, tensor: tf.Tensor) -> tf.Tensor:
        """Apply the OrdinalEncoder to tensor data."""
        dtype = tensor.dtype
        shape = tensor.shape
        class_maps = []
        yrange = np.arange(self.ymin, self.ymax)
        for category in yrange:
            bool_class = tf.math.greater(tensor, category)
            inds_class = tf.where(bool_class)
            ones_map = tf.ones_like(bool_class, dtype=dtype)
            ones_mask = tf.boolean_mask(ones_map, bool_class)
            class_map = tf.scatter_nd(inds_class, ones_mask, shape)
            class_maps.append(class_map)
        encoded = tf.concat(class_maps, axis=1)
        return encoded

    def inverse_transform(self, tensor: tf.Tensor) -> tf.Tensor:
        """Convert from scaled to unscaled units"""
        dtype = tensor.dtype
        decoded = tf.reduce_sum(tensor, axis=1, keepdims=True) + self.ymin
        return decoded

inverse_transform(tensor)

Convert from scaled to unscaled units

Source code in myco/scalers.py
678
679
680
681
682
def inverse_transform(self, tensor: tf.Tensor) -> tf.Tensor:
    """Convert from scaled to unscaled units"""
    dtype = tensor.dtype
    decoded = tf.reduce_sum(tensor, axis=1, keepdims=True) + self.ymin
    return decoded

transform(tensor)

Apply the OrdinalEncoder to tensor data.

Source code in myco/scalers.py
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
def transform(self, tensor: tf.Tensor) -> tf.Tensor:
    """Apply the OrdinalEncoder to tensor data."""
    dtype = tensor.dtype
    shape = tensor.shape
    class_maps = []
    yrange = np.arange(self.ymin, self.ymax)
    for category in yrange:
        bool_class = tf.math.greater(tensor, category)
        inds_class = tf.where(bool_class)
        ones_map = tf.ones_like(bool_class, dtype=dtype)
        ones_mask = tf.boolean_mask(ones_map, bool_class)
        class_map = tf.scatter_nd(inds_class, ones_mask, shape)
        class_maps.append(class_map)
    encoded = tf.concat(class_maps, axis=1)
    return encoded

TFPCA

Bases: PCA

TF-enabled scaling for PCA objects

Source code in myco/scalers.py
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
class TFPCA(PCA):
    """TF-enabled scaling for PCA objects"""

    def transform(self, tensor: tf.Tensor) -> tf.Tensor:
        """Apply the PCA to tensor data."""
        dtype = tensor.dtype
        if self.mean_ is not None:
            mean = tf.convert_to_tensor(self.mean_, dtype=dtype)
            tensor -= self.mean_
        components = tf.convert_to_tensor(self.components_, dtype=dtype)
        transformed = tf.tensordot(tensor, tf.transpose(components), 1)
        if self.whiten:
            ev = tf.convert_to_tensor(self.explained_variance_, dtype=dtype)
            transformed /= tf.sqrt(ev)
        return transformed

    def inverse_transform(self, tensor: tf.Tensor) -> tf.Tensor:
        """Convert from scaled to unscaled units"""
        dtype = tensor.dtype
        mean = tf.convert_to_tensor(self.mean_, dtype=dtype)
        components = tf.convert_to_tensor(self.components_, dtype=dtype)
        if self.whiten:
            ev = tf.convert_to_tensor(self.explained_variance_, dtype=dtype)
            std = tf.expand_dims(tf.sqrt(ev), 1)
            inverse = tf.tensordot(tensor, std * components, 1) + mean
        else:
            inverse = tf.tensordot(tensor, components, 1) + mean

        return inverse

inverse_transform(tensor)

Convert from scaled to unscaled units

Source code in myco/scalers.py
701
702
703
704
705
706
707
708
709
710
711
712
713
def inverse_transform(self, tensor: tf.Tensor) -> tf.Tensor:
    """Convert from scaled to unscaled units"""
    dtype = tensor.dtype
    mean = tf.convert_to_tensor(self.mean_, dtype=dtype)
    components = tf.convert_to_tensor(self.components_, dtype=dtype)
    if self.whiten:
        ev = tf.convert_to_tensor(self.explained_variance_, dtype=dtype)
        std = tf.expand_dims(tf.sqrt(ev), 1)
        inverse = tf.tensordot(tensor, std * components, 1) + mean
    else:
        inverse = tf.tensordot(tensor, components, 1) + mean

    return inverse

transform(tensor)

Apply the PCA to tensor data.

Source code in myco/scalers.py
688
689
690
691
692
693
694
695
696
697
698
699
def transform(self, tensor: tf.Tensor) -> tf.Tensor:
    """Apply the PCA to tensor data."""
    dtype = tensor.dtype
    if self.mean_ is not None:
        mean = tf.convert_to_tensor(self.mean_, dtype=dtype)
        tensor -= self.mean_
    components = tf.convert_to_tensor(self.components_, dtype=dtype)
    transformed = tf.tensordot(tensor, tf.transpose(components), 1)
    if self.whiten:
        ev = tf.convert_to_tensor(self.explained_variance_, dtype=dtype)
        transformed /= tf.sqrt(ev)
    return transformed

TFRobustScaler

Bases: RobustScaler

TF-enabled scaling for RobustScaler objects

Source code in myco/scalers.py
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
class TFRobustScaler(RobustScaler):
    """TF-enabled scaling for RobustScaler objects"""

    def transform(self, tensor: tf.Tensor) -> tf.Tensor:
        """Apply the RobustScaler to tensor data."""
        dtype = tensor.dtype
        if self.with_centering:
            center = tf.convert_to_tensor(self.center_, dtype=dtype)
            tensor -= center
        if self.with_scaling:
            scale = tf.convert_to_tensor(self.scale_, dtype=dtype)
            tensor /= scale
        return tensor

    def inverse_transform(self, tensor: tf.Tensor) -> tf.Tensor:
        """Convert from scaled to unscaled units"""
        dtype = tensor.dtype
        if self.with_scaling:
            scale = tf.convert_to_tensor(self.scale_, dtype=dtype)
            tensor *= scale
        if self.with_centering:
            center = tf.convert_to_tensor(self.center_, dtype=dtype)
            tensor += center
        return tensor

inverse_transform(tensor)

Convert from scaled to unscaled units

Source code in myco/scalers.py
738
739
740
741
742
743
744
745
746
747
def inverse_transform(self, tensor: tf.Tensor) -> tf.Tensor:
    """Convert from scaled to unscaled units"""
    dtype = tensor.dtype
    if self.with_scaling:
        scale = tf.convert_to_tensor(self.scale_, dtype=dtype)
        tensor *= scale
    if self.with_centering:
        center = tf.convert_to_tensor(self.center_, dtype=dtype)
        tensor += center
    return tensor

transform(tensor)

Apply the RobustScaler to tensor data.

Source code in myco/scalers.py
727
728
729
730
731
732
733
734
735
736
def transform(self, tensor: tf.Tensor) -> tf.Tensor:
    """Apply the RobustScaler to tensor data."""
    dtype = tensor.dtype
    if self.with_centering:
        center = tf.convert_to_tensor(self.center_, dtype=dtype)
        tensor -= center
    if self.with_scaling:
        scale = tf.convert_to_tensor(self.scale_, dtype=dtype)
        tensor /= scale
    return tensor

TFScaler

Extend fitted sklearn scalers to apply inverse/transform methods to tensors

Source code in myco/scalers.py
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
class TFScaler:
    """Extend fitted sklearn scalers to apply inverse/transform methods to tensors"""

    tfscalers: list = None
    n_classes_: int = None

    def __init__(self, scaler: BaseEstimator):
        """Create a TFScaler to support applying inverse/transform methods to tensors.

        Args:
            scaler: a fitted sklearn scaler or a myco NetworkScaler.
                multiple scalers fitted using a Pipeline method will be
                applied in series.
        """
        self.tfscalers = []
        self.n_classes_ = None

        # extract the scalers wrapped by NetworkScaler
        if isinstance(scaler, NetworkScaler):
            scaler = scaler.scaler

        # since pipeline data are an iterable of scalers we'll create
        # an iterable for normal scalers to loop over
        if not isinstance(scaler, Pipeline):
            scaler = [scaler]

        for s in scaler:
            params = s.get_params()

            if isinstance(s, MinMaxScaler):
                tfscaler = TFMinMaxScaler(**params)
                tfscaler.feature_range = s.feature_range
                tfscaler.scale_ = s.scale_
                tfscaler.min_ = s.min_

            elif isinstance(s, OneHotEncoder):
                tfscaler = TFOneHotEncoder(**params)
                tfscaler.categories_ = s.categories_[0]
                self.n_classes_ = len(tfscaler.categories_)

            elif isinstance(s, OrdinalEncoder):
                tfscaler = TFOrdinalEncoder()
                tfscaler.ymin = s.ymin
                tfscaler.ymax = s.ymax
                self.n_classes_ = s.ymax - s.ymin

            elif isinstance(s, PCA):
                tfscaler = TFPCA(**params)
                tfscaler.explained_variance_ = s.explained_variance_
                tfscaler.components_ = s.components_
                tfscaler.mean_ = s.mean_

            elif isinstance(s, PowerTransformer):
                raise NotImplementedError

            elif isinstance(s, QuantileTransformer):
                raise NotImplementedError

            elif isinstance(s, RobustScaler):
                tfscaler = TFRobustScaler(**params)
                tfscaler.center_ = s.center_
                tfscaler.scale_ = s.scale_

            elif isinstance(s, StandardScaler):
                tfscaler = TFStandardScaler(**params)
                tfscaler.mean_ = s.mean_
                tfscaler.scale_ = s.scale_

            else:
                raise NotImplementedError(f"TFScaler of type {type(s)} not supported")

            self.tfscalers.append(tfscaler)

    def _format_nodata(self, srcnodata: float = None, dstnodata: float = None) -> tuple:
        """Parses passed nodata values to retrieve appropriate nodata values"""
        dstnodata = srcnodata if dstnodata is None else dstnodata
        srcnodata = dstnodata if srcnodata is None else srcnodata
        return srcnodata, dstnodata

    def _get_transform_shape(self, tensor: tf.Tensor) -> tf.TensorShape:
        """Get the shape of the output tensor"""
        shape = list(tensor.shape)
        if self.n_classes_ is not None:
            shape[-1] = self.n_classes_
        return tf.TensorShape(shape)

    def _get_inverse_transform_shape(self, tensor: tf.Tensor) -> tf.TensorShape:
        """Determine the shape of the output array"""
        if self.n_classes_ is None:
            return tensor.shape
        else:
            return tf.expand_dims(tensor[..., -1], -1).shape

    def _reshape_tensor(self, tensor: tf.Tensor) -> tf.Tensor:
        """Convert a 3-D or 4-D tensor to a 2-D tensor"""
        return tf.reshape(tensor, (-1, tensor.shape[-1]))

    def _get_valid_locations(self, tensor: tf.Tensor, srcnodata: float) -> tf.Tensor:
        """Get boolean indices for valid array locations"""
        valid = tf.math.not_equal(tensor, srcnodata)
        valid = tf.reduce_all(valid, axis=-1)
        invalid = tf.math.equal(tensor, srcnodata)
        invalid = tf.reduce_all(invalid, axis=-1)
        return valid, invalid

    def transform(
        self, tensor: tf.Tensor, srcnodata: float = None, dstnodata: float = None
    ) -> tf.Tensor:
        """Apply the sklearn transform method(s) to tensor data.

        Args:
            tensor: n- dimensional tensor to transform
            srcnodata: the input nodata value to ignore
            dstnodata: the value to assign to output nodata pixels

        Returns:
            scaled/transformed tensor data
        """
        srcnodata, dstnodata = self._format_nodata(srcnodata, dstnodata)
        output_shape = self._get_transform_shape(tensor)
        reshaped = self._reshape_tensor(tensor)

        apply_mask = srcnodata is not None
        if apply_mask:
            scatter_shape = reshaped.shape
            bool_valid, bool_invalid = self._get_valid_locations(reshaped, srcnodata)
            inds_valid = tf.where(bool_valid)
            inds_invalid = tf.where(bool_invalid)
            ndvals = tf.boolean_mask(reshaped, bool_invalid)
            if srcnodata != dstnodata:
                nd_diff = dstnodata - srcnodata
                ndvals = tf.add(ndvals, nd_diff)
            reshaped = tf.boolean_mask(reshaped, bool_valid)

        for scaler in self.tfscalers:
            reshaped = scaler.transform(reshaped)

        if apply_mask:
            tvalid = tf.scatter_nd(inds_valid, reshaped, scatter_shape)
            tinvalid = tf.scatter_nd(inds_invalid, ndvals, scatter_shape)
            reshaped = tf.add(tvalid, tinvalid)

        transformed = tf.reshape(reshaped, output_shape)

        return transformed

    def inverse_transform(
        self, tensor: tf.Tensor, srcnodata: float = None, dstnodata: float = None
    ) -> tf.Tensor:
        """Apply the sklearn inverse_transform method(s) to tensor data

        Args:
            tensor: n- dimensional tensor to inverse transform
            srcnodata: the input nodata value to ignore
            dstnodata: the value to assign to output nodata pixels

        Returns:
            tensor transformed to it's original unscaled range
        """
        srcnodata, dstnodata = self._format_nodata(srcnodata, dstnodata)
        output_shape = self._get_inverse_transform_shape(tensor)
        reshaped = self._reshape_tensor(tensor)

        apply_mask = srcnodata is not None
        if apply_mask:
            scatter_shape = reshaped.shape
            bool_valid, bool_invalid = self._get_valid_locations(reshaped, srcnodata)
            inds_valid = tf.where(bool_valid)
            inds_invalid = tf.where(bool_invalid)
            ndvals = tf.boolean_mask(reshaped, bool_invalid)
            if srcnodata != dstnodata:
                nd_diff = dstnodata - srcnodata
                ndvals = tf.add(ndvals, nd_diff)
            reshaped = tf.boolean_mask(reshaped, bool_valid)

        for scaler in self.tfscalers[::-1]:
            reshaped = scaler.inverse_transform(reshaped)

        if apply_mask:
            tvalid = tf.scatter_nd(inds_valid, reshaped, scatter_shape)
            tinvalid = tf.scatter_nd(inds_invalid, ndvals, scatter_shape)
            reshaped = tf.add(tvalid, tinvalid)

        try:
            inverse_transformed = tf.reshape(reshaped, output_shape)
        except ValueError:
            inverse_transformed = reshaped

        return inverse_transformed

__init__(scaler)

Create a TFScaler to support applying inverse/transform methods to tensors.

Parameters:

Name Type Description Default
scaler BaseEstimator

a fitted sklearn scaler or a myco NetworkScaler. multiple scalers fitted using a Pipeline method will be applied in series.

required
Source code in myco/scalers.py
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
def __init__(self, scaler: BaseEstimator):
    """Create a TFScaler to support applying inverse/transform methods to tensors.

    Args:
        scaler: a fitted sklearn scaler or a myco NetworkScaler.
            multiple scalers fitted using a Pipeline method will be
            applied in series.
    """
    self.tfscalers = []
    self.n_classes_ = None

    # extract the scalers wrapped by NetworkScaler
    if isinstance(scaler, NetworkScaler):
        scaler = scaler.scaler

    # since pipeline data are an iterable of scalers we'll create
    # an iterable for normal scalers to loop over
    if not isinstance(scaler, Pipeline):
        scaler = [scaler]

    for s in scaler:
        params = s.get_params()

        if isinstance(s, MinMaxScaler):
            tfscaler = TFMinMaxScaler(**params)
            tfscaler.feature_range = s.feature_range
            tfscaler.scale_ = s.scale_
            tfscaler.min_ = s.min_

        elif isinstance(s, OneHotEncoder):
            tfscaler = TFOneHotEncoder(**params)
            tfscaler.categories_ = s.categories_[0]
            self.n_classes_ = len(tfscaler.categories_)

        elif isinstance(s, OrdinalEncoder):
            tfscaler = TFOrdinalEncoder()
            tfscaler.ymin = s.ymin
            tfscaler.ymax = s.ymax
            self.n_classes_ = s.ymax - s.ymin

        elif isinstance(s, PCA):
            tfscaler = TFPCA(**params)
            tfscaler.explained_variance_ = s.explained_variance_
            tfscaler.components_ = s.components_
            tfscaler.mean_ = s.mean_

        elif isinstance(s, PowerTransformer):
            raise NotImplementedError

        elif isinstance(s, QuantileTransformer):
            raise NotImplementedError

        elif isinstance(s, RobustScaler):
            tfscaler = TFRobustScaler(**params)
            tfscaler.center_ = s.center_
            tfscaler.scale_ = s.scale_

        elif isinstance(s, StandardScaler):
            tfscaler = TFStandardScaler(**params)
            tfscaler.mean_ = s.mean_
            tfscaler.scale_ = s.scale_

        else:
            raise NotImplementedError(f"TFScaler of type {type(s)} not supported")

        self.tfscalers.append(tfscaler)

inverse_transform(tensor, srcnodata=None, dstnodata=None)

Apply the sklearn inverse_transform method(s) to tensor data

Parameters:

Name Type Description Default
tensor tf.Tensor

n- dimensional tensor to inverse transform

required
srcnodata float

the input nodata value to ignore

None
dstnodata float

the value to assign to output nodata pixels

None

Returns:

Type Description
tf.Tensor

tensor transformed to it's original unscaled range

Source code in myco/scalers.py
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
def inverse_transform(
    self, tensor: tf.Tensor, srcnodata: float = None, dstnodata: float = None
) -> tf.Tensor:
    """Apply the sklearn inverse_transform method(s) to tensor data

    Args:
        tensor: n- dimensional tensor to inverse transform
        srcnodata: the input nodata value to ignore
        dstnodata: the value to assign to output nodata pixels

    Returns:
        tensor transformed to it's original unscaled range
    """
    srcnodata, dstnodata = self._format_nodata(srcnodata, dstnodata)
    output_shape = self._get_inverse_transform_shape(tensor)
    reshaped = self._reshape_tensor(tensor)

    apply_mask = srcnodata is not None
    if apply_mask:
        scatter_shape = reshaped.shape
        bool_valid, bool_invalid = self._get_valid_locations(reshaped, srcnodata)
        inds_valid = tf.where(bool_valid)
        inds_invalid = tf.where(bool_invalid)
        ndvals = tf.boolean_mask(reshaped, bool_invalid)
        if srcnodata != dstnodata:
            nd_diff = dstnodata - srcnodata
            ndvals = tf.add(ndvals, nd_diff)
        reshaped = tf.boolean_mask(reshaped, bool_valid)

    for scaler in self.tfscalers[::-1]:
        reshaped = scaler.inverse_transform(reshaped)

    if apply_mask:
        tvalid = tf.scatter_nd(inds_valid, reshaped, scatter_shape)
        tinvalid = tf.scatter_nd(inds_invalid, ndvals, scatter_shape)
        reshaped = tf.add(tvalid, tinvalid)

    try:
        inverse_transformed = tf.reshape(reshaped, output_shape)
    except ValueError:
        inverse_transformed = reshaped

    return inverse_transformed

transform(tensor, srcnodata=None, dstnodata=None)

Apply the sklearn transform method(s) to tensor data.

Parameters:

Name Type Description Default
tensor tf.Tensor

n- dimensional tensor to transform

required
srcnodata float

the input nodata value to ignore

None
dstnodata float

the value to assign to output nodata pixels

None

Returns:

Type Description
tf.Tensor

scaled/transformed tensor data

Source code in myco/scalers.py
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
def transform(
    self, tensor: tf.Tensor, srcnodata: float = None, dstnodata: float = None
) -> tf.Tensor:
    """Apply the sklearn transform method(s) to tensor data.

    Args:
        tensor: n- dimensional tensor to transform
        srcnodata: the input nodata value to ignore
        dstnodata: the value to assign to output nodata pixels

    Returns:
        scaled/transformed tensor data
    """
    srcnodata, dstnodata = self._format_nodata(srcnodata, dstnodata)
    output_shape = self._get_transform_shape(tensor)
    reshaped = self._reshape_tensor(tensor)

    apply_mask = srcnodata is not None
    if apply_mask:
        scatter_shape = reshaped.shape
        bool_valid, bool_invalid = self._get_valid_locations(reshaped, srcnodata)
        inds_valid = tf.where(bool_valid)
        inds_invalid = tf.where(bool_invalid)
        ndvals = tf.boolean_mask(reshaped, bool_invalid)
        if srcnodata != dstnodata:
            nd_diff = dstnodata - srcnodata
            ndvals = tf.add(ndvals, nd_diff)
        reshaped = tf.boolean_mask(reshaped, bool_valid)

    for scaler in self.tfscalers:
        reshaped = scaler.transform(reshaped)

    if apply_mask:
        tvalid = tf.scatter_nd(inds_valid, reshaped, scatter_shape)
        tinvalid = tf.scatter_nd(inds_invalid, ndvals, scatter_shape)
        reshaped = tf.add(tvalid, tinvalid)

    transformed = tf.reshape(reshaped, output_shape)

    return transformed

TFStandardScaler

Bases: StandardScaler

TF-enabled scaling for StandardScaler objects

Source code in myco/scalers.py
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
class TFStandardScaler(StandardScaler):
    """TF-enabled scaling for StandardScaler objects"""

    def transform(self, tensor: tf.Tensor) -> tf.Tensor:
        """Apply the StandardScaler to tensor data."""
        dtype = tensor.dtype
        if self.with_mean:
            mean = tf.convert_to_tensor(self.mean_, dtype=dtype)
            tensor -= mean
        if self.with_std:
            scale = tf.convert_to_tensor(self.scale_, dtype=dtype)
            tensor /= scale
        return tensor

    def inverse_transform(self, tensor: tf.Tensor) -> tf.Tensor:
        """Convert from scaled to unscaled units"""
        dtype = tensor.dtype
        if self.with_std:
            scale = tf.convert_to_tensor(self.scale_, dtype=dtype)
            tensor *= scale
        if self.with_mean:
            mean = tf.convert_to_tensor(self.mean_, dtype=dtype)
            tensor += mean
        return tensor

inverse_transform(tensor)

Convert from scaled to unscaled units

Source code in myco/scalers.py
764
765
766
767
768
769
770
771
772
773
def inverse_transform(self, tensor: tf.Tensor) -> tf.Tensor:
    """Convert from scaled to unscaled units"""
    dtype = tensor.dtype
    if self.with_std:
        scale = tf.convert_to_tensor(self.scale_, dtype=dtype)
        tensor *= scale
    if self.with_mean:
        mean = tf.convert_to_tensor(self.mean_, dtype=dtype)
        tensor += mean
    return tensor

transform(tensor)

Apply the StandardScaler to tensor data.

Source code in myco/scalers.py
753
754
755
756
757
758
759
760
761
762
def transform(self, tensor: tf.Tensor) -> tf.Tensor:
    """Apply the StandardScaler to tensor data."""
    dtype = tensor.dtype
    if self.with_mean:
        mean = tf.convert_to_tensor(self.mean_, dtype=dtype)
        tensor -= mean
    if self.with_std:
        scale = tf.convert_to_tensor(self.scale_, dtype=dtype)
        tensor /= scale
    return tensor

get_names()

Return a list of the available scalers supported in configuration

Source code in myco/scalers.py
820
821
822
def get_names() -> list:
    """Return a list of the available scalers supported in configuration"""
    return list(SUPPORTED.keys())

get_scaler(name)

Return an initialized scaler object by name

Source code in myco/scalers.py
825
826
827
828
def get_scaler(name: str) -> BaseEstimator:
    """Return an initialized scaler object by name"""
    assert name in get_names(), f"Invalid scaler: {name}"
    return SUPPORTED[name]

get_weighting_names()

Return a list of available sample weight transformers

Source code in myco/scalers.py
831
832
833
def get_weighting_names() -> list:
    """Return a list of available sample weight transformers"""
    return list(WEIGHTS.keys())