Source code for tfitpy.indices.ppi

"""Protein-Protein Interaction (PPI) related indices.

The core methods accept a NetworkX graph as the PPI input, which enables the
use of multiple PPI network sources. Three sources are currently supported:

- HIPPIE PPI (:cite:t:`hippie`)
- StringDB PPI (:cite:t:`string`)
- BioGRID Database (:cite:t:`biogrid`)

Three wrapper functions are provided that accept a shared dataset cache.
The cache is described in the load_datasets() documentation.
"""

import networkx as nx
import numpy as np
from typing import Dict, List, Any, Union, Callable, Tuple
from tfitpy.utils import generate_tf_pairs
import pandas as pd
from scipy.stats import hypergeom

# -----
# Index 1 : PPI Shared Partners Index
# ------


def _get_ppi_partners(ppi_graph: nx.Graph, node: str) -> set:
    """Returns the set of direct PPI partners (neighbors) of a node."""
    if node not in ppi_graph:
        return set()
    return set(ppi_graph.neighbors(node))


def _hypergeometric_pvalue(N: int, N1: int, N2: int, c: int) -> float:
    """Computes the upper-tail hypergeometric p-value for partner overlap.

    Calculates P(X >= c) where X ~ Hypergeometric(N, N1, N2).

    Args:
        N: Total population size (number of nodes in the PPI network).
        N1: Number of interaction partners for the first TF.
        N2: Number of interaction partners for the second TF.
        c: Observed number of shared partners.

    Returns:
        The upper-tail p-value as a float in the range [0, 1]. Returns 1.0
        if c exceeds the maximum possible overlap (min(N1, N2)).
    """
    if c > min(N1, N2):
        return 1.0
    return float(hypergeom.sf(c - 1, N, N1, N2))


[docs] def shared_partners_pairwise(tf1: str, tf2: str, ppi_graph: nx.Graph, background_size: int) -> Tuple[float, float, int]: """ Computes the hypergeometric shared-partners score for a single TF pair. For a given pair of transcription factors, retrieves their respective PPI partner sets, computes the overlap, and returns a significance score S = -log10(P) where P is the upper-tail hypergeometric p-value. Based on :cite:t:`indices_2014`. Args: tf1: identifier of TF1. tf2: identifier of TF2. ppi_graph: An undirected NetworkX graph representing the PPI network. background_size: Total number of proteins used as the population size for the hypergeometric test. Typically the number of nodes in the PPI graph. Returns: A tuple (S,p,c) where: S (float): The significance score -log10(P). Returns 0.0 if either TF has no partners or if there is no overlap. Returns inf if P rounds to zero. p (float): The p-value c (int): The number of common partners """ partners1 = _get_ppi_partners(ppi_graph, tf1) partners2 = _get_ppi_partners(ppi_graph, tf2) N1 = len(partners1) N2 = len(partners2) common_partners = partners1 & partners2 c = len(common_partners) if N1 == 0 or N2 == 0 or c == 0: return 0.0, 1.0, 0 P = _hypergeometric_pvalue(background_size, N1, N2, c) S = float("inf") if P <= 0.0 else -np.log10(P) return float(S), P, c
[docs] def shared_partners( sources: list, ppi_network: nx.Graph = None, pairs=None ) -> Tuple[float, pd.DataFrame]: """Computes the shared PPI partners score for a TF regulatory module. For each TF pair derived from sources, computes the hypergeometric shared-partners score and aggregates all pairwise scores into a single module-level index using by taking the mean. Based on :cite:t:`indices_2014`. Args: sources: List of source TF identifiers in the regulatory module. ppi_network: An undirected NetworkX graph representing the PPI network. Must be provided. Returns: A tuple (final_score, pairs_df) where: final_score (float): The mean hypergeometric score across all valid TF pairs. Returns 0.0 if no valid scores exist. pairs_df (DataFrame): A DataFrame with one row per TF pair, containing columns: tf1, tf2, score, p_value , common_partners Raises: ValueError: If ppi_network is None. """ if ppi_network is None: raise ValueError("No graph provided") if pairs is None: pairs = generate_tf_pairs(sources) pair_results = [] background_size = len(ppi_network.nodes()) for tf1, tf2 in pairs: score, p_value, common_partner_count = shared_partners_pairwise( tf1=tf1, tf2=tf2, ppi_graph=ppi_network, background_size=background_size, ) row = { "tf1": tf1, "tf2": tf2, "score": score, "p_value": p_value, "common_partners": common_partner_count } pair_results.append(row) pairs_df = pd.DataFrame(pair_results) valid_scores = pairs_df["score"].replace( [np.inf, -np.inf], np.nan) valid_scores = valid_scores.dropna() if len(valid_scores) == 0: final_score = 0.0 else: # aggregate via mean for the whole group final_score = float(np.mean(valid_scores)) return final_score, pairs_df
def _hypergeom_wrapper(db_key, sources, datasets, pairs=None): """Shared logic for hypergeometric wrappers.""" if datasets is None: raise ValueError( "datasets cache is required. Create cache with load_datasets() first.") if db_key not in datasets: raise ValueError(f"Dataset dependency missing: '{db_key}'") return shared_partners( sources=sources, ppi_network=datasets[db_key], pairs=pairs )
[docs] def shared_partners_hippie( sources: list, datasets=None, pairs=None, **kwargs ): """PPI Shared Partner score using the HIPPIE PPI network.""" return _hypergeom_wrapper("hippie", sources, datasets, pairs)
[docs] def shared_partners_stringdb( sources: list, datasets=None, pairs=None, **kwargs ): """PPI Shared Partner score using the STRING PPI network.""" return _hypergeom_wrapper("stringdb", sources, datasets, pairs)
[docs] def shared_partners_biogrid( sources: list, datasets=None, pairs=None, **kwargs ): """PPI Shared Partner score using the BioGRID PPI network.""" return _hypergeom_wrapper("biogrid", sources, datasets, pairs)
# =========| # Index 2 # Shortest path score # =========|
[docs] def shortest_path_pairwise(tf1: str, tf2: str, ppi_graph: nx.Graph) -> Tuple[float, float]: """Compute proximity score from shortest path length.""" if tf1 not in ppi_graph or tf2 not in ppi_graph: return 0.0, float('inf') try: length = nx.shortest_path_length(ppi_graph, tf1, tf2) S = 1.0 / length if length > 0 else 1.0 return S, length except nx.NetworkXNoPath: return 0.0, float('inf')
[docs] def shortest_path_score( sources: list, ppi_network: nx.Graph = None, pairs=None ) -> tuple: """Computes the aggregate shortest-path score for a TF regulatory module. For each TF pair derived from sources, computes the shortest-path score and aggregates all pairwise scores into a single module-level index by taking the mean. Based on :cite:t:`indices_2014` Args: sources: List of source TF identifiers in the regulatory module. ppi_network: An undirected NetworkX graph representing the PPI network. Must be provided. pairs: Optional precomputed list of (tf1, tf2) tuples. If None, all unique pairs are generated from sources via generate_tf_pairs(). Returns: A tuple (final_score, pairs_df) where: final_score (float): The mean proximity score across all TF pairs. Returns 0.0 if no valid scores exist. pairs_df (DataFrame): A DataFrame with one row per TF pair, containing columns: tf1, tf2, score, path_length. Raises: ValueError: If ppi_network is None. """ if ppi_network is None: raise ValueError("No graph provided") if pairs is None: pairs = generate_tf_pairs(sources) pair_results = [] for tf1, tf2 in pairs: proximity, path_length = shortest_path_pairwise(tf1, tf2, ppi_network) pair_results.append({ 'tf1': tf1, 'tf2': tf2, 'score': proximity, 'path_length': path_length }) pairs_df = pd.DataFrame(pair_results) valid_scores = pairs_df['score'].replace( [np.inf, -np.inf], np.nan).dropna() final_score = np.mean(valid_scores) if len(valid_scores) > 0 else 0.0 return final_score, pairs_df
# =========| # Optimized: single-pass all PPI scores # =========| _PPI_DB_KEYS = ["hippie", "stringdb", "biogrid"] def _ppi_single_pass( pairs: list, graph: nx.Graph, background_size: int, ) -> Tuple[float, float]: """Single pass over pairs computing both shortest-path and shared-partners scores. For each pair, computes: - shortest path proximity score: 1/length (0.0 if no path) - shared partners hypergeometric score: -log10(p) (0.0 if no overlap) Uses a row-level neighbor cache so each TF's neighbor set is fetched only once per call, regardless of how many pairs it appears in. Args: pairs: List of (tf1, tf2) tuples. graph: NetworkX PPI graph for this database. background_size: Number of nodes in graph, used as hypergeometric N. Returns: Tuple of (shortest_path_score, shared_partners_score) — both are means across valid pairs, 0.0 if no valid scores exist. """ # Row-level neighbor cache: avoids recomputing set(G.neighbors(tf)) # for TFs that appear in multiple pairs within this row. neighbor_cache: dict = {} path_scores = [] partner_scores = [] for tf1, tf2 in pairs: # --- shortest path --- if tf1 not in graph or tf2 not in graph: path_scores.append(0.0) else: try: length = nx.shortest_path_length(graph, tf1, tf2) path_scores.append(1.0 / length if length > 0 else 1.0) except nx.NetworkXNoPath: path_scores.append(0.0) # --- shared partners (neighbor sets cached at row level) --- if tf1 not in neighbor_cache: neighbor_cache[tf1] = ( set(graph.neighbors(tf1)) if tf1 in graph else set() ) if tf2 not in neighbor_cache: neighbor_cache[tf2] = ( set(graph.neighbors(tf2)) if tf2 in graph else set() ) n1 = neighbor_cache[tf1] n2 = neighbor_cache[tf2] N1, N2 = len(n1), len(n2) c = len(n1 & n2) if N1 == 0 or N2 == 0 or c == 0: partner_scores.append(0.0) else: P = _hypergeometric_pvalue(background_size, N1, N2, c) S = float("inf") if P <= 0.0 else -np.log10(P) partner_scores.append(S) # Aggregate: mean over valid (finite, non-nan) scores def _safe_mean(values: list) -> float: arr = np.array(values, dtype=float) arr = arr[np.isfinite(arr)] return float(np.mean(arr)) if len(arr) > 0 else 0.0 return _safe_mean(path_scores), _safe_mean(partner_scores) def _ppi_scores_from_cache(sources: list, pairs: list, cache: pd.DataFrame) -> dict: """Extract PPI scores from precomputed cache and aggregate.""" from tfitpy.utils import generate_tf_pairs if pairs is None: pairs = generate_tf_pairs(sources) # Convert to sorted tuples for index lookup pair_tuples = [tuple(sorted([g1, g2])) for g1, g2 in pairs] # Fast index-based lookup using .loc with list of tuples try: relevant_rows = cache.loc[pair_tuples] except KeyError: # Some pairs not in cache - filter to existing ones existing_pairs = [p for p in pair_tuples if p in cache.index] if not existing_pairs: return { "shortest_PPI_path_score_hippie": 0.0, "shortest_PPI_path_score_stringdb": 0.0, "shortest_PPI_path_score_biogrid": 0.0, "shared_PPI_partners_score_hippie": 0.0, "shared_PPI_partners_score_stringdb": 0.0, "shared_PPI_partners_score_biogrid": 0.0, } relevant_rows = cache.loc[existing_pairs] # Aggregate scores (mean, ignoring inf/nan) results = {} for db_key in ["hippie", "stringdb", "biogrid"]: path_col = f'shortest_PPI_path_score_{db_key}' partner_col = f'shared_PPI_partners_score_{db_key}' # Get valid scores path_scores = relevant_rows[path_col].replace( [np.inf, -np.inf], np.nan ).dropna() partner_scores = relevant_rows[partner_col].replace( [np.inf, -np.inf], np.nan ).dropna() # Compute means results[path_col] = round( float(path_scores.mean()) if len(path_scores) > 0 else 0.0, 5 ) results[partner_col] = round( float(partner_scores.mean()) if len(partner_scores) > 0 else 0.0, 5 ) return results
[docs] def ppi_all_scores( sources: list, datasets: dict = None, pairs: list = None, **kwargs, ) -> dict: """Compute all 6 PPI scores in a single pass per database. For each of the three PPI databases (hippie, stringdb, biogrid), makes one pass over all TF pairs to compute both the shortest-path proximity score and the shared-partners hypergeometric score simultaneously. This is equivalent to calling shortest_path_score and shared_partners separately for each database, but with half the graph traversals. Args: sources: List of source TF identifiers in the regulatory module. datasets: Dataset cache dict containing 'hippie', 'stringdb', 'biogrid' NetworkX graphs. Must be provided. pairs: Optional precomputed list of (tf1, tf2) tuples. If None, generated from sources via generate_tf_pairs(). Returns: Dict with 6 keys: shortest_PPI_path_score_hippie shortest_PPI_path_score_stringdb shortest_PPI_path_score_biogrid shared_PPI_partners_score_hippie shared_PPI_partners_score_stringdb shared_PPI_partners_score_biogrid Raises: ValueError: If datasets is None or any required db key is missing. """ # Check if we have the cache if datasets is not None and 'pairwise_score_cache' in datasets: # Fast path: use cache cache = datasets['pairwise_score_cache'] #print("using fastcache") return _ppi_scores_from_cache(sources, pairs, cache) if datasets is None: raise ValueError( "datasets cache is required. Create cache with load_datasets() first.") missing = [k for k in _PPI_DB_KEYS if k not in datasets] if missing: raise ValueError(f"Dataset dependencies missing: {missing}") if pairs is None: pairs = generate_tf_pairs(sources) results = {} for db_key in _PPI_DB_KEYS: graph = datasets[db_key] background_size = len(graph.nodes()) path_score, partner_score = _ppi_single_pass(pairs, graph, background_size) results[f"shortest_PPI_path_score_{db_key}"] = round(path_score, 5) results[f"shared_PPI_partners_score_{db_key}"] = round(partner_score, 5) return results
PPI_METHODS = { "ppi": { "func": ppi_all_scores, "datasets": ["pairwise_score_cache"], "type": "df_columns", "cols": [ "shortest_PPI_path_score_hippie", "shortest_PPI_path_score_stringdb", "shortest_PPI_path_score_biogrid", "shared_PPI_partners_score_hippie", "shared_PPI_partners_score_stringdb", "shared_PPI_partners_score_biogrid", ], }, }