Source code for hfs.lazyHierarchicalFeatureSelector

from abc import ABC

import networkx as nx
import numpy as np
from sklearn.metrics import classification_report
from sklearn.naive_bayes import BernoulliNB

from .base import HierarchicalEstimator
from .helpers import checkData, getRelevance
from .metrics import conditional_mutual_information


[docs]class LazyHierarchicalFeatureSelector(HierarchicalEstimator, ABC): """ Abstract class used for all lazy hierarchical feature selection methods. Every method should implement the method select_and_predict. """
[docs] def __init__(self, hierarchy: np.ndarray = None): # todo G = None """ Initialize a LazyHierarchicalFeatureSelector with the required data. Parameters ---------- hierarchy : np.ndarray The hierarchy graph as an adjacency matrix. """ self.hierarchy = hierarchy
[docs] def fit(self, X, y=None): """ Implementing the fit function for Sklearn compatibility. Parameters ---------- X : {array-like, sparse matrix}, shape (n_samples, n_features) The training input samples. y : array-like, shape (n_samples,) The target values. An array of int. Returns ------- self : object Fitted estimator. """ X = self._validate_data( X, ) self.n_features_ = X.shape[1] return self
[docs] def fit_selector(self, X_train, y_train, X_test, columns=None): """ Fit LazyHierarchicalFeatureSelector class. Due to laziness fitting of parameters as well as predictions are obtained per instance. Parameters ---------- X_train : {numpy array} of shape (n_samples, n_features) The training input samples. X_test : {numpy array} of shape (n_samples, n_features) The test input samples. converted into a sparse ``csc_matrix``. y_train : array-like of shape (n_samples, n_levels) The target values, i.e., hierarchical class labels for classification. """ # Create DAG self.n_features_ = X_train.shape[1] self._set_hierarchy() self._hierarchy.remove_node("ROOT") if columns: self._columns = columns else: self._columns = list(range(self.n_features_)) mapping = {value: index for index, value in enumerate(self._columns)} self._hierarchy = nx.relabel_nodes(self._hierarchy, mapping) self._xtrain = X_train self._ytrain = y_train self._xtest = X_test self._features = np.zeros(shape=X_test.shape) self._feature_length = np.zeros(self._xtest.shape[1], dtype=int) # Validate data checkData(self._hierarchy, self._xtrain, self._ytrain) # Get relevance of each node self._relevance = {} for node in self._hierarchy: self._relevance[node] = getRelevance(self._xtrain, self._ytrain, node) self._sorted_relevance = sorted(self._relevance, key=self._relevance.get) self._instance_status = {} for node in self._hierarchy: self._instance_status[node] = 1
[docs] def select_and_predict( self, predict=True, saveFeatures=False, estimator=BernoulliNB() ): """ Select features lazy for each test instance amd optionally predict target value of test instances. To be implemented by children. Parameters ---------- predict : {bool} true if predictions shall be obtained. saveFeatures : {bool} true if features selected for each test instance shall be saved. estimator : sklearn-compatible estimator. Estimator to use for predictions. Returns ------- predictions for test input samples if predict = false, returns empty array """ pass
def _get_nonredundant_features(self, idx): """ Get nonredundant features without relevance score. Basic functionality of the algorithm HIP proposed by Wan & Freitas. Parameters ---------- idx : int Index of test instance for which the features shall be selected. """ for node in self._hierarchy: self._instance_status[node] = 1 for node in self._hierarchy: if self._xtest[idx][node] == 1: for anc in self._hierarchy.predecessors(node): self._instance_status[anc] = 0 else: for desc in self._hierarchy.successors(node): self._instance_status[desc] = 0 def _get_nonredundant_features_relevance(self, idx): """ Get nonredundant features based on relevance score. Basic functionality of the HNB algorithm proposed by Wan & Freitas. Parameters ---------- idx : Index of test instance for which the features shall be selected. """ for node in self._hierarchy: self._instance_status[node] = 1 for node in self._hierarchy: if node == "ROOT": continue if self._xtest[idx][node] == 1: for anc in nx.ancestors(self._hierarchy, node): if self._relevance[anc] <= self._relevance[node]: self._instance_status[anc] = 0 else: for desc in nx.descendants(self._hierarchy, node): if self._relevance[desc] <= self._relevance[node]: self._instance_status[desc] = 0 def _get_nonredundant_features_mr(self, idx): """ Get nonredundant features based on the MR considering all pathes. Basic functionality of the HIP algorithm proposed by Wan & Freitas. Parameters ---------- idx : Index of test instance for which the features shall be selected. """ top_sort = list(nx.topological_sort(self._hierarchy)) reverse_top_sort = reversed(top_sort) mr = {} for node in top_sort: # correctness: as each predecessor lies on the same path as the current node # one can be removed. # # not several nodes per path: # contradiction - nodes a and b (a<b) selected and are on the same path. # each node a+1..b-1 between would save a as most relevant node: # 0: mr[a] = a # 1: mr[a+1] = a+1, if rel[mr[pred]=a] > rel[mr[a+1]=a+1]: mr[a+1]=a # and status[a+1] = remove # i: mr[a+i] = a+i, if rel[mr[pred]=a] > rel[mr[a+i]=a+i]: mr[a+i]=a # and status[a+i] = remove # When b is processed, mr[b-1] = a, so either rel[a]>rel[b] -> mr[b]=b is removed # and set to a OR mr[b-1]=a is removed and mr[b] stays b. # # at least one node per path: As there is a maximum relevant node in each path # this node will stay mr[node] = node and not be exchanged through mr[pred]. # Then the condition is never met on this path # so self._instance_status[mr[node]] = 0 never executed. # mr[node] = [] more_rel_nodes = [node] if self._xtest[idx][node]: # preds are 1 because of 0-1-propagation for pred in self._hierarchy.predecessors(node): # get most relevant nodes seen on the paths until current node for _mr in mr[pred]: # if their is a node on the path more important then current node if self._relevance[_mr] > self._relevance[node]: self._instance_status[node] = 0 # save this node for next iterations (steps on path) more_rel_nodes.append(_mr) else: # save current node as most important. # there can be several paths, in this case, several nodes are saved self._instance_status[_mr] = 0 more_rel_nodes.append(node) mr[node] = more_rel_nodes for node in reverse_top_sort: mr[node] = [] more_rel_nodes = [node] if not self._xtest[idx][node]: mr[node] = node for suc in self._hierarchy.successors(node): # get most relevant nodes seen on paths until current node for _mr in mr[suc]: if self._relevance[_mr] > self._relevance[node]: # each node not selected will removed self._instance_status[node] = 0 more_rel_nodes.append(_mr) else: self._instance_status[_mr] = 0 more_rel_nodes.append(node) mr[node] = more_rel_nodes def _get_top_k(self): """ Get k highest-ranked features by relevance. """ counter = 0 for node in reversed(self._sorted_relevance): if node == "ROOT": continue if (counter < self.k or not self.k) and self._instance_status[node]: counter += 1 else: self._instance_status[node] = 0 def _build_mst(self): """ Build minium spanning tree for each possible edge in the feature tree. """ edges = self._hierarchy.edges self._edge_status = np.zeros((self.n_features_, self.n_features_)) self._cmi = np.zeros((self.n_features_, self.n_features_)) self._sorted_edges = [] for node1 in self._hierarchy.nodes: for node2 in self._hierarchy.nodes: if node1 == node2: continue self._cmi[node1][node2] = conditional_mutual_information( self._xtrain[:, node1], self._xtrain[:, node2], self._ytrain ) self._edge_status[node1][node2] = 1 sorted_indices = np.argsort(self._cmi, axis=None) for index in sorted_indices: coordinates = divmod(index, self.n_features_) if coordinates[0] < coordinates[1]: self._sorted_edges.append(coordinates) def _get_nonredundant_features_from_mst(self, idx): """ Get nonredundant features from MST. Basic functionality of the algorithm TAN proposed by Wan & Freitas. Parameters ---------- idx : int Index of test instance for which the features shall be selected. """ UDAG = nx.Graph() for node1 in self._hierarchy: self._instance_status[node1] = 0 for node2 in self._hierarchy: self._edge_status[node1][node2] = 1 representants = [i for i in range(self.n_features_)] members = {} for i in range(self.n_features_): members[i] = [i] # get paths reachable_nodes = {} for node in self._hierarchy: reachable_nodes[node] = [] for des in nx.descendants(self._hierarchy, node): reachable_nodes[node].append(des) # select edges for edge in self._sorted_edges: if ( self._edge_status[edge[0]][edge[1]] # check redundancy: same path and same value and ( self._xtest[idx][edge[0]] != self._xtest[idx][edge[1]] or ( edge[0] not in reachable_nodes[edge[1]] and edge[1] not in reachable_nodes[edge[1]] ) ) # check if circle in UDAG using the property, that edge (a,b) infers circle iff a und b # are members of the same component and representants[edge[0]] != representants[edge[1]] ): UDAG.add_edge(edge[0], edge[1]) self._edge_status[edge[0]][edge[1]] = 0 # merge: change the representatives of the smaller component if len(members[representants[edge[0]]]) <= len( members[representants[edge[1]]] ): for m in members[edge[0]]: representants[m] = representants[edge[1]] members[representants[edge[1]]].append(m) else: for m in members[edge[1]]: representants[m] = representants[edge[0]] members[representants[edge[0]]].append(m) # remove all edges with redundant ancestors or descendants of e0 and e1 for selected_node in [edge[0], edge[1]]: for neighbor_node in nx.ancestors(self._hierarchy, selected_node): if ( self._xtest[idx][selected_node] == self._xtest[idx][neighbor_node] ): # alternative: collect all and then delete in sorted_edges self._edge_status[:, neighbor_node] = 0 self._edge_status[neighbor_node][:] = 0 for neighbor_node in nx.descendants(self._hierarchy, selected_node): if ( self._xtest[idx][selected_node] == self._xtest[idx][neighbor_node] ): self._edge_status[:, neighbor_node] = 0 self._edge_status[neighbor_node][:] = 0 self._instance_status[edge[0]] = 1 self._instance_status[edge[1]] = 1 def _predict(self, idx, estimator): """ Predicts for an instance of the test set. Parameters ---------- idx : int Index of test instance which shall be predicted. estimator : sklearn-compatible estimator Estimator to use for predictions. Returns ------- prediction : bool prediction of test instance's target value. """ features = [nodes for nodes, status in self._instance_status.items() if status] features_in_dataset = [] for feature in features: features_in_dataset.append(self._columns.index(feature)) clf = estimator clf.fit(self._xtrain[:, features_in_dataset], self._ytrain) return clf.predict(self._xtest[idx][features_in_dataset].reshape(1, -1))
[docs] def get_score(self, ytest, predictions): """ Returns score of the predictions. Note that recall of the positive class is known as “sensitivity”; recall of the negative class is “specificity” Parameters ---------- ytest : 1d array-like, or label indicator array / sparse matrix truth values of y. predictions : 1d array-like, or label indicator array / sparse matrix obtained predictions. Returns ------- report : dict metrics of prediction. """ avg_feature_length = 0 for idx in range(0, self._xtest.shape[0] - 1): avg_feature_length += self._feature_length[idx] / self._xtrain.shape[1] avg_feature_length = avg_feature_length / (len(self._feature_length)) score = classification_report( y_true=ytest, y_pred=predictions, output_dict=True ) score["sensitivityxspecificity"] = float(score["0"]["recall"]) * float( score["1"]["recall"] ) score["compression"] = avg_feature_length return score
[docs] def get_features(self): """ Get selected features. Parameters ---------- None Returns ------- features : numpy array Boolean value at index states if feature is selected. """ return self._features