Source code for madarrays.waveform
# -*- coding: utf-8 -*-
# ######### COPYRIGHT #########
#
# Copyright(c) 2018
# -----------------
#
# * Laboratoire d'Informatique et Systèmes <http://www.lis-lab.fr/>
# * Université d'Aix-Marseille <http://www.univ-amu.fr/>
# * Centre National de la Recherche Scientifique <http://www.cnrs.fr/>
# * Université de Toulon <http://www.univ-tln.fr/>
#
# Contributors
# ------------
#
# * Ronan Hamon <firstname.lastname_AT_lis-lab.fr>
# * Valentin Emiya <firstname.lastname_AT_lis-lab.fr>
# * Florent Jaillet <firstname.lastname_AT_lis-lab.fr>
#
# Description
# -----------
#
# Python package for audio data structures with missing entries
#
# Licence
# -------
# This file is part of madarrays.
#
# madarrays is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# ######### COPYRIGHT #########
"""Definition of a masked waveform.
.. moduleauthor:: Ronan Hamon
.. moduleauthor:: Valentin Emiya
.. moduleauthor:: Florent Jaillet
"""
import warnings
from fractions import Fraction
import numpy as np
import resampy
import simpleaudio as sa
from scipy.io import wavfile
from scipy.signal import hilbert
import matplotlib.pyplot as plt
from .mad_array import MadArray
VALID_IO_FS = {1, 8000, 16000, 32000, 48000, 11025, 22050, 44100, 88200}
def _check_compatibility_fs(w1, w2):
"""Raise an exception if the sampling frequency of the two Waveforms
are different."""
if w1.fs != w2.fs:
errmsg = 'Waveforms do not have the same fs: {} and {}.'
raise ValueError(errmsg.format(w1.fs, w2.fs))
[docs]class Waveform(MadArray):
"""Subclass of MadArray to handle mono and stereo audio signals.
:class:`Waveform` inherits from :class:`~madarrays.mad_array.MadArray` and
adds an attribute :attr:`fs` to store the sampling frequency, as well as
methods to facilitate the manipulation of audio files.
.. _type_entry_waveform:
**Type of entries**: audio data entries can have different types, that are
associated with specific constraints on the values:
* *float*: the values are float between -1 and 1;
* *int*: the values are integers between a range that depends on the
precision.
* *complex*: the real and imaginary parts are float between -1 and 1.
High-precision types (float128, int64, complex256) may lead to problems and
should be used with caution.
The casting of a waveform in a different dtype depends on the current dtype
and the desired dtype (complex casting is similar to float casting):
* *Integer-to-real* casting is performed by applying on each entry
:math:`x` the function :math:`f(x)=\\frac{x - z}{2^{n-1}}`, where the
source integral type is coded with :math:`n` bits, and :math:`z` is the
integer associated with zero, i.e., :math:`z=0` for a signed type (`int`)
and :math:`z=2^{n-1}` for an unsigned type (`uint`).
* *Real-to-integer* casting is performed by applying on each entry
:math:`x` the function :math:`f(x)=\\lfloor\\left(x + 1\\right) 2^{n-1} +
m\\rfloor`, where the target integral type is coded with :math:`n` bits,
and :math:`m` is the minimum integer value, i.e., :math:`m=-2^{n-1}` for
a signed type (`int`) and :math:`z=0` for an unsigned type (`uint`);
* *Real-to-real* casting is obtained by a basic rounding operation;
* *Integer-to-integer* casting is obtained by chaining an
integer-to-float64 casting and a float64-to-integer casting.
Clipping is performed for unexpected values:
* When casting to `float`, values outside :math:`[-1, 1]` are clipped;
* When casting to `int`, values outside the minimum and maximum values
allowed by the integral type are clipped:
* :math:`\\left[-2^{n-1}, 2^{n-1}-1\\right]` for :math:`n`-bits signed
integers;
* :math:`\\left[0, 2^{n}-1\\right]` for :math:`n`-bits unsigned
integers.
These constraints are only applied when calling explicitely the method
:meth:`astype`.
.. _masking:
**Masking**: Waveform allows for complex entries, but only real-like
masking is permitted, i.e. it is not possible to mask only the phase or the
amplitude. In particular, this implies that the attribute
:attr:`_is_complex` inherited from :class:`~madarrays.mad_array.MadArray`
is always equal to False.
Parameters
----------
data : nd-array [N] or [N, 2]
Audio samples, as a N-length vector for a mono signal or a
[N, 2]-shape array for a stereo signal
fs : int or float, optional
Sampling frequency of the original signal, in Hz. If float, truncated.
If None and ``data`` is a Waveform, use ``data.fs``, otherwise it
is set to 1.
mask : nd-array, optional
Boolean mask with True values for missing samples. Its shape must be
the same as ``data``.
indexing :
See :class:`~madarrays.mad_array.MadArray`.
"""
def __new__(cls, data, fs=None, mask=None, masked_indexing=None):
if fs is None:
if isinstance(data, Waveform):
fs = data.fs
else:
fs = 1
# create the MadArray
data = data.squeeze()
if isinstance(data, MadArray) and data._complex_masking \
and mask is None :
warnmsg = ('Construction of a Waveform from a MadArray with '
'complex masking: the original masking data are '
'discarded and a boolean mask set to False is used.')
warnings.warn(warnmsg)
# Create mask to force boolean masking
mask = np.zeros_like(data, dtype=np.bool)
obj = MadArray.__new__(cls=cls, data=data, mask=mask,
masked_indexing=masked_indexing)
if obj.ndim > 2 or obj.ndim == 2 and obj.shape[1] > 2:
raise ValueError('`data` should be either mono or stereo.')
# add the new attribute to the created instance
obj.fs = fs
return obj
def __array_ufunc__(self, ufunc, method, *inputs, **kwargs):
if len(inputs) == 2:
if (isinstance(inputs[0], Waveform) and
isinstance(inputs[1], Waveform)):
_check_compatibility_fs(*inputs)
output = super().__array_ufunc__(ufunc, method, *inputs, **kwargs)
if isinstance(output, MadArray):
output = output.view(Waveform)
output.fs = inputs[0].fs if isinstance(inputs[0], Waveform) \
else inputs[1].fs
return output
def __array_finalize__(self, obj):
super().__array_finalize__(obj)
self._fs = getattr(obj, '_fs', 1)
self._play_object = getattr(obj, '_play_object', None)
def __reduce__(self):
pickled_state = super().__reduce__()
new_state = pickled_state[2] + (self._fs, )
return pickled_state[0], pickled_state[1], new_state
def __setstate__(self, state):
self._play_object = None
self._fs = state[-1]
super().__setstate__(state[0:-1])
@property
def fs(self):
"""Frequency sampling of the audio signal (int or float).
The signal is not resampled when the sampling frequency is modified.
Raises
------
ValueError
If `fs` is not positive.
"""
return self._fs
@fs.setter
def fs(self, fs):
if fs <= 0:
errmsg = 'fs is not strictly positive (given: {})'
raise ValueError(errmsg.format(fs))
self._fs = fs
@property
def rms(self):
"""Root mean square error of the masked signal (float).
Raises
------
NotImplementedError
If data is with an integer type.
"""
if np.issubdtype(self.dtype, np.integer):
errmsg = 'RMS is not available for integer data types'
raise NotImplementedError(errmsg)
return np.sqrt(np.mean(np.abs(self.to_np_array(0))**2))
@property
def length(self):
"""Length of the signal, in samples."""
return self.shape[0]
@property
def duration(self):
"""Duration of the signal, in seconds."""
return self.length / self.fs
@property
def n_channels(self):
"""Number of channels (1 for mono, 2 for stereo)."""
return 1 if self.ndim == 1 else self.shape[1]
@property
def time_axis(self):
"""Time axis."""
return np.arange(self.length) / self.fs
[docs] def is_stereo(self):
"""Indicates whether the signal is stereo or not."""
return self.n_channels == 2
[docs] def set_rms(self, rms):
"""Set root mean square error of the signal.
Parameters
----------
rms : float
Root mean square of the signal.
Raises
------
ValueError
If `rms` is negative.
NotImplementedError
If data is with an integer type.
"""
if rms < 0:
errmsg = '`rms` should be a positive float (given: {})'
raise ValueError(errmsg.format(rms))
self[:] /= self.rms
self[:] *= rms
[docs] def resample(self, fs):
"""Resample the audio signal, in place.
Can be only performed on a waveform without missing data.
Note that if the current or the new sampling frequencies are not
integers, the new sampling frequency may be different from the
desired value since the resampling method only allows input and
output frequencies of type ``int``. In this case, a warning is
raised.
Parameters
----------
fs : int or float
New sampling frequency.
Raises
------
ValueError
If `fs` is not a positive number.
If `self` has missing samples.
UserWarning
"""
assert np.issubdtype(type(fs), np.integer) \
or np.issubdtype(type(fs), np.floating)
if fs <= 0:
errmsg = '`fs` should be a positive number (given: {})'
raise ValueError(errmsg.format(fs))
if np.issubdtype(type(fs), np.floating) \
or np.issubdtype(type(self.fs), np.floating):
# Find a good rational number to approximate the ratio between
# sampling frequencies
fs_ratio = Fraction(fs/self.fs)
fs_ratio = fs_ratio.limit_denominator(10000)
# Sampling frequencies used for the resampling (need to be int)
resample_new_fs = fs_ratio.numerator
resample_old_fs = fs_ratio.denominator
# Adjust new sampling frequency
new_fs = fs_ratio * self.fs
if new_fs != fs:
warnings.warn('New sampling frequency adjusted to {} instead '
'of {} in order to use ``int`` values in '
'``resample``.'.format(fs_ratio * self.fs, fs))
fs = new_fs
else:
resample_new_fs = fs
resample_old_fs = self.fs
if self.is_masked():
errmsg = 'Waveform has missing entries.'
raise ValueError(errmsg)
if resample_new_fs != resample_old_fs:
x = self.to_np_array()
y = resampy.resample(x, resample_old_fs, resample_new_fs, axis=0)
self.resize(y.shape, refcheck=False)
self[:] = y
self._fs = fs
self._mask = np.zeros_like(self, dtype=np.bool)
[docs] def plot(self, x_axis_label='Time (s)', y_axis_label='',
cpx_mode='real', fill_value=0, **kwargs):
"""Plot the signal.
Parameters
----------
x_axis_label : str
Label of the x axis.
y_axis_label : str
Label of the y axis.
cpx_mode : {'real', 'imag', 'both'}
In case of a complex-valued signal, plot the real part only
('real'), the imaginary part only ('imag'), or both curves
('both'). This option has no effect if the signal is real-valued.
fill_value : scalar or None
Value used to fill missing data for display. If None, the existing
value is used (e.g., for clipping).
Other Parameters
----------------
kwargs
See :func:`matplotlib.pyplot.plot`.
Returns
-------
List of Line
See :func:`matplotlib.pyplot.plot`.
"""
x = self.to_np_array(fill_value)
if np.issubdtype(x.dtype, np.complexfloating):
if cpx_mode == 'real':
lines = plt.plot(self.time_axis, np.real(x), **kwargs)
elif cpx_mode == 'imag':
lines = plt.plot(self.time_axis, np.imag(x), **kwargs)
elif cpx_mode == 'both':
lines_real = plt.plot(self.time_axis, np.real(x), **kwargs)
lines_complex = plt.plot(
self.time_axis, np.imag(x), **kwargs)
lines = lines_real + lines_complex
else:
errmsg = 'Unknown complex mode (given: {}).'
raise ValueError(errmsg.format(cpx_mode))
else:
lines = plt.plot(self.time_axis, x, **kwargs)
# add label_dict
if x_axis_label is not None:
plt.xlabel(x_axis_label)
if y_axis_label is not None:
plt.ylabel(y_axis_label)
return lines
[docs] def plot_mask(self,
x_axis_label='Time (s)',
y_axis_label='',
**kwargs):
"""Plot the mask.
Parameters
----------
x_axis_label : str
Label of the time axis (horizontal axis).
y_axis_label : str
Label of the frequency axis (vertical axis).
"""
plt.step(
self.time_axis, self._mask, linewidth=2, where='pre', **kwargs)
plt.ylim(-0.25, 1.25)
plt.yticks([0, 1])
plt.xlabel(x_axis_label)
plt.ylabel(y_axis_label)
plt.grid()
[docs] def to_wavfile(self, filename, dtype=None):
"""Save the wavefile as an audio wav file.
Parameters
----------
filename : str or :class:`pathlib.Path`
Name of the file including paths and extension
dtype : data-type or None, optional
Type of the entries of the wav file. If None, use current dtype of
data.
Raises
------
UserWarning
If the signal is complex.
ValueError
If the sampling frequency is not an integer from the set of
supported frequencies ``madarrays.waveform.VALID_IO_FS``).
NotImplementedError
If dtype is not supported by the current implementation.
See also
--------
scipy.io.wavfile.write
"""
if self.fs not in VALID_IO_FS:
errmsg = '`fs` is not a valid sampling frequency (given: {}).'
raise ValueError(errmsg.format(self.fs))
if dtype is None:
dtype = self.dtype
y = self.to_np_array(0)
if np.iscomplexobj(y):
warnmsg = 'Complex values have been stored by keeping only the '\
'real part, the imaginary part has been discarded.'
warnings.warn(warnmsg)
y = np.real(y)
if np.issubdtype(dtype, np.complexfloating):
dtype = np.float32
if dtype == np.float64:
dtype = np.float32
elif dtype == np.int64:
raise NotImplementedError
# catch clipping error
y = y.astype(dtype=dtype)
if not np.all(np.isfinite(y)):
errmsg = 'Data should be finite.'
raise ValueError(errmsg)
wavfile.write(str(filename), data=y, rate=self.fs)
[docs] @staticmethod
def from_wavfile(filename, dtype=np.float64, conversion_to_mono=None):
"""Load an audio file and return an Waveform object.
Parameters
----------
filename : str or :class:`pathlib.Path`
Name of the audio file
dtype : data-type, optional
Output data type. If None, the data type from the wavfile is kept.
See :ref:`Type of Entry<type_entry_waveform>`.
conversion_to_mono : {'left', 'right', 'mean'} or None, optional
If None (default), no conversion is performed. If str, select the
appropriate channel (left or right), or the mean between the left
and right channels if stereo.
Returns
-------
:class:`Waveform`
"""
fs, x = wavfile.read(str(filename))
if dtype is None:
dtype = x.dtype
if x.ndim > 1:
if conversion_to_mono is not None:
if conversion_to_mono == 'left':
x = x[:, 0]
elif conversion_to_mono == 'right':
x = x[:, 1]
elif conversion_to_mono == 'mean':
x = np.mean(x, axis=1)
else:
errmsg = '`conversion_to_mono` in invalid (given: {}).'
raise ValueError(errmsg.format(conversion_to_mono))
return Waveform(x, fs=fs).astype(dtype=dtype)
[docs] def fade(self, mode='both', fade_duration=None, fade_length=None):
"""Apply fade-in and/or fade-out.
Half a hanning window with the specified length or duration is used
(exactly one parameter `fade_duration` or `fade_length` must be set).
Parameters
----------
mode : {'both', 'in', 'out'}, optional
Signal to be processed.
fade_duration : float, optional
Duration, in seconds, of the fade effect.
fade_length : int, optional
Length, in samples, of the fade effect.
Raises
------
ValueError
If neither `fade_duration` nor `fade_length` is given.
If both `fade_duration` and `fade_length` are given.
If `fade_duration` or `fade_length` are greater than the
duration or length of the waveform.
If `fade_duration` or `fade_length` are negative.
If `mode` is invalid.
"""
if fade_duration is None and fade_length is None:
errmsg = 'Either `fade_duration` or `fade_length` must be given.'
raise ValueError(errmsg)
if fade_duration is not None and fade_length is not None:
errmsg = '`fade_duration` and `fade_length` cannot be given '\
'simultaneously.'
raise ValueError(errmsg)
if mode not in ['both', 'in', 'out']:
errmsg = 'Mode is unknown: {}.'
raise ValueError(errmsg.format(mode))
if fade_length is None:
fade_length = int(np.round(fade_duration * self.fs))
if fade_length == 0:
return
if fade_length < 0:
errmsg = 'Fade length cannot be negative.'
raise ValueError(errmsg)
if fade_length > self.shape[0]:
errmsg = 'Fade length ({}) is greater than data length ({})'
raise ValueError(errmsg.format(fade_length, self.shape[0]))
w = np.sin(np.pi / 2 / fade_length * np.arange(fade_length))**2
if mode in ['both', 'in']:
self[:len(w)] *= w
if mode in ['both', 'out']:
self[-len(w):] *= w[-1::-1]
[docs] def show_player(self):
"""
Show the player when using jupyter notebook.
Raises
------
ValueError
If sampling frequency is 1.
"""
try:
from IPython.display import Audio
if self.fs == 1:
errmsg = 'Invalid sampling frequency: {}Hz'
raise ValueError(errmsg.format(self.fs))
if np.iscomplexobj(self):
warnmsg = 'Only the real part is played.'
warnings.warn(warnmsg)
x = np.real(self).to_np_array(0)
return Audio(data=x, rate=self.fs)
except ImportError:
print('This method should only be called in notebooks.')
[docs] def play(self, scale=False):
"""Play the audio file.
Parameters
----------
scale : bool
If True, the data is normalized by maximum value.
Returns
-------
`simpleaudio.PlayObject`
Object useful to handle the played audio stream.
Raises
------
ValueError
If sampling frequency is 1.
"""
if self.fs == 1:
errmsg = 'Invalid sampling frequency: {}Hz'
raise ValueError(errmsg.format(self.fs))
if np.iscomplexobj(self):
warnmsg = 'Only the real part is played.'
warnings.warn(warnmsg)
x = np.real(self)
if scale:
x = x.astype(np.float64)
maxima = np.max(np.abs(self))
if maxima > 0:
x[:] = x / maxima * (1 - np.finfo(np.float64).eps)
x = x.astype(dtype=np.int16).to_np_array(0)
self._play_object = sa.play_buffer(x, num_channels=self.n_channels,
bytes_per_sample=2,
sample_rate=self.fs)
[docs] def stop(self):
"""Stop playing (does nothing if the sound is not played) """
if self.is_playing():
self._play_object.stop()
[docs] def is_playing(self):
"""Indicates whether the sound is currently being played.
Returns
-------
bool
True if the sound is currently being played, False otherwise.
"""
return (hasattr(self._play_object, 'is_playing') and
self._play_object.is_playing())
[docs] def get_analytic_signal(self, N=None, axis=0):
"""Return the analytic waveform.
Parameters
----------
N : int, optional
Number of Fourier components. If None, set to x.shape[axis].
axis : int, optional
Axis along which to do the transformation.
Returns
-------
Waveform
Raises
------
TypeError
If Waveform has missing samples.
See Also
--------
scipy.signal.hilbert
"""
if np.any(self.get_unknown_mask()):
errmsg = 'Waveform has missing samples.'
raise TypeError(errmsg)
y = hilbert(self, N=N, axis=axis)
return Waveform(data=y, fs=self.fs)
[docs] def clip(self, min_value=None, max_value=None):
"""Clip audio data according to given bound values.
See :ref:`Type of Entry<type_entry_waveform>` for details.
Parameters
----------
max_value : float, optional
Clipping is applied above this value.
min_value : float, optional
Clipping is applied below this value.
Raises
------
UserWarning
If some coefficients are actually clipped.
Notes
-----
Masked values can be also clipped.
"""
if min_value is None:
min_value = np.min(self)
if max_value is None:
max_value = np.max(self)
if np.issubdtype(self.dtype, np.complexfloating):
w_real = Waveform(np.real(self))
w_imag = Waveform(np.imag(self))
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
w_real.clip(min_value=min_value, max_value=max_value)
warnmsg = 'Real part of the complex entries: {}'
for w_i in w:
warnings.warn(warnmsg.format(str(w_i.message)))
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
w_imag.clip(min_value=min_value, max_value=max_value)
warnmsg = 'Imaginary part of the complex entries: {}'
for w_i in w:
warnings.warn(warnmsg.format(str(w_i.message)))
self[:] = np.array(w_real) + 1j * np.array(w_imag)
else:
ind_min = self < min_value
if np.any(ind_min):
self[ind_min] = min_value
warnmsg = '{} values lower than {} have been clipped.'
warnings.warn(warnmsg.format(self.dtype, min_value))
ind_max = self > max_value
if np.any(ind_max):
self[ind_max] = max_value
warnmsg = '{} values greater than {} have been clipped.'
warnings.warn(warnmsg.format(self.dtype, max_value))
[docs] def astype(self, dtype):
"""Cast the waveform into a given type (float, int, or complex).
Parameters
----------
dtype : data-type, optional
Output data type. See :ref:`Type of Entry<type_entry_waveform>`.
Returns
-------
nd-array
Waveform with the desired dtype.
Raises
------
TypeError
If argument `dtype` is an unsupported data type.
UserWarning
If some coefficients are actually clipped.
"""
y = self.copy()
if (np.issubdtype(self.dtype, np.floating) or
np.issubdtype(self.dtype, np.complexfloating)):
if (np.issubdtype(dtype, np.floating) or
np.issubdtype(dtype, np.complexfloating)):
y.clip(min_value=-1, max_value=(1 - np.finfo(dtype).eps))
y = np.asarray(y, dtype=dtype)
elif np.issubdtype(dtype, np.integer):
target_type_info = np.iinfo(dtype)
int_range = target_type_info.max - target_type_info.min + 1
y = np.floor((np.real(y) + 1) / 2 * int_range +
target_type_info.min)
y.clip(min_value=np.iinfo(dtype).min,
max_value=np.iinfo(dtype).max)
y = np.asarray(y, dtype=dtype)
else:
errmsg = 'Unsupported target type {}.'
raise TypeError(errmsg.format(dtype))
else:
src_iinfo = np.iinfo(y.dtype)
if np.issubdtype(dtype, np.floating):
int_range = src_iinfo.max - src_iinfo.min + 1
zero = src_iinfo.min + int_range // 2
y = ((super(type(y), y).astype(dtype) - zero)
/ (int_range // 2))
y.clip(min_value=-1, max_value=1-np.finfo(dtype).eps)
y = np.asarray(y, dtype=dtype)
elif np.issubdtype(dtype, np.integer):
y = y.astype(dtype=np.float64)
y = y.astype(dtype=dtype)
y = np.asarray(y)
elif np.issubdtype(dtype, np.complexfloating):
y = y.astype(dtype=np.float64)
y = np.asarray(y, dtype=dtype)
else:
errmsg = 'Unsupported target type {}.'
raise TypeError(errmsg.format(dtype))
return Waveform(y, mask=self.get_unknown_mask(), fs=self.fs)
def __eq__(self, other):
if isinstance(other, Waveform):
_check_compatibility_fs(self, other)
return super().__eq__(other)
def __str__(self):
string = super().__str__()
string = string.replace('MadArray', 'Waveform, fs={}Hz, length={}')
return string.format(self.fs, self.length)
def __repr__(self):
string = super().__repr__()
string = string.replace('MadArray', 'Waveform')
return string