# -*- coding: utf-8 -*-
# ######### COPYRIGHT #########
# Credits
# #######
#
# Copyright(c) 2020-2020
# ----------------------
#
# * Laboratoire d'Informatique et Systèmes <http://www.lis-lab.fr/>
# * Université d'Aix-Marseille <http://www.univ-amu.fr/>
# * Centre National de la Recherche Scientifique <http://www.cnrs.fr/>
# * Université de Toulon <http://www.univ-tln.fr/>
#
# Contributors
# ------------
#
# * `Valentin Emiya <mailto:valentin.emiya@lis-lab.fr>`_
# * `Ama Marina Krémé <mailto:ama-marina.kreme@lis-lab.fr>`_
#
# This package has been created thanks to the joint work with Florent Jaillet
# and Ronan Hamon on other packages.
#
# Description
# -----------
#
# Time frequency fading using Gabor multipliers
#
# Version
# -------
#
# * tffpy version = 0.1.4
#
# Licence
# -------
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# ######### COPYRIGHT #########
"""
Class :class:`GabMulTff` is the main object to solve a time-frequency fading
problem.
.. moduleauthor:: Valentin Emiya
"""
from time import perf_counter
from pathlib import Path
import numpy as np
from scipy.optimize import minimize_scalar, minimize
from matplotlib import pyplot as plt
from ltfatpy import plotdgtreal
from skpomade.range_approximation import \
adaptive_randomized_range_finder, randomized_range_finder
from skpomade.factorization_construction import evd_nystrom
from tffpy.tf_tools import GaborMultiplier
from tffpy.create_subregions import create_subregions
from tffpy.utils import dgt, plot_spectrogram, db
[docs]class GabMulTff:
"""
Time-frequency fading using Gabor multipliers
Main object to solve the TFF problem
Parameters
----------
x_mix : nd-array
Mix signal
mask : nd-array
Time-frequency mask
dgt_params : dict
DGT parameters
signal_params : dict
Signal parameters
tol_subregions : None or float
If None, the mask is considered as a single region. If float,
tolerance to split the mask into sub-regions using
:py:func:`~tffpy.create_subregions.create_subregions`.
fig_dir : str or Path
If not None, folder where figures are stored. If None, figures are
not plotted.
"""
def __init__(self, x_mix, mask, dgt_params, signal_params,
tol_subregions=None, fig_dir=None):
self.x_mix = x_mix
self.dgt_params = dgt_params
self.signal_params = signal_params
self.tol_subregions = tol_subregions
self.t_subreg = None
if tol_subregions is not None:
if np.issubdtype(mask.dtype, np.bool_):
t0 = perf_counter()
mask = create_subregions(mask_bool=mask,
dgt_params=dgt_params,
signal_params=signal_params,
tol=tol_subregions,
fig_dir=fig_dir)
self.t_subreg = perf_counter() - t0
n_areas = np.unique(mask).size - 1
self.mask = mask
else:
n_areas = 1
self.mask = mask > 0
self.gabmul_list = [GaborMultiplier(mask=(mask == i + 1),
dgt_params=dgt_params,
signal_params=signal_params)
for i in range(n_areas)]
self.s_vec_list = [None for i in range(n_areas)]
self.u_mat_list = [None for i in range(n_areas)]
self.uh_x_list = [None for i in range(n_areas)]
self.t_arrf = [None for i in range(n_areas)]
self.t_evdn = [None for i in range(n_areas)]
self.t_uh_x = [None for i in range(n_areas)]
self.fig_dir = fig_dir
if fig_dir is not None:
fig_dir = Path(fig_dir)
fig_dir.mkdir(parents=True, exist_ok=True)
@property
def n_areas(self):
"""
Number of sub-regions
"""
return len(self.u_mat_list)
[docs] def compute_decomposition(self, tolerance_arrf, proba_arrf, rand_state=0):
"""
Decompose each Gabor multiplier using a random EVD
The decomposition is obtained using
:py:func:`skpomade.range_approximation.adaptive_randomized_range_finder`
followed by :py:func:`skpomade.factorization_construction.evd_nystrom`.
The rank of each decomposition is estimated using parameters
`tolerance_arrf` and `proba_arrf`.
Running times to compute the range approximation, the
EVD itself and the additional matrix products for subsequent
computations are stored in attributes `t_arrf`, `t_evdn` and
`t_uh_x`, respectively.
Parameters
----------
tolerance_arrf : float
Tolerance for
:py:func:`~skpomade.range_approximation.adaptive_randomized_range_finder`
proba_arrf : float
Probability of error for
:py:func:`~skpomade.range_approximation.adaptive_randomized_range_finder`
rand_state : RandomState, int or None
If RandomState, random generator.
If int or None, random seed used to initialize the pseudo-random
number generator.
"""
if rand_state is None:
rand_state = np.random.RandomState(None)
if np.issubdtype(type(rand_state), np.dtype(int).type):
rand_state = np.random.RandomState(rand_state)
for i in range(self.n_areas):
print('Random EVD of Gabor multiplier #{}'.format(i))
print('#coefs in mask: {} ({:.1%} missing)'
.format(np.sum(self.gabmul_list[i].mask),
np.sum(self.gabmul_list[i].mask)
/ self.gabmul_list[i].mask.size))
t0 = perf_counter()
q_mat = adaptive_randomized_range_finder(a=self.gabmul_list[i],
tolerance=tolerance_arrf,
proba=proba_arrf, r=None,
rand_state=rand_state,
n_cols_Q=32)
self.t_arrf[i] = perf_counter() - t0
print('Q shape:', q_mat.shape)
t0 = perf_counter()
self.s_vec_list[i], self.u_mat_list[i] = \
evd_nystrom(a=self.gabmul_list[i], q_mat=q_mat)
self.t_evdn[i] = perf_counter() - t0
print('Running times:')
print(' - adaptive_randomized_range_finder: {} s'.format(
self.t_arrf[i]))
print(' - evd_nystrom: {} s'.format(self.t_evdn[i]))
t0 = perf_counter()
[docs] self.uh_x_list[i] = self.u_mat_list[i].T.conj() @ self.x_mix
self.t_uh_x[i] = perf_counter() - t0
def compute_decomposition_fixed_rank(self, rank, rand_state=0):
"""
Decompose each Gabor multiplier using a random EVD with given rank
The decomposition is obtained using
:py:func:`skpomade.range_approximation.randomized_range_finder`
followed by :py:func:`skpomade.factorization_construction.evd_nystrom`.
Running times are stored in attributes `t_rrf`, `t_evdn` and `t_uh_x`.
Parameters
----------
rank : int
Rank of the decompostion
rand_state : RandomState, int or None
If RandomState, random generator.
If int or None, random seed used to initialize the pseudo-random
number generator.
"""
if rand_state is None:
rand_state = np.random.RandomState(None)
if np.issubdtype(type(rand_state), np.dtype(int).type):
rand_state = np.random.RandomState(rand_state)
t_rrf = [None for i in range(self.n_areas)]
t_evdn = [None for i in range(self.n_areas)]
t_uh_x = [None for i in range(self.n_areas)]
for i in range(self.n_areas):
print('Random EVD of Gabor multiplier #{}'.format(i))
print('#coefs in mask: {} ({:.1%})'
.format(np.sum(self.gabmul_list[i].mask),
np.sum(self.gabmul_list[i].mask)
/ self.gabmul_list[i].mask.size))
t0 = perf_counter()
q_mat = randomized_range_finder(a=self.gabmul_list[i],
n_l=rank,
rand_state=rand_state)
t_rrf[i] = perf_counter() - t0
print('Q shape:', q_mat.shape)
t0 = perf_counter()
self.s_vec_list[i], self.u_mat_list[i] = \
evd_nystrom(a=self.gabmul_list[i], q_mat=q_mat)
t_evdn[i] = perf_counter() - t0
print('Running times:')
print(' - randomized_range_finder: {} s'.format(t_rrf[i]))
print(' - evd_nystrom: {} s'.format(t_evdn[i]))
t0 = perf_counter()
[docs] self.uh_x_list[i] = self.u_mat_list[i].T.conj() @ self.x_mix
t_uh_x[i] = perf_counter() - t0
def compute_estimate(self, lambda_coef):
"""
Compute the signal estimate for a given hyperparameter
:math:`\lambda_i` for each sub-region :math:`i`.
Prior decomposition should have been performed using
:meth:`compute_decomposition` or
:meth:`compute_decomposition_fixed_rank`.
Parameters
----------
lambda_coef : nd-array or float
If nd-array, hyperparameters :math:`\lambda_i` for each sub-region
:math:`i`. If float, the same value :math:`\lambda` is used
for each sub-region.
Returns
-------
nd-array
Reconstructed signal
"""
if isinstance(lambda_coef, np.ndarray):
assert lambda_coef.size == self.n_areas
else:
lambda_coef = np.full(self.n_areas, fill_value=lambda_coef)
x = self.x_mix.copy()
for i in range(self.n_areas):
gamma_vec = lambda_coef[i] * self.s_vec_list[i] \
/ (1 - (1 - lambda_coef[i]) * self.s_vec_list[i])
[docs] x -= self.u_mat_list[i] @ (gamma_vec * self.uh_x_list[i])
return x
def compute_lambda(self, x_mix, e_target=None):
"""
Estimate hyperparameters :math:`\lambda_i` from target energy in each
sub-region :math:`i`.
Parameters
----------
x_mix : nd-array
Mix signal
e_target : nd-array or None
Target energy for each sub-region/. If None, function
:py:func:`estimate_energy_in_mask` is used to estimate the
target energies.
Returns
-------
lambda_est : nd-array
Hyperparameters :math:`\lambda_i` for each sub-region :math:`i`.
t_est : nd-array
Running time to estimate each hyperparameter.
"""
if e_target is None:
e_target = estimate_energy_in_mask(
x_mix=x_mix, mask=self.mask,
dgt_params=self.dgt_params, signal_params=self.signal_params,
fig_dir=self.fig_dir)
t_est = np.empty(self.n_areas)
lambda_est = np.empty(self.n_areas)
for i_area in range(self.n_areas):
mask_i = self.mask == i_area + 1
def obj_fun_est(lambda_coef):
x = self.compute_estimate(lambda_coef)
x_tf_masked = mask_i * self.gabmul_list[i_area].dgt(x)
e_mask = np.linalg.norm(x_tf_masked, 'fro') ** 2
return np.abs(e_target[i_area] - e_mask)
t0 = perf_counter()
sol_est = minimize_scalar(obj_fun_est, bracket=[0, 1],
method='brent')
t_est[i_area] = perf_counter() - t0
lambda_est[i_area] = sol_est.x
return lambda_est, t_est
[docs]def reconstruction(x_mix, lambda_coef, u_mat, s_vec):
return GabMulTff(x_mix=x_mix, u_mat=u_mat, s_vec=s_vec)(lambda_coef)
[docs]def estimate_energy_in_mask(x_mix, mask, dgt_params, signal_params,
fig_dir=None, prefix=None):
"""
Estimate energy in time-frequency mask
Parameters
----------
x_mix : nd-array
Mix signal
mask: nd-array
Time-frequency mask for each sub-region
dgt_params : dict
DGT parameters
signal_params : dict
Signal parameters
fig_dir : str or Path
If not None, folder where figures are stored. If None, figures are
not plotted.
prefix : str
If not None, this prefix is used when saving the figures.
Returns
-------
nd-array
Estimated energy in each sub-region.
"""
x_tf_mat = dgt(sig=x_mix, dgt_params=dgt_params)
x_tf_mat[mask > 0] = np.nan
e_f_mean = np.nanmean(np.abs(x_tf_mat) ** 2, axis=1)
mask = mask.astype(int)
n_areas = np.unique(mask).size - 1
estimated_energy = np.empty(n_areas)
[docs] e_mat = e_f_mean[:, None] @ np.ones((1, x_tf_mat.shape[1]))
e_mat[mask == 0] = 0
for i_area in range(n_areas):
mask_i = mask == i_area + 1
estimated_energy[i_area] = np.sum(e_mat * mask_i)
if fig_dir is not None:
fig_dir = Path(fig_dir)
fig_dir.mkdir(parents=True, exist_ok=True)
if prefix is None:
prefix = ''
else:
prefix = prefix + '_'
dynrange = 100
c_max = np.nanmax(db(x_tf_mat))
clim = c_max - dynrange, c_max
fs = signal_params['fs']
plt.figure()
plot_spectrogram(x=x_mix, dgt_params=dgt_params, fs=fs, clim=clim)
plt.title('Mix')
plt.savefig(fig_dir / '{}mix.pdf'.format(prefix))
plt.figure()
plotdgtreal(coef=np.sqrt(e_mat), a=dgt_params['hop'],
M=dgt_params['n_bins'], fs=fs, clim=clim)
plt.title('Mask filled with average energy (total: {})'
.format(estimated_energy))
plt.savefig(fig_dir / '{}filled_mask.pdf'.format(prefix))
x_tf_mat[mask > 0] = np.sqrt(e_mat[mask > 0])
plt.figure()
plotdgtreal(coef=x_tf_mat, a=dgt_params['hop'],
M=dgt_params['n_bins'], fs=fs, clim=clim)
plt.title('Mix filled with average energy (total: {})'
.format(estimated_energy))
plt.savefig(fig_dir / '{}filled_mask.pdf'.format(prefix))
plt.figure()
plt.plot(db(e_f_mean) / 2)
plt.xlabel('Frequency index')
plt.ylabel('Average energy')
plt.title('Average energy per frequency bin in mix')
plt.savefig(fig_dir / '{}average_energy.pdf'.format(prefix))
e_f_mean_check = np.mean(np.abs(x_tf_mat) ** 2, axis=1)
plt.figure()
plt.plot(db(e_f_mean) / 2, label='Before filling')
plt.plot(db(e_f_mean_check) / 2, '--', label='After filling')
plt.xlabel('Frequency index')
plt.ylabel('Average energy')
plt.title('Average energy per frequency bin in mix')
plt.legend()
plt.savefig(fig_dir / '{}average_energy_check.pdf'.format(prefix))
return estimated_energy
def compute_lambda_oracle_sdr(gmtff, x_wb):
"""
Compute oracle value for hyperparameter :math:`\lambda_i` from true
solution.
If only one region is considered, the Brent's algorithm is used (see
:py:func:`scipy.optimize.minimize_scalar`). If multiple sub-regions are
considered the BFGS
algorithm is used (see :py:func:`scipy.optimize.minimize`).
Parameters
----------
gmtff : GabMulTff
x_wb : nd-array
True signal for the wideband source.
Returns
-------
lambda_oracle : nd-array
Oracle values for hyperparameters :math:`\lambda_i` for each
sub-region :math:`i`.
t_oracle : nd-array
Running times for the computation of each hyperparameter
"""
t0 = perf_counter()
def obj_fun_oracle(lambda_coef):
return np.linalg.norm(x_wb - gmtff.compute_estimate(lambda_coef))
if gmtff.tol_subregions is None:
sol_oracle = minimize_scalar(obj_fun_oracle,
bracket=[0, 1], method='brent')
lambda_oracle = np.array([sol_oracle.x])
else:
sol_oracle = minimize(obj_fun_oracle,
np.ones(gmtff.n_areas),
method='BFGS',
options={'disp': True})
lambda_oracle = sol_oracle.x
t_oracle = perf_counter() - t0
return lambda_oracle, t_oracle