Source code for libretro.drivers.environment.composite

"""
Default :class:`.EnvironmentDriver` that composes per-feature drivers into a single dispatcher.

.. seealso::

    :class:`.EnvironmentDriver`
        The protocol this implementation satisfies.
"""

# ruff: noqa: F405
# We need to import a _lot_ of symbols from libretro.api

from __future__ import annotations

import typing
from collections.abc import Sequence
from copy import deepcopy
from ctypes import (
    c_bool,
    c_char_p,
    c_double,
    c_float,
    c_int16,
    c_int32,
    c_uint,
    c_uint8,
    c_uint64,
    c_void_p,
    cast,
    memmove,
    pointer,
    sizeof,
)
from typing import TYPE_CHECKING, Literal, overload, override

from libretro.api import *  # noqa: F403
from libretro.api._utils import (
    MAX_POINTER_VALUE,
    deepcopy_array,
    from_zero_terminated,
    is_zeroed,
    memoryview_at,
)
from libretro.ctypes import TypedFunctionPointer, TypedPointer, c_void_ptr
from libretro.drivers.audio import AudioDriver
from libretro.drivers.camera import CameraDriver
from libretro.drivers.content import ContentDriver
from libretro.drivers.input import InputDriver
from libretro.drivers.led import LedDriver
from libretro.drivers.location import LocationDriver
from libretro.drivers.log import LogDriver
from libretro.drivers.message import MessageDriver
from libretro.drivers.microphone import MicrophoneDriver
from libretro.drivers.midi import MidiDriver
from libretro.drivers.options import OptionDriver
from libretro.drivers.path import PathDriver
from libretro.drivers.perf import PerfDriver
from libretro.drivers.power import PowerDriver
from libretro.drivers.rumble import RumbleDriver
from libretro.drivers.sensor import SensorDriver
from libretro.drivers.timing import TimingDriver
from libretro.drivers.user import UserDriver
from libretro.drivers.vfs import FileSystemDriver
from libretro.drivers.video import FrameBufferSpecial, VideoDriver
from libretro.drivers.video.driver import UnsupportedContextError

from .default import DefaultEnvironmentDriver
from .dict import DictEnvironmentDriver

# TODO: Match envcalls even if the experimental flag is unset (but still consider it for ABI differences)
if TYPE_CHECKING:
    from ctypes import _CDataType, _CFunctionType  # type: ignore

_return_on_raise = DictEnvironmentDriver.return_on_raise
# No need to apply this decorator to environment calls,
# since the base implementation of environment() in DictEnvironmentDriver does so.


[docs] class CompositeEnvironmentDriver[ _Audio: AudioDriver, _Input: InputDriver, _Video: VideoDriver, _Content: ContentDriver | None, _Message: MessageDriver | None, _Option: OptionDriver | None, _Path: PathDriver | None, _Rumble: RumbleDriver | None, _Sensor: SensorDriver | None, _Camera: CameraDriver | None, _Log: LogDriver | None, _Perf: PerfDriver | None, _Location: LocationDriver | None, _User: UserDriver | None, _Vfs: FileSystemDriver | None, _Led: LedDriver | None, _Midi: MidiDriver | None, _Timing: TimingDriver | None, _Mic: MicrophoneDriver | None, _Power: PowerDriver | None, ](DefaultEnvironmentDriver): """ :class:`.EnvironmentDriver` that composes individual feature drivers into one dispatcher. Each constructor argument supplies the driver responsible for one libretro subsystem (audio, input, video, etc.); ``None`` indicates the subsystem is not provided and the corresponding environment calls will return ``false``. """
[docs] @override def __init__( self, /, audio: _Audio, input: _Input, video: _Video, content: _Content = None, overscan: bool | None = None, message: _Message = None, options: _Option = None, path: _Path = None, rumble: _Rumble = None, sensor: _Sensor = None, camera: _Camera = None, log: _Log = None, perf: _Perf = None, location: _Location = None, user: _User = None, vfs: _Vfs = None, led: _Led = None, av_enable: AvEnableFlags | None = None, midi: _Midi = None, timing: _Timing = None, preferred_hw: HardwareContext | None = None, driver_switch_enable: bool | None = None, savestate_context: SavestateContext | None = None, jit_capable: bool | None = None, mic: _Mic = None, device_power: _Power = None, ): super().__init__() self._audio = audio if not isinstance(self._audio, AudioDriver): raise TypeError(f"Expected AudioDriver, got {type(self._audio).__qualname__}") self._input = input if not isinstance(self._input, InputDriver): raise TypeError(f"Expected InputDriver, got {type(self._input).__qualname__}") self._video = video if not isinstance(self._video, VideoDriver): raise TypeError(f"Expected VideoDriver, got {type(self._video).__qualname__}") self._content = content if self._content is not None and not isinstance(self._content, ContentDriver): raise TypeError( f"Expected ContentDriver or None, got {type(self._content).__qualname__}" ) self._overscan = overscan if self._overscan is not None and not isinstance(self._overscan, bool): raise TypeError(f"Expected bool or None, got {type(self._overscan).__qualname__}") self._message = message if self._message is not None and not isinstance(self._message, MessageDriver): raise TypeError( f"Expected MesasgeDriver or None, got {type(self._message).__qualname__}" ) self.__shutdown = False self._performance_level: int | None = None self._path = path if self._path is not None and not isinstance(self._path, PathDriver): raise TypeError(f"Expected PathDriver or None, got {type(self._path).__qualname__}") self._options = options if self._options is not None and not isinstance(self._options, OptionDriver): raise TypeError( f"Expected OptionDriver or None, got {type(self._options).__qualname__}" ) self._rumble = rumble if self._rumble is not None and not isinstance(self._rumble, RumbleDriver): raise TypeError( f"Expected RumbleDriver or None, got {type(self._rumble).__qualname__}" ) self._sensor = sensor if self._sensor is not None and not isinstance(self._sensor, SensorDriver): raise TypeError( f"Expected SensorDriver or None, got {type(self._sensor).__qualname__}" ) self._camera = camera if self._camera is not None and not isinstance(self._camera, CameraDriver): raise TypeError( f"Expected CameraDriver or None, got {type(self._camera).__qualname__}" ) self._log_driver = log if self._log_driver is not None and not isinstance(self._log_driver, LogDriver): raise TypeError( f"Expected LogDriver or None, got {type(self._log_driver).__qualname__}" ) self._perf = perf if self._perf is not None and not isinstance(self._perf, PerfDriver): raise TypeError(f"Expected PerfDriver or None, got {type(self._perf).__qualname__}") self._location = location if self._location is not None and not isinstance(self._location, LocationDriver): raise TypeError( f"Expected LocationDriver or None, got {type(self._location).__qualname__}" ) self._proc_address_callback: retro_get_proc_address_interface | None = None self._memory_maps: retro_memory_map | None = None self._user = user if self._user is not None and not isinstance(self._user, UserDriver): raise TypeError(f"Expected UserDriver or None, got {type(self._user).__qualname__}") self._supports_achievements: bool | None = None self._serialization_quirks: SerializationQuirks | None = None self._vfs = vfs if self._vfs is not None and not isinstance(self._vfs, FileSystemDriver): raise TypeError( f"Expected FileSystemDriver or None, got {type(self._vfs).__qualname__}" ) self._led = led if self._led is not None and not isinstance(self._led, LedDriver): raise TypeError(f"Expected LedDriver or None, got {type(self._led).__qualname__}") self._av_enable = av_enable if self._av_enable is not None and not isinstance(self._av_enable, AvEnableFlags): raise TypeError( f"Expected AvEnableFlags or None, got {type(self._av_enable).__qualname__}" ) self._midi = midi if self._midi is not None and not isinstance(self._midi, MidiDriver): raise TypeError(f"Expected MidiDriver or None, got {type(self._midi).__qualname__}") self._timing = timing if self._timing is not None and not isinstance(self._timing, TimingDriver): raise TypeError( f"Expected TimingDriver or None, got {type(self._timing).__qualname__}" ) self._preferred_hw = preferred_hw if self._preferred_hw is not None and not isinstance(self._preferred_hw, HardwareContext): raise TypeError( f"Expected HardwareContext or None, got {type(self._preferred_hw).__qualname__}" ) self._driver_switch_enable = driver_switch_enable if self._driver_switch_enable is not None and not isinstance( self._driver_switch_enable, bool ): raise TypeError( f"Expected bool or None, got {type(self._driver_switch_enable).__qualname__}" ) self._savestate_context = savestate_context if self._savestate_context is not None and not isinstance( self._savestate_context, SavestateContext ): raise TypeError( f"Expected SavestateContext or None, got {type(self._savestate_context).__qualname__}" ) self._jit_capable = jit_capable if self._jit_capable is not None and not isinstance(self._jit_capable, bool): raise TypeError(f"Expected bool or None, got {type(self._jit_capable).__qualname__}") self._mic = mic if self._mic is not None and not isinstance(self._mic, MicrophoneDriver): raise TypeError( f"Expected MicrophoneDriver or None, got {type(self._mic).__qualname__}" ) self._device_power = device_power if self._device_power is not None and not isinstance(self._device_power, PowerDriver): raise TypeError( f"Expected PowerDriver or None, got {type(self._device_power).__qualname__}" ) self._hw_render_callback: retro_hw_render_callback | None = None self._rumble_interface: retro_rumble_interface | None = None self._camera_callback: retro_camera_callback | None = None self._sensor_interface: retro_sensor_interface | None = None self._log_callback: retro_log_callback | None = None self._perf_callback: retro_perf_callback | None = None self._location_callback: retro_location_callback | None = None self._vfs_interface: retro_vfs_interface | None = None self._led_interface: retro_led_interface | None = None self._midi_interface: retro_midi_interface | None = None self._mic_interface: retro_microphone_interface | None = None
@property def audio(self) -> _Audio: """Return the :class:`.AudioDriver` supplied at construction time.""" return self._audio @property def input(self) -> _Input: """Return the :class:`.InputDriver` supplied at construction time.""" return self._input @property def video(self) -> _Video: """Return the :class:`.VideoDriver` supplied at construction time.""" return self._video @property def content(self) -> _Content: """Return the :class:`.ContentDriver` supplied at construction time, or ``None`` if absent.""" return self._content @property def sensor(self) -> _Sensor: """Return the :class:`.SensorDriver` supplied at construction time, or ``None`` if absent.""" return self._sensor @property def camera(self) -> _Camera: """Return the :class:`.CameraDriver` supplied at construction time, or ``None`` if absent.""" return self._camera @property def user(self) -> _User: """Return the :class:`.UserDriver` supplied at construction time, or ``None`` if absent.""" return self._user @property def path(self) -> _Path: """Return the :class:`.PathDriver` supplied at construction time, or ``None`` if absent.""" return self._path @property def timing(self) -> _Timing: """Return the :class:`.TimingDriver` supplied at construction time, or ``None`` if absent.""" return self._timing @property def rumble(self) -> _Rumble: """Return the :class:`.RumbleDriver` supplied at construction time, or ``None`` if absent.""" return self._rumble
[docs] @override @_return_on_raise(None) def video_refresh(self, data: c_void_ptr, width: int, height: int, pitch: int) -> None: # Handle the constants and their equivalent ints, just to be safe match data.value: case 0: # Passing NULL to retro_video_refresh_t means "redraw the frame" self._video.refresh(FrameBufferSpecial.DUPE, width, height, pitch) case int(i) if i == MAX_POINTER_VALUE: self._video.refresh(FrameBufferSpecial.HARDWARE, width, height, pitch) case int(): view = memoryview_at(data, pitch * height, readonly=True) assert len(view) == pitch * height, ( f"Expected view to have {pitch * height} bytes, got {len(view)} bytes" ) self._video.refresh(view, width, height, pitch) case _: raise TypeError( f"Expected FrameBufferSpecial, int, or c_void_p, got {type(data).__name__}" )
[docs] @override @_return_on_raise(None) def audio_sample(self, left: int, right: int) -> None: self._audio.sample(left, right)
[docs] @override @_return_on_raise(0) def audio_sample_batch(self, data: TypedPointer[c_int16], frames: int) -> int: sample_view = memoryview_at(data, frames * 2 * sizeof(c_int16)).cast("h") assert len(sample_view) == frames * 2, ( f"Expected view to have {frames * 2} samples, got {len(sample_view)} samples" ) return self._audio.sample_batch(sample_view)
[docs] @override @_return_on_raise(None) def input_poll(self) -> None: # TODO: Ensure this isn't called more than once per frame self._input.poll() if self._sensor is not None: self._sensor.poll()
[docs] @override @_return_on_raise(0) def input_state(self, port: Port, device: int, index: int, id: int) -> int: return self._input.state(port, InputDevice(device), index, id)
@property def rotation(self) -> Rotation: """ Return the screen :class:`.Rotation` currently set on the underlying :class:`.VideoDriver`. .. seealso:: :attr:`.EnvironmentCall.SET_ROTATION` The environment call that updates this value. """ return self._video.rotation @override def _set_rotation(self, rotation: TypedPointer[c_uint]) -> bool: if not rotation: raise ValueError("RETRO_ENVIRONMENT_SET_ROTATION doesn't accept NULL") self._video.rotation = Rotation(rotation[0]) return True @property def overscan(self) -> bool | None: """ Return whether the frontend wants overscan to be visible, or ``None`` if unset. .. seealso:: :attr:`.EnvironmentCall.GET_OVERSCAN` The environment call that queries this value. """ return self._overscan @overscan.setter def overscan(self, value: bool) -> None: self._overscan = bool(value) @overscan.deleter def overscan(self) -> None: self._overscan = None @override def _get_overscan(self, overscan: TypedPointer[c_bool]) -> bool: if self.overscan is None: return False if not overscan: raise ValueError("RETRO_ENVIRONMENT_GET_OVERSCAN doesn't accept NULL") overscan[0] = self.overscan return True @property def can_dupe(self) -> bool | None: """ Return whether the underlying :class:`.VideoDriver` supports frame duping. .. seealso:: :attr:`.EnvironmentCall.GET_CAN_DUPE` The environment call that queries this value. """ return self._video.can_dupe @override def _get_can_dupe(self, can_dupe: TypedPointer[c_bool]) -> bool: if not can_dupe: raise ValueError("RETRO_ENVIRONMENT_GET_CAN_DUPE doesn't accept NULL") if self._video.can_dupe is None: return False can_dupe[0] = self._video.can_dupe return True @property def message(self) -> _Message: """Return the :class:`.MessageDriver` supplied at construction time, or ``None`` if absent.""" return self._message @override def _set_message(self, message: TypedPointer[retro_message]) -> bool: if self._message is None: return False if not message: raise ValueError("RETRO_ENVIRONMENT_SET_MESSAGE doesn't accept NULL") return self._message.set_message(message[0]) @property def is_shutdown(self) -> bool: """ Return ``True`` if the core has requested a shutdown via the environment call. .. seealso:: :attr:`.EnvironmentCall.SHUTDOWN` The environment call that flips this flag. """ return self.__shutdown @override def _shutdown(self) -> bool: self.__shutdown = True # TODO: Add a shutdown driver? return True @property def performance_level(self) -> int | None: """ Return the performance level the core has reported, or ``None`` if unset. .. seealso:: :attr:`.EnvironmentCall.SET_PERFORMANCE_LEVEL` The environment call that sets this value. """ return self._performance_level @performance_level.setter def performance_level(self, value: int) -> None: self._performance_level = int(value) @performance_level.deleter def performance_level(self) -> None: self._performance_level = None @override def _set_performance_level(self, level: TypedPointer[c_uint]) -> bool: if not level: raise ValueError("RETRO_ENVIRONMENT_SET_PERFORMANCE_LEVEL doesn't accept NULL") self._performance_level = level[0] return True @override def _get_system_directory(self, dir: TypedPointer[c_char_p]) -> bool: if self._path is None or self._path.system_dir is None: return False if not dir: raise ValueError("RETRO_ENVIRONMENT_GET_SYSTEM_DIRECTORY doesn't accept NULL") dir[0] = self._path.system_dir return True @property def pixel_format(self) -> PixelFormat: """ Return the :class:`.PixelFormat` currently set on the underlying :class:`.VideoDriver`. .. seealso:: :attr:`.EnvironmentCall.SET_PIXEL_FORMAT` The environment call that updates this value. """ return self._video.pixel_format @override def _set_pixel_format(self, fmt: TypedPointer[retro_pixel_format]) -> bool: if not fmt: raise ValueError("RETRO_ENVIRONMENT_SET_PIXEL_FORMAT doesn't accept NULL") self._video.pixel_format = PixelFormat(fmt[0]) return True @property def input_descriptors(self) -> Sequence[retro_input_descriptor] | None: """ Return the input descriptors registered by the core, or ``None`` if none were set. .. seealso:: :attr:`.EnvironmentCall.SET_INPUT_DESCRIPTORS` The environment call that registers these descriptors. """ return self._input.descriptors @override def _set_input_descriptors(self, descriptors: TypedPointer[retro_input_descriptor]) -> bool: if not descriptors: raise ValueError("RETRO_ENVIRONMENT_SET_INPUT_DESCRIPTORS doesn't accept NULL") self._input.descriptors = tuple(deepcopy(d) for d in from_zero_terminated(descriptors)) return True @property def keyboard_callback(self) -> retro_keyboard_callback | None: """ Return the keyboard callback registered by the core, or ``None`` if none was set. .. seealso:: :attr:`.EnvironmentCall.SET_KEYBOARD_CALLBACK` The environment call that registers this callback. """ return self._input.keyboard_callback @override def _set_keyboard_callback(self, callback: TypedPointer[retro_keyboard_callback]) -> bool: if not callback: raise ValueError("RETRO_ENVIRONMENT_SET_KEYBOARD_CALLBACK doesn't accept NULL") self._input.keyboard_callback = deepcopy(callback[0]) return True @override def _set_disk_control_interface( self, callback: TypedPointer[retro_disk_control_callback] ) -> bool: return False # TODO: Implement @override def _set_hw_render(self, callback: TypedPointer[retro_hw_render_callback]) -> bool: if not callback: raise ValueError("RETRO_ENVIRONMENT_SET_HW_RENDER doesn't accept NULL") core_callback = callback[0] try: self._video.set_context(core_callback) except UnsupportedContextError: return False if not self._hw_render_callback: self._hw_render_callback = retro_hw_render_callback( get_current_framebuffer=retro_hw_get_current_framebuffer_t( self._get_current_framebuffer ), get_proc_address=retro_hw_get_proc_address_t(self._get_hw_proc_address), ) core_callback.get_current_framebuffer = self._hw_render_callback.get_current_framebuffer core_callback.get_proc_address = self._hw_render_callback.get_proc_address # Give the core the callbacks that the video driver defines return True @_return_on_raise(0) def _get_current_framebuffer(self) -> int: return self._video.current_framebuffer or 0 @_return_on_raise(0) def _get_hw_proc_address(self, sym: bytes) -> int: proc = self._video.get_proc_address(sym) if not proc: return 0 return cast(proc, c_void_p).value or 0 @property def options(self) -> _Option: """Return the :class:`.OptionDriver` supplied at construction time, or ``None`` if absent.""" return self._options @override def _get_variable(self, variable: TypedPointer[retro_variable]) -> bool: if self._options is None: return False if variable: key = variable[0].key if key is not None: variable[0].value = self._options.get_variable(key) # This envcall supports passing NULL to query for support return True @override def _set_variables(self, variables: TypedPointer[retro_variable]) -> bool: if self._options is None: return False if variables: definitions = tuple(deepcopy(s) for s in from_zero_terminated(variables)) self._options.set_variables(definitions) else: self._options.set_variables(None) # This envcall supports passing NULL to query for support return True @override def _get_variable_update(self, updated: TypedPointer[c_bool]) -> bool: if self._options is None: return False if not updated: raise ValueError("RETRO_ENVIRONMENT_GET_VARIABLE_UPDATE doesn't accept NULL") updated[0] = self._options.variable_updated return True @property def support_no_game(self) -> bool | None: """ Return whether the core can run without content, or ``None`` if no content driver is set. .. seealso:: :attr:`.EnvironmentCall.SET_SUPPORT_NO_GAME` The environment call that sets this value. """ if self._content is None: return None return self._content.support_no_game @override def _set_support_no_game(self, support: TypedPointer[c_bool]) -> bool: if self._content is None: return False if not support: raise ValueError("RETRO_ENVIRONMENT_SET_SUPPORT_NO_GAME doesn't accept NULL") self._content.support_no_game = support[0] return True @override def _get_libretro_path(self, path: TypedPointer[c_char_p]) -> bool: if self._path is None or not self._path.libretro_path: return False if not path: raise ValueError("RETRO_ENVIRONMENT_GET_LIBRETRO_PATH doesn't accept NULL") path[0] = self._path.libretro_path return True @override def _set_frame_time_callback(self, callback: TypedPointer[retro_frame_time_callback]) -> bool: if self._timing is None: return False if not callback: raise ValueError("RETRO_ENVIRONMENT_SET_FRAME_TIME_CALLBACK doesn't accept NULL") self._timing.frame_time_callback = deepcopy(callback[0]) return True @override def _set_audio_callback(self, callback: TypedPointer[retro_audio_callback]) -> bool: if callback: self._audio.callbacks = deepcopy(callback[0]) return True @override def _get_rumble_interface(self, rumble: TypedPointer[retro_rumble_interface]) -> bool: if not rumble: raise ValueError("RETRO_ENVIRONMENT_GET_RUMBLE_INTERFACE doesn't accept NULL") if not self._rumble: return False if not self._rumble_interface: self._rumble_interface = retro_rumble_interface( retro_set_rumble_state_t(self._set_rumble_state) ) # So that even if the rumble/input drivers are swapped out, # the core still has valid function pointers tied to non-GC'd callable objects rumble[0] = self._rumble_interface return True @_return_on_raise(False) def _set_rumble_state(self, port: Port, effect: int, strength: int) -> bool: if self._rumble is None: return False return self._rumble.set_rumble_state(port, RumbleEffect(effect), strength) @override def _get_input_device_capabilities(self, capabilities: TypedPointer[c_uint64]) -> bool: if not capabilities: raise ValueError("RETRO_ENVIRONMENT_GET_INPUT_DEVICE_CAPABILITIES doesn't accept NULL") caps = self._input.device_capabilities if caps is None: return False capabilities[0] = caps return True @override def _get_sensor_interface(self, interface: TypedPointer[retro_sensor_interface]) -> bool: if not interface: raise ValueError("RETRO_ENVIRONMENT_GET_SENSOR_INTERFACE doesn't accept NULL") if not self._sensor: return False if not self._sensor_interface: self._sensor_interface = retro_sensor_interface( set_sensor_state=retro_set_sensor_state_t(self._set_sensor_state), get_sensor_input=retro_sensor_get_input_t(self._get_sensor_input), ) # So that even if the sensor/input drivers are swapped out, # the core still has valid function pointers tied to non-GC'd callable objects interface[0] = self._sensor_interface return True @_return_on_raise(False) def _set_sensor_state(self, port: Port, action: int, rate: int) -> bool: if self._sensor is None: return False if self._input.max_users is not None and port >= self._input.max_users: # If we have a max-user limit set, and the port number exceeds it... return False if action not in SensorAction: # If the action isn't a valid SensorAction, then we shouldn't pass it to the driver return False return self._sensor.set_sensor_state(port, SensorAction(action), rate) @_return_on_raise(0.0) def _get_sensor_input(self, port: Port, sensor: int) -> float: if self._sensor is None: return 0.0 if self._input.max_users is not None and port >= self._input.max_users: # If we have a max-user limit set, and the port number exceeds it... return 0.0 if sensor not in Sensor: # If the sensor isn't a valid Sensor, then we shouldn't pass it to the driver return 0.0 return self._sensor.get_sensor_input(port, Sensor(sensor)) @override def _get_camera_interface(self, interface: TypedPointer[retro_camera_callback]) -> bool: if self._camera is None: return False if not interface: raise ValueError("RETRO_ENVIRONMENT_GET_CAMERA_INTERFACE doesn't accept NULL") if not self._camera_callback: self._camera_callback = retro_camera_callback( start=self._camera_start, stop=self._camera_stop, ) callback = interface[0] self._camera.width = callback.width self._camera.height = callback.height self._camera.frame_raw_framebuffer = callback.frame_raw_framebuffer self._camera.frame_opengl_texture = callback.frame_opengl_texture self._camera.initialized = callback.initialized self._camera.deinitialized = callback.deinitialized callback.start = self._camera_callback.start callback.stop = self._camera_callback.stop return True @_return_on_raise(False) def _camera_start(self) -> bool: if self._camera is None: return False return self._camera.start() @_return_on_raise(None) def _camera_stop(self) -> None: if self._camera is not None: self._camera.stop() @property def log(self) -> _Log: """Return the :class:`.LogDriver` supplied at construction time, or ``None`` if absent.""" return self._log_driver @override def _get_log_interface(self, interface: TypedPointer[retro_log_callback]) -> bool: if self._log_driver is None: return False if not interface: raise ValueError("RETRO_ENVIRONMENT_GET_LOG_INTERFACE doesn't accept NULL") if not self._log_callback: self._log_callback = retro_log_callback(log=retro_log_printf_t(self._log)) interface[0] = self._log_callback return True @_return_on_raise(None) def _log(self, level: int, message: bytes): if self._log_driver is not None and level in LogLevel: self._log_driver.log(LogLevel(level), message) @property def perf(self) -> _Perf: """Return the :class:`.PerfDriver` supplied at construction time, or ``None`` if absent.""" return self._perf @override def _get_perf_interface(self, interface: TypedPointer[retro_perf_callback]) -> bool: if self._perf is None: return False if not interface: raise ValueError("RETRO_ENVIRONMENT_GET_PERF_INTERFACE doesn't accept NULL") if not self._perf_callback: self._perf_callback = retro_perf_callback( get_time_usec=self._get_time_usec, get_cpu_features=self._get_cpu_features, get_perf_counter=self._get_perf_counter, perf_register=self._perf_register, perf_start=self._perf_start, perf_stop=self._perf_stop, perf_log=self._perf_log, ) interface[0] = self._perf_callback return True @_return_on_raise(0) def _get_time_usec(self) -> int: if self._perf is None: return 0 return self._perf.get_time_usec() @_return_on_raise(0) def _get_cpu_features(self) -> int: if self._perf is None: return 0 return self._perf.get_cpu_features() @_return_on_raise(0) def _get_perf_counter(self) -> int: if self._perf is None: return 0 return self._perf.get_perf_counter() @_return_on_raise(None) def _perf_register(self, counter: TypedPointer[retro_perf_counter]): if self._perf is not None and counter: self._perf.perf_register(counter[0]) @_return_on_raise(None) def _perf_start(self, counter: TypedPointer[retro_perf_counter]): if self._perf is not None and counter: self._perf.perf_start(counter[0]) @_return_on_raise(None) def _perf_stop(self, counter: TypedPointer[retro_perf_counter]): if self._perf is not None and counter: self._perf.perf_stop(counter[0]) @_return_on_raise(None) def _perf_log(self): if self._perf is not None: self._perf.perf_log() @property def location(self) -> _Location: """Return the :class:`.LocationDriver` supplied at construction time, or ``None`` if absent.""" return self._location @override def _get_location_interface(self, interface: TypedPointer[retro_location_callback]) -> bool: if self._location is None: return False if not interface: raise ValueError("RETRO_ENVIRONMENT_GET_LOCATION_INTERFACE doesn't accept NULL") if not self._location_callback: self._location_callback = retro_location_callback( start=retro_location_start_t(self._location_start), stop=retro_location_stop_t(self._location_stop), get_position=retro_location_get_position_t(self._location_get_position), set_interval=retro_location_set_interval_t(self._location_set_interval), ) location = interface[0] self._location.initialized = location.initialized self._location.deinitialized = location.deinitialized location.start = self._location_callback.start location.stop = self._location_callback.stop location.get_position = self._location_callback.get_position location.set_interval = self._location_callback.set_interval return True @_return_on_raise(False) def _location_start(self) -> bool: if self._location is None: return False return self._location.start() @_return_on_raise(None) def _location_stop(self) -> None: if self._location is not None: self._location.stop() @_return_on_raise(False) def _location_get_position( self, lat: TypedPointer[c_double], lon: TypedPointer[c_double], horiz_accuracy: TypedPointer[c_double], vert_accuracy: TypedPointer[c_double], ) -> bool: if self._location is None: return False if not lat or not lon or not horiz_accuracy or not vert_accuracy: raise ValueError("retro_location_get_position_t doesn't accept NULL") position = self._location.get_position() if position is None: return False lat[0] = position.latitude or 0.0 lon[0] = position.longitude or 0.0 horiz_accuracy[0] = position.horizontal_accuracy or 0.0 vert_accuracy[0] = position.vertical_accuracy or 0.0 return True @_return_on_raise(None) def _location_set_interval(self, ms: int, distance: int) -> None: if self._location is not None: self._location.set_interval(ms, distance) @override def _get_core_assets_directory(self, dir: TypedPointer[c_char_p]) -> bool: if self._path is None or self._path.core_assets_dir is None: return False if not dir: raise ValueError("RETRO_ENVIRONMENT_GET_CORE_ASSETS_DIRECTORY doesn't accept NULL") dir[0] = self._path.core_assets_dir return True @override def _get_save_directory(self, dir: TypedPointer[c_char_p]) -> bool: if self._path is None or self._path.save_dir is None: return False if not dir: raise ValueError("RETRO_ENVIRONMENT_GET_SAVE_DIRECTORY doesn't accept NULL") dir[0] = self._path.save_dir return True @override def _set_system_av_info(self, info: TypedPointer[retro_system_av_info]) -> bool: if not info: return False # TODO: Provide a way to disable this envcall av_info = info[0] self._video.system_av_info = av_info self._audio.system_av_info = av_info self._system_av_info = deepcopy(av_info) return True @property def proc_address_callback(self) -> retro_get_proc_address_interface | None: """ Return the :c:type:`retro_get_proc_address_interface` registered by the core, if any. .. seealso:: :attr:`.EnvironmentCall.SET_PROC_ADDRESS_CALLBACK` The environment call that registers this interface. """ return self._proc_address_callback @overload def get_proc_address[T: _CFunctionType]( self, sym: str | bytes, funtype: type[T] ) -> T | None: ... @overload def get_proc_address( self, sym: str | bytes, funtype: None = None ) -> retro_proc_address_t | None: ... @overload def get_proc_address( self, sym: Literal[b"", ""], funtype: type[_CFunctionType] | None = None ) -> None: ... @overload def get_proc_address[R: _CDataType | None, **P]( self, sym: str | bytes, funtype: type[TypedFunctionPointer[R, P]] ) -> TypedFunctionPointer[R, P] | None: ...
[docs] def get_proc_address[T: _CFunctionType, R: _CDataType | None, **P]( self, sym: str | bytes, funtype: type[T] | type[TypedFunctionPointer[R, P]] | None = None ) -> retro_proc_address_t | T | TypedFunctionPointer[R, P] | None: """ Look up a function pointer the core exposed via its proc-address interface. :param sym: The name of the symbol to look up; an empty name returns ``None``. :param funtype: An optional :mod:`ctypes` function type to cast the returned pointer to; if omitted, a generic :c:type:`retro_proc_address_t` is returned. :return: The requested function pointer cast to ``funtype``, or ``None`` if the core has not registered a proc-address interface, the symbol is empty, or the lookup failed. """ if not sym: return None if not self._proc_address_callback: return None if not self._proc_address_callback.get_proc_address: return None if not (proc := self._proc_address_callback(sym)): return None if not funtype: return proc return cast(proc, funtype)
@override def _set_proc_address_callback( self, callback: TypedPointer[retro_get_proc_address_interface] ) -> bool: if not callback: self._proc_address_callback = None else: self._proc_address_callback = deepcopy(callback[0]) return True @property def subsystems(self) -> Sequence[retro_subsystem_info] | None: """ Return the subsystem info the core registered, or ``None`` if no content driver is set. .. seealso:: :attr:`.EnvironmentCall.SET_SUBSYSTEM_INFO` The environment call that registers this information. """ if self._content is None: return None return self._content.subsystem_info @override def _set_subsystem_info(self, info: TypedPointer[retro_subsystem_info]) -> bool: if self._content is None: return False if not info: raise ValueError("RETRO_ENVIRONMENT_SET_SUBSYSTEM_INFO doesn't accept NULL") self._content.subsystem_info = tuple(deepcopy(s) for s in from_zero_terminated(info)) return True @property def controller_info(self) -> Sequence[retro_controller_description] | None: """ Return the controller descriptions the core registered, or ``None`` if none were set. .. seealso:: :attr:`.EnvironmentCall.SET_CONTROLLER_INFO` The environment call that registers this information. """ return self._input.controller_info @override def _set_controller_info(self, info: TypedPointer[retro_controller_info]) -> bool: if not info: raise ValueError("RETRO_ENVIRONMENT_SET_CONTROLLER_INFO doesn't accept NULL") controller_info = info[0] array = ( deepcopy_array(controller_info.types, controller_info.num_types) if controller_info.types else None ) controller_infos = tuple(array) if array else () self._input.controller_info = controller_infos return True @property def memory_maps(self) -> retro_memory_map | None: """ Return the memory map the core registered, or ``None`` if none was set. .. seealso:: :attr:`.EnvironmentCall.SET_MEMORY_MAPS` The environment call that registers this information. """ return self._memory_maps @override def _set_memory_maps(self, maps: TypedPointer[retro_memory_map]) -> bool: if not maps: raise ValueError("RETRO_ENVIRONMENT_SET_MEMORY_MAPS doesn't accept NULL") self._memory_maps = deepcopy(maps[0]) return True @property def geometry(self) -> retro_game_geometry | None: """ Return the current frame geometry from the underlying :class:`.VideoDriver`. .. seealso:: :attr:`.EnvironmentCall.SET_GEOMETRY` The environment call that updates this value. """ return self._video.geometry @override def _set_geometry(self, geometry: TypedPointer[retro_game_geometry]) -> bool: if not geometry: raise ValueError("RETRO_ENVIRONMENT_SET_GEOMETRY doesn't accept NULL") self._video.geometry = geometry[0] return True @override def _get_username(self, username: TypedPointer[c_char_p]) -> bool: if self._user is None or self._user.username is None: return False if not username: raise ValueError("RETRO_ENVIRONMENT_GET_USERNAME doesn't accept NULL") username[0] = self._user.username return True @override def _get_language(self, language: TypedPointer[retro_language]) -> bool: if self._user is None or self._user.language is None: return False if not language: raise ValueError("RETRO_ENVIRONMENT_GET_LANGUAGE doesn't accept NULL") language[0] = self._user.language return True @override def _get_current_software_framebuffer( self, framebuffer: TypedPointer[retro_framebuffer] ) -> bool: if not framebuffer: raise ValueError( "RETRO_ENVIRONMENT_GET_CURRENT_SOFTWARE_FRAMEBUFFER doesn't accept NULL" ) core_fb = framebuffer[0] width = core_fb.width height = core_fb.height access = core_fb.access_flags fb = self._video.get_software_framebuffer(width, height, MemoryAccess(access)) if not fb: return False core_fb.data = fb.data core_fb.pitch = fb.pitch core_fb.format = fb.format core_fb.memory_flags = fb.memory_flags return True @override def _get_hw_render_interface(self, interface: TypedPointer[retro_hw_render_interface]) -> bool: if not interface: raise ValueError("RETRO_ENVIRONMENT_GET_HW_RENDER_INTERFACE doesn't accept NULL") driver_interface = self._video.hw_render_interface if not driver_interface: # This video driver doesn't provide (or need) a hardware render interface return False interface[0] = driver_interface return True @property def support_achievements(self) -> bool | None: """ Return whether the core supports achievements, or ``None`` if it has not declared. .. seealso:: :attr:`.EnvironmentCall.SET_SUPPORT_ACHIEVEMENTS` The environment call that sets this value. """ return self._supports_achievements @override def _set_support_achievements(self, support: TypedPointer[c_bool]) -> bool: if not support: raise ValueError("RETRO_ENVIRONMENT_SET_SUPPORT_ACHIEVEMENTS doesn't accept NULL") self._supports_achievements = support[0] return True @override def _set_hw_render_context_negotiation_interface( self, interface: TypedPointer[retro_hw_render_context_negotiation_interface] ) -> bool: return False # TODO: Implement @property def serialization_quirks(self) -> SerializationQuirks | None: """ Return the :class:`.SerializationQuirks` the core declared, or ``None`` if unset. .. seealso:: :attr:`.EnvironmentCall.SET_SERIALIZATION_QUIRKS` The environment call that sets these flags. """ return self._serialization_quirks @override def _set_serialization_quirks(self, quirks: TypedPointer[c_uint64]) -> bool: if not quirks: raise ValueError("RETRO_ENVIRONMENT_SET_SERIALIZATION_QUIRKS doesn't accept NULL") self._serialization_quirks = SerializationQuirks(quirks[0]) return True @property def hw_shared_context(self) -> bool: """ Return whether the underlying :class:`.VideoDriver` is configured for a shared HW context. .. seealso:: :attr:`.EnvironmentCall.SET_HW_SHARED_CONTEXT` The environment call that enables shared-context mode. """ return self._video.shared_context @override def _set_hw_shared_context(self) -> bool: self._video.shared_context = True return True @property def vfs(self) -> _Vfs: """Return the :class:`.FileSystemDriver` supplied at construction time, or :obj:`None` if absent.""" return self._vfs @override def _get_vfs_interface(self, vfs: TypedPointer[retro_vfs_interface_info]) -> bool: if self._vfs is None: return False if not vfs: raise ValueError("RETRO_ENVIRONMENT_GET_VFS_INTERFACE doesn't accept NULL") vfs_info = vfs[0] if vfs_info.required_interface_version > self._vfs.version: # If the core wants a higher version than what we offer... return False if self._vfs_interface is None: self._vfs_interface = retro_vfs_interface() if self._vfs.version >= 1: self._vfs_interface.get_path = retro_vfs_get_path_t(self._vfs_get_path) self._vfs_interface.open = retro_vfs_open_t(self._vfs_open) self._vfs_interface.close = retro_vfs_close_t(self._vfs_close) self._vfs_interface.size = retro_vfs_size_t(self._vfs_size) self._vfs_interface.tell = retro_vfs_tell_t(self._vfs_tell) self._vfs_interface.seek = retro_vfs_seek_t(self._vfs_seek) self._vfs_interface.read = retro_vfs_read_t(self._vfs_read) self._vfs_interface.write = retro_vfs_write_t(self._vfs_write) self._vfs_interface.flush = retro_vfs_flush_t(self._vfs_flush) self._vfs_interface.remove = retro_vfs_remove_t(self._vfs_remove) self._vfs_interface.rename = retro_vfs_rename_t(self._vfs_rename) if self._vfs.version >= 2: self._vfs_interface.truncate = retro_vfs_truncate_t(self._vfs_truncate) if self._vfs.version >= 3: self._vfs_interface.stat = retro_vfs_stat_t(self._vfs_stat) self._vfs_interface.mkdir = retro_vfs_mkdir_t(self._vfs_mkdir) self._vfs_interface.opendir = retro_vfs_opendir_t(self._vfs_opendir) self._vfs_interface.readdir = retro_vfs_readdir_t(self._vfs_readdir) self._vfs_interface.dirent_get_name = retro_vfs_dirent_get_name_t( self._vfs_dirent_get_name ) self._vfs_interface.dirent_is_dir = retro_vfs_dirent_is_dir_t( self._vfs_dirent_is_dir ) self._vfs_interface.closedir = retro_vfs_closedir_t(self._vfs_closedir) vfs_info.required_interface_version = self._vfs.version vfs_info.iface = pointer(self._vfs_interface) return True @_return_on_raise(typing.cast(bytes | None, None)) def _vfs_get_path(self, file: TypedPointer[retro_vfs_file_handle]) -> bytes | None: if self._vfs is None or not file: return None return self._vfs.get_path(file[0]) @_return_on_raise(0) def _vfs_open(self, path: bytes, mode: int, hints: int): if self._vfs is None or mode not in VfsFileAccess: return 0 file = self._vfs.open(path, VfsFileAccess(mode), VfsFileAccessHint(hints)) if not file: return 0 return cast(pointer(file), c_void_p).value or 0 @_return_on_raise(-1) def _vfs_close(self, file: TypedPointer[retro_vfs_file_handle]): if self._vfs is None or not file: return -1 return 0 if self._vfs.close(file[0]) else -1 @_return_on_raise(-1) def _vfs_size(self, file: TypedPointer[retro_vfs_file_handle]) -> int: if self._vfs is None or not file: return -1 return self._vfs.size(file[0]) @_return_on_raise(-1) def _vfs_truncate(self, file: TypedPointer[retro_vfs_file_handle], length: int) -> int: if self._vfs is None or not file or length < 0: return -1 return 0 if self._vfs.truncate(file[0], length) else -1 @_return_on_raise(-1) def _vfs_tell(self, file: TypedPointer[retro_vfs_file_handle]) -> int: if self._vfs is None or not file: return -1 return self._vfs.tell(file[0]) @_return_on_raise(-1) def _vfs_seek( self, file: TypedPointer[retro_vfs_file_handle], offset: int, whence: int ) -> int: if self._vfs is None or not file or whence not in VfsSeekPosition: return -1 return self._vfs.seek(file[0], offset, VfsSeekPosition(whence)) @_return_on_raise(-1) def _vfs_read( self, file: TypedPointer[retro_vfs_file_handle], data: c_void_ptr, size: int ) -> int: if self._vfs is None or not file or not data or size < 0: return -1 return self._vfs.read(file[0], memoryview_at(data, size, readonly=False)) @_return_on_raise(-1) def _vfs_write( self, file: TypedPointer[retro_vfs_file_handle], data: c_void_ptr, size: int ) -> int: if self._vfs is None or not file or not data or size < 0: return -1 return self._vfs.write(file[0], memoryview_at(data, size, readonly=True)) @_return_on_raise(-1) def _vfs_flush(self, file: TypedPointer[retro_vfs_file_handle]) -> int: if self._vfs is None or not file: return -1 return 0 if self._vfs.flush(file[0]) else -1 @_return_on_raise(-1) def _vfs_remove(self, path: bytes) -> int: if self._vfs is None or not path: return -1 return 0 if self._vfs.remove(path) else -1 @_return_on_raise(-1) def _vfs_rename(self, old_path: bytes, new_path: bytes) -> int: if self._vfs is None or not old_path or not new_path: return -1 return 0 if self._vfs.rename(old_path, new_path) else -1 @_return_on_raise(0) def _vfs_stat(self, path: bytes, size: TypedPointer[c_int32]) -> int: if self._vfs is None or not path: return 0 stat = self._vfs.stat(path) if not stat: return 0 vfs_stat, stat_size = stat if size: size[0] = stat_size return vfs_stat @_return_on_raise(-1) def _vfs_mkdir(self, path: bytes) -> int: if self._vfs is None or not path: return -1 return self._vfs.mkdir(path) @_return_on_raise(0) def _vfs_opendir(self, path: bytes, include_hidden: bool): if self._vfs is None or not path: return 0 dir = self._vfs.opendir(path, include_hidden) if not dir: return 0 return cast(pointer(dir), c_void_p).value or 0 @_return_on_raise(False) def _vfs_readdir(self, dir: TypedPointer[retro_vfs_dir_handle]) -> bool: if self._vfs is None or not dir: return False return self._vfs.readdir(dir[0]) @_return_on_raise(typing.cast(bytes | None, None)) def _vfs_dirent_get_name(self, dir: TypedPointer[retro_vfs_dir_handle]) -> bytes | None: if self._vfs is None or not dir: return None return self._vfs.dirent_get_name(dir[0]) @_return_on_raise(False) def _vfs_dirent_is_dir(self, dir: TypedPointer[retro_vfs_dir_handle]) -> bool: if self._vfs is None or not dir: return False return self._vfs.dirent_is_dir(dir[0]) @_return_on_raise(False) def _vfs_closedir(self, dir: TypedPointer[retro_vfs_dir_handle]) -> bool: if self._vfs is None or not dir: return False return self._vfs.closedir(dir[0]) @property def led(self) -> _Led: """Return the :class:`.LedDriver` supplied at construction time, or ``None`` if absent.""" return self._led @override def _get_led_interface(self, led: TypedPointer[retro_led_interface]) -> bool: if self._led is None: return False if not led: # This envcall supports passing NULL to query for support return True if not self._led_interface: self._led_interface = retro_led_interface( set_led_state=retro_set_led_state_t(self._set_led_state) ) led[0] = self._led_interface return True @_return_on_raise(None) def _set_led_state(self, led: int, state: int) -> None: if self._led is not None: self._led.set_led_state(led, state) @property def av_enable(self) -> AvEnableFlags | None: """ Return the :class:`.AvEnableFlags` the frontend exposes to the core, or ``None`` if unset. .. seealso:: :attr:`.EnvironmentCall.GET_AUDIO_VIDEO_ENABLE` The environment call that queries this value. """ return self._av_enable @av_enable.setter def av_enable(self, value: AvEnableFlags) -> None: if not isinstance(value, (int, AvEnableFlags)): raise TypeError(f"Expected AvEnableFlags, got {type(value).__name__}") self._av_enable = AvEnableFlags(value) @av_enable.deleter def av_enable(self) -> None: self._av_enable = None @override def _get_audio_video_enable(self, enable: TypedPointer[retro_av_enable_flags]) -> bool: if self._av_enable is None: return False if enable: enable[0] = self._av_enable # This envcall supports passing NULL to query for support return True # TODO: Derive this from the audio, video, and state drivers @property def midi(self) -> MidiDriver | None: """Return the :class:`.MidiDriver` supplied at construction time, or ``None`` if absent.""" return self._midi @override def _get_midi_interface(self, midi: TypedPointer[retro_midi_interface]) -> bool: if not self._midi: return False if not midi: # This envcall supports passing NULL to query for support return True if not self._midi_interface: self._midi_interface = retro_midi_interface( input_enabled=retro_midi_input_enabled_t(self._midi_input_enabled), output_enabled=retro_midi_output_enabled_t(self._midi_output_enabled), read=retro_midi_read_t(self._midi_read), write=retro_midi_write_t(self._midi_write), flush=retro_midi_flush_t(self._midi_flush), ) midi[0] = self._midi_interface return True @_return_on_raise(False) def _midi_input_enabled(self) -> bool: if self._midi is None: return False return self._midi.input_enabled @_return_on_raise(False) def _midi_output_enabled(self) -> bool: if self._midi is None: return False return self._midi.output_enabled @_return_on_raise(False) def _midi_read(self, byte: TypedPointer[c_uint8]) -> bool: if self._midi is None or not byte: return False result = self._midi.read() if result is None: return False byte[0] = result return True @_return_on_raise(False) def _midi_write(self, byte: int, delta_time: int) -> bool: if self._midi is None: return False return self._midi.write(byte, delta_time) @_return_on_raise(False) def _midi_flush(self) -> bool: if self._midi is None: return False return self._midi.flush() @override def _get_fastforwarding(self, is_fastforwarding: TypedPointer[c_bool]) -> bool: if self._timing is None: return False if self._timing.throttle_state is None: return False if not is_fastforwarding: raise ValueError("RETRO_ENVIRONMENT_GET_FASTFORWARDING doesn't accept NULL") is_fastforwarding[0] = ( ThrottleMode(self._timing.throttle_state.mode) == ThrottleMode.FAST_FORWARD ) return True @override def _get_target_refresh_rate(self, rate: TypedPointer[c_float]) -> bool: if self._timing is None or self._timing.target_refresh_rate is None: return False if not rate: raise ValueError("RETRO_ENVIRONMENT_GET_TARGET_REFRESH_RATE doesn't accept NULL") rate[0] = self._timing.target_refresh_rate return True @property def input_bitmasks(self) -> bool | None: """ Return whether the underlying :class:`.InputDriver` supports input bitmasks. .. seealso:: :attr:`.EnvironmentCall.GET_INPUT_BITMASKS` The environment call that queries this value. """ return self._input.bitmasks_supported @override def _get_input_bitmasks(self) -> bool: return bool(self._input.bitmasks_supported) @property def core_options_version(self) -> int | None: """ Return the core-options API version supported by the option driver, or ``None`` if absent. .. seealso:: :attr:`.EnvironmentCall.GET_CORE_OPTIONS_VERSION` The environment call that queries this value. """ if self._options is None: return None return self._options.version @override def _get_core_options_version(self, version: TypedPointer[c_uint]) -> bool: if self._options is None: return False if not version: raise ValueError("RETRO_ENVIRONMENT_GET_CORE_OPTIONS_VERSION doesn't accept NULL") version[0] = self._options.version return True @override def _set_core_options(self, options: TypedPointer[retro_core_option_definition]) -> bool: if self._options is None: return False if options: if self._options.version < 1: return False self._options.set_options(tuple(deepcopy(o) for o in from_zero_terminated(options))) else: self._options.set_options(None) # This envcall supports passing NULL to reset the options return True @override def _set_core_options_intl(self, options: TypedPointer[retro_core_options_intl]) -> bool: if self._options is None: return False if options: if self._options.version < 1: return False self._options.set_options_intl(options[0]) else: self._options.set_options_intl(None) # This envcall supports passing NULL to reset the options return True @override def _set_core_options_display(self, options: TypedPointer[retro_core_option_display]) -> bool: if self._options is None: return False if options: opt_display = options[0] if opt_display.key: self._options.set_display(opt_display.key, opt_display.visible) # This envcall supports passing NULL to query for support return True @property def preferred_hw_render(self) -> HardwareContext | None: """ Return the preferred :class:`.HardwareContext` exposed to the core, or ``None`` if unset. .. seealso:: :attr:`.EnvironmentCall.GET_PREFERRED_HW_RENDER` The environment call that queries this value. """ return self._preferred_hw @preferred_hw_render.setter def preferred_hw_render(self, value: HardwareContext) -> None: if not isinstance(value, (int, HardwareContext)): raise TypeError(f"Expected HardwareContext, got {type(value).__name__}") self._preferred_hw = HardwareContext(value) @preferred_hw_render.deleter def preferred_hw_render(self) -> None: self._preferred_hw = None @override def _get_preferred_hw_render(self, preferred: TypedPointer[retro_hw_context_type]) -> bool: if self._preferred_hw is None: return False if not preferred: raise ValueError("RETRO_ENVIRONMENT_GET_PREFERRED_HW_RENDER doesn't accept NULL") preferred[0] = self._preferred_hw # This envcall returns True if the frontend supports the call # *and* the frontend can switch video drivers return bool(self._driver_switch_enable) # TODO: Move RETRO_ENVIRONMENT_GET_PREFERRED_HW_RENDER to the VideoDriver and _driver_switch_enable to the VideoDriver @override def _get_disk_control_interface_version(self, version: TypedPointer[c_uint]) -> bool: return False # TODO: Implement @override def _set_disk_control_ext_interface( self, interface: TypedPointer[retro_disk_control_ext_callback] ) -> bool: return False # TODO: Implement @override def _get_message_interface_version(self, version: TypedPointer[c_uint]) -> bool: if self._message is None: return False if not version: raise ValueError("RETRO_ENVIRONMENT_GET_MESSAGE_INTERFACE_VERSION doesn't accept NULL") version[0] = self._message.version return True @override def _set_message_ext(self, message_ext: TypedPointer[retro_message_ext]) -> bool: if self._message is None: return False if not message_ext: raise ValueError("RETRO_ENVIRONMENT_SET_MESSAGE_EXT doesn't accept NULL") return self._message.set_message(message_ext[0]) @override def _get_input_max_users(self, max_users: TypedPointer[c_uint]) -> bool: if not max_users: raise ValueError("RETRO_ENVIRONMENT_GET_INPUT_MAX_USERS doesn't accept NULL") max_supported_users = self._input.max_users if max_supported_users is None: return False max_users[0] = max_supported_users return True @override def _set_audio_buffer_status_callback( self, callback: TypedPointer[retro_audio_buffer_status_callback] ) -> bool: if callback: self._audio.buffer_status = deepcopy(callback[0]) else: self._audio.buffer_status = None return True @override def _set_minimum_audio_latency(self, latency: TypedPointer[c_uint]) -> bool: if latency: self._audio.minimum_latency = latency[0] return True @override def _set_fastforwarding_override( self, override: TypedPointer[retro_fastforwarding_override] ) -> bool: if self._timing is None: return False if override: self._timing.fastforwarding_override = deepcopy(override[0]) # This envcall supports passing NULL to query for support return True @override def _set_content_info_override( self, override: TypedPointer[retro_system_content_info_override] ) -> bool: if self._content is None: return False if not override: # This envcall supports passing NULL to query for support return True self._content.overrides = tuple(deepcopy(o) for o in from_zero_terminated(override)) return True @override def _get_game_info_ext(self, info: TypedPointer[TypedPointer[retro_game_info_ext]]) -> bool: if self._content is None: return False if not info: return False info_ext = self._content.game_info_ext if info_ext is None: return False last_element = info_ext[-1] if not is_zeroed(last_element): raise ValueError( "The last element of game_info_ext must be zeroed out, or else the core may read out-of-bounds memory" ) info_ptr = cast(pointer(info_ext), TypedPointer[retro_game_info_ext]) info[0] = info_ptr return True @override def _set_core_options_v2(self, options: TypedPointer[retro_core_options_v2]) -> bool: if self._options is None: return False if self._options.version < 2: return False if options: self._options.set_options_v2(options[0]) else: self._options.set_options_v2(None) return self._options.supports_categories @override def _set_core_options_v2_intl(self, options: TypedPointer[retro_core_options_v2_intl]) -> bool: if self._options is None: return False if self._options.version < 2: return False if options: self._options.set_options_v2_intl(options[0]) else: self._options.set_options_v2_intl(None) return self._options.supports_categories @override def _set_core_options_update_display_callback( self, callback: TypedPointer[retro_core_options_update_display_callback] ) -> bool: if self._options is None: return False if callback: self._options.update_display_callback = callback[0] return True @override def _set_variable(self, variable: TypedPointer[retro_variable]) -> bool: if self._options is None: return False if not variable: # This envcall supports passing NULL to query for support return True var = variable[0] if not var.key: raise ValueError("Variable key cannot be empty") if not var.value: raise ValueError("Variable value cannot be empty") return self._options.set_variable(var.key, var.value) @override def _get_throttle_state(self, state: TypedPointer[retro_throttle_state]) -> bool: if self._timing is None or not self._timing.throttle_state: return False if not state: raise ValueError("RETRO_ENVIRONMENT_GET_THROTTLE_STATE doesn't accept NULL") state[0] = deepcopy(self._timing.throttle_state) return True @property def savestate_context(self) -> SavestateContext | None: """ Return the :class:`.SavestateContext` the frontend exposes to the core, or ``None`` if unset. .. seealso:: :attr:`.EnvironmentCall.GET_SAVESTATE_CONTEXT` The environment call that queries this value. """ return self._savestate_context @savestate_context.setter def savestate_context(self, value: SavestateContext) -> None: if not isinstance(value, (int, SavestateContext)): raise TypeError(f"Expected SavestateContext, got {type(value).__name__}") self._savestate_context = SavestateContext(value) @savestate_context.deleter def savestate_context(self) -> None: self._savestate_context = None @override def _get_savestate_context(self, context: TypedPointer[retro_savestate_context]) -> bool: if self._savestate_context is None: return False if context: context[0] = self._savestate_context # This envcall supports passing NULL to query for support return True @override def _get_hw_render_context_negotiation_interface_support( self, support: TypedPointer[retro_hw_render_context_negotiation_interface] ) -> bool: return False # TODO: Implement @property def jit_capable(self) -> bool | None: """ Return whether the frontend permits JIT compilation, or ``None`` if unset. .. seealso:: :attr:`.EnvironmentCall.GET_JIT_CAPABLE` The environment call that queries this value. """ return self._jit_capable @jit_capable.setter def jit_capable(self, value: bool) -> None: self._jit_capable = bool(value) @jit_capable.deleter def jit_capable(self) -> None: self._jit_capable = None @override def _get_jit_capable(self, capable: TypedPointer[c_bool]) -> bool: if self._jit_capable is None: return False if not capable: raise ValueError("RETRO_ENVIRONMENT_GET_JIT_CAPABLE doesn't accept NULL") capable[0] = self._jit_capable return True @property def mic(self) -> _Mic: """Return the :class:`.MicrophoneDriver` supplied at construction time, or ``None`` if absent.""" return self._mic @override def _get_microphone_interface( self, interface: TypedPointer[retro_microphone_interface] ) -> bool: if self._mic is None: return False if self._mic_interface is None: self._mic_interface = retro_microphone_interface( interface_version=self._mic.version, open_mic=retro_open_mic_t(self._open_mic), close_mic=retro_close_mic_t(self._close_mic), get_params=retro_get_mic_params_t(self._get_mic_params), set_mic_state=retro_set_mic_state_t(self._set_mic_state), get_mic_state=retro_get_mic_state_t(self._get_mic_state), read_mic=retro_read_mic_t(self._read_mic), ) if interface: target = interface[0] if target.interface_version != self._mic.version: return False target.open_mic = self._mic_interface.open_mic target.close_mic = self._mic_interface.close_mic target.get_params = self._mic_interface.get_params target.set_mic_state = self._mic_interface.set_mic_state target.get_mic_state = self._mic_interface.get_mic_state target.read_mic = self._mic_interface.read_mic # This envcall supports passing NULL to query for support return True @_return_on_raise(0) def _open_mic(self, params: TypedPointer[retro_microphone_params]): if self._mic is None: return 0 mic = self._mic.open_mic(params[0] if params else None) if not mic: return 0 return cast(pointer(mic), c_void_p).value or 0 @_return_on_raise(None) def _close_mic(self, mic: TypedPointer[retro_microphone]) -> None: if self._mic is not None and mic: self._mic.close_mic(mic[0]) @_return_on_raise(False) def _get_mic_params( self, mic: TypedPointer[retro_microphone], params: TypedPointer[retro_microphone_params], ) -> bool: if self._mic is None or not mic or not params: return False returned_params = self._mic.get_mic_params(mic[0]) if not returned_params: return False params[0] = returned_params return True @_return_on_raise(False) def _set_mic_state(self, mic: TypedPointer[retro_microphone], state: bool) -> bool: if self._mic is None or not mic: return False self._mic.set_mic_state(mic[0], state) return True @_return_on_raise(False) def _get_mic_state(self, mic: TypedPointer[retro_microphone]) -> bool: if self._mic is None or not mic: return False return self._mic.get_mic_state(mic[0]) @_return_on_raise(-1) def _read_mic( self, mic: TypedPointer[retro_microphone], buffer: TypedPointer[c_int16], frames: int ) -> int: if self._mic is None or not mic or not buffer or frames < 0: return -1 returned_frames = self._mic.read_mic(mic[0], frames) if returned_frames is None: return -1 returned_buffer, _ = returned_frames.buffer_info() memmove(buffer, returned_buffer, len(returned_frames) * returned_frames.itemsize) return len(returned_frames) @property def power(self) -> _Power: """Return the :class:`.PowerDriver` supplied at construction time, or ``None`` if absent.""" return self._device_power @override def _get_device_power(self, power: TypedPointer[retro_device_power]) -> bool: if self._device_power is None: return False if power: power[0] = self._device_power.device_power # This envcall supports passing NULL to query for support return True @override def _set_netpacket_interface(self, interface: TypedPointer[retro_netpacket_callback]) -> bool: return False # TODO: Implement @override def _get_playlist_directory(self, dir: TypedPointer[c_char_p]) -> bool: if self._path is None or self._path.playlist_dir is None: return False if not dir: raise ValueError("RETRO_ENVIRONMENT_GET_PLAYLIST_DIRECTORY doesn't accept NULL") dir[0] = self._path.playlist_dir return True
__all__ = ["CompositeEnvironmentDriver"]