mirror of
https://github.com/283375/arcaea-offline-pyside-ui.git
synced 2025-11-07 13:02:17 +00:00
369 lines
11 KiB
Python
369 lines
11 KiB
Python
import dataclasses
|
|
import logging
|
|
from enum import StrEnum
|
|
from pathlib import Path
|
|
|
|
from arcaea_offline.models import (
|
|
CalculatedPotential,
|
|
Chart,
|
|
ConfigBase,
|
|
ScoreBest,
|
|
ScoreCalculated,
|
|
ScoresBase,
|
|
ScoresViewBase,
|
|
SongsBase,
|
|
SongsViewBase,
|
|
)
|
|
from arcaea_offline.models import (
|
|
Property as AoProperty,
|
|
)
|
|
from PySide6.QtCore import Property, QObject, QUrl, Signal, Slot
|
|
from PySide6.QtQml import QmlElement
|
|
from sqlalchemy import inspect, select
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
from sqlalchemy.orm import Session
|
|
|
|
from core.database import create_engine, db_path_to_sqlite_url, sqlite_url_to_db_path
|
|
from core.settings import SettingsKeys, settings
|
|
from core.settings.values import GeneralDatabaseType
|
|
|
|
from .common import VM_QML_IMPORT_NAME
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
QML_IMPORT_NAME = VM_QML_IMPORT_NAME
|
|
QML_IMPORT_MAJOR_VERSION = 1
|
|
QML_IMPORT_MINOR_VERSION = 0
|
|
|
|
|
|
class _FmwiwsDatabase:
|
|
"""Fuck Me Why I Wrote Singleton Database"""
|
|
|
|
def __init__(self, url: str):
|
|
self.engine = create_engine(url)
|
|
|
|
def init(self, *, checkfirst: bool = True):
|
|
# create tables & views
|
|
if checkfirst:
|
|
# > https://github.com/kvesteri/sqlalchemy-utils/issues/396
|
|
# > view.create_view() causes DuplicateTableError on
|
|
# > Base.metadata.create_all(checkfirst=True)
|
|
# so if `checkfirst` is True, drop these views before creating
|
|
SongsViewBase.metadata.drop_all(self.engine)
|
|
ScoresViewBase.metadata.drop_all(self.engine)
|
|
|
|
SongsBase.metadata.create_all(self.engine, checkfirst=checkfirst)
|
|
SongsViewBase.metadata.create_all(self.engine)
|
|
ScoresBase.metadata.create_all(self.engine, checkfirst=checkfirst)
|
|
ScoresViewBase.metadata.create_all(self.engine)
|
|
ConfigBase.metadata.create_all(self.engine, checkfirst=checkfirst)
|
|
|
|
version_property = AoProperty(key="version", value="4")
|
|
with Session(self.engine) as session:
|
|
session.merge(version_property)
|
|
session.commit()
|
|
|
|
def is_initialized(self):
|
|
expect_tables = (
|
|
list(SongsBase.metadata.tables.keys())
|
|
+ list(ScoresBase.metadata.tables.keys())
|
|
+ list(ConfigBase.metadata.tables.keys())
|
|
+ [
|
|
Chart.__tablename__,
|
|
ScoreCalculated.__tablename__,
|
|
ScoreBest.__tablename__,
|
|
CalculatedPotential.__tablename__,
|
|
]
|
|
)
|
|
|
|
return all(inspect(self.engine).has_table(t) for t in expect_tables)
|
|
|
|
def version(self):
|
|
with Session(self.engine) as session:
|
|
stmt = select(AoProperty.value).where(AoProperty.key == "version")
|
|
result = session.scalar(stmt)
|
|
|
|
return None if result is None else int(result)
|
|
|
|
|
|
class _UiMode(StrEnum):
|
|
SELECT = "select"
|
|
CREATE = "create"
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class DatabaseInfo:
|
|
url: str
|
|
error: Exception | None = None
|
|
initialized: bool | None = None
|
|
version: int | None = None
|
|
|
|
@property
|
|
def error_dict(self):
|
|
e = self.error
|
|
|
|
if e is None:
|
|
return None
|
|
|
|
return {
|
|
"title": e.__class__.__name__,
|
|
"message": str(e),
|
|
}
|
|
|
|
def to_qml_dict(self):
|
|
return {
|
|
"url": self.url,
|
|
"error": self.error_dict,
|
|
"initialized": self.initialized,
|
|
"version": self.version,
|
|
}
|
|
|
|
|
|
@QmlElement
|
|
class DatabaseInitViewModel(QObject):
|
|
_void = Signal()
|
|
uiModeChanged = Signal()
|
|
selectFileUrlChanged = Signal()
|
|
directoryUrlChanged = Signal()
|
|
filenameChanged = Signal()
|
|
databaseUrlChanged = Signal()
|
|
databaseInfoChanged = Signal()
|
|
canContinueChanged = Signal()
|
|
canConfirmSilentlyChanged = Signal()
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
self._selectFileUrl: QUrl | None = None
|
|
self._directoryUrl: QUrl = QUrl()
|
|
self._filename: str = ""
|
|
self._databaseInfo: DatabaseInfo | None = None
|
|
|
|
self._uiMode = _UiMode.SELECT
|
|
|
|
self.directoryUrlChanged.connect(lambda: self.databaseUrlChanged.emit())
|
|
self.filenameChanged.connect(lambda: self.databaseUrlChanged.emit())
|
|
self.selectFileUrlChanged.connect(self.databaseUrlChanged)
|
|
self.databaseInfoChanged.connect(self.canContinueChanged)
|
|
|
|
self.databaseUrlChanged.connect(self.onDatabaseUrlChanged)
|
|
|
|
self._loadSettings()
|
|
|
|
def onDatabaseUrlChanged(self):
|
|
self.loadDatabaseInfo()
|
|
|
|
@property
|
|
def _settingsDatabaseFile(self) -> Path | None:
|
|
# TODO: process database type when available
|
|
if (
|
|
settings.stringValue(SettingsKeys.General.DatabaseType)
|
|
!= GeneralDatabaseType.FILE
|
|
):
|
|
return None
|
|
|
|
file = settings.stringValue(SettingsKeys.General.DatabaseConn)
|
|
|
|
if not file:
|
|
logger.debug("No database file specified in settings")
|
|
return
|
|
|
|
filepath = Path(file)
|
|
if not filepath.exists():
|
|
logger.warning("Cannot find database file: %s", file)
|
|
return
|
|
|
|
return filepath
|
|
|
|
def _loadSettings(self) -> None:
|
|
fileUrl = self._settingsDatabaseFile
|
|
if fileUrl is None:
|
|
return
|
|
logger.info("Loading database from settings: %s", fileUrl)
|
|
|
|
self.setUiMode(_UiMode.SELECT)
|
|
self.setSelectFileUrl(fileUrl)
|
|
self.loadDatabaseInfo()
|
|
|
|
def _makeDatabaseInfo(self, dbUrl: str):
|
|
info = DatabaseInfo(url=dbUrl)
|
|
|
|
path = sqlite_url_to_db_path(dbUrl)
|
|
if not path.exists():
|
|
e = FileNotFoundError()
|
|
e.strerror = f"{path} does not exist"
|
|
info.error = e
|
|
return info
|
|
|
|
try:
|
|
db = _FmwiwsDatabase(dbUrl)
|
|
info.initialized = db.is_initialized()
|
|
info.version = db.version()
|
|
except SQLAlchemyError as e:
|
|
logger.exception("Error loading database info")
|
|
info.error = e
|
|
|
|
logger.debug("Database info for %r: %r", dbUrl, info)
|
|
return info
|
|
|
|
@Slot()
|
|
def loadDatabaseInfo(self):
|
|
dbUrl = self.getDatabaseUrl()
|
|
logger.info("Loading database info: %s", dbUrl)
|
|
if dbUrl is None:
|
|
logger.warning("Database URL is None")
|
|
return
|
|
|
|
self._databaseInfo = self._makeDatabaseInfo(dbUrl.toString())
|
|
self.databaseInfoChanged.emit()
|
|
|
|
@Slot(str)
|
|
def createFile(self, dbUrl: str):
|
|
file = sqlite_url_to_db_path(dbUrl)
|
|
|
|
if file.exists():
|
|
logger.warning(
|
|
"Attempted to create an existing file, check UI logic? (%s)", file
|
|
)
|
|
return
|
|
|
|
file.touch(mode=0o644)
|
|
logger.info("Created file %s", file)
|
|
|
|
@Slot()
|
|
def confirmCurrentConnection(self):
|
|
dbInfo = self._databaseInfo
|
|
if dbInfo is None:
|
|
logger.warning("Current database info is None, ignoring")
|
|
return
|
|
|
|
settings.setValue(
|
|
SettingsKeys.General.DatabaseType,
|
|
GeneralDatabaseType.FILE.value,
|
|
)
|
|
settings.setValue(
|
|
SettingsKeys.General.DatabaseConn,
|
|
str(sqlite_url_to_db_path(dbInfo.url).resolve().as_posix()),
|
|
)
|
|
|
|
@Slot(str)
|
|
def initialize(self, dbUrl: str):
|
|
try:
|
|
db = _FmwiwsDatabase(dbUrl)
|
|
db.init()
|
|
except SQLAlchemyError:
|
|
logger.exception("Error initializing database %s", dbUrl)
|
|
|
|
# region properties
|
|
def getUiMode(self):
|
|
return self._uiMode.value
|
|
|
|
def setUiMode(self, mode: str | _UiMode):
|
|
if isinstance(mode, _UiMode):
|
|
self._uiMode = mode
|
|
elif isinstance(mode, str):
|
|
try:
|
|
self._uiMode = _UiMode(mode)
|
|
except ValueError:
|
|
logger.warning("Invalid UI mode: %s", mode)
|
|
|
|
self.uiModeChanged.emit()
|
|
|
|
def getSelectFileUrl(self):
|
|
return self._selectFileUrl
|
|
|
|
def setSelectFileUrl(self, value: Path | QUrl | None):
|
|
if isinstance(value, Path):
|
|
value = QUrl.fromLocalFile(value.as_posix())
|
|
|
|
self._selectFileUrl = value
|
|
self.selectFileUrlChanged.emit()
|
|
|
|
def getDirectoryUrl(self):
|
|
return self._directoryUrl
|
|
|
|
def setDirectoryUrl(self, value: QUrl | None):
|
|
self._directoryUrl = value or QUrl()
|
|
self.directoryUrlChanged.emit()
|
|
|
|
def getFilename(self):
|
|
return self._filename
|
|
|
|
def setFilename(self, value: str | None):
|
|
self._filename = value or ""
|
|
self.filenameChanged.emit()
|
|
|
|
def getDatabaseUrl(self):
|
|
if self._uiMode == _UiMode.SELECT:
|
|
fileUrl = self.getSelectFileUrl()
|
|
if fileUrl is None:
|
|
return None
|
|
return db_path_to_sqlite_url(Path(fileUrl.toLocalFile()))
|
|
|
|
directoryUrl = self.getDirectoryUrl()
|
|
filename = self.getFilename()
|
|
|
|
databasePath = Path(directoryUrl.toLocalFile()) / filename
|
|
databaseUrl = db_path_to_sqlite_url(databasePath)
|
|
|
|
return databaseUrl
|
|
|
|
def getDatabaseInfo(self):
|
|
if self._databaseInfo is None:
|
|
return {
|
|
"url": "",
|
|
"initialized": None,
|
|
"version": None,
|
|
"error": None,
|
|
}
|
|
|
|
return self._databaseInfo.to_qml_dict()
|
|
|
|
def getCanContinue(self):
|
|
return (
|
|
self._databaseInfo is not None
|
|
and self._databaseInfo.error is None
|
|
and self._databaseInfo.version == 4 # noqa: PLR2004
|
|
and self._databaseInfo.initialized
|
|
)
|
|
|
|
def getCanConfirmSilently(self):
|
|
"""Whether the user can confirm database connection without a dialog popping up"""
|
|
if self.getUiMode() != _UiMode.SELECT:
|
|
return False
|
|
|
|
filepath = self._settingsDatabaseFile
|
|
if filepath is None:
|
|
return False
|
|
|
|
return (
|
|
self._databaseInfo is not None
|
|
and self._databaseInfo.error is None
|
|
and self.getDatabaseUrl() == db_path_to_sqlite_url(filepath)
|
|
)
|
|
|
|
uiMode = Property(str, getUiMode, setUiMode, notify=uiModeChanged)
|
|
selectFileUrl = Property(
|
|
QUrl,
|
|
getSelectFileUrl,
|
|
setSelectFileUrl,
|
|
notify=selectFileUrlChanged,
|
|
)
|
|
directoryUrl = Property(
|
|
QUrl,
|
|
getDirectoryUrl,
|
|
setDirectoryUrl,
|
|
notify=directoryUrlChanged,
|
|
)
|
|
filename = Property(str, getFilename, setFilename, notify=filenameChanged)
|
|
databaseUrl = Property(QUrl, getDatabaseUrl, None, notify=databaseUrlChanged)
|
|
databaseInfo = Property(dict, getDatabaseInfo, None, notify=databaseInfoChanged)
|
|
canContinue = Property(bool, getCanContinue, None, notify=canContinueChanged)
|
|
canConfirmSilently = Property(
|
|
bool,
|
|
getCanConfirmSilently,
|
|
None,
|
|
notify=canConfirmSilentlyChanged,
|
|
)
|
|
# endregion
|