# -*- coding: utf-8 -*-
# ######### COPYRIGHT #########
# Credits
# #######
#
# Copyright(c) 2020-2020
# ----------------------
#
# * Laboratoire d'Informatique et Systèmes <http://www.lis-lab.fr/>
# * Université d'Aix-Marseille <http://www.univ-amu.fr/>
# * Centre National de la Recherche Scientifique <http://www.cnrs.fr/>
# * Université de Toulon <http://www.univ-tln.fr/>
#
# Contributors
# ------------
#
# * `Valentin Emiya <mailto:valentin.emiya@lis-lab.fr>`_
# * `Ama Marina Krémé <mailto:ama-marina.kreme@lis-lab.fr>`_
#
# This package has been created thanks to the joint work with Florent Jaillet
# and Ronan Hamon on other packages.
#
# Description
# -----------
#
# Time frequency fading using Gabor multipliers
#
# Version
# -------
#
# * tffpy version = 0.1.4
#
# Licence
# -------
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# ######### COPYRIGHT #########
"""
Class `SolveTffExperiment` uses the :class:`yafe.base.Experiment` experiment
framework to handle the main time-frequency fading experiment: It includes
loading the data, generating the problems, applying solvers, and exploiting
results.
See the `documentation <http://skmad-suite.pages.lis-lab.fr/yafe/>`_ of
package :py:mod:`yafe` for the technical details.
.. moduleauthor:: Valentin Emiya
"""
import numpy as np
import matplotlib.pyplot as plt
import matplotlib
import pandas as pd
from pathlib import Path
from scipy.stats import linregress
from yafe import Experiment
from madarrays import Waveform
from tffpy.datasets import get_mix, get_dataset
from tffpy.tf_fading import GabMulTff, compute_lambda_oracle_sdr
from tffpy.interpolation_solver import solve_by_interpolation
from tffpy.utils import \
sdr, plot_spectrogram, is_div_spectrum, plot_mask, db, dgt
[docs]class SolveTffExperiment(Experiment):
"""
The main experiment to solve time-frequency fading problems with a
number of sounds mixtures and solvers.
Parameters
----------
force_reset : bool
If true, reset the experiment by erasing all previous results
in order to run it from scratch. If False, the existing results are
kept in order to proceed with the existing experiment.
suffix : str
Suffix that is appended to the name of the experiment, useful to
save results in a specific folder.
keep_eigenvectors : 'all' or list
Use this parameter to remove eigenvectors from
`GabMulTff` object after computing performance in order to save
space. In this case, the `GabMulTff` will not be usable anymore
after the computation of the performance results.
If 'all', all eigenvectors are kept. To keep only some eigenvectors,
set this parameter to the list of task IDs for which eigenvectors
should be kept (usefull if you want to use or plot some task data
after the experiments, e.g., using method `plot_task`). If the list
is empty, all eigenvectors will be removed.
"""
def __init__(self, force_reset=False, suffix='',
keep_eigenvectors='all'):
Experiment.__init__(self,
name='SolveTffExperiment' + suffix,
get_data=get_data,
get_problem=Problem,
get_solver=Solver,
measure=perf_measures,
force_reset=force_reset,
log_to_file=False,
log_to_console=False)
self.fig_dir = self.xp_path / 'figures'
# a little trick to save collections when computing performance
self.measure = lambda **x: perf_measures(**x, exp=self)
self.keep_eigenvectors = keep_eigenvectors
@property
def n_tasks(self):
"""
Number of tasks
Returns
-------
int
"""
return len(list((self.xp_path / 'tasks').glob('0*')))
[docs] @staticmethod
def get_experiment(setting='full', force_reset=False,
keep_eigenvectors=None):
"""
Get the experiment instance with default values in order to handle it.
Parameters
----------
setting : {'full', 'light'}
If 'full', the default values are set to run the full
experiment. If 'light', the default values are set to have a
very light experiment with few tasks, running fast, for test
purposes.
force_reset : bool
If true, reset the experiment by erasing all previous results
in order to run it from scratch. If False, the existing results are
kept in order to proceed with the existing experiment.
keep_eigenvectors = 'all' or list
See constructor of `SolveTffExperiment`. If None, default
values are used.
Returns
-------
SolveTffExperiment
"""
assert setting in ('full', 'light')
dataset = get_dataset()
# Set task parameters
data_params = dict(loc_source=list(dataset['localized'].keys()),
wideband_src=list(dataset['wideband'].keys()))
problem_params = dict(win_choice=['gauss 256', 'hann 512'],
wb_to_loc_ratio_db=8,
n_iter_closing=3, n_iter_opening=3,
closing_first=True,
delta_mix_db=0,
delta_loc_db=40,
or_mask=True,
crop=None,
fig_dir=None)
solver_params = dict(tol_subregions=[None, 1e-5],
tolerance_arrf=1e-3,
proba_arrf=1 - 1e-4,
rand_state=np.arange(10))
keep_eigenvectors = []
if setting == 'light':
data_params['loc_source'] = 'bird'
data_params['wideband_src'] = 'car'
problem_params['win_choice'] = ['gauss 64', 'hann 128']
problem_params['crop'] = 4096
problem_params['delta_loc_db'] = 20
problem_params['wb_to_loc_ratio_db'] = 16
solver_params['tolerance_arrf'] = 1e-2
solver_params['proba_arrf'] = 1 - 1e-2
keep_eigenvectors = [0, 1]
# Create Experiment
suffix = '' if setting == 'full' else '_Light'
exp = SolveTffExperiment(force_reset=force_reset,
suffix=suffix,
keep_eigenvectors=keep_eigenvectors)
exp.add_tasks(data_params=data_params,
problem_params=problem_params,
solver_params=solver_params)
exp.generate_tasks()
return exp
[docs] def export_task_params(self, csv_path=None):
"""
Export task parameters to a csv file and to a
:class:`pandas.DataFrame` object.
Parameters
----------
csv_path : str or Path
Name of the csv file to be written. If None, file is
located in the experiment folder with name 'task_params.csv'.
Returns
-------
pandas.DataFrame
"""
if csv_path is None:
csv_path = self.xp_path / 'task_params.csv'
else:
csv_path = Path(csv_path)
task_list = []
for i_task in range(self.n_tasks):
task = self.get_task_data_by_id(idt=i_task)
task_list.append({k + '_' + kk: task['task_params'][k][kk]
for k in task['task_params']
for kk in task['task_params'][k]})
df = pd.DataFrame(task_list)
df.to_csv(csv_path)
print('Task params exported to', csv_path)
return df
[docs] def generate_tasks(self):
"""
Generate tasks and export params to a csv file
See :py:meth:`yafe.Experiment.generate_tasks`
"""
Experiment.generate_tasks(self)
self.export_task_params()
[docs] def get_misc_file(self, task_params=None, idt=None):
"""
Get file with some additional task results.
This has been set up in order to pass additional data in a way that
could not be handled by the :py:mod:`yafe` framework.
Parameters
----------
task_params : dict
Task parameters.
idt : int
Task identifier. Either `task_params` or `idt` should be given
in order to specify the task.
Returns
-------
Path
File containing additional task results.
"""
if task_params is not None:
task = self.get_task_data_by_params(
data_params=task_params['data_params'],
problem_params=task_params['problem_params'],
solver_params=task_params['solver_params'])
idt = task['id_task']
elif idt is None:
raise ValueError('Either `task_params` or `idt` should be given.')
path_task = self.xp_path / 'tasks' / '{:06}'.format(idt)
return path_task / 'misc.npz'
[docs] def plot_results(self):
"""
Plot and save results of the experiment
"""
self.fig_dir.mkdir(parents=True, exist_ok=True)
print('Figures saved in {}'.format(self.fig_dir))
results = self.load_results(array_type='xarray')
results_std = results.std('solver_rand_state').squeeze()
results = results.mean('solver_rand_state').squeeze()
coords_dict = results.to_dict()['coords']
csv_path = self.fig_dir / 'exp_solve_pd.csv'
results.to_series().to_csv(csv_path, header=True)
print('number of nan values:',
np.sum(np.isnan(results.values)))
# Scatter plot for running times : tff-1 vs. tff-P
plt.figure()
x = []
y = []
for win_type in coords_dict['problem_win_choice']['data']:
t_tff1 = results.sel(measure=['t_lambda_tff', 't_arrf', 't_evdn'],
problem_win_choice=win_type,
solver_tol_subregions=None)
t_tff1 = t_tff1.sum(dim='measure')
not_none = coords_dict['solver_tol_subregions']['data'].copy()
not_none.remove(None)
not_none = not_none[0]
t_tffp = results.sel(measure=['t_lambda_tff', 't_arrf',
't_evdn', 't_subreg'],
problem_win_choice=win_type,
solver_tol_subregions=not_none)
t_tffp = t_tffp.sum(dim='measure')
if win_type[:5] == 'gauss':
win_type_label = 'Gauss'
else:
win_type_label = 'Hann'
slope, intercept, r_value, p_value, std_err = \
linregress(t_tff1.values.reshape(-1), t_tffp.values.reshape(-1))
print('Running times ({}): slope={}, intercept={}, 1/slope={},'
.format(win_type, slope, intercept, 1 / slope))
plt.plot(t_tff1.values.reshape(-1),
t_tffp.values.reshape(-1),
'+', label=win_type_label)
# plt.plot(t_tff1.values.reshape(-1),
# slope * t_tff1.values.reshape(-1) + intercept)
x.append(t_tff1.values.reshape(-1).copy())
y.append(t_tffp.values.reshape(-1).copy())
x = np.array(x)
y = np.array(y)
# for i in range(x.shape[1]):
# plt.plot(x[:, i], y[:, i], ':k')
x = x.reshape(-1)
y = y.reshape(-1)
I = np.logical_and(x!=0, y!=0)
x, y = x[I], y[I]
slope, intercept, r_value, p_value, std_err = linregress(x, y)
print('Running times (all): slope={}, intercept={}, 1/slope={}, '
'r_value={}, p_value={}, std_err={}'
.format(slope, intercept, 1 / slope, r_value, p_value, std_err))
print('Linear slope (not affine):')
print(np.vdot(x, y) / np.vdot(x, x), np.vdot(x, x) / np.vdot(x, y))
log_x, log_y = np.log10(x), np.log10(y)
slope, intercept, r_value, p_value, std_err = linregress(log_x, log_y)
print('Running times (all-log): slope={}, intercept={}, 1/slope={},'
'r_value={}, p_value={}, std_err={}'
.format(slope, intercept, 1 / slope, r_value, p_value, std_err))
print('Linear slope (not affine) - log:')
print(np.vdot(log_x, log_y) / np.vdot(log_x, log_x),
np.vdot(log_x, log_x) / np.vdot(log_x, log_y))
# plt.plot(x, slope * x + intercept)
plt.xlabel(r'Running time for TFF-1 (s)')
plt.ylabel(r'Running time for TFF-P (s)')
plt.legend()
plt.grid()
plt.savefig(self.fig_dir / 'running_times_exp.pdf')
plt.savefig(self.fig_dir / 'running_times_exp.png')
plt.xscale('log')
plt.yscale('log')
plt.savefig(self.fig_dir / 'running_times_exp_loglog.pdf')
plt.savefig(self.fig_dir / 'running_times_exp_loglog.png')
# Scatter plot for running times : tff-1 and tff-P vs. mask size
plt.figure()
symbol = '+'
for win_type in coords_dict['problem_win_choice']['data']:
mask_size_tff1 = results.sel(measure=['mask_size'],
problem_win_choice=win_type,
solver_tol_subregions=None).squeeze()
t_tff1 = results.sel(measure=['t_lambda_tff', 't_arrf', 't_evdn'],
problem_win_choice=win_type,
solver_tol_subregions=None)
t_tff1 = t_tff1.sum(dim='measure')
plt.plot(mask_size_tff1.values.reshape(-1),
t_tff1.values.reshape(-1),
symbol, label='{} - {}'.format('TFF-1', win_type))
not_none = coords_dict['solver_tol_subregions']['data'].copy()
not_none.remove(None)
not_none = not_none[0]
for win_type in coords_dict['problem_win_choice']['data']:
mask_size_tffp = results.sel(
measure=['mask_size'],
problem_win_choice=win_type,
solver_tol_subregions=not_none).squeeze()
t_tffp = results.sel(measure=['t_lambda_tff', 't_arrf',
't_evdn', 't_subreg'],
problem_win_choice=win_type,
solver_tol_subregions=not_none)
t_tffp = t_tffp.sum(dim='measure')
plt.plot(mask_size_tffp.values.reshape(-1),
t_tffp.values.reshape(-1),
symbol, label='{} - {}'.format('TFF-P', win_type))
plt.ylabel('Running time (s)')
plt.xlabel('Mask size')
plt.legend()
plt.grid()
plt.savefig(self.fig_dir / 'running_times_masksize_exp.pdf')
plt.savefig(self.fig_dir / 'running_times_masksize_exp.png')
plt.xscale('log')
plt.yscale('log')
plt.savefig(self.fig_dir / 'running_times_masksize_exp_loglog.pdf')
plt.savefig(self.fig_dir / 'running_times_masksize_exp_loglog.png')
# Scatter plot : SDR vs IS
plt.figure()
symbol = '+'
for k_measure in results.coords['measure'].values:
if k_measure[:3] == 'sdr':
sdr_res = results.sel(measure=[k_measure]).squeeze()
is_res = results.sel(measure=['is' + k_measure[3:]]).squeeze()
plt.plot(sdr_res.values.reshape(-1),
is_res.values.reshape(-1),
symbol,
label=k_measure[4:])
plt.legend()
plt.grid()
plt.yscale('log')
plt.savefig(self.fig_dir / 'sdr_vs_is.pdf')
plt.savefig(self.fig_dir / 'sdr_vs_is.png')
# Scatter plot : SDR vs IS with polygons
sdr_tff1 = results.sel(
measure='sdr_tff',
solver_tol_subregions=None).squeeze().values.reshape(-1)
sdr_tffp = results.sel(
measure='sdr_tff',
solver_tol_subregions=not_none).squeeze().values.reshape(-1)
sdr_tffo = results.sel(
measure='sdr_tffo',
solver_tol_subregions=not_none).squeeze().values.reshape(-1)
sdr_interp = results.sel(
measure='sdr_interp',
solver_tol_subregions=not_none).squeeze().values.reshape(-1)
sdr_zero = results.sel(
measure='sdr_zero',
solver_tol_subregions=not_none).squeeze().values.reshape(-1)
is_tff1 = results.sel(
measure='is_tff',
solver_tol_subregions=None).squeeze().values.reshape(-1)
is_tffp = results.sel(
measure='is_tff',
solver_tol_subregions=not_none).squeeze().values.reshape(-1)
is_tffo = results.sel(
measure='is_tffo',
solver_tol_subregions=not_none).squeeze().values.reshape(-1)
is_interp = results.sel(
measure='is_interp',
solver_tol_subregions=not_none).squeeze().values.reshape(-1)
is_zero = results.sel(
measure='is_zero',
solver_tol_subregions=not_none).squeeze().values.reshape(-1)
plt.figure()
symbol = '+'
for i in range(sdr_tff1.size):
plt.plot([sdr_tff1[i], sdr_tffp[i], sdr_tffo[i],
sdr_zero[i], sdr_interp[i], sdr_tff1[i]],
[is_tff1[i], is_tffp[i], is_tffo[i],
is_zero[i], is_interp[i], is_tff1[i]],
'k', alpha=0.2)
plt.plot(sdr_tff1, is_tff1, symbol, label='TFF-1')
plt.plot(sdr_tffp, is_tffp, symbol, label='TFF-P')
plt.plot(sdr_tffo, is_tffo, symbol, label='TFF-O')
plt.plot(sdr_interp, is_interp, symbol, label='Interp')
plt.plot(sdr_zero, is_zero, symbol, label='Zero fill')
plt.legend()
plt.grid()
plt.xlabel('SDR')
plt.ylabel('IS divergence')
plt.yscale('log')
plt.savefig(self.fig_dir / 'sdr_vs_is_polygons.pdf')
plt.savefig(self.fig_dir / 'sdr_vs_is_polygons.png')
@staticmethod
def _get_label(k, tol_subregions):
if k.endswith('tffo'):
label = 'TFF-O'
elif k.endswith('tff'):
label = 'TFF-1' if tol_subregions is None else 'TFF-P'
elif k.endswith('tffe'):
label = 'TFF-E'
elif k.endswith('interp'):
label = 'Interp'
elif k.endswith('zero'):
label = 'Zero fill'
elif k.endswith('mix'):
label = 'Mix'
else:
raise ValueError('Unknown key: ' + k)
return label
[docs] def plot_task(self, idt, fontsize=16):
"""
Plot and save figures for a specific task
Parameters
----------
idt : int
Task identifier
fontsize : int
Fontsize to be used in Figures.
"""
matplotlib.rcParams.update({'font.size': fontsize})
fig_dir = self.xp_path / 'figures' / 'tasks' / '{:06}'.format(idt)
fig_dir.mkdir(parents=True, exist_ok=True)
print('Save figures in:', fig_dir)
task = self.get_task_data_by_id(idt=idt)
misc_data = np.load(self.get_misc_file(idt=idt))
mask = task['problem_data']['mask']
dgt_params = task['problem_data']['dgt_params']
signal_params = task['problem_data']['signal_params']
x_mix = task['problem_data']['x_mix']
x_wb = task['solution_data']['x_wb']
tol_subregions = task['task_params']['solver_params']['tol_subregions']
gmtff = task['solved_data']['gmtff']
sdr_res = dict()
is_res = dict()
for k in task['result']:
k_suf = k.split('_')[-1]
if k.startswith('sdr'):
sdr_res[k_suf] = task['result'][k]
elif k.startswith('is'):
is_res[k_suf] = task['result'][k]
lambda_res = dict()
for k in misc_data:
k_suf = k.split('_')[-1]
lambda_res[k_suf] = misc_data[k]
plt.figure()
plot_mask(mask=mask, hop=dgt_params['hop'],
n_bins=dgt_params['n_bins'], fs=signal_params['fs'])
plt.title('Area: {} ({:.1%})'.format(mask.sum(), np.average(mask)))
plt.tight_layout()
plt.savefig(fig_dir / 'mask.pdf')
plt.figure()
for i_area in range(gmtff.n_areas):
s_vec = gmtff.s_vec_list[i_area]
plt.plot(s_vec, label='Sub-region {}'.format(i_area + 1))
plt.xlabel('k')
plt.ylabel('$\\sigma_k$')
plt.yscale('log')
plt.grid()
plt.legend()
plt.tight_layout()
plt.savefig(fig_dir / 'gabmul_eigenvalues.pdf')
# Results
def sdr_wb(lambda_coef):
return sdr(x_ref=x_wb, x_est=gmtff.compute_estimate(lambda_coef))
def is_wb(lambda_coef):
return is_div_spectrum(x_ref=x_wb,
x_est=gmtff.compute_estimate(lambda_coef))
def sdr_wb_1area(lambda_coef, i_area):
lambda_vec = np.ones(gmtff.n_areas)
lambda_vec[i_area] = lambda_coef
return sdr(x_ref=x_wb, x_est=gmtff.compute_estimate(lambda_vec))
l_range = 10 ** np.linspace(-10, 10, 100, endpoint=True)
if tol_subregions is None:
plt.figure()
plt.plot(l_range, [sdr_wb(i) for i in l_range], '-',
label='SDR')
for k in lambda_res:
plt.plot(lambda_res[k], sdr_wb(lambda_res[k]), 'o',
label=self._get_label(k=k,
tol_subregions=tol_subregions))
else:
plt.figure()
for i_area in range(gmtff.n_areas):
plt.plot(l_range,
[sdr_wb_1area(i, i_area) for i in l_range],
'-', label='SDR sub-reg {}'.format(i_area + 1))
for k in lambda_res:
label_prefix = self._get_label(k=k,
tol_subregions=tol_subregions)
if not isinstance(lambda_res[k], np.ndarray):
plt.plot(lambda_res[k], sdr_wb(lambda_res[k]),
'o', label=label_prefix)
continue
for i_area in range(gmtff.n_areas):
label = '{} {}'.format(label_prefix, i_area + 1)
plt.plot(lambda_res[k][i_area],
sdr_wb_1area(lambda_res[k][i_area], i_area),
'o', label=label)
plt.xlabel('$\\lambda$')
plt.ylabel('SDR (dB)')
plt.xscale('log')
plt.grid()
plt.legend()
plt.tight_layout()
plt.savefig(fig_dir / 'tuning_lambda.pdf')
if tol_subregions is None:
plt.figure()
plt.plot(l_range, [is_wb(i) for i in l_range], '-',
label='IS')
for k in lambda_res:
plt.plot(lambda_res[k], is_wb(lambda_res[k]), 'o',
label=self._get_label(k=k,
tol_subregions=tol_subregions))
plt.xlabel('$\\lambda$')
plt.ylabel('IS (dB)')
plt.xscale('log')
plt.grid()
plt.legend()
plt.tight_layout()
plt.savefig(fig_dir / 'tuning_lambda_IS.pdf')
fig, ax1 = plt.subplots()
color = 'tab:blue'
ax1.set_xlabel('$\\lambda$')
ax1.set_ylabel('SDR (dB)', color=color)
ax1.plot(l_range, [sdr_wb(i) for i in l_range], '-',
color=color)
for k in lambda_res:
ax1.plot(lambda_res[k], sdr_wb(lambda_res[k]), 'o',
label=self._get_label(k=k,
tol_subregions=tol_subregions))
ax1.tick_params(axis='y', labelcolor=color)
plt.xscale('log')
ax1.grid()
# instantiate a second axes that shares the same x-axis
ax2 = ax1.twinx()
color = 'tab:red'
ax2.set_xlabel('$\\lambda$')
ax2.set_ylabel('IS divergence', color=color)
ax2.plot(l_range, [is_wb(i) for i in l_range], '-',
color=color)
for k in lambda_res:
ax2.plot(lambda_res[k], is_wb(lambda_res[k]), 'o',
label=self._get_label(k=k,
tol_subregions=tol_subregions))
ax2.tick_params(axis='y', labelcolor=color)
fig.tight_layout()
ax2.legend()
plt.tight_layout()
plt.savefig(fig_dir / 'tuning_lambda_SDR_IS.pdf')
x_dict = dict()
for k in lambda_res:
x_dict[k] = gmtff.compute_estimate(lambda_res[k])
x_dict[k].to_wavfile(fig_dir / 'x_{}.wav'.format(k))
x_dict['interp'] = Waveform(solve_by_interpolation(
x_mix=x_mix, mask=mask, dgt_params=dgt_params,
signal_params=signal_params), fs=signal_params['fs'])
x_dict['interp'].to_wavfile(fig_dir / 'x_interp.wav')
for k in sdr_res:
print(self._get_label(k=k, tol_subregions=tol_subregions))
print(' - SDR: {:.1f}dB'.format(sdr_res[k]))
print(' - IS: {:.1f}'.format(is_res[k]))
if k in lambda_res:
print(' - lambda: ', lambda_res[k])
x_mix_tf = dgt(sig=x_mix, dgt_params=dgt_params)
x_max = db(x_mix_tf).max()
clim = x_max - 100, x_max
for k in x_dict:
plt.figure()
plot_spectrogram(x=x_dict[k], dgt_params=dgt_params,
fs=signal_params['fs'], clim=clim)
plt.title('{} - SDR={:.1f}dB - IS={:.1f}'
.format(self._get_label(k=k,
tol_subregions=tol_subregions),
sdr_res[k], is_res[k]))
plt.tight_layout()
plt.savefig(fig_dir / '{}.pdf'.format(k))
plt.figure()
plot_spectrogram(x=x_wb, dgt_params=dgt_params,
fs=signal_params['fs'],
clim=clim)
plt.title('True signal')
plt.tight_layout()
plt.savefig(fig_dir / 'spectrogram_true_wb_source.pdf')
[docs] def get_idt_from_params(self, data_params, problem_params, solver_params):
d = self.get_task_data_by_params(data_params=data_params,
problem_params=problem_params,
solver_params=solver_params)
return d['id_task']
[docs]def get_data(loc_source, wideband_src):
"""
Prepare the input data information for the :py:class:`SolveTffExperiment`
experiment.
This function is only embedding its input in a dictionary
Parameters
----------
loc_source : Path
File for the source localized in time-frequency (perturbation)
wideband_src : Path
File for the source of interest.
Returns
-------
dict
Dictionary to be given when calling the problemm (
see :py:meth:`Problem.__call__`), with keys `'loc_source'` and
`wideband_src`.
"""
return dict(loc_source=loc_source, wideband_src=wideband_src)
[docs]class Problem:
"""
Problem generation for the :py:class:`SolveTffExperiment` experiment.
Parameters
----------
crop : int or None
If not None, a cropped, centered portion of the sound will be
extracted with the specified length, in samples.
win_choice : str
String of the form 'name len' where 'name' is a window name and
'len' is a window length, e.g. 'hann 512', 'gauss 256.
delta_mix_db : float
Coefficient energy ratio, in dB, between the wideband source and the
localized source in the mixture in order to select coefficients in
the mask.
delta_loc_db : float
Dynamic range, in dB, for the localized source in order to select
coefficients in the mask.
wb_to_loc_ratio_db : float
Wideband source to localized source energy ratio to be adjusted in
the mix.
or_mask : bool
If True, the mask is build by taking the union of the two masks
obtained using thresholds `delta_mix_db` and `delta_loc_db`. If
False, the intersection is taken.
n_iter_closing : int
Number of successive morphological closings with radius 1 (a.k.a.
radius of one single closing)
n_iter_opening : int
Number of successive morphological openings with radius 1 (a.k.a.
radius of one single opening)
closing_first : bool
If True, morphological closings are applied first, followed by
openings. If False, the reverse way is used.
fig_dir : None or str or Path
If not None, folder where figures are stored. If None, figures are
not plotted.
"""
def __init__(self, crop, win_choice,
delta_mix_db, delta_loc_db, wb_to_loc_ratio_db, or_mask,
n_iter_closing, n_iter_opening, closing_first, fig_dir):
win_type, win_len_str = win_choice.split(sep=' ')
win_dur = int(win_len_str) / 8000
self.win_dur = win_dur
self.win_type = win_type
if win_type == 'gauss':
self.hop_ratio = 1 / 4
self.n_bins_ratio = 4
else:
self.hop_ratio = 1 / 8
self.n_bins_ratio = 2
self.n_iter_closing = n_iter_closing
self.n_iter_opening = n_iter_opening
self.closing_first = closing_first
self.delta_mix_db = delta_mix_db
self.delta_loc_db = delta_loc_db
self.wb_to_loc_ratio_db = wb_to_loc_ratio_db
self.or_mask = or_mask
self.crop = crop
self.fig_dir = fig_dir
[docs] def __call__(self, loc_source, wideband_src):
"""
Generate the problem from input data.
Parameters
----------
loc_source : Path
File for the source localized in time-frequency (perturbation)
wideband_src : Path
File for the source of interest.
Returns
-------
problem_data : dict
Dictionary to be given to a solver, with keys `'x_mix'` (mix
signal), `mask` (time-frequency mask), `dgt_params` (DGT
parameters) and `signal_params` (signal parameters).
solution_data : dict
Dictionary containing problem solutions, with keys `'x_loc'` (
localized signal ) and `x_wb` (wideband signal).
"""
x_mix, dgt_params, signal_params, mask, x_loc, x_wb = \
get_mix(loc_source=loc_source,
wideband_src=wideband_src,
crop=self.crop,
win_dur=self.win_dur,
win_type=self.win_type,
hop_ratio=self.hop_ratio,
n_bins_ratio=self.n_bins_ratio,
n_iter_closing=self.n_iter_closing,
n_iter_opening=self.n_iter_opening,
closing_first=self.closing_first,
delta_mix_db=self.delta_mix_db,
delta_loc_db=self.delta_loc_db,
wb_to_loc_ratio_db=self.wb_to_loc_ratio_db,
or_mask=self.or_mask,
fig_dir=self.fig_dir)
problem_data = dict(x_mix=x_mix, mask=mask,
dgt_params=dgt_params, signal_params=signal_params)
solution_data = dict(x_loc=x_loc, x_wb=x_wb)
return problem_data, solution_data
[docs]class Solver:
"""
Solver for the :py:class:`SolveTffExperiment` experiment.
This solver is computing
* the `TFF-1` of `TFF-P` solution (depending on parameter `tol_subregions`)
using a :py:class:`~tffpy.tf_fading.GabMulTff` instance
* the `Interp` solution using function
:py:func:`~tffpy.interpolation_solver.solve_by_interpolation`
Parameters
----------
tol_subregions : None or float
Tolerance to split the mask into sub-regions in
:py:class:`~tffpy.tf_fading.GabMulTff`.
tolerance_arrf : float
Tolerance for the randomized EVD in
:py:class:`~tffpy.tf_fading.GabMulTff`, see method
:py:meth:`~tffpy.tf_fading.GabMulTff.compute_decomposition`.
proba_arrf : float
Probability of error for the randomized EVD in
:py:class:`~tffpy.tf_fading.GabMulTff`, see method
:py:meth:`~tffpy.tf_fading.GabMulTff.compute_decomposition`.
"""
def __init__(self, tol_subregions, tolerance_arrf, proba_arrf,
rand_state=0):
self.tol_subregions = tol_subregions
self.tolerance_arrf = tolerance_arrf
self.proba_arrf = proba_arrf
self.rand_state = rand_state
[docs] def __call__(self, x_mix, mask, dgt_params, signal_params):
"""
Apply the solver to estimate solutions from the problem data.
The output dictionary is composed of data with keys:
* `'x_tff'`: solution estimated by :py:class:`~tffpy.tf_fading.GabMulTff`
* `'x_zero'`: solution when applying the Gabor
multiplier (i.e., :math:`\lambda=1`)
* `'x_interp'`: solution from function
:py:func:`~tffpy.interpolation_solver.solve_by_interpolation`
* `'gmtff'`: `GabMulTff` instance
* `'t_lambda_tff'`: running times to estimate hyperparameter in method
:py:meth:`~tffpy.tf_fading.GabMulTff.compute_lambda`
* `'t_arrf'`: running times to compute range approximation in method
:py:meth:`~tffpy.tf_fading.GabMulTff.compute_decomposition`
* `'t_evdn'`: running times to compute EVD in method
:py:meth:`~tffpy.tf_fading.GabMulTff.compute_decomposition`
* `'t_uh_x'`: running times to compute additional matrix products in
method :py:meth:`~tffpy.tf_fading.GabMulTff.compute_decomposition`
* `'t_subreg'`: running times to split mask into sub-regions in class
:py:class:`~tffpy.tf_fading.GabMulTff`
* `'lambda_tff'`: estimated values for hyper-parameters
:math:`\lambda_i` estimated by
:py:meth:`~tffpy.tf_fading.GabMulTff`.compute_lambda`
Parameters
----------
x_mix : nd-array
Mix signal
mask : nd-array
Time-frequency mask
dgt_params : dict
DGT parameters
signal_params : dict
Signal parameters
Returns
-------
dict
The estimated solution and additional information
"""
gmtff = GabMulTff(x_mix=x_mix, mask=mask, dgt_params=dgt_params,
signal_params=signal_params,
tol_subregions=self.tol_subregions)
gmtff.compute_decomposition(tolerance_arrf=self.tolerance_arrf,
proba_arrf=self.proba_arrf,
rand_state=self.rand_state)
# Estimate energy and lambda
lambda_tff, t_lambda_tff = gmtff.compute_lambda(x_mix=x_mix)
print('Running time to tune lambda (est): {} s'.format(t_lambda_tff))
x_tff = gmtff.compute_estimate(lambda_tff)
x_zero = gmtff.compute_estimate(1)
x_interp = solve_by_interpolation(
x_mix=x_mix, mask=mask, dgt_params=dgt_params,
signal_params=signal_params)
return dict(x_tff=x_tff, x_zero=x_zero, x_interp=x_interp, gmtff=gmtff,
t_lambda_tff=t_lambda_tff, t_arrf=gmtff.t_arrf,
t_evdn=gmtff.t_evdn, t_uh_x=gmtff.t_uh_x,
t_subreg=gmtff.t_subreg, lambda_tff=lambda_tff)
[docs]def perf_measures(task_params, source_data, problem_data,
solution_data, solved_data, exp=None):
"""
Performance measure, including computation of oracle solutions
Parameters
----------
task_params : dict
Task parameters
source_data : dict
Input data
problem_data : dict
Problem data
solution_data : dict
Solver output
solved_data : dict
True solution data
exp : SolveTffExperiment
The experiment
Returns
-------
dict
All data useful for result analysis including SDR and Itakura-Saito
performance, running times, hyperparameter values, mask size,
number of sub-regions, estimated rank (summed over sub-regions),
lowest singular value.
"""
x_tff = solved_data['x_tff']
x_zero = solved_data['x_zero']
gmtff = solved_data['gmtff']
lambda_tff = solved_data['lambda_tff']
x_interp = solved_data['x_interp']
x_mix = problem_data['x_mix']
x_wb = solution_data['x_wb']
# Trick for storing additional results
misc_file = exp.get_misc_file(task_params=task_params)
# Orcale SDR
lambda_tffo, t_lambda_tffo = compute_lambda_oracle_sdr(gmtff=gmtff, x_wb=x_wb)
x_tffo = gmtff.compute_estimate(lambda_tffo)
# Oracle true energy
e_target_tffe = np.empty(gmtff.n_areas)
x_wb_tf_mat = dgt(x_wb, dgt_params=gmtff.dgt_params)
for i_area in range(gmtff.n_areas):
mask_i = gmtff.mask == i_area + 1
x_wb_tf_masked = mask_i * x_wb_tf_mat
e_target_tffe[i_area] = \
np.linalg.norm(x_wb_tf_masked, 'fro') ** 2
lambda_tffe, t_lambda_tffe = gmtff.compute_lambda(
x_mix=x_mix, e_target=e_target_tffe)
x_tffe = gmtff.compute_estimate(lambda_tffe)
solutions = dict(tffo=x_tffo,
tff=x_tff,
tffe=x_tffe,
zero=x_zero,
mix=x_mix,
interp=x_interp)
sdr_res = {'sdr_' + k: sdr(x_ref=x_wb, x_est=x)
for k, x in solutions.items()}
is_res = {'is_' + k: is_div_spectrum(x_ref=x_wb, x_est=x)
for k, x in solutions.items()}
np.savez(misc_file,
lambda_tffe=lambda_tffe,
lambda_tffo=lambda_tffo,
lambda_tff=lambda_tff)
running_times = dict(t_lambda_tffe=np.sum(t_lambda_tffe),
t_lambda_tffo=np.sum(t_lambda_tffo),
t_lambda_tff=np.sum(solved_data['t_lambda_tff']),
t_arrf=np.sum(solved_data['t_arrf']),
t_evdn=np.sum(solved_data['t_evdn']),
t_uh_x=np.sum(solved_data['t_uh_x']),
t_subreg=solved_data['t_subreg']
)
features = dict(mask_size=np.sum(gmtff.mask > 0),
mask_ratio=np.mean(gmtff.mask > 0),
n_subregions=gmtff.n_areas,
rank_sum=np.sum([s.size for s in gmtff.s_vec_list]),
lowest_sv=np.min([np.min(s) for s in gmtff.s_vec_list])
)
idt = exp.get_idt_from_params(**task_params)
if exp.keep_eigenvectors != 'all' and idt not in exp.keep_eigenvectors:
sd = exp._read_item(type_item='solved_data', idt=idt)
sd['gmtff'].u_mat_list = [None for _ in sd['gmtff'].u_mat_list]
exp._write_item(type_item='solved_data', idt=idt, content=sd)
return dict(**running_times, **sdr_res, **is_res, **features)
[docs]def create_and_run_light_experiment():
"""
Create a light experiment and run it
"""
exp = SolveTffExperiment.get_experiment(setting='light', force_reset=True)
print('*' * 80)
print('Created experiment')
print(exp)
print(exp.display_status())
print('*' * 80)
print('Run task 0')
task_data = exp.get_task_data_by_id(idt=0)
print(task_data.keys())
print(task_data['task_params']['data_params'])
problem = exp.get_problem(
**task_data['task_params']['problem_params'])
print(problem)
print('*' * 80)
print('Run all')
exp.launch_experiment()
print('*' * 80)
print('Collect and plot results')
exp.collect_results()