refactor: external sources update

This commit is contained in:
283375 2023-08-26 00:12:28 +08:00
parent 73fd563de6
commit de8efbeaf1
Signed by: 283375
SSH Key Fingerprint: SHA256:UcX0qg6ZOSDOeieKPGokA5h7soykG61nz2uxuQgVLSk
10 changed files with 274 additions and 63 deletions

View File

@ -0,0 +1,2 @@
from .packlist import PacklistParser
from .songlist import SonglistDifficultiesParser, SonglistParser

View File

@ -0,0 +1,55 @@
import json
from os import PathLike
from typing import Any, List, Optional, Union
from sqlalchemy import Engine
from sqlalchemy.orm import DeclarativeBase, Session
def to_db_value(val: Any) -> Any:
if not val:
return None
elif isinstance(val, list):
return json.dumps(val, ensure_ascii=False)
else:
return val
def is_localized(item: dict, key: str, append_localized: bool = True):
item_key = f"{key}_localized" if append_localized else key
subitem: Optional[dict] = item.get(item_key)
return subitem and (
subitem.get("ja")
or subitem.get("ko")
or subitem.get("zh-Hant")
or subitem.get("zh-Hans")
)
def set_model_localized_attrs(
model: DeclarativeBase, item: dict, model_key: str, item_key: Optional[str] = None
):
if item_key is None:
item_key = f"{model_key}_localized"
subitem: dict = item.get(item_key, {})
if not subitem:
return
setattr(model, f"{model_key}_ja", to_db_value(subitem.get("ja")))
setattr(model, f"{model_key}_ko", to_db_value(subitem.get("ko")))
setattr(model, f"{model_key}_zh_hans", to_db_value(subitem.get("zh-Hans")))
setattr(model, f"{model_key}_zh_hant", to_db_value(subitem.get("zh-Hant")))
class ArcaeaParser:
def __init__(self, filepath: Union[str, bytes, PathLike]):
self.filepath = filepath
def parse(self) -> List[DeclarativeBase]:
...
def write_database(self, engine: Engine):
with Session(engine) as session:
results = self.parse()
for result in results:
session.merge(result)
session.commit()

View File

@ -0,0 +1,31 @@
import json
from typing import List, Union
from ...models.songs import Pack, PackLocalized
from .common import ArcaeaParser, is_localized, set_model_localized_attrs
class PacklistParser(ArcaeaParser):
def __init__(self, filepath):
super().__init__(filepath)
def parse(self) -> List[Union[Pack, PackLocalized]]:
with open(self.filepath, "r", encoding="utf-8") as pl_f:
packlist_json_root = json.loads(pl_f.read())
packlist_json = packlist_json_root["packs"]
results = []
for item in packlist_json:
pack = Pack()
pack.id = item["id"]
pack.name = item["name_localized"]["en"]
pack.description = item["description_localized"]["en"] or None
results.append(pack)
if is_localized(item, "name") or is_localized(item, "description"):
pack_localized = PackLocalized(id=pack.id)
set_model_localized_attrs(pack_localized, item, "name")
set_model_localized_attrs(pack_localized, item, "description")
results.append(pack_localized)
return results

View File

@ -0,0 +1,104 @@
import json
from typing import List, Union
from ...models.songs import Chart, ChartLocalized, Song, SongLocalized
from .common import ArcaeaParser, is_localized, set_model_localized_attrs, to_db_value
class SonglistParser(ArcaeaParser):
def __init__(self, filepath):
super().__init__(filepath)
def parse(self) -> List[Union[Song, SongLocalized, Chart, ChartLocalized]]:
with open(self.filepath, "r", encoding="utf-8") as sl_f:
songlist_json_root = json.loads(sl_f.read())
songlist_json = songlist_json_root["songs"]
results = []
for item in songlist_json:
song = Song()
song.idx = item["idx"]
song.id = item["id"]
song.title = item["title_localized"]["en"]
song.artist = item["artist"]
song.bpm = item["bpm"]
song.bpm_base = item["bpm_base"]
song.set = item["set"]
song.audio_preview = item["audioPreview"]
song.audio_preview_end = item["audioPreviewEnd"]
song.side = item["side"]
song.version = item["version"]
song.date = item["date"]
song.bg = to_db_value(item.get("bg"))
song.bg_inverse = to_db_value(item.get("bg_inverse"))
if item.get("bg_daynight"):
song.bg_day = to_db_value(item["bg_daynight"].get("day"))
song.bg_night = to_db_value(item["bg_daynight"].get("night"))
if item.get("source_localized"):
song.source = item["source_localized"]["en"]
song.source_copyright = to_db_value(item.get("source_copyright"))
results.append(song)
if (
is_localized(item, "title")
or is_localized(item, "search_title", append_localized=False)
or is_localized(item, "search_artist", append_localized=False)
or is_localized(item, "source")
):
song_localized = SongLocalized(id=song.id)
set_model_localized_attrs(song_localized, item, "title")
set_model_localized_attrs(
song_localized, item, "search_title", "search_title"
)
set_model_localized_attrs(
song_localized, item, "search_artist", "search_artist"
)
set_model_localized_attrs(song_localized, item, "source")
results.append(song_localized)
return results
class SonglistDifficultiesParser(ArcaeaParser):
def __init__(self, filepath):
self.filepath = filepath
def parse(self) -> List[Union[Chart, ChartLocalized]]:
with open(self.filepath, "r", encoding="utf-8") as sl_f:
songlist_json_root = json.loads(sl_f.read())
songlist_json = songlist_json_root["songs"]
results = []
for song_item in songlist_json:
if not song_item.get("difficulties"):
continue
for item in song_item["difficulties"]:
chart = Chart(song_id=song_item["id"])
chart.rating_class = item["ratingClass"]
chart.rating = item["rating"]
chart.rating_plus = item.get("ratingPlus") or False
chart.chart_designer = item["chartDesigner"]
chart.jacket_desginer = item.get("jacketDesigner") or None
chart.audio_override = item.get("audioOverride") or False
chart.jacket_override = item.get("jacketOverride") or False
chart.jacket_night = item.get("jacketNight") or None
chart.title = item.get("title_localized", {}).get("en") or None
chart.artist = item.get("artist") or None
chart.bg = item.get("bg") or None
chart.bg_inverse = item.get("bg_inverse")
chart.bpm = item.get("bpm") or None
chart.bpm_base = item.get("bpm_base") or None
chart.version = item.get("version") or None
chart.date = item.get("date") or None
results.append(chart)
if is_localized(item, "title") or is_localized(item, "artist"):
chart_localized = ChartLocalized(
song_id=chart.song_id, rating_class=chart.rating_class
)
set_model_localized_attrs(chart_localized, item, "title")
set_model_localized_attrs(chart_localized, item, "artist")
results.append(chart_localized)
return results

View File

@ -0,0 +1,44 @@
import sqlite3
from typing import List
from ...models.scores import Score
from .common import ArcaeaParser
class St3ScoreParser(ArcaeaParser):
CLEAR_TYPES_MAP = {0: -1, 1: 1, 2: 1, 3: 1, 4: 1, 5: 1}
def __init__(self, filepath):
super().__init__(filepath)
def get_score_items(self) -> List[Score]:
items = []
with sqlite3.connect(self.filepath) as st3_conn:
cursor = st3_conn.cursor()
db_scores = cursor.execute(
"SELECT songId, songDifficulty, score, perfectCount, nearCount, missCount, date FROM scores"
).fetchall()
for song_id, rating_class, score, pure, far, lost, date in db_scores:
db_clear_type = cursor.execute(
"SELECT clearType FROM cleartypes WHERE songId = ? AND songDifficulty = ?",
(song_id, rating_class),
).fetchone()[0]
r10_clear_type = self.CLEAR_TYPES_MAP[db_clear_type]
date_str = str(date)
date = None if len(date_str) < 7 else int(date_str.ljust(10, "0"))
items.append(
Score(
song_id=song_id,
rating_class=rating_class,
score=score,
pure=pure,
far=far,
lost=lost,
date=date,
r10_clear_type=r10_clear_type,
)
)
return items

View File

@ -0,0 +1 @@
from .arcsong_db import ArcsongDbParser

View File

@ -0,0 +1,37 @@
import sqlite3
from typing import List
from sqlalchemy import Engine
from sqlalchemy.orm import Session
from ...models.songs import ChartInfo
class ArcsongDbParser:
def __init__(self, filepath):
self.filepath = filepath
def parse(self) -> List[ChartInfo]:
results = []
with sqlite3.connect(self.filepath) as conn:
cursor = conn.cursor()
arcsong_db_results = cursor.execute(
"SELECT song_id, rating_class, rating, note FROM charts"
)
for result in arcsong_db_results:
chart = ChartInfo(
song_id=result[0],
rating_class=result[1],
constant=result[2],
note=result[3] or None,
)
results.append(chart)
return results
def write_database(self, engine: Engine):
with Session(engine) as session:
results = self.parse()
for result in results:
session.merge(result)
session.commit()

View File

@ -1,20 +0,0 @@
import dataclasses
from typing import List
@dataclasses.dataclass
class ExternalScoreItem:
song_id: str
rating_class: int
score: int
pure: int = -1
far: int = -1
lost: int = -1
max_recall: int = -1
clear_type: int = -1
time: int = -1
class ExternalScoreSource:
def get_score_items(self) -> List[ExternalScoreItem]:
...

View File

@ -1,43 +0,0 @@
import sqlite3
from typing import Union
from .common import ExternalScoreItem, ExternalScoreSource
class St3ScoreSource(ExternalScoreSource):
db_path: Union[str, bytes]
CLEAR_TYPES_MAP = {0: -1, 1: 1, 2: 1, 3: 1, 4: 1, 5: 1}
def __init__(self, db_path: Union[str, bytes]):
self.db_path = db_path
def get_score_items(self):
items = []
with sqlite3.connect(self.db_path) as st3_conn:
cursor = st3_conn.cursor()
db_scores = cursor.execute(
"SELECT songId, songDifficulty, score, perfectCount, nearCount, missCount, date FROM scores"
).fetchall()
for song_id, rating_class, score, pure, far, lost, date in db_scores:
db_clear_type = cursor.execute(
"SELECT clearType FROM cleartypes WHERE songId = ? AND songDifficulty = ?",
(song_id, rating_class),
).fetchone()[0]
clear_type = self.CLEAR_TYPES_MAP[db_clear_type]
date_str = str(date)
date = None if len(date_str) < 7 else int(date_str.ljust(10, "0"))
kwargs = {
"song_id": song_id,
"rating_class": rating_class,
"score": score,
"pure": pure,
"far": far,
"lost": lost,
"clear_type": clear_type,
}
if date:
kwargs["time"] = date
items.append(ExternalScoreItem(**kwargs))
return items