Source code for libretro.drivers.microphone.generator
"""
:class:`.MicrophoneDriver` implementation that returns samples produced by a generator function.
.. seealso::
:class:`.MicrophoneDriver`
The protocol this driver implements.
"""
from array import array
from collections.abc import Callable, Collection, Iterable, Iterator, Sequence
from itertools import repeat
from typing import override
from libretro.api.microphone import (
INTERFACE_VERSION,
retro_microphone,
retro_microphone_params,
)
from .driver import Microphone, MicrophoneDriver
MicrophoneInput = int | Sequence[int] | None
MicrophoneInputIterator = Iterator[MicrophoneInput]
MicrophoneInputIterable = Iterable[MicrophoneInput]
MicrophoneInputGeneratorFunction = Callable[[], MicrophoneInputIterator]
MicrophoneSource = MicrophoneInputIterable | MicrophoneInputGeneratorFunction
[docs]
class GeneratorMicrophone(Microphone):
"""
A :class:`.Microphone` that returns samples produced by a generator or iterable.
Use this to feed scripted or recorded audio into a core under test
without needing a real microphone.
.. seealso::
:class:`.Microphone`
The protocol this class implements.
"""
_generator_state: MicrophoneInputIterator
_handle: retro_microphone
[docs]
def __init__(self, generator: MicrophoneSource | None, params: retro_microphone_params | None):
"""
Initialize the microphone with a sample source and capture parameters.
:param generator: The audio source.
A generator function is called once to obtain its iterator;
an iterable is used directly.
:obj:`None` produces an infinite stream of silence.
:param params: The capture parameters to advertise,
or :obj:`None` to default to 44.1 kHz mono.
:raises TypeError: If ``generator`` is neither a callable, an iterable, nor :obj:`None`.
"""
self._params = params or retro_microphone_params(44100)
self._enabled = False
self._closed = False
self._overflow = array("h")
self._handle = retro_microphone(id(self))
match generator:
case None:
self._generator_state = repeat(0)
case Callable():
self._generator_state = generator()
case Iterable() as it:
self._generator_state = iter(it)
case _:
raise TypeError(
f"GeneratorMicrophone requires a generator function or an iterable as input; got {type(generator).__name__}"
)
[docs]
@override
def close(self) -> None:
self._closed = True
@property
@override
def params(self) -> retro_microphone_params | None:
if self._closed:
return None
return self._params
@property
@override
def state(self) -> bool:
if self._closed:
return False
return self._enabled
@state.setter
@override
def state(self, state: bool) -> None:
if self._closed:
raise RuntimeError("Cannot set state on a closed microphone")
self._enabled = bool(state)
[docs]
@override
def read(self, frames: int) -> array[int] | None:
if self._closed or not self._enabled or not frames:
# If this mic is closed, off, or asked to provide zero samples...
return None
buffer = array("h", self._overflow)
# array.clear() wasn't introduced until Python 3.13
del self._overflow[:]
while len(buffer) < frames:
# Until we have the requested number of frames in the buffer,
# keep asking the generator for more input and filling the buffer with it.
next_samples = next(self._generator_state, None)
match next_samples:
case None:
buffer.append(0)
case int(frame):
buffer.append(frame)
case array() as samples if samples.typecode == "h":
buffer.extend(samples)
case f if isinstance(f, Sequence):
buffer.extend(f)
case f:
raise TypeError(
f"MicrophoneInputGenerator must yield a signed 16-bit integer, an array of them, or None; got {type(f).__name__}"
)
if len(buffer) > frames:
# If we got more frames than requested, save the excess in the overflow buffer for next time.
self._overflow.extend(buffer[frames:])
del buffer[frames:]
assert len(buffer) <= frames
return buffer
@property
def handle(self) -> retro_microphone:
"""The opaque :class:`.retro_microphone` handle exposed to the core."""
return self._handle
[docs]
class GeneratorMicrophoneDriver(MicrophoneDriver):
"""
A :class:`.MicrophoneDriver` that opens :class:`GeneratorMicrophone` instances on demand.
Each microphone the core opens is backed by the same source supplied at construction.
.. seealso::
:class:`.MicrophoneDriver`
The protocol this class implements.
"""
[docs]
def __init__(
self, generator: MicrophoneInputGeneratorFunction | MicrophoneInputIterable | None = None
):
"""
Initialize the driver with a default sample source.
:param generator: The audio source used for every microphone opened by the core,
or :obj:`None` to default to silence.
See :class:`GeneratorMicrophone` for accepted forms.
"""
self._microphones: dict[int, GeneratorMicrophone] = {}
self._generator = generator
@property
@override
def version(self) -> int:
return INTERFACE_VERSION
[docs]
@override
def open_mic(self, params: retro_microphone_params | None) -> retro_microphone | None:
mic = GeneratorMicrophone(self._generator, params)
handle = mic.handle
self._microphones[handle.id] = mic
return handle
[docs]
@override
def close_mic(self, mic: retro_microphone) -> None:
mic_id = mic.id
if m := self._microphones.get(mic_id):
m.close()
del self._microphones[mic_id]
[docs]
@override
def get_mic_params(self, mic: retro_microphone) -> retro_microphone_params | None:
mic_id = mic.id
if (m := self._microphones.get(mic_id)) is not None:
return m.params
return None
[docs]
@override
def get_mic_state(self, mic: retro_microphone) -> bool:
mic_id = mic.id
if (m := self._microphones.get(mic_id)) is not None:
return m.state
return False
[docs]
@override
def set_mic_state(self, mic: retro_microphone, state: bool) -> None:
mic_id = mic.id
if (m := self._microphones.get(mic_id)) is not None:
m.state = state
[docs]
@override
def read_mic(self, mic: retro_microphone, frames: int) -> array[int] | None:
mic_id = mic.id
if (m := self._microphones.get(mic_id)) is not None:
return m.read(frames)
return None
@property
@override
def microphones(self) -> Collection[GeneratorMicrophone]:
return self._microphones.values()
__all__ = [
"GeneratorMicrophone",
"GeneratorMicrophoneDriver",
"MicrophoneInput",
"MicrophoneInputIterator",
"MicrophoneInputGeneratorFunction",
"MicrophoneSource",
]