Source code for lisbet.unsupervised

"""LISBET module for sequence segmentation and dimensionality reduction."""

import logging
import random
from pathlib import Path

import joblib
import numpy as np
import pandas as pd
import yaml
from hmmlearn import hmm
from sklearn.preprocessing import MinMaxScaler
from tqdm.auto import tqdm
from umap import UMAP


def _one_hot(targets, num_classes):
    """One-hot encoding."""
    return np.eye(num_classes, dtype=int)[np.array(targets)]


def _get_embeddings(features_path, datafilter=None):
    """
    Retrieve embeddings for a given dataset.

    Parameters
    ----------
    features_path : str or None, optional
        Path to the directory containing precomputed features. If None, features are
        computed using the provided raw data.
    datafilter : str or None, optional
        Comma-separated string specifying filters for loading precomputed features.

    Returns
    -------
    embeddings : list
        A list of tuples, where each tuple contains the file name and corresponding
        embeddings as a NumPy array.

    Notes
    -----
    If `features_path` is None, the function computes features using the specified
    embedding model. If `features_path` is provided, precomputed features are loaded
    and filtered based on the specified criteria in `datafilter`.
    """
    # Load precomputed features and filter data, if requested
    featpaths = list(Path(features_path).rglob("features_*_embedding.csv"))
    embeddings = [
        (
            str(fpath.relative_to(features_path).parent),  # key
            pd.read_csv(fpath, index_col=0, dtype=np.float32).values,  # data
        )
        for fpath in tqdm(featpaths, desc="Loading embeddings")
        if datafilter is None
        or any(
            flt in str(fpath.relative_to(features_path))
            for flt in datafilter.split(",")
        )
    ]
    logging.info("Loaded %d sequences", len(embeddings))

    logging.debug([emb[0] for emb in embeddings])

    return embeddings


def _fit_hmm_func(n_components, num_iter, data, lengths, seed):
    """Fit HMM model, target function."""
    hmm_model = hmm.GaussianHMM(
        n_components=n_components,
        covariance_type="full",
        random_state=seed,
        n_iter=num_iter,
        tol=1e-3,
        verbose=False,
    )
    hmm_model.fit(data, lengths=lengths)
    hmm_history = list(hmm_model.monitor_.history)
    hmm_metrics = {
        "ll": hmm_model.score(data, lengths=lengths),
        "aic": hmm_model.aic(data, lengths=lengths),
        "bic": float(hmm_model.bic(data, lengths=lengths)),
    }

    return hmm_model, hmm_history, hmm_metrics


def _fit_hmm(
    min_n_components,
    max_n_components,
    num_iter,
    embeddings,
    frac,
    n_jobs,
    seed,
    output_path=None,
):
    """Fit HMM model."""
    # Random sample
    if frac is not None:
        assert 0 < frac <= 1, "frac must be in the (0, 1] range"

        rng = np.random.default_rng(seed=seed)
        indices = rng.choice(len(embeddings), size=int(np.ceil(frac * len(embeddings))))
        fit_embeddings = [embeddings[idx] for idx in indices]
        logging.info("Sampled %d sequences for model fitting", len(fit_embeddings))
    else:
        fit_embeddings = embeddings

    # Merge sequences
    fit_lengths = [emb[1].shape[0] for emb in fit_embeddings]
    fit_embeddings = np.concatenate([emb[1] for emb in fit_embeddings])

    # Make n_components range
    # NOTE: Shuffling is used mostly to improve user experience by making the ETA more
    #       accurate. An LPT scheduler would be more efficient, but it could severely
    #       degrade user experience as the ETA would be initially overestimated.
    n_components_range = list(range(min_n_components, max_n_components + 1))
    n_models = len(n_components_range)
    random.shuffle(n_components_range)

    # Fit model
    # NOTE: We are only parallelizing HMM fitting, not the prediction step or the disk
    #       I/O. This is because the latter two steps are not CPU-bound and we wanted to
    #       keep the parallelized code as simple as possible.
    # NOTE: Using the default loky backend raises an Exception due to a bug in joblib
    #       (see https://github.com/joblib/joblib/issues/1707). The issue has been
    #       fixed, but the patch will not be available until the next release of joblib,
    #       currently at version 1.4.2. In the meantime, we can use the threading
    #       backend via prefer="threads".
    parallel = joblib.Parallel(
        n_jobs=n_jobs, return_as="generator_unordered", prefer="threads"
    )
    fitting_results = parallel(
        joblib.delayed(_fit_hmm_func)(
            n_components, num_iter, fit_embeddings, fit_lengths, seed
        )
        for n_components in n_components_range
    )

    fitted_models = []
    for hmm_model, hmm_history, hmm_metrics in tqdm(
        fitting_results, total=n_models, desc="Fitting HMM models"
    ):
        logging.debug(
            "HMM with %d states: log-likelihood = %.2f, AIC = %.2f, BIC = %.2f",
            hmm_model.n_components,
            hmm_metrics["ll"],
            hmm_metrics["aic"],
            hmm_metrics["bic"],
        )

        fitted_models.append(hmm_model)

        # Store fitting results on file, if requested
        if output_path is not None:
            dst_path = Path(output_path) / "annotations"
            dst_path.mkdir(parents=True, exist_ok=True)

            # HMM model
            joblib.dump(
                hmm_model, dst_path / f"model_hmm{hmm_model.n_components}.joblib"
            )

            # History
            hmm_history_df = pd.DataFrame(hmm_history, columns=["History"])
            hmm_history_df.to_csv(dst_path / f"history_hmm{hmm_model.n_components}.csv")

            # Metrics
            with open(
                dst_path / f"metrics_hmm{hmm_model.n_components}.yaml",
                "w",
                encoding="utf-8",
            ) as f_yaml:
                yaml.safe_dump(hmm_metrics, f_yaml)

    return fitted_models


[docs] def segment_hmm( data_path, min_n_components=2, max_n_components=32, num_iter=10, data_filter=None, fit_frac=None, hmm_seed=None, n_jobs=-1, pretrained_path=None, output_path=None, ): """ Segment time series data using Hidden Markov Models. This function fits one or more HMM models to the embeddings and uses the models to segment the data into discrete states. Parameters ---------- data_path : str or Path Path to the directory containing LISBET embeddings. min_n_components : int, optional Minimum number of states to use in the HMM. max_n_components : int, optional Maximum number of states to use in the HMM. num_iter : int, default=10 Maximum number of iterations for the Baum-Welch algorithm. data_filter : callable, optional Function to filter the data before fitting. fit_frac : float, optional Fraction of data to use for fitting. If None, use all data. hmm_seed : int, optional Random seed for reproducibility. n_jobs : int, default=-1 Number of parallel jobs to run, -1 means using all processors. pretrained_path : str or Path, optional Path to the directory containing pretrained HMM models. If None, models are trained from scratch. output_path : str or Path, optional Path to save the results. If None, results are not saved to disk. Returns ------- predictions : dict Dictionary mapping the number of states to the predicted segments for each sequence. Raises ------ ValueError If min_n_components or max_n_components are smaller than 2, or max_n_components is smaller than min_n_components. """ # Calculate the number of models to fit if not (2 <= min_n_components <= max_n_components): raise ValueError("Must satisfy: 2 <= min_n_components <= max_n_components") # Get LISBET embeddings for the dataset embeddings = _get_embeddings(data_path, data_filter) if pretrained_path is None: # Fit HMM models hmm_models = _fit_hmm( min_n_components=min_n_components, max_n_components=max_n_components, num_iter=num_iter, embeddings=embeddings, frac=fit_frac, n_jobs=n_jobs, seed=hmm_seed, output_path=output_path, ) else: # Load pretrained HMM models hmm_models = [ joblib.load(pretrained_path / f"model_hmm{n}.joblib") for n in range(min_n_components, max_n_components + 1) ] # Segment all sequences and store predictions to disk, if requested predictions = {} for hmm_model in tqdm(hmm_models, desc="Segmenting sequences"): # HMM predictions all_lengths = [emb[1].shape[0] for emb in embeddings] all_embeddings = np.concatenate([emb[1] for emb in embeddings]) all_predictions = hmm_model.predict(all_embeddings, lengths=all_lengths) # Assign predictions to match the corresponding sequences hmm_predictions = [] for seq_idx, (key, seq) in enumerate(embeddings): # Find prediction boundaries for the current sequence seq_start = sum(all_lengths[:seq_idx]) seq_stop = seq_start + all_lengths[seq_idx] assert seq.shape[0] == all_lengths[seq_idx] logging.debug( "Sequence start = %d, Sequence stop = %d", seq_start, seq_stop ) # Extract prediction pred = all_predictions[seq_start:seq_stop] hmm_predictions.append((key, pred)) # Store predictions on file, if requested if output_path is not None: dst_path = Path(output_path) / "annotations" / key dst_path.mkdir(parents=True, exist_ok=True) # HMM motifs motifs = pd.DataFrame( _one_hot(pred, hmm_model.n_components), columns=[f"Motif_{i}" for i in range(hmm_model.n_components)], ) motifs.to_csv( dst_path / f"machineAnnotation_hmm{hmm_model.n_components}.csv" ) predictions[hmm_model.n_components] = hmm_predictions return predictions
[docs] def reduce_umap( data_path, num_dims=2, num_neighbors=60, data_filter=None, sample_size=None, sample_seed=None, umap_seed=None, output_path=None, ): """Dimensionality reduction using UMAP.""" # Get LISBET embeddings for the dataset embeddings = _get_embeddings(data_path, data_filter) # Random sample # NOTE: Sampling the embeddings after computing/reading them is inefficient. # Furthermore, the sampling logic should probably be part of the data reader. # For the moment we keep it here for simplicity, but we might rethink that. if sample_size is not None: rng = np.random.default_rng(seed=sample_seed) indices = rng.choice(len(embeddings), size=sample_size, replace=False) embeddings = [embeddings[idx] for idx in indices] logging.info("Sampled %d sequences", len(embeddings)) # Merge sequences lengths = [emb[1].shape[0] for emb in embeddings] all_embeddings = np.concatenate([emb[1] for emb in embeddings]) # Scale features for dataset all_embeddings = MinMaxScaler().fit_transform(all_embeddings) # Reduce embedding size using UMAP all_predictions = UMAP( n_neighbors=num_neighbors, n_components=num_dims, random_state=umap_seed, verbose=True, ).fit_transform(all_embeddings) # Store fitting results on file, if requested if output_path is not None: dst_path = Path(output_path) dst_path.mkdir(parents=True, exist_ok=True) # UMAP predictions predictions = [] for seq_idx, (key, seq) in enumerate(embeddings): # Find prediction boundaries for the current sequence seq_start = sum(lengths[:seq_idx]) seq_stop = seq_start + lengths[seq_idx] assert seq.shape[0] == lengths[seq_idx] logging.debug("Sequence start = %d, Sequence stop = %d", seq_start, seq_stop) # Extract prediction pred = all_predictions[seq_start:seq_stop] predictions.append((key, pred)) # Store results on file, if requested if output_path is not None: dst_path = ( Path(output_path) / "embeddings" / key / f"features_umap{num_dims}d{num_neighbors}_dimred.csv" ) dst_path.parent.mkdir(parents=True, exist_ok=True) umap_embeddings = pd.DataFrame(pred) umap_embeddings.to_csv(dst_path) return predictions