Source code for libretro.drivers.microphone.generator

"""
:class:`.MicrophoneDriver` implementation that returns samples produced by a generator function.

.. seealso::

    :class:`.MicrophoneDriver`
        The protocol this driver implements.
"""

from array import array
from collections.abc import Callable, Collection, Iterable, Iterator, Sequence
from itertools import repeat
from typing import override

from libretro.api.microphone import (
    INTERFACE_VERSION,
    retro_microphone,
    retro_microphone_params,
)

from .driver import Microphone, MicrophoneDriver

MicrophoneInput = int | Sequence[int] | None
MicrophoneInputIterator = Iterator[MicrophoneInput]
MicrophoneInputIterable = Iterable[MicrophoneInput]
MicrophoneInputGeneratorFunction = Callable[[], MicrophoneInputIterator]
MicrophoneSource = MicrophoneInputIterable | MicrophoneInputGeneratorFunction


[docs] class GeneratorMicrophone(Microphone): """ A :class:`.Microphone` that returns samples produced by a generator or iterable. Use this to feed scripted or recorded audio into a core under test without needing a real microphone. .. seealso:: :class:`.Microphone` The protocol this class implements. """ _generator_state: MicrophoneInputIterator _handle: retro_microphone
[docs] def __init__(self, generator: MicrophoneSource | None, params: retro_microphone_params | None): """ Initialize the microphone with a sample source and capture parameters. :param generator: The audio source. A generator function is called once to obtain its iterator; an iterable is used directly. :obj:`None` produces an infinite stream of silence. :param params: The capture parameters to advertise, or :obj:`None` to default to 44.1 kHz mono. :raises TypeError: If ``generator`` is neither a callable, an iterable, nor :obj:`None`. """ self._params = params or retro_microphone_params(44100) self._enabled = False self._closed = False self._overflow = array("h") self._handle = retro_microphone(id(self)) match generator: case None: self._generator_state = repeat(0) case Callable(): self._generator_state = generator() case Iterable() as it: self._generator_state = iter(it) case _: raise TypeError( f"GeneratorMicrophone requires a generator function or an iterable as input; got {type(generator).__name__}" )
[docs] @override def close(self) -> None: self._closed = True
@property @override def params(self) -> retro_microphone_params | None: if self._closed: return None return self._params @property @override def state(self) -> bool: if self._closed: return False return self._enabled @state.setter @override def state(self, state: bool) -> None: if self._closed: raise RuntimeError("Cannot set state on a closed microphone") self._enabled = bool(state)
[docs] @override def read(self, frames: int) -> array[int] | None: if self._closed or not self._enabled or not frames: # If this mic is closed, off, or asked to provide zero samples... return None buffer = array("h", self._overflow) # array.clear() wasn't introduced until Python 3.13 del self._overflow[:] while len(buffer) < frames: # Until we have the requested number of frames in the buffer, # keep asking the generator for more input and filling the buffer with it. next_samples = next(self._generator_state, None) match next_samples: case None: buffer.append(0) case int(frame): buffer.append(frame) case array() as samples if samples.typecode == "h": buffer.extend(samples) case f if isinstance(f, Sequence): buffer.extend(f) case f: raise TypeError( f"MicrophoneInputGenerator must yield a signed 16-bit integer, an array of them, or None; got {type(f).__name__}" ) if len(buffer) > frames: # If we got more frames than requested, save the excess in the overflow buffer for next time. self._overflow.extend(buffer[frames:]) del buffer[frames:] assert len(buffer) <= frames return buffer
@property def handle(self) -> retro_microphone: """The opaque :class:`.retro_microphone` handle exposed to the core.""" return self._handle
[docs] class GeneratorMicrophoneDriver(MicrophoneDriver): """ A :class:`.MicrophoneDriver` that opens :class:`GeneratorMicrophone` instances on demand. Each microphone the core opens is backed by the same source supplied at construction. .. seealso:: :class:`.MicrophoneDriver` The protocol this class implements. """
[docs] def __init__( self, generator: MicrophoneInputGeneratorFunction | MicrophoneInputIterable | None = None ): """ Initialize the driver with a default sample source. :param generator: The audio source used for every microphone opened by the core, or :obj:`None` to default to silence. See :class:`GeneratorMicrophone` for accepted forms. """ self._microphones: dict[int, GeneratorMicrophone] = {} self._generator = generator
@property @override def version(self) -> int: return INTERFACE_VERSION
[docs] @override def open_mic(self, params: retro_microphone_params | None) -> retro_microphone | None: mic = GeneratorMicrophone(self._generator, params) handle = mic.handle self._microphones[handle.id] = mic return handle
[docs] @override def close_mic(self, mic: retro_microphone) -> None: mic_id = mic.id if m := self._microphones.get(mic_id): m.close() del self._microphones[mic_id]
[docs] @override def get_mic_params(self, mic: retro_microphone) -> retro_microphone_params | None: mic_id = mic.id if (m := self._microphones.get(mic_id)) is not None: return m.params return None
[docs] @override def get_mic_state(self, mic: retro_microphone) -> bool: mic_id = mic.id if (m := self._microphones.get(mic_id)) is not None: return m.state return False
[docs] @override def set_mic_state(self, mic: retro_microphone, state: bool) -> None: mic_id = mic.id if (m := self._microphones.get(mic_id)) is not None: m.state = state
[docs] @override def read_mic(self, mic: retro_microphone, frames: int) -> array[int] | None: mic_id = mic.id if (m := self._microphones.get(mic_id)) is not None: return m.read(frames) return None
@property @override def microphones(self) -> Collection[GeneratorMicrophone]: return self._microphones.values()
__all__ = [ "GeneratorMicrophone", "GeneratorMicrophoneDriver", "MicrophoneInput", "MicrophoneInputIterator", "MicrophoneInputGeneratorFunction", "MicrophoneSource", ]