"""
The standard content driver that loads game files from disk or memory.
.. seealso::
:mod:`libretro.api.content`
Defines the game info structures and content loading types this driver handles.
"""
from __future__ import annotations
import os
from collections.abc import Buffer, Generator, Sequence
from contextlib import AbstractContextManager, ExitStack, contextmanager
from ctypes import Array, c_void_p
from os import PathLike
from tempfile import TemporaryDirectory
from typing import override
from zipfile import Path as ZipPath
from libretro.api import (
Content,
ContentInfoOverrides,
SubsystemContent,
Subsystems,
get_extension,
retro_game_info,
retro_game_info_ext,
retro_subsystem_info,
retro_subsystem_rom_info,
retro_system_content_info_override,
retro_system_info,
)
from libretro.api._utils import addressof_buffer, mmap_file
from .driver import (
ContentAttributes,
ContentDriver,
ContentError,
LoadedContent,
LoadedContentFile,
)
# TODO: Make this a context manager
class _PersistentBuffer:
def __init__(
self, ptr: c_void_p | None, backing: AbstractContextManager[memoryview[int]] | None
):
self.ptr = ptr
self.backing = backing
def __del__(self):
self.close()
def close(self) -> None:
if self.backing:
self.backing.__exit__(None, None, None)
del self.backing
self.backing = None
if self.ptr:
del self.ptr
self.ptr = None
[docs]
class StandardContentDriver(ContentDriver):
"""A :class:`.ContentDriver` that loads content from files on disk or from memory buffers."""
[docs]
def __init__(self, enable_extended_info: bool = True):
"""
:param enable_extended_info: If :obj:`True`, the driver will provide
extended content info to the core via ``RETRO_ENVIRONMENT_GET_GAME_INFO_EXT``.
"""
self._subsystems: Subsystems | None = None
self._overrides: ContentInfoOverrides | None = None
self._content: Sequence[retro_game_info] | None = None
self._content_ext: Array[retro_game_info_ext] | None = None
self._system_info: retro_system_info | None = None
self._support_no_game: bool | None = None
self._enable_extended_info = bool(enable_extended_info)
self._persistent_buffers: set[_PersistentBuffer] = set()
[docs]
def __del__(self):
"""Clean up any persistent buffers that haven't already been cleaned up."""
self._persistent_buffers.clear()
@property
@override
def enable_extended_info(self) -> bool:
return self._enable_extended_info
@enable_extended_info.setter
def enable_extended_info(self, value: bool) -> None:
self._enable_extended_info = bool(value)
@property
@override
def game_info_ext(self) -> Array[retro_game_info_ext] | None:
if not self._enable_extended_info:
return None
if not self._content:
return None
return self._content_ext
[docs]
@contextmanager
@override
def load(self, content: Content | SubsystemContent | None) -> Generator[LoadedContent]:
if not self._system_info:
raise RuntimeError("System info not set")
with ExitStack() as stack:
# We may be loading several files, each of which needs its own context manager
# So we use ExitStack to manage all of their lives at once
match content:
case SubsystemContent() if not self.subsystem_info:
raise RuntimeError(
"Subsystem content was provided, but the core didn't register support for subsystems"
)
case SubsystemContent(info=info):
subsystem = self.__get_subsystem(content)
if len(info) != len(subsystem):
raise ValueError(
f"Subsystem {subsystem.ident!r} needs exactly {len(subsystem)} ROMs, got {len(info)}"
)
loaded_content = [
stack.enter_context(self._load(c, (subsystem, subsystem[i])))
for (i, c) in enumerate(info)
]
case (
ZipPath()
| str()
| PathLike()
| bytes()
| bytearray()
| Buffer()
| retro_game_info()
):
subsystem = None
loaded_content = [stack.enter_context(self._load(content))]
case None if self.support_no_game:
subsystem = None
loaded_content = []
case None:
raise ValueError(
"No content provided and core did not register support for no-content mode"
)
case _:
raise TypeError(
f"Expected a content path, data buffer, SubsystemContent, or retro_game_info; got {type(content).__name__}"
)
# Now that we've loaded all the content, let's create the extended info array
content_ext_type = retro_game_info_ext * len(loaded_content)
self._content_ext = content_ext_type()
for i, lc in enumerate(loaded_content):
self._content_ext[i] = lc.info_ext or retro_game_info_ext()
# Now we hand off the loaded content to retro_load_game...
yield subsystem, loaded_content
# ...and now that retro_load_game has finished, let's clean up the loaded content
# (but persistent buffers will be kept open in self._persistent_buffers)
del self._content_ext
del loaded_content
@contextmanager
def _load(
self,
content: Content,
selected_subsystem: tuple[retro_subsystem_info, retro_subsystem_rom_info] | None = None,
) -> Generator[LoadedContentFile]:
"""
:param content: The content to load
:param subsystem: The subsystem we're using for this session, if any
:param subsysrom: The descriptor for the ROM type we're using, if any
"""
assert self._system_info is not None
if selected_subsystem and not self._subsystems:
raise RuntimeError("Subsystem info was provided, but the core didn't register support")
_, subsysrom = selected_subsystem or (None, None)
ext = get_extension(content)
if ext is not None:
if not selected_subsystem and ext not in self._system_info.extensions:
raise ContentError(f"Content extension '{ext!r}' is not supported by the system")
if selected_subsystem and ext not in selected_subsystem[1].extensions:
# If we're trying to load an inapplicable subsystem ROM...
desc = selected_subsystem[1].desc or "unknown"
ident = selected_subsystem[0].ident or "unknown"
raise ContentError(
f"Content extension '{ext!r}' is not supported by the {desc!r} ROM of subsystem {ident!r}"
)
attributes = ContentAttributes(
block_extract=self.__should_block_extract(ext, subsysrom),
persistent_data=self.__is_data_persistent(ext),
need_fullpath=self.__needs_fullpath(ext, subsysrom),
required=self.__is_required(ext, subsysrom),
)
def _make_game_info_ext(info: retro_game_info) -> retro_game_info_ext:
# The frontend can lie to the core and say it extracted the content from an archive
path, sep, archive_file = (
info.path.partition(b"#") if info.path else (None, None, None)
)
# path will be the content path if content is not in an archive
# path will be the archive path if content *is* in an archive
file_in_archive = bool(sep and archive_file)
dir = os.path.dirname(path) if path else None
name = os.path.basename(path) if path else None
return retro_game_info_ext(
full_path=info.path if not file_in_archive else None,
archive_path=path or None, # Will be None if file_in_archive is False
archive_file=archive_file or None, # Will be None if file_in_archive is False
dir=dir, # Will be the content dirname if not an archive
name=name,
ext=ext.lower() if ext else None,
meta=info.meta,
data=info.data,
size=info.size,
file_in_archive=file_in_archive,
persistent_data=attributes.persistent_data,
)
loaded_info: retro_game_info | None = None
loaded_info_ext: retro_game_info_ext | None = None
match content, attributes:
# For test cases that create a retro_game_info manually.
case retro_game_info(path=None), ContentAttributes(need_fullpath=True):
# If trying to use a manually-created game info that needs a full path, but didn't give one...
raise ValueError("Core needs a full path, but none was provided")
case retro_game_info(data=None), ContentAttributes(need_fullpath=False):
# If trying to use a manually-created game info that doesn't need a full path, but didn't give data...
raise ValueError(
"Core needs retro_game_info to include data, but none was provided"
)
case retro_game_info(path=None, data=None), _:
raise ValueError("Core needs a full path or data, but neither was provided")
case retro_game_info() as info, ContentAttributes(persistent_data=persistent_data):
loaded_info = info
loaded_info_ext = _make_game_info_ext(info)
if persistent_data:
self._persistent_buffers.add(_PersistentBuffer(info.data, None))
# Give the loaded content to the environment
yield LoadedContentFile(loaded_info, loaded_info_ext)
case ZipPath() as zippath, ContentAttributes(
need_fullpath=True, block_extract=False, persistent_data=False
):
# If the core needs a full path...
with TemporaryDirectory() as tmp:
tmpfile = zippath.root.extract(zippath.at, tmp)
loaded_info = retro_game_info(os.fsencode(tmpfile), None, 0, None)
loaded_info_ext = _make_game_info_ext(loaded_info)
yield LoadedContentFile(loaded_info, loaded_info_ext)
case ZipPath() as zippath, ContentAttributes(need_fullpath=True, block_extract=True):
# If the core needs a full path and we're blocking extraction...
raise ContentError(
f"Cannot extract {zippath}; core requires a full path, but block_extract is enabled"
)
case ZipPath() as zippath, ContentAttributes(
need_fullpath=False
): # TODO: Is block_extract significant here?
path = f"{zippath.filename}#{zippath.name}".encode()
data = zippath.read_bytes()
loaded_info = retro_game_info(path, addressof_buffer(data), len(data), None)
loaded_info_ext = _make_game_info_ext(loaded_info)
if attributes.persistent_data:
self._persistent_buffers.add(_PersistentBuffer(loaded_info.data, None))
yield LoadedContentFile(loaded_info, loaded_info_ext)
# For test cases that provide content by path
case (str() | PathLike()) as path, ContentAttributes(need_fullpath=True):
loaded_info = retro_game_info(os.fsencode(path), None, 0, None)
loaded_info_ext = _make_game_info_ext(loaded_info)
yield LoadedContentFile(loaded_info, loaded_info_ext)
# There's no data to persist, so no cleanup needed
case (str() | PathLike()) as path, ContentAttributes(persistent_data=False):
with mmap_file(path) as view:
loaded_info = retro_game_info(
os.fsencode(path), addressof_buffer(view), len(view), None
)
loaded_info_ext = _make_game_info_ext(loaded_info)
yield LoadedContentFile(loaded_info, loaded_info_ext)
# Content is not persistent, so just let the with statement clean up the view
case (str() | PathLike()) as path, ContentAttributes(persistent_data=True):
context = mmap_file(path)
view = context.__enter__()
loaded_info = retro_game_info(
os.fsencode(path), addressof_buffer(view), len(view), None
)
loaded_info_ext = _make_game_info_ext(loaded_info)
self._persistent_buffers.add(
_PersistentBuffer(c_void_p(addressof_buffer(view)), context)
)
yield LoadedContentFile(loaded_info, loaded_info_ext)
# Content is persistent, so the view (and backing file) will be cleaned up in __del__ later
# For test cases that provide ROM data directly
case bytes() | bytearray() | memoryview() | Buffer(), ContentAttributes(
need_fullpath=True
):
raise ValueError("Core requires a full path, but only raw data was provided")
case Buffer() | bytes() | bytearray() | memoryview(), ContentAttributes(
persistent_data=persistent_data
):
buffer = memoryview(content)
loaded_info = retro_game_info(None, addressof_buffer(buffer), buffer.nbytes, None)
loaded_info_ext = _make_game_info_ext(loaded_info)
if persistent_data:
self._persistent_buffers.add(_PersistentBuffer(loaded_info.data, None))
yield LoadedContentFile(loaded_info, loaded_info_ext)
# For test cases that provide no content for certain subsystems
case None, ContentAttributes(required=False):
# Optional subsystem content that isn't provided should be repesented as zeroed-out retro_game_infos
# (Optional *regular* content is handled up in load())
yield LoadedContentFile(retro_game_info(), retro_game_info_ext())
case None, _:
raise ContentError(
"No content provided and core did not indicate support for no game."
)
case e, _:
raise TypeError(f"Unexpected content type: {type(e).__name__}")
if not attributes.persistent_data:
if loaded_info:
loaded_info.data = None
if loaded_info_ext:
loaded_info_ext.data = None
@property
@override
def system_info(self) -> retro_system_info | None:
return self._system_info
@system_info.setter
@override
def system_info(self, info: retro_system_info | None) -> None:
self._system_info = info
@property
@override
def subsystem_info(self) -> Subsystems | None:
return self._subsystems
@subsystem_info.setter
@override
def subsystem_info(self, subsystems: Sequence[retro_subsystem_info] | None) -> None:
self._subsystems = Subsystems(subsystems) if subsystems else None
@property
@override
def support_no_game(self) -> bool | None:
return self._support_no_game
@support_no_game.setter
@override
def support_no_game(self, support: bool) -> None:
self._support_no_game = support
@property
@override
def overrides(self) -> ContentInfoOverrides | None:
return self._overrides
@overrides.setter
@override
def overrides(self, overrides: Sequence[retro_system_content_info_override] | None) -> None:
self._overrides = ContentInfoOverrides(overrides) if overrides else None
def __needs_fullpath(
self, ext: bytes | None, subsysrom: retro_subsystem_rom_info | None = None
) -> bool:
assert self._system_info is not None
assert ext is None or isinstance(ext, bytes)
assert subsysrom is None or isinstance(subsysrom, retro_subsystem_rom_info)
# These params should've been validated by the caller
match ext, subsysrom, self._overrides:
case None, _, _:
# If loading any content from in-memory...
return False
case bytes(), _, ContentInfoOverrides() as overrides if ext in overrides:
# If loading a content file with a specially-treated extension...
overrides: ContentInfoOverrides
return overrides[ext].need_fullpath
case bytes(), None, _ if ext.removeprefix(b".") in self._system_info.extensions:
# If loading a regular content file with no relevant overrides...
return self._system_info.need_fullpath
case bytes(), None, _:
# No overrides were found, and the extension's not in the system info.
raise ValueError(
f"Can't determine if regular content extension '{ext!r}' needs a full path (it's not in the overrides or system info)"
)
case bytes(), retro_subsystem_rom_info(), _:
# If loading subsystem content with no relevant overrides, and the active subsystem is given directly...
return subsysrom.need_fullpath
case _, _, _:
raise ValueError(
f"Can't determine if subsystem content extension '{ext!r}' needs a full path"
)
def __is_data_persistent(self, ext: bytes | None) -> bool:
assert self._system_info is not None
assert ext is None or isinstance(ext, bytes)
# These params should've been validated by the caller
match ext, self._overrides:
case None, _:
# If loading any content from in-memory...
return True
case bytes(ext), ContentInfoOverrides() as overrides if ext in overrides:
# If loading a content file with a specially-treated extension...
return overrides[ext].persistent_data
case bytes(ext), _ if ext in self._system_info.extensions:
# If loading a regular content file with no relevant overrides...
return False # Regular content is not guaranteed to be persistent
case _, _:
raise ValueError(
f"Can't determine if subsystem content extension '{ext!r}' has persistent data"
)
def __should_block_extract(
self, ext: bytes | None, subsysrom: retro_subsystem_rom_info | None = None
) -> bool:
assert self._system_info is not None
assert ext is None or isinstance(ext, bytes)
assert subsysrom is None or isinstance(subsysrom, retro_subsystem_rom_info)
# These params should've been validated by the caller
# NOTE: retro_content_info_override does not have block_extract
match ext, subsysrom:
case (None, _) | (_, None):
# If loading any content from in-memory or if not using a subsystem...
return self._system_info.block_extract
case bytes(), retro_subsystem_rom_info():
# If loading a subsystem ROM...
return subsysrom.block_extract
case _, _:
raise ValueError(
f"Can't determine if subsystem content extension '{ext!r}' should block extraction"
)
def __is_required(
self, ext: bytes | None, subsysrom: retro_subsystem_rom_info | None = None
) -> bool:
assert self._system_info is not None
assert ext is None or isinstance(ext, bytes)
assert subsysrom is None or isinstance(subsysrom, retro_subsystem_rom_info)
# These params should've been validated by the caller
# NOTE: retro_content_info_override and retro_system_info do not have required,
# but retro_system_info does use RETRO_ENVIRONMENT_SET_SUPPORT_NO_GAME
match ext, subsysrom:
case (None, _) | (_, None):
# If loading any content from in-memory or if not using a subsystem...
return not self._support_no_game
case bytes(), retro_subsystem_rom_info():
# If loading a subsystem ROM...
return subsysrom.required
case _, _:
raise ValueError(
f"Can't determine if subsystem content extension '{ext!r}' is required"
)
def __get_subsystem(self, content: SubsystemContent) -> retro_subsystem_info:
if not isinstance(content, SubsystemContent):
raise ValueError(f"Expected a SubsystemContent, got {type(content)}")
if not self.subsystem_info:
raise RuntimeError(
"Subsystem content was provided, but core did not register subsystems"
)
match content.game_type:
case int(id):
for s in self.subsystem_info:
if s.id == id:
return s
raise ValueError(f"Core did not register a subsystem with a numeric ID of {id}")
case str(ident) | bytes(ident):
return self.subsystem_info[ident]
case e:
raise TypeError(
f"Expected a subsystem identifier of types int, str, or bytes; got {type(e).__name__}"
)
__all__ = [
"StandardContentDriver",
]