# -*- coding: utf-8 -*-
# ######### COPYRIGHT #########
#
# Copyright(c) 2018
# -----------------
#
# * Laboratoire d'Informatique et Systèmes <http://www.lis-lab.fr/>
# * Université d'Aix-Marseille <http://www.univ-amu.fr/>
# * Centre National de la Recherche Scientifique <http://www.cnrs.fr/>
# * Université de Toulon <http://www.univ-tln.fr/>
#
# Contributors
# ------------
#
# * Ronan Hamon <firstname.lastname_AT_lis-lab.fr>
# * Valentin Emiya <firstname.lastname_AT_lis-lab.fr>
# * Florent Jaillet <firstname.lastname_AT_lis-lab.fr>
#
# Description
# -----------
#
# Python package for audio data structures with missing entries
#
# Licence
# -------
# This file is part of madarrays.
#
# madarrays is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# ######### COPYRIGHT #########
"""Definition of a masked array.
.. moduleauthor:: Ronan Hamon
.. moduleauthor:: Valentin Emiya
.. moduleauthor:: Florent Jaillet
"""
import numpy as np
def _merge_masks(ma1, ma2):
"""Merge the masks of two :class:`MadArray` objects and return the
arguments used for initialisation of the resulting :class:`MadArray`
object.
Parameters
----------
ma1 : MadArray
First masked array to consider.
ma2 : MadArray
Second masked array to consider.
Returns
-------
dict
Arguments to be used for the initialisation of a :class:`MadArray`
object.
"""
if ma1._complex_masking or ma2._complex_masking:
if ma1._complex_masking:
mm1 = ma1.get_unknown_mask('magnitude')
mp1 = ma1.get_unknown_mask('phase')
else:
mm1 = ma1.get_unknown_mask('any')
mp1 = ma1.get_unknown_mask('any')
if ma2._complex_masking:
mm2 = ma2.get_unknown_mask('magnitude')
mp2 = ma2.get_unknown_mask('phase')
else:
mm2 = ma2.get_unknown_mask('any')
mp2 = ma2.get_unknown_mask('any')
mask_magnitude = np.logical_or(mm1, mm2)
mask_phase = np.logical_or(mp1, mp2)
return {'mask_magnitude': mask_magnitude, 'mask_phase': mask_phase}
else:
return {'mask': (ma1.get_unknown_mask()) | (ma2.get_unknown_mask())}
UFUNC_NOT_RETURNING_MADARRAYS = ['bitwise_and', 'bitwise_or', 'bitwise_xor',
'invert', 'left_shift', 'right_shift',
'greater', 'greater_equal', 'less',
'less_equal', 'not_equal', 'equal',
'logical_and', 'logical_or', 'logical_xor',
'logical_not', 'maximum', 'minimum', 'fmax',
'fmin', 'isfinite', 'isinf', 'isnan', 'isnat',
'signbit', 'copysign', 'nextafter', 'spacing',
'modf', 'frexp', 'fmod']
[docs]class MadArray(np.ndarray):
"""Subclass of numpy.ndarray to handle data with missing elements.
.. _type_entry_madarray:
**Type of entry**: entries of array can be *int*, *float*, or *complex*.
.. _masking_madarray:
**Masking**: the masking of entries has two different modes:
* Entries can be either masked or not masked, leading to a boolean mask,
whose entries are equal to True if the corresponding data entry is
masked, or False otherwise.
This is the default mode and the mode selected when specifying ``mask``
during creation.
* Complex entries can have only the magnitude or phase component masked, or
both. The resulting mask has integers entries, equal to:
* *0* if the phase and the magnitude are not masked (known magnitude
and phase);
* *1* if only the phase is masked (known magnitude, unknown phase);
* *2* if only the magnitude is masked (unknown magnitude, known phase);
* *3* if the magnitude and the phase are masked (unknown magnitude and
phase).
This mode is selected when specifying ``mask_magnitude`` and/or
``mask_phase`` during creation.
Entries are converted to a complex type.
If entries are complex values and ``mask`` is given during creation, both
the magnitude and phase are masked and the boolean mask mode is used.
.. _indexing_madarray:
**Indexing**: two different modes to index a :class:`MadArray` object are
implemented:
* a :class:`MadArray` object with shape corresponding to the indices is
returned, with both the data matrix and the mask properly indexed. This
is the default mode;
* a :class:`MadArray` object with unchanged shape is returned, where
non-indexed entries are set as masked. This mode is selected by setting
the parameter ``masked_indexing`` to True.
.. _numpy_behaviour_madarray:
**Numpy behaviour**: it is possible to use standard operations (+, -, /,
//, \*, T) between two :class:`MadArray` objects, likewise operations
between numpy arrays. The resulting object has a mask consisting of the
union of the operands. It is also possible to use pickle operations to
jointly store the data and the mask.
Parameters
----------
data : array_like
Multidimensional array. See :ref:`Type of Entry<type_entry_madarray>`.
mask : boolean array_like, optional
Mask for boolean masking mode.
See :ref:`Masking<masking_madarray>`.
mask_magnitude : boolean array_like or None, optional
Magnitude mask for masking with complex data.
See :ref:`Masking<masking_madarray>`.
mask_phase : boolean or array_like or None, optional
Phase mask for masking with complex data.
See :ref:`Masking<masking_madarray>`.
masked_indexing : bool or None, optional
Indicate how the indexing is performed. If None, set to False.
See :ref:`Indexing<indexing_madarray>`.
Warnings
--------
This class inherits from ndarray or subclass of ndarray. Instances can be
then manipulated like ndarrays (e.g., indexation). While some methods have
been implemented taking into account the mask, some may cause unexpected
behavior (e.g., mean).
See also
--------
:mod:`numpy.doc.subclassing`.
Notes
-----
This class implements an alternative masked array different from
:class:`numpy.ma.MaskedArray`. The reason of this choice is that it is only
used as a container of a ndarray and a mask. No masked operations are
needed.
"""
def __new__(cls, data, mask=None, mask_magnitude=None, mask_phase=None,
masked_indexing=None, **kwargs):
if mask is not None and mask_magnitude is not None:
errmsg = ('Parameters mask and mask_magnitude are mutually '
'exclusive')
raise ValueError(errmsg)
if mask is not None and mask_phase is not None:
errmsg = 'Parameters mask and mask_phase are mutually exclusive'
raise ValueError(errmsg)
_data = np.array(data, **kwargs)
if not (np.issubdtype(_data.dtype, np.floating) or
np.issubdtype(_data.dtype, np.integer) or
np.issubdtype(_data.dtype, np.complexfloating)):
errmsg = 'Invalid dtype: {}'
raise TypeError(errmsg.format(data.dtype))
if mask is not None:
complex_masking = False
elif mask_magnitude is not None or mask_phase is not None:
complex_masking = True
elif isinstance(data, MadArray):
complex_masking = data._complex_masking
else:
complex_masking = False
if masked_indexing is None:
if isinstance(data, MadArray):
masked_indexing = data._masked_indexing
else:
masked_indexing = False
if not complex_masking:
if mask is None:
if isinstance(data, MadArray):
mask = data.get_unknown_mask()
else:
mask = np.zeros(_data.shape, dtype=np.bool)
else:
mask = np.array(mask, dtype=np.bool)
if mask.shape != _data.shape:
errmsg = "Mask shape {} and data shape {} not compatible."
raise ValueError(errmsg.format(mask.shape, data.shape))
mask = mask
else:
if not np.issubdtype(_data.dtype, np.complexfloating):
_data = _data.astype(np.complex)
if mask_magnitude is None:
if isinstance(data, MadArray) and mask_phase is None:
mask_magnitude = data.get_unknown_mask('magnitude')
mask_phase = data.get_unknown_mask('phase')
else:
mask_magnitude = np.zeros_like(data, dtype=np.bool)
else:
mask_magnitude = np.array(mask_magnitude, dtype=np.bool)
if mask_magnitude.shape != _data.shape:
errmsg = 'Magnitude mask shape {} and data shape {} not '\
'compatible.'
raise ValueError(errmsg.format(mask_magnitude.shape,
_data.shape))
if mask_phase is None:
mask_phase = np.zeros_like(data, dtype=np.bool)
else:
mask_phase = np.array(mask_phase, dtype=np.bool)
if mask_phase.shape != _data.shape:
errmsg = 'Phase mask shape {} and data shape {} not '\
'compatible.'
raise ValueError(errmsg.format(
mask_phase.shape, _data.shape))
mask = np.zeros(_data.shape, dtype=np.uint8)
mask[np.logical_and(mask_phase, ~mask_magnitude)] = 1
mask[np.logical_and(~mask_phase, mask_magnitude)] = 2
mask[np.logical_and(mask_phase, mask_magnitude)] = 3
# create the object
obj = np.ndarray.__new__(cls, _data.shape, dtype=_data.dtype)
obj[:] = _data
obj._mask = mask
obj._masked_indexing = masked_indexing
obj._complex_masking = complex_masking
return obj
def __array_finalize__(self, obj):
if obj is None:
return
self._mask = getattr(obj, '_mask', None)
self._complex_masking = getattr(obj, '_complex_masking', None)
self._masked_indexing = getattr(obj, '_masked_indexing', None)
def __array_ufunc__(self, ufunc, method, *inputs, **kwargs):
args = []
is_mad = []
for input_ in inputs:
if isinstance(input_, MadArray):
args.append(input_.view(np.ndarray))
is_mad.append(True)
else:
args.append(input_)
is_mad.append(False)
if len(is_mad) > 1:
if np.all(is_mad):
if inputs[0]._complex_masking or inputs[1]._complex_masking:
errmsg = 'Operation not permitted when complex masking.'
raise ValueError(errmsg)
mask = MadArray(inputs[0].data, **
_merge_masks(inputs[0], inputs[1]))._mask
complex_masking = inputs[0]._complex_masking
masked_indexing = inputs[0]._masked_indexing
else:
where_is_mad = np.argwhere(is_mad).squeeze()
mask = inputs[where_is_mad]._mask
complex_masking = inputs[where_is_mad]._complex_masking
masked_indexing = inputs[where_is_mad]._masked_indexing
else:
mask = inputs[0]._mask
complex_masking = inputs[0]._complex_masking
masked_indexing = inputs[0]._masked_indexing
outputs = kwargs.pop('out', None)
if outputs:
out_args = []
for output in outputs:
out_args.append(output.view(np.ndarray))
kwargs['out'] = tuple(out_args)
else:
outputs = (None,) * ufunc.nout
results = (super().__array_ufunc__(ufunc, method, *args, **kwargs), )
new_results = []
for result, output in zip(results, outputs):
if output is None:
if (method == '__call__' and
ufunc.__name__ not in UFUNC_NOT_RETURNING_MADARRAYS):
new_results.append(np.asarray(result).view(MadArray))
new_results[-1]._mask = mask
new_results[-1]._complex_masking = complex_masking
new_results[-1]._masked_indexing = masked_indexing
else:
new_results.append(np.asarray(result).view(np.ndarray))
else:
new_results.append(output)
results = tuple(new_results)
return results[0] if len(results) == 1 else results
def __getitem__(self, index):
if (getattr(self, '_masked_indexing', None) is not None and
self._masked_indexing):
mask = np.zeros(self.shape, dtype=np.bool)
mask[index] = True
return MadArray(self, mask=np.logical_or(~mask, self._mask))
else:
out_arr = super().__getitem__(index)
if getattr(out_arr, '_mask', None) is not None:
out_arr._mask = out_arr._mask[index]
return out_arr
def __reduce__(self):
pickled_state = super().__reduce__()
new_state = pickled_state[2] + (self._mask, self._complex_masking,
self._masked_indexing)
return pickled_state[0], pickled_state[1], new_state
def __setstate__(self, state):
self._mask = state[-3]
self._complex_masking = state[-2]
self._masked_indexing = state[-1]
super().__setstate__(state[0:-3])
@property
def n_missing_data(self):
"""Number of missing data (double or tuple).
Number of masked coefficients if dtype is int or float. Number of
masked coefficients in phase and magnitude masks if dtype is complex.
"""
if self._complex_masking:
return (np.sum(self.get_unknown_mask('magnitude')),
np.sum(self.get_unknown_mask('phase')))
else:
return np.sum(self.get_unknown_mask())
@property
def ratio_missing_data(self):
"""Ratio of missing data (double or tuple).
Ratio of masked coefficients if dtype is int or float. Ratio of
masked coefficients in phase and magnitude masks if dtype is complex.
"""
if self._complex_masking:
return (np.average(self.get_unknown_mask('magnitude')),
np.average(self.get_unknown_mask('phase')))
else:
return np.average(self.get_unknown_mask())
[docs] def is_masked(self):
"""Indicate if one or several elements are masked."""
return np.any(self._mask)
[docs] def to_np_array(self, fill_value=None):
"""Return a numpy array.
If ``fill_value`` is not None, masked elements are replaced according
to the type of entries:
* ``fill_value`` if the type of entries is *int* or *float*;
* If the type is *complex*, missing entries are replaced either by:
* a complex number with the known magnitude value without the phase
information if only the phase is masked;
* a complex number of magnitude 1 with the known phase if only the
magnitude is masked;
* by ``fill_value`` if both magnitude and phase are masked.
Parameters
----------
fill_value : scalar or None
Value used to fill masked elements. If None, the initial value is
kept.
Returns
-------
nd-array
"""
data = np.array(self)
if fill_value is not None:
if self._complex_masking:
upom = self.get_unknown_mask('phase only')
umom = self.get_unknown_mask('magnitude only')
data[upom] = np.abs(data[upom])
data[umom] = np.exp(1j * np.angle(data[umom]))
data[self.get_unknown_mask('all')] = fill_value
else:
data[self.get_unknown_mask()] = fill_value
return data
def __eq__(self, other):
if isinstance(other, MadArray):
return np.logical_and(self.to_np_array(0) == other.to_np_array(0),
self._mask == self._mask)
else:
return np.array(self) == other
def __ne__(self, other):
return np.logical_not(self == other)
[docs] def is_equal(self, other):
if not isinstance(other, MadArray):
return False
if not np.all(self == other):
return False
if not (self._complex_masking == other._complex_masking and
self._masked_indexing == other._masked_indexing):
return False
return True
@property
def T(self):
"""Transpose of the MadArray."""
return self.transpose()
[docs] def copy(self):
return MadArray(self)
[docs] def transpose(self):
mat = super().transpose()
mat._mask = mat._mask.transpose()
return mat
def __str__(self):
arr = np.array(self)
if np.issubdtype(self.dtype, np.integer):
arr = arr.astype(np.float64)
arr[self.get_unknown_mask()] = np.nan
arr_str = np.ndarray.__str__(arr)
if np.isrealobj(arr):
arr_str = arr_str.replace('nan', ' x')
else:
arr_str = arr_str.replace('nan+0.j', ' x ')
if np.issubdtype(self.dtype, np.integer):
arr_str = arr_str.replace('.', '')
if self._complex_masking:
n_all_unknown = np.count_nonzero(self.get_unknown_mask('all'))
string = 'MadArray, dtype={0}, ' \
'{1[0]} missing magnitudes ({2[0]:.1%}) ' \
'and {1[1]} missing phases ({2[1]:.1%}), ' \
' including {3} missing magnitudes and phases jointly ' \
'({4:.1%})\n{5}'
return string.format(self.dtype,
self.n_missing_data,
self.ratio_missing_data,
n_all_unknown,
n_all_unknown/self.size,
arr_str)
else:
string = 'MadArray, dtype={}, {} missing entries ({:.1%})\n{}'
return string.format(self.dtype,
self.n_missing_data,
self.ratio_missing_data, arr_str)
def __repr__(self):
string = '<MadArray at {}>'
return string.format(hex(id(self)))
[docs] def get_known_mask(self, mask_type='all'):
"""Boolean mask for known coefficients.
Compute the boolean mask marking known coefficients as True.
Parameters
----------
mask_type : {'all', 'any', 'magnitude', 'phase', 'magnitude only', \
'phase only'}
Type of mask:
- ``all``: mark coefficients for wich both the magnitude and the
phase are known,
- ``any``: mark coefficients for wich the magnitude or the phase
are known (including when both the magnitude and the phase are
known),
- ``magnitude``: mark coefficients for wich the magnitude is
known,
- ``phase``: mark coefficients for wich the phase is known,
- ``magnitude only``: mark coefficients for wich both the magnitude
is known and the phase is unknown,
- ``phase only``: mark coefficients for wich both the phase is
known and the magnitude is unknown.
Returns
-------
mask : boolean nd-array
Boolean array with entries set to True if the corresponding value
in the object is known.
Raises
------
ValueError
If ``mask_type`` has an invalid value.
"""
if mask_type == 'all':
return ~self.get_unknown_mask('any')
elif mask_type == 'any':
return ~self.get_unknown_mask('all')
elif mask_type == 'magnitude':
return ~self.get_unknown_mask('magnitude')
elif mask_type == 'phase':
return ~self.get_unknown_mask('phase')
elif mask_type == 'magnitude only':
return self.get_unknown_mask('phase only')
elif mask_type == 'phase only':
return self.get_unknown_mask('magnitude only')
errmsg = 'Invalid value for mask_type: {}'.format(mask_type)
raise ValueError(errmsg)
[docs] def get_unknown_mask(self, mask_type='any'):
"""Boolean mask for unknown coefficients.
Compute the boolean mask marking unknown coefficients as True.
Parameters
----------
mask_type : {'any', 'all', 'magnitude', 'phase', 'magnitude only', \
'phase only'}
Type of mask:
- ``any``: mark coefficients for wich the magnitude or the phase
are unknown (including when both the magnitude and the phase are
unknown),
- ``all``: mark coefficients for wich both the magnitude and the
phase are unknown,
- ``magnitude``: mark coefficients for wich the magnitude is
unknown,
- ``phase``: mark coefficients for wich the phase is unknown,
- ``magnitude only``: mark coefficients for wich both the magnitude
is unknown and the phase is known,
- ``phase only``: mark coefficients for wich both the phase is
unknown and the magnitude is known.
Returns
-------
mask : boolean nd-array
Boolean array with values set to True if the corresponding value in
the object is unknown.
Raises
------
ValueError
If ``mask_type`` has an invalid value.
"""
if self._complex_masking:
if mask_type == 'any':
return self._mask != 0
elif mask_type == 'all':
return self._mask == 3
elif mask_type == 'magnitude':
return (self._mask == 3) | (self._mask == 2)
elif mask_type == 'phase':
return (self._mask == 3) | (self._mask == 1)
elif mask_type == 'magnitude only':
return self._mask == 2
elif mask_type == 'phase only':
return self._mask == 1
else:
if mask_type in ('any', 'all', 'magnitude', 'phase'):
return np.copy(self._mask)
elif mask_type in ('magnitude only', 'phase only'):
return np.zeros_like(self._mask, dtype=np.bool)
errmsg = 'Invalid value for mask_type: {}'.format(mask_type)
raise ValueError(errmsg)