Source code for libretro.drivers.vfs.default

"""
Default :class:`.FileSystemDriver` implementation backed by :mod:`os` and :mod:`io`.

.. seealso::

    :class:`.FileSystemDriver`
        The protocol this driver implements.
"""

from __future__ import annotations

import os
import stat
from io import FileIO
from os import DirEntry
from typing import Literal, override

from libretro.api.vfs import (
    VfsFileAccess,
    VfsFileAccessHint,
    VfsMkdirResult,
    VfsSeekPosition,
    VfsStat,
    retro_vfs_dir_handle,
    retro_vfs_file_handle,
)

from .driver import DirectoryHandle, FileHandle, FileSystemDriver


[docs] class StandardFileHandle(FileHandle): """Default :class:`.FileHandle` implementation backed by :class:`io.FileIO`."""
[docs] @override def __init__(self, path: bytes, mode: VfsFileAccess, hints: VfsFileAccessHint): """ Open the file at ``path`` using :class:`io.FileIO`. :param path: Path of the file to open, encoded as :class:`bytes`. :param mode: Access mode flags controlling read/write behavior. :param hints: Hints describing the intended access pattern. :raises ValueError: If ``path`` is empty. :raises OSError: If the file cannot be opened. """ self._file: FileIO | None = None if not path: raise ValueError("Expected a non-empty path") self._path = path self._file = FileIO(path, mode.open_flag) self._handle = retro_vfs_file_handle(self._file.fileno(), path, mode, hints)
[docs] def __del__(self): """Close the underlying file when this handle is garbage-collected.""" self.close()
[docs] @override def close(self) -> bool: if self._file is not None: self._file.close() self._file = None return True
@property @override def path(self) -> bytes: if not self._file: raise IOError("File is closed") return self._path @property @override def size(self) -> int: if not self._file: raise IOError("File is closed") stat = os.stat(self._file.fileno()) return stat.st_size
[docs] @override def tell(self) -> int: if not self._file: raise IOError("File is closed") return self._file.tell()
[docs] @override def seek(self, offset: int, whence: VfsSeekPosition) -> int: if not self._file: raise IOError("File is closed") return self._file.seek(offset, whence)
[docs] @override def read(self, buffer: bytearray | memoryview) -> int: if not self._file: raise IOError("File is closed") return self._file.readinto(buffer)
[docs] @override def write(self, buffer: bytes | bytearray | memoryview) -> int: if not self._file: raise IOError("File is closed") return self._file.write(buffer)
[docs] @override def flush(self) -> bool: if not self._file: raise IOError("File is closed") self._file.flush() os.fsync(self._file.fileno()) return True
[docs] @override def truncate(self, length: int) -> bool: if not self._file: raise IOError("File is closed") self._file.truncate(length) return True
@property def fileno(self) -> int: """ Return the OS-level file descriptor backing this handle. :return: The integer file descriptor of the underlying :class:`io.FileIO`. :raises IOError: If the file is closed. """ if not self._file: raise IOError("File is closed") return self._file.fileno() @property def vfs_handle(self) -> retro_vfs_file_handle: """ Return the :class:`.retro_vfs_file_handle` that represents this file in the VFS. :return: The opaque VFS handle associated with this file. """ return self._handle
[docs] class StandardDirectoryHandle(DirectoryHandle): """Default :class:`.DirectoryHandle` implementation backed by :func:`os.scandir`."""
[docs] @override def __init__(self, path: bytes, include_hidden: bool): """ Open the directory at ``path`` for iteration via :func:`os.scandir`. :param path: Path of the directory to open, encoded as :class:`bytes`. :param include_hidden: Whether hidden entries should be returned during iteration. :raises OSError: If the directory cannot be opened. """ self._scandir = os.scandir(path) self._dirent: DirEntry[bytes] | None = None self._include_hidden = include_hidden
[docs] def __del__(self): """Close the underlying scan iterator when this handle is garbage-collected.""" self.closedir()
[docs] @override def readdir(self) -> bool: if not self._scandir: raise IOError("Directory is closed") # TODO: If include_hidden is False, # keep iterating until we find a non-hidden entry or reach the end of the directory self._dirent = next(self._scandir, None) return self._dirent is not None
@property @override def dirent_name(self) -> bytes | None: if not self._scandir: raise IOError("Directory is closed") if not self._dirent: return None return self._dirent.name @property @override def dirent_is_dir(self) -> bool: if not self._scandir: raise IOError("Directory is closed") if not self._dirent: raise ValueError("No directory entry available") return self._dirent.is_dir()
[docs] @override def closedir(self) -> bool: if self._scandir: self._scandir.close() del self._scandir self._scandir = None return True
[docs] class DefaultFileSystemDriver(FileSystemDriver): """ Default :class:`.FileSystemDriver` implementation backed by :mod:`os` and :mod:`io`. Open files and directories are tracked internally by their VFS handle ID so that they can be looked up when cores invoke the corresponding VFS callbacks. """ _file_handles: dict[int, StandardFileHandle] _dir_handles: dict[int, StandardDirectoryHandle]
[docs] def __init__(self, version: Literal[1, 2, 3] = 3): """ Initialize the driver and declare which VFS interface version it advertises. :param version: The VFS interface version to report via :attr:`version`; must be 1, 2, or 3. Defaults to 3. """ self._version = version self._file_handles = {} self._dir_handles = {}
[docs] @override def get_path(self, stream: retro_vfs_file_handle) -> bytes | None: fileno = stream.id file = self._file_handles.get(fileno, None) if not file: return None return file.path
[docs] @override def open( self, path: bytes, mode: VfsFileAccess, hints: VfsFileAccessHint ) -> retro_vfs_file_handle | None: try: file = StandardFileHandle(path, mode, hints) self._file_handles[file.fileno] = file return file.vfs_handle except FileNotFoundError: return None
[docs] @override def close(self, stream: retro_vfs_file_handle) -> bool: fileno = stream.id file = self._file_handles.pop(fileno, None) if not file: return False return file.close()
[docs] @override def size(self, stream: retro_vfs_file_handle) -> int: fileno = stream.id file = self._file_handles.get(fileno, None) if not file: return -1 return file.size
[docs] @override def truncate(self, stream: retro_vfs_file_handle, length: int) -> bool: handle = stream.id file = self._file_handles.get(handle) if not file: return False return file.truncate(length)
[docs] @override def tell(self, stream: retro_vfs_file_handle) -> int: handle = stream.id file = self._file_handles.get(handle) if not file: return -1 return file.tell()
[docs] @override def seek(self, stream: retro_vfs_file_handle, offset: int, whence: VfsSeekPosition) -> int: handle = stream.id file = self._file_handles.get(handle) if not file: return -1 return file.seek(offset, whence)
[docs] @override def read(self, stream: retro_vfs_file_handle, buffer: memoryview[int]) -> int: handle = stream.id file = self._file_handles.get(handle) if not file: return -1 return file.read(buffer)
[docs] @override def write(self, stream: retro_vfs_file_handle, buffer: memoryview[int]) -> int: handle = stream.id file = self._file_handles.get(handle) if not file: return -1 return file.write(buffer)
[docs] @override def flush(self, stream: retro_vfs_file_handle) -> bool: handle = stream.id file = self._file_handles.get(handle) if not file: return False return file.flush()
[docs] @override def remove(self, path: bytes) -> bool: os.remove(path) return True
[docs] @override def rename(self, old_path: bytes, new_path: bytes) -> bool: os.rename(old_path, new_path) return True
[docs] @override def stat(self, path: bytes) -> tuple[VfsStat, int] | None: try: filestat = os.stat(path) flags = VfsStat(0) if stat.S_ISREG(filestat.st_mode): flags |= VfsStat.IS_VALID if stat.S_ISDIR(filestat.st_mode): flags |= VfsStat.IS_DIRECTORY if stat.S_ISCHR(filestat.st_mode): flags |= VfsStat.IS_CHARACTER_SPECIAL return flags, filestat.st_size except FileNotFoundError: return None
[docs] @override def mkdir(self, path: bytes) -> VfsMkdirResult: try: os.mkdir(path) return VfsMkdirResult.SUCCESS except FileExistsError: return VfsMkdirResult.ALREADY_EXISTS except OSError: return VfsMkdirResult.ERROR
[docs] @override def opendir(self, path: bytes, include_hidden: bool) -> retro_vfs_dir_handle | None: dir_handle = StandardDirectoryHandle(path, include_hidden) handle = id(dir_handle) self._dir_handles[handle] = dir_handle return retro_vfs_dir_handle(handle, path, include_hidden)
[docs] @override def readdir(self, dir: retro_vfs_dir_handle) -> bool: handle = dir.id dir_handle = self._dir_handles.get(handle) if not dir_handle: return False return dir_handle.readdir()
[docs] @override def dirent_get_name(self, dir: retro_vfs_dir_handle) -> bytes | None: handle = dir.id dir_handle = self._dir_handles.get(handle) if not dir_handle: return None return dir_handle.dirent_name
[docs] @override def dirent_is_dir(self, dir: retro_vfs_dir_handle) -> bool: handle = dir.id dir_handle = self._dir_handles.get(handle) if not dir_handle: return False return dir_handle.dirent_is_dir
[docs] @override def closedir(self, dir: retro_vfs_dir_handle) -> bool: handle = dir.id dir_handle = self._dir_handles.pop(handle, None) if not dir_handle: return False return dir_handle.closedir()
@property @override def version(self) -> int: return self._version
__all__ = [ "StandardFileHandle", "StandardDirectoryHandle", "DefaultFileSystemDriver", ]