"""
Default :class:`.FileSystemDriver` implementation backed by :mod:`os` and :mod:`io`.
.. seealso::
:class:`.FileSystemDriver`
The protocol this driver implements.
"""
from __future__ import annotations
import os
import stat
from io import FileIO
from os import DirEntry
from typing import Literal, override
from libretro.api.vfs import (
VfsFileAccess,
VfsFileAccessHint,
VfsMkdirResult,
VfsSeekPosition,
VfsStat,
retro_vfs_dir_handle,
retro_vfs_file_handle,
)
from .driver import DirectoryHandle, FileHandle, FileSystemDriver
[docs]
class StandardFileHandle(FileHandle):
"""Default :class:`.FileHandle` implementation backed by :class:`io.FileIO`."""
[docs]
@override
def __init__(self, path: bytes, mode: VfsFileAccess, hints: VfsFileAccessHint):
"""
Open the file at ``path`` using :class:`io.FileIO`.
:param path: Path of the file to open, encoded as :class:`bytes`.
:param mode: Access mode flags controlling read/write behavior.
:param hints: Hints describing the intended access pattern.
:raises ValueError: If ``path`` is empty.
:raises OSError: If the file cannot be opened.
"""
self._file: FileIO | None = None
if not path:
raise ValueError("Expected a non-empty path")
self._path = path
self._file = FileIO(path, mode.open_flag)
self._handle = retro_vfs_file_handle(self._file.fileno(), path, mode, hints)
[docs]
def __del__(self):
"""Close the underlying file when this handle is garbage-collected."""
self.close()
[docs]
@override
def close(self) -> bool:
if self._file is not None:
self._file.close()
self._file = None
return True
@property
@override
def path(self) -> bytes:
if not self._file:
raise IOError("File is closed")
return self._path
@property
@override
def size(self) -> int:
if not self._file:
raise IOError("File is closed")
stat = os.stat(self._file.fileno())
return stat.st_size
[docs]
@override
def tell(self) -> int:
if not self._file:
raise IOError("File is closed")
return self._file.tell()
[docs]
@override
def seek(self, offset: int, whence: VfsSeekPosition) -> int:
if not self._file:
raise IOError("File is closed")
return self._file.seek(offset, whence)
[docs]
@override
def read(self, buffer: bytearray | memoryview) -> int:
if not self._file:
raise IOError("File is closed")
return self._file.readinto(buffer)
[docs]
@override
def write(self, buffer: bytes | bytearray | memoryview) -> int:
if not self._file:
raise IOError("File is closed")
return self._file.write(buffer)
[docs]
@override
def flush(self) -> bool:
if not self._file:
raise IOError("File is closed")
self._file.flush()
os.fsync(self._file.fileno())
return True
[docs]
@override
def truncate(self, length: int) -> bool:
if not self._file:
raise IOError("File is closed")
self._file.truncate(length)
return True
@property
def fileno(self) -> int:
"""
Return the OS-level file descriptor backing this handle.
:return: The integer file descriptor of the underlying :class:`io.FileIO`.
:raises IOError: If the file is closed.
"""
if not self._file:
raise IOError("File is closed")
return self._file.fileno()
@property
def vfs_handle(self) -> retro_vfs_file_handle:
"""
Return the :class:`.retro_vfs_file_handle` that represents this file in the VFS.
:return: The opaque VFS handle associated with this file.
"""
return self._handle
[docs]
class StandardDirectoryHandle(DirectoryHandle):
"""Default :class:`.DirectoryHandle` implementation backed by :func:`os.scandir`."""
[docs]
@override
def __init__(self, path: bytes, include_hidden: bool):
"""
Open the directory at ``path`` for iteration via :func:`os.scandir`.
:param path: Path of the directory to open, encoded as :class:`bytes`.
:param include_hidden: Whether hidden entries should be returned during iteration.
:raises OSError: If the directory cannot be opened.
"""
self._scandir = os.scandir(path)
self._dirent: DirEntry[bytes] | None = None
self._include_hidden = include_hidden
[docs]
def __del__(self):
"""Close the underlying scan iterator when this handle is garbage-collected."""
self.closedir()
[docs]
@override
def readdir(self) -> bool:
if not self._scandir:
raise IOError("Directory is closed")
# TODO: If include_hidden is False,
# keep iterating until we find a non-hidden entry or reach the end of the directory
self._dirent = next(self._scandir, None)
return self._dirent is not None
@property
@override
def dirent_name(self) -> bytes | None:
if not self._scandir:
raise IOError("Directory is closed")
if not self._dirent:
return None
return self._dirent.name
@property
@override
def dirent_is_dir(self) -> bool:
if not self._scandir:
raise IOError("Directory is closed")
if not self._dirent:
raise ValueError("No directory entry available")
return self._dirent.is_dir()
[docs]
@override
def closedir(self) -> bool:
if self._scandir:
self._scandir.close()
del self._scandir
self._scandir = None
return True
[docs]
class DefaultFileSystemDriver(FileSystemDriver):
"""
Default :class:`.FileSystemDriver` implementation backed by :mod:`os` and :mod:`io`.
Open files and directories are tracked internally by their VFS handle ID so
that they can be looked up when cores invoke the corresponding VFS callbacks.
"""
_file_handles: dict[int, StandardFileHandle]
_dir_handles: dict[int, StandardDirectoryHandle]
[docs]
def __init__(self, version: Literal[1, 2, 3] = 3):
"""
Initialize the driver and declare which VFS interface version it advertises.
:param version: The VFS interface version to report via :attr:`version`;
must be 1, 2, or 3. Defaults to 3.
"""
self._version = version
self._file_handles = {}
self._dir_handles = {}
[docs]
@override
def get_path(self, stream: retro_vfs_file_handle) -> bytes | None:
fileno = stream.id
file = self._file_handles.get(fileno, None)
if not file:
return None
return file.path
[docs]
@override
def open(
self, path: bytes, mode: VfsFileAccess, hints: VfsFileAccessHint
) -> retro_vfs_file_handle | None:
try:
file = StandardFileHandle(path, mode, hints)
self._file_handles[file.fileno] = file
return file.vfs_handle
except FileNotFoundError:
return None
[docs]
@override
def close(self, stream: retro_vfs_file_handle) -> bool:
fileno = stream.id
file = self._file_handles.pop(fileno, None)
if not file:
return False
return file.close()
[docs]
@override
def size(self, stream: retro_vfs_file_handle) -> int:
fileno = stream.id
file = self._file_handles.get(fileno, None)
if not file:
return -1
return file.size
[docs]
@override
def truncate(self, stream: retro_vfs_file_handle, length: int) -> bool:
handle = stream.id
file = self._file_handles.get(handle)
if not file:
return False
return file.truncate(length)
[docs]
@override
def tell(self, stream: retro_vfs_file_handle) -> int:
handle = stream.id
file = self._file_handles.get(handle)
if not file:
return -1
return file.tell()
[docs]
@override
def seek(self, stream: retro_vfs_file_handle, offset: int, whence: VfsSeekPosition) -> int:
handle = stream.id
file = self._file_handles.get(handle)
if not file:
return -1
return file.seek(offset, whence)
[docs]
@override
def read(self, stream: retro_vfs_file_handle, buffer: memoryview[int]) -> int:
handle = stream.id
file = self._file_handles.get(handle)
if not file:
return -1
return file.read(buffer)
[docs]
@override
def write(self, stream: retro_vfs_file_handle, buffer: memoryview[int]) -> int:
handle = stream.id
file = self._file_handles.get(handle)
if not file:
return -1
return file.write(buffer)
[docs]
@override
def flush(self, stream: retro_vfs_file_handle) -> bool:
handle = stream.id
file = self._file_handles.get(handle)
if not file:
return False
return file.flush()
[docs]
@override
def remove(self, path: bytes) -> bool:
os.remove(path)
return True
[docs]
@override
def rename(self, old_path: bytes, new_path: bytes) -> bool:
os.rename(old_path, new_path)
return True
[docs]
@override
def stat(self, path: bytes) -> tuple[VfsStat, int] | None:
try:
filestat = os.stat(path)
flags = VfsStat(0)
if stat.S_ISREG(filestat.st_mode):
flags |= VfsStat.IS_VALID
if stat.S_ISDIR(filestat.st_mode):
flags |= VfsStat.IS_DIRECTORY
if stat.S_ISCHR(filestat.st_mode):
flags |= VfsStat.IS_CHARACTER_SPECIAL
return flags, filestat.st_size
except FileNotFoundError:
return None
[docs]
@override
def mkdir(self, path: bytes) -> VfsMkdirResult:
try:
os.mkdir(path)
return VfsMkdirResult.SUCCESS
except FileExistsError:
return VfsMkdirResult.ALREADY_EXISTS
except OSError:
return VfsMkdirResult.ERROR
[docs]
@override
def opendir(self, path: bytes, include_hidden: bool) -> retro_vfs_dir_handle | None:
dir_handle = StandardDirectoryHandle(path, include_hidden)
handle = id(dir_handle)
self._dir_handles[handle] = dir_handle
return retro_vfs_dir_handle(handle, path, include_hidden)
[docs]
@override
def readdir(self, dir: retro_vfs_dir_handle) -> bool:
handle = dir.id
dir_handle = self._dir_handles.get(handle)
if not dir_handle:
return False
return dir_handle.readdir()
[docs]
@override
def dirent_get_name(self, dir: retro_vfs_dir_handle) -> bytes | None:
handle = dir.id
dir_handle = self._dir_handles.get(handle)
if not dir_handle:
return None
return dir_handle.dirent_name
[docs]
@override
def dirent_is_dir(self, dir: retro_vfs_dir_handle) -> bool:
handle = dir.id
dir_handle = self._dir_handles.get(handle)
if not dir_handle:
return False
return dir_handle.dirent_is_dir
[docs]
@override
def closedir(self, dir: retro_vfs_dir_handle) -> bool:
handle = dir.id
dir_handle = self._dir_handles.pop(handle, None)
if not dir_handle:
return False
return dir_handle.closedir()
@property
@override
def version(self) -> int:
return self._version
__all__ = [
"StandardFileHandle",
"StandardDirectoryHandle",
"DefaultFileSystemDriver",
]