mirror of
https://github.com/283375/arcaea-offline-pyside-ui.git
synced 2025-11-07 04:52:15 +00:00
wip: database checker
This commit is contained in:
2
ui/viewmodels/__init__.py
Normal file
2
ui/viewmodels/__init__.py
Normal file
@ -0,0 +1,2 @@
|
||||
from .databaseInit import DatabaseInitViewModel
|
||||
from .overview import OverviewViewModel
|
||||
1
ui/viewmodels/common.py
Normal file
1
ui/viewmodels/common.py
Normal file
@ -0,0 +1 @@
|
||||
VM_QML_IMPORT_NAME = "internal.ui.vm"
|
||||
368
ui/viewmodels/databaseInit.py
Normal file
368
ui/viewmodels/databaseInit.py
Normal file
@ -0,0 +1,368 @@
|
||||
import dataclasses
|
||||
import logging
|
||||
from enum import StrEnum
|
||||
from pathlib import Path
|
||||
|
||||
from arcaea_offline.models import (
|
||||
CalculatedPotential,
|
||||
Chart,
|
||||
ConfigBase,
|
||||
ScoreBest,
|
||||
ScoreCalculated,
|
||||
ScoresBase,
|
||||
ScoresViewBase,
|
||||
SongsBase,
|
||||
SongsViewBase,
|
||||
)
|
||||
from arcaea_offline.models import (
|
||||
Property as AoProperty,
|
||||
)
|
||||
from PySide6.QtCore import Property, QObject, QUrl, Signal, Slot
|
||||
from PySide6.QtQml import QmlElement
|
||||
from sqlalchemy import inspect, select
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from core.database import create_engine, db_path_to_sqlite_url, sqlite_url_to_db_path
|
||||
from core.settings import SettingsKeys, settings
|
||||
from core.settings.values import GeneralDatabaseType
|
||||
|
||||
from .common import VM_QML_IMPORT_NAME
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
QML_IMPORT_NAME = VM_QML_IMPORT_NAME
|
||||
QML_IMPORT_MAJOR_VERSION = 1
|
||||
QML_IMPORT_MINOR_VERSION = 0
|
||||
|
||||
|
||||
class _FmwiwsDatabase:
|
||||
"""Fuck Me Why I Wrote Singleton Database"""
|
||||
|
||||
def __init__(self, url: str):
|
||||
self.engine = create_engine(url)
|
||||
|
||||
def init(self, *, checkfirst: bool = True):
|
||||
# create tables & views
|
||||
if checkfirst:
|
||||
# > https://github.com/kvesteri/sqlalchemy-utils/issues/396
|
||||
# > view.create_view() causes DuplicateTableError on
|
||||
# > Base.metadata.create_all(checkfirst=True)
|
||||
# so if `checkfirst` is True, drop these views before creating
|
||||
SongsViewBase.metadata.drop_all(self.engine)
|
||||
ScoresViewBase.metadata.drop_all(self.engine)
|
||||
|
||||
SongsBase.metadata.create_all(self.engine, checkfirst=checkfirst)
|
||||
SongsViewBase.metadata.create_all(self.engine)
|
||||
ScoresBase.metadata.create_all(self.engine, checkfirst=checkfirst)
|
||||
ScoresViewBase.metadata.create_all(self.engine)
|
||||
ConfigBase.metadata.create_all(self.engine, checkfirst=checkfirst)
|
||||
|
||||
version_property = AoProperty(key="version", value="4")
|
||||
with Session(self.engine) as session:
|
||||
session.merge(version_property)
|
||||
session.commit()
|
||||
|
||||
def is_initialized(self):
|
||||
expect_tables = (
|
||||
list(SongsBase.metadata.tables.keys())
|
||||
+ list(ScoresBase.metadata.tables.keys())
|
||||
+ list(ConfigBase.metadata.tables.keys())
|
||||
+ [
|
||||
Chart.__tablename__,
|
||||
ScoreCalculated.__tablename__,
|
||||
ScoreBest.__tablename__,
|
||||
CalculatedPotential.__tablename__,
|
||||
]
|
||||
)
|
||||
|
||||
return all(inspect(self.engine).has_table(t) for t in expect_tables)
|
||||
|
||||
def version(self):
|
||||
with Session(self.engine) as session:
|
||||
stmt = select(AoProperty.value).where(AoProperty.key == "version")
|
||||
result = session.scalar(stmt)
|
||||
|
||||
return None if result is None else int(result)
|
||||
|
||||
|
||||
class _UiMode(StrEnum):
|
||||
SELECT = "select"
|
||||
CREATE = "create"
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class DatabaseInfo:
|
||||
url: str
|
||||
error: Exception | None = None
|
||||
initialized: bool | None = None
|
||||
version: int | None = None
|
||||
|
||||
@property
|
||||
def error_dict(self):
|
||||
e = self.error
|
||||
|
||||
if e is None:
|
||||
return None
|
||||
|
||||
return {
|
||||
"title": e.__class__.__name__,
|
||||
"message": str(e),
|
||||
}
|
||||
|
||||
def to_qml_dict(self):
|
||||
return {
|
||||
"url": self.url,
|
||||
"error": self.error_dict,
|
||||
"initialized": self.initialized,
|
||||
"version": self.version,
|
||||
}
|
||||
|
||||
|
||||
@QmlElement
|
||||
class DatabaseInitViewModel(QObject):
|
||||
_void = Signal()
|
||||
uiModeChanged = Signal()
|
||||
selectFileUrlChanged = Signal()
|
||||
directoryUrlChanged = Signal()
|
||||
filenameChanged = Signal()
|
||||
databaseUrlChanged = Signal()
|
||||
databaseInfoChanged = Signal()
|
||||
canContinueChanged = Signal()
|
||||
canConfirmSilentlyChanged = Signal()
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
self._selectFileUrl: QUrl | None = None
|
||||
self._directoryUrl: QUrl = QUrl()
|
||||
self._filename: str = ""
|
||||
self._databaseInfo: DatabaseInfo | None = None
|
||||
|
||||
self._uiMode = _UiMode.SELECT
|
||||
|
||||
self.directoryUrlChanged.connect(lambda: self.databaseUrlChanged.emit())
|
||||
self.filenameChanged.connect(lambda: self.databaseUrlChanged.emit())
|
||||
self.selectFileUrlChanged.connect(self.databaseUrlChanged)
|
||||
self.databaseInfoChanged.connect(self.canContinueChanged)
|
||||
|
||||
self.databaseUrlChanged.connect(self.onDatabaseUrlChanged)
|
||||
|
||||
self._loadSettings()
|
||||
|
||||
def onDatabaseUrlChanged(self):
|
||||
self.loadDatabaseInfo()
|
||||
|
||||
@property
|
||||
def _settingsDatabaseFile(self) -> Path | None:
|
||||
# TODO: process database type when available
|
||||
if (
|
||||
settings.stringValue(SettingsKeys.General.DatabaseType)
|
||||
!= GeneralDatabaseType.FILE
|
||||
):
|
||||
return None
|
||||
|
||||
file = settings.stringValue(SettingsKeys.General.DatabaseConn)
|
||||
|
||||
if not file:
|
||||
logger.debug("No database file specified in settings")
|
||||
return
|
||||
|
||||
filepath = Path(file)
|
||||
if not filepath.exists():
|
||||
logger.warning("Cannot find database file: %s", file)
|
||||
return
|
||||
|
||||
return filepath
|
||||
|
||||
def _loadSettings(self) -> None:
|
||||
fileUrl = self._settingsDatabaseFile
|
||||
if fileUrl is None:
|
||||
return
|
||||
logger.info("Loading database from settings: %s", fileUrl)
|
||||
|
||||
self.setUiMode(_UiMode.SELECT)
|
||||
self.setSelectFileUrl(fileUrl)
|
||||
self.loadDatabaseInfo()
|
||||
|
||||
def _makeDatabaseInfo(self, dbUrl: str):
|
||||
info = DatabaseInfo(url=dbUrl)
|
||||
|
||||
path = sqlite_url_to_db_path(dbUrl)
|
||||
if not path.exists():
|
||||
e = FileNotFoundError()
|
||||
e.strerror = f"{path} does not exist"
|
||||
info.error = e
|
||||
return info
|
||||
|
||||
try:
|
||||
db = _FmwiwsDatabase(dbUrl)
|
||||
info.initialized = db.is_initialized()
|
||||
info.version = db.version()
|
||||
except SQLAlchemyError as e:
|
||||
logger.exception("Error loading database info")
|
||||
info.error = e
|
||||
|
||||
logger.debug("Database info for %r: %r", dbUrl, info)
|
||||
return info
|
||||
|
||||
@Slot()
|
||||
def loadDatabaseInfo(self):
|
||||
dbUrl = self.getDatabaseUrl()
|
||||
logger.info("Loading database info: %s", dbUrl)
|
||||
if dbUrl is None:
|
||||
logger.warning("Database URL is None")
|
||||
return
|
||||
|
||||
self._databaseInfo = self._makeDatabaseInfo(dbUrl.toString())
|
||||
self.databaseInfoChanged.emit()
|
||||
|
||||
@Slot(str)
|
||||
def createFile(self, dbUrl: str):
|
||||
file = sqlite_url_to_db_path(dbUrl)
|
||||
|
||||
if file.exists():
|
||||
logger.warning(
|
||||
"Attempted to create an existing file, check UI logic? (%s)", file
|
||||
)
|
||||
return
|
||||
|
||||
file.touch(mode=0o644)
|
||||
logger.info("Created file %s", file)
|
||||
|
||||
@Slot()
|
||||
def confirmCurrentConnection(self):
|
||||
dbInfo = self._databaseInfo
|
||||
if dbInfo is None:
|
||||
logger.warning("Current database info is None, ignoring")
|
||||
return
|
||||
|
||||
settings.setValue(
|
||||
SettingsKeys.General.DatabaseType,
|
||||
GeneralDatabaseType.FILE.value,
|
||||
)
|
||||
settings.setValue(
|
||||
SettingsKeys.General.DatabaseConn,
|
||||
str(sqlite_url_to_db_path(dbInfo.url).resolve().as_posix()),
|
||||
)
|
||||
|
||||
@Slot(str)
|
||||
def initialize(self, dbUrl: str):
|
||||
try:
|
||||
db = _FmwiwsDatabase(dbUrl)
|
||||
db.init()
|
||||
except SQLAlchemyError:
|
||||
logger.exception("Error initializing database %s", dbUrl)
|
||||
|
||||
# region properties
|
||||
def getUiMode(self):
|
||||
return self._uiMode.value
|
||||
|
||||
def setUiMode(self, mode: str | _UiMode):
|
||||
if isinstance(mode, _UiMode):
|
||||
self._uiMode = mode
|
||||
elif isinstance(mode, str):
|
||||
try:
|
||||
self._uiMode = _UiMode(mode)
|
||||
except ValueError:
|
||||
logger.warning("Invalid UI mode: %s", mode)
|
||||
|
||||
self.uiModeChanged.emit()
|
||||
|
||||
def getSelectFileUrl(self):
|
||||
return self._selectFileUrl
|
||||
|
||||
def setSelectFileUrl(self, value: Path | QUrl | None):
|
||||
if isinstance(value, Path):
|
||||
value = QUrl.fromLocalFile(value.as_posix())
|
||||
|
||||
self._selectFileUrl = value
|
||||
self.selectFileUrlChanged.emit()
|
||||
|
||||
def getDirectoryUrl(self):
|
||||
return self._directoryUrl
|
||||
|
||||
def setDirectoryUrl(self, value: QUrl | None):
|
||||
self._directoryUrl = value or QUrl()
|
||||
self.directoryUrlChanged.emit()
|
||||
|
||||
def getFilename(self):
|
||||
return self._filename
|
||||
|
||||
def setFilename(self, value: str | None):
|
||||
self._filename = value or ""
|
||||
self.filenameChanged.emit()
|
||||
|
||||
def getDatabaseUrl(self):
|
||||
if self._uiMode == _UiMode.SELECT:
|
||||
fileUrl = self.getSelectFileUrl()
|
||||
if fileUrl is None:
|
||||
return None
|
||||
return db_path_to_sqlite_url(Path(fileUrl.toLocalFile()))
|
||||
|
||||
directoryUrl = self.getDirectoryUrl()
|
||||
filename = self.getFilename()
|
||||
|
||||
databasePath = Path(directoryUrl.toLocalFile()) / filename
|
||||
databaseUrl = db_path_to_sqlite_url(databasePath)
|
||||
|
||||
return databaseUrl
|
||||
|
||||
def getDatabaseInfo(self):
|
||||
if self._databaseInfo is None:
|
||||
return {
|
||||
"url": "",
|
||||
"initialized": None,
|
||||
"version": None,
|
||||
"error": None,
|
||||
}
|
||||
|
||||
return self._databaseInfo.to_qml_dict()
|
||||
|
||||
def getCanContinue(self):
|
||||
return (
|
||||
self._databaseInfo is not None
|
||||
and self._databaseInfo.error is None
|
||||
and self._databaseInfo.version == 4 # noqa: PLR2004
|
||||
and self._databaseInfo.initialized
|
||||
)
|
||||
|
||||
def getCanConfirmSilently(self):
|
||||
"""Whether the user can confirm database connection without a dialog popping up"""
|
||||
if self.getUiMode() != _UiMode.SELECT:
|
||||
return False
|
||||
|
||||
filepath = self._settingsDatabaseFile
|
||||
if filepath is None:
|
||||
return False
|
||||
|
||||
return (
|
||||
self._databaseInfo is not None
|
||||
and self._databaseInfo.error is None
|
||||
and self.getDatabaseUrl() == db_path_to_sqlite_url(filepath)
|
||||
)
|
||||
|
||||
uiMode = Property(str, getUiMode, setUiMode, notify=uiModeChanged)
|
||||
selectFileUrl = Property(
|
||||
QUrl,
|
||||
getSelectFileUrl,
|
||||
setSelectFileUrl,
|
||||
notify=selectFileUrlChanged,
|
||||
)
|
||||
directoryUrl = Property(
|
||||
QUrl,
|
||||
getDirectoryUrl,
|
||||
setDirectoryUrl,
|
||||
notify=directoryUrlChanged,
|
||||
)
|
||||
filename = Property(str, getFilename, setFilename, notify=filenameChanged)
|
||||
databaseUrl = Property(QUrl, getDatabaseUrl, None, notify=databaseUrlChanged)
|
||||
databaseInfo = Property(dict, getDatabaseInfo, None, notify=databaseInfoChanged)
|
||||
canContinue = Property(bool, getCanContinue, None, notify=canContinueChanged)
|
||||
canConfirmSilently = Property(
|
||||
bool,
|
||||
getCanConfirmSilently,
|
||||
None,
|
||||
notify=canConfirmSilentlyChanged,
|
||||
)
|
||||
# endregion
|
||||
21
ui/viewmodels/overview.py
Normal file
21
ui/viewmodels/overview.py
Normal file
@ -0,0 +1,21 @@
|
||||
from PySide6.QtCore import Property, QObject, Signal
|
||||
from PySide6.QtQml import QmlElement
|
||||
|
||||
from .common import VM_QML_IMPORT_NAME
|
||||
|
||||
QML_IMPORT_NAME = VM_QML_IMPORT_NAME
|
||||
QML_IMPORT_MAJOR_VERSION = 1
|
||||
QML_IMPORT_MINOR_VERSION = 0
|
||||
|
||||
|
||||
@QmlElement
|
||||
class OverviewViewModel(QObject):
|
||||
_void = Signal()
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def get_b30(self):
|
||||
return 0
|
||||
|
||||
b30 = Property(float, get_b30, None, notify=_void)
|
||||
Reference in New Issue
Block a user