Source code for hfs.helpers

"""
Collection of helper methods for the feature selection algorithms.
"""
import math
from fractions import Fraction

import networkx as nx
import numpy as np
from networkx.algorithms.simple_paths import all_simple_paths


def getRelevance(xdata, ydata, node):
    """
    Gather relevance for a given node.

    Parameters
    ----------
    node : int
        Node for which the relevance should be obtained.
    xdata : {array-like, sparse matrix}, shape (n_samples, n_features)
            The training input samples.
    ydata : array-like, shape (n_samples,)
            The target values. An array of int.
    """
    p1 = (
        Fraction(
            xdata[(xdata[:, node] == 1) & (ydata == 1)].shape[0],
            xdata[(xdata[:, node] == 1)].shape[0],
        )
        if xdata[(xdata[:, node] == 1)].shape[0] != 0
        else 0
    )
    p2 = (
        Fraction(
            xdata[(xdata[:, node] == 0) & (ydata == 1)].shape[0],
            xdata[(xdata[:, node] == 0)].shape[0],
        )
        if xdata[(xdata[:, node] == 0)].shape[0] != 0
        else 0
    )
    p3 = 1 - p1
    p4 = 1 - p2

    rel = (p1 - p2) ** 2 + (p3 - p4) ** 2
    return rel


def checkData(dag, x_data, y_data):
    """Checks whether the given dataset satisfies the 0-1-propagation on the DAG.

    The 0-1-propagation property states that if there is a directed edge (u, v)
    in the DAG, then whenever node u has a value of 1 in the dataset, node v
    must have a value of 1 for the same instance.

    Parameters
    ----------
    dag : networkx.DiGraph
        The Directed Acyclic Graph representing the hierarchy structure.
    x_data : numpy.ndarray
            An array containing the input features of the dataset.
    y_data : numpy.ndarray
            An array containing the corresponding output labels of the dataset.

    Raises
    ----------
    ValueError: If the dataset violates the 0-1-propagation property
    on any of the edges in the DAG.

    """
    data = np.column_stack((x_data, y_data))
    edges = list(nx.edge_dfs(dag, source=0, orientation="original"))
    for edge in edges:
        for idx in range(len(data)):
            if data[idx, edge[0]] == 0 and data[idx, edge[1]] == 1:
                raise ValueError(
                    f"Test instance {idx} violates 0-1-propagation \
                    on edge ({edge[0]}, {edge[1]})"
                    f"{data[idx]}"
                )


def get_irrelevant_leaves(x_identifier, digraph):
    """Get leaves from the given graph that contain no relevant information.

    A leaf is considered irrelevant if it meets the following criteria:
    - It is not part of the specified x_identifier list.
    - It is not the "ROOT" node, which typically represents the starting
      node of the DAG.

    Parameters
    ----------
    x_identifier :  list
                    A list containing node identifiers that are considered
                    relevant
    digraph : networkx.DiGraph
            The Directed Acyclic Graph (DAG) from which irrelevant leaves
            will be identified.

    Returns
    ----------
    selected_leaves : list
                    A list of leaf nodes that contain no relevant information.
    """
    selected_leaves = [
        x
        for x in digraph.nodes()
        if digraph.out_degree(x) == 0
        and digraph.in_degree(x) == 1
        and x not in x_identifier
        and x != "ROOT"
    ]
    return selected_leaves


def get_leaves(graph: nx.DiGraph):
    """Get the leaf nodes from the given directed acyclic graph (DAG).

    A leaf node is a node in the graph that meets the following criteria:
    - It has no outgoing edges (out_degree == 0).
    - It has at least one incoming edge (in_degree > 0), indicating it
      has one or more parent nodes.

    Parameters
    ----------
    graph : networkx.DiGraph
            The Directed Acyclic Graph (DAG) from which the leaf nodes
            will be identified.

    Returns
    ----------
    leaves : list
            A list of leaf nodes found in the DAG.
    """
    leaves = [
        node
        for node in graph
        if graph.in_degree(node) > 0 and graph.out_degree(node) == 0
    ]
    return leaves


def shrink_dag(x_identifier, digraph):
    """Remove irrelevant leaf nodes from the given DAG.

    Parameters
    ----------
    x_identifier : list
            A list containing node identifiers that are considered relevant
    digraph : networkx.DiGraph
            The Directed Acyclic Graph (DAG) from which irrelevant leaf nodes
            will be removed.

    Returns
    ----------
    digraph : networkx.DiGraph
            The resulting DAG after removing all irrelevant leaf nodes.

    """
    leaves = get_irrelevant_leaves(x_identifier, digraph)
    while leaves:
        for x in leaves:
            digraph.remove_node(x)
        leaves = get_irrelevant_leaves(x_identifier, digraph)
    return digraph


def connect_dag(x_identifiers, hierarchy: nx.DiGraph):
    """
    Connects digraph (DAG), so that every node not in x_identifiers is removed from the DAG, and an new edge with its predecessor is built.

    Parameters
    ----------
    hierarchy : networkx.DiGraph
                The Directed Acyclic Graph (DAG) representing the hierarchy.

    """
    top_sort = nx.topological_sort(hierarchy)

    # node i = 0: source is either in or not in, as they are no predecessors,
    # there should not be any artificial edge
    # i: for each pred there is a direct edge to the pred and iff pred not in x_ide
    #       also to their pred2. (it does not matter if pred2 is really in x, if it is not,
    #       the edge will be removed later anyway)
    # i+1: if i is in -> no artificial edge on this path needed
    #       if i is not -> artifical edge to every pred of i, so each path going through i
    #       will be continued, if i is removed later

    for node in list(top_sort):
        preds = list(hierarchy.predecessors(node))
        for pred in preds:
            new_connections = []
            if pred not in x_identifiers:
                for pred_of_pred in hierarchy.predecessors(pred):
                    new_connections.append(pred_of_pred)
                for new_connection in new_connections:
                    hierarchy.add_edge(new_connection, node)

    # remove all nodes (and edges) that are not in x_identifier
    x_identifiers_set = set(x_identifiers)
    nodes_to_remove = [
        node for node in hierarchy.nodes if node not in x_identifiers_set
    ]
    hierarchy.remove_nodes_from(nodes_to_remove)

    return hierarchy


def create_hierarchy(hierarchy: nx.DiGraph):
    """Create a virtual root node to connect disjoint hierarchies.

    Parameters
    ----------
    hierarchy : networkx.DiGraph
                The Directed Acyclic Graph (DAG) representing the hierarchy.

    Returns
    ----------
    hierarchy : networkx.DiGraph
                The final hierarchy graph.
    """

    roots = [x for x in hierarchy.nodes() if hierarchy.in_degree(x) == 0]
    # create parent node to join hierarchies
    for root_node in roots:
        hierarchy.add_edge("ROOT", root_node)
    if not roots:
        hierarchy.add_node("ROOT")

    return hierarchy


def get_paths(graph: nx.DiGraph, reverse=False):
    """Get all the paths from the "ROOT" node to the leaf nodes in the input graph.

    Parameters
    ----------
    graph : networkx.DiGraph
            The Directed Acyclic Graph (DAG) for which paths need to be found.
    reverse : bool
            If True, the order of nodes in each path will be reversed,
            effectively giving the paths from leaf nodes to the "ROOT" node.

    Returns
    ----------
    paths : list
            A list node lists which represent paths.
    """
    leaves = get_leaves(graph)
    paths = list(all_simple_paths(graph, "ROOT", leaves))
    if reverse:
        for path in paths:
            path.reverse()
    return paths


[docs]def get_columns_for_numpy_hierarchy(hierarchy: nx.DiGraph, num_columns: int): """Get mapping from hierarchy nodes to columns after hierarchy transformation. If each node in the hierarchy is named after a column's index this methods will give you the mapping from column index to node name of the node after the graph was transformed to a numpy array and back. During this transformation the node names are lost and afterwards each node is named after its index in hierarchy.nodes. Parameters ---------- hierarchy : networkx.DiGraph The Directed Acyclic Graph (DAG) representing the hierarchy. num_columns : bool The number of columns in the dataset. Returns ---------- columns : list A mapping from nodes to columns. """ columns = [] for node in range(num_columns): index = list(hierarchy.nodes()).index(node) if node in hierarchy.nodes else -1 columns.append(index) return columns
def normalize_score(score, max_value): """Normalize the given score using logarithmic scaling and a maximum value. Parameters ---------- score : float or int The score to be normalized. max_value : float or int The maximum of the scores in the corresponding row. Returns ---------- float or int : The normalized score after applying logarithmic scaling. """ if score != 0: score = math.log(1 + (score / max_value)) + 1 return score def compute_aggregated_values( X, hierarchy: nx.DiGraph, columns: list[int], node="ROOT" ): """Recursively aggregate features in X by summing up their children's values. The method traverses the given Directed Acyclic Graph (DAG) hierarchy starting from the specified node, and recursively aggregates the values from its children nodes up to the specified root node. To caculate all values start form "ROOT". Parameters ---------- X : {array-like, sparse matrix} The input array with the original data. hierarchy : networkx.DiGraph The Directed Acyclic Graph (DAG) representing the hierarchical structure. columns : list The mapping from the hierarchy graph's nodes to the columns in X. A list of ints. If this parameter is None the columns in X and the corresponding nodes in the hierarchy are expected to be in the same order. node : {int, str} The starting node for aggregation. Default is "ROOT". Returns ---------- X : numpy.ndarray The input array `X` with the aggregated values based on the provided hierarchy. """ if hierarchy.out_degree(node) == 0: return X children = hierarchy.successors(node) aggregated = np.zeros((X.shape[0])) for child in list(children): X = compute_aggregated_values(X, hierarchy, columns, node=child) aggregated = np.add(aggregated, X[:, columns.index(child)]) if node != "ROOT": aggregated = np.add(aggregated, X[:, columns.index(node)]) column_index = columns.index(node) X[:, column_index] = aggregated return X