Source code for kpop.population.population_multi

from .population_base import PopulationBase
from .population_list import PopulationList
from .population_single import Population
from ..libs import np, pd


[docs]class MultiPopulation(PopulationBase): """ A population formed by several sub-populations. """ is_multi_population = True @property def meta(self): # Collect common columns cols, *tail = [set(pop.meta.columns) for pop in self.populations] for new in tail: cols.intersection_update(new) cols = sorted(cols) # Create dataframe metas = [pop.meta[cols] for pop in self.populations] df = pd.DataFrame(columns=cols) for meta in metas: df = df.append(meta) return df def __init__(self, populations=(), freqs=None, **kwargs): if freqs is not None: raise ValueError( 'cannot specify frequencies on MultiPopulation objects' ) self.populations = PopulationList() for population in populations: self.add_population(population) super().__init__(**kwargs) def __len__(self): return sum(len(x) for x in self.populations) def _getitem_by_index(self, idx): i = idx for pop in self.populations: size = len(pop) if i >= size: i -= size else: return pop[i] raise IndexError(idx) def __iter__(self): for pop in self.populations: yield from pop def __add__(self, other): if isinstance(other, (Population, MultiPopulation)): populations = list(self.populations) return MultiPopulation(populations + list(other.populations)) return NotImplemented def __radd__(self, other): if isinstance(other, (Population, MultiPopulation)): populations = list(self.populations) return MultiPopulation([other] + populations) return NotImplemented def _as_array(self): stack = [pop._as_array() for pop in self.populations] return np.vstack(stack)
[docs] def add_population(self, population): """ Adds a new sub-population. Args: population: A :class:`Population` instance. """ for sub in self.populations: if population.id is not None and population.id == sub.id: raise ValueError( 'sub-population with id "%s" already present!' % sub.id ) self.populations.append(population)
[docs] def slice_indexes(self, indexes): """ Map indexes to a list of indexes for each sub-population. """ slices = [[] for pop in self.populations] breakpoints = np.add.accumulate([len(pop) for pop in self.populations]) for idx in indexes: for k, idx_max in enumerate(breakpoints): if idx < idx_max: if k > 0: idx -= breakpoints[k - 1] slices[k].append(idx) break else: raise IndexError('index out of bounds: %s' % idx) return [np.array(lst) for lst in slices]
def copy(self, populations=None, **kwargs): new = super().copy(**kwargs) if populations is not None: new.populations = PopulationList(populations) return new def keep_individuals(self, indexes, **kwargs): sliced_indexes = self.slice_indexes(indexes) data = zip(self.populations, sliced_indexes) populations = [pop.keep_individuals(indexes) for pop, indexes in data] return self.copy(populations=populations, **kwargs) def keep_loci(self, indexes, **kwargs): populations = [pop.keep_loci(indexes) for pop in self.populations] return self.copy(populations=populations, **kwargs) def map_alleles(self, alleles_mapping, **kwargs): map = alleles_mapping populations = [pop.map_alleles(map) for pop in self.populations] return self.copy(populations=populations, **kwargs)