chore!: remove legacy external arcaea parsers

This commit is contained in:
283375 2024-09-27 23:12:58 +08:00
parent bfa1472b5c
commit 5e996d35d2
Signed by: 283375
SSH Key Fingerprint: SHA256:UcX0qg6ZOSDOeieKPGokA5h7soykG61nz2uxuQgVLSk
7 changed files with 10 additions and 386 deletions

View File

@ -1,12 +0,0 @@
from .online import ArcaeaOnlineParser
from .packlist import PacklistParser
from .songlist import SonglistDifficultiesParser, SonglistParser
from .st3 import St3ScoreParser
__all__ = [
"ArcaeaOnlineParser",
"PacklistParser",
"SonglistDifficultiesParser",
"SonglistParser",
"St3ScoreParser",
]

View File

@ -1,99 +0,0 @@
import contextlib
import json
import math
import time
from os import PathLike
from typing import Any, List, Optional, Union
from sqlalchemy.orm import DeclarativeBase, Session
def fix_timestamp(timestamp: int) -> Union[int, None]:
"""
Some of the `date` column in st3 are strangely truncated. For example,
a `1670283375` may be truncated to `167028`, even `1`. Yes, a single `1`.
To properly handle this situation, we check the timestamp's digits.
If `digits < 5`, we treat this timestamp as a `None`. Otherwise, we try to
fix the timestamp.
:param timestamp: a POSIX timestamp
:return: `None` if the timestamp's digits < 5, otherwise a fixed POSIX timestamp
"""
# find digit length from https://stackoverflow.com/a/2189827/16484891
# CC BY-SA 2.5
# this might give incorrect result when timestamp > 999999999999997,
# see https://stackoverflow.com/a/28883802/16484891 (CC BY-SA 4.0).
# but that's way too later than 9999-12-31 23:59:59, 253402271999,
# I don't think Arcaea would still be an active updated game by then.
# so don't mind those small issues, just use this.
digits = int(math.log10(abs(timestamp))) + 1 if timestamp != 0 else 1
if digits < 5:
return None
timestamp_str = str(timestamp)
current_timestamp_digits = int(math.log10(int(time.time()))) + 1
timestamp_str = timestamp_str.ljust(current_timestamp_digits, "0")
return int(timestamp_str, 10)
def to_db_value(val: Any) -> Any:
if not val:
return None
return json.dumps(val, ensure_ascii=False) if isinstance(val, list) else val
def is_localized(item: dict, key: str, append_localized: bool = True):
item_key = f"{key}_localized" if append_localized else key
subitem: Optional[dict] = item.get(item_key)
return subitem and (
subitem.get("ja")
or subitem.get("ko")
or subitem.get("zh-Hant")
or subitem.get("zh-Hans")
)
def set_model_localized_attrs(
model: DeclarativeBase, item: dict, model_key: str, item_key: Optional[str] = None
):
if item_key is None:
item_key = f"{model_key}_localized"
subitem: dict = item.get(item_key, {})
if not subitem:
return
setattr(model, f"{model_key}_ja", to_db_value(subitem.get("ja")))
setattr(model, f"{model_key}_ko", to_db_value(subitem.get("ko")))
setattr(model, f"{model_key}_zh_hans", to_db_value(subitem.get("zh-Hans")))
setattr(model, f"{model_key}_zh_hant", to_db_value(subitem.get("zh-Hant")))
class ArcaeaParser:
def __init__(self, filepath: Union[str, bytes, PathLike]):
self.filepath = filepath
def read_file_text(self):
file_handle = None
with contextlib.suppress(TypeError):
# original open
file_handle = open(self.filepath, "r", encoding="utf-8")
if file_handle is None:
try:
# or maybe a `pathlib.Path` subset
# or an `importlib.resources.abc.Traversable` like object
# e.g. `zipfile.Path`
file_handle = self.filepath.open(mode="r", encoding="utf-8") # type: ignore
except Exception as e:
raise ValueError("Invalid `filepath`.") from e
with file_handle:
return file_handle.read()
def parse(self) -> List[DeclarativeBase]:
raise NotImplementedError()
def write_database(self, session: Session):
results = self.parse()
for result in results:
session.merge(result)

View File

@ -1,72 +0,0 @@
import json
import logging
from datetime import datetime
from typing import Dict, List, Literal, Optional, TypedDict
from ...models import Score
from .common import ArcaeaParser, fix_timestamp
logger = logging.getLogger(__name__)
class TWebApiRatingMeScoreItem(TypedDict):
song_id: str
difficulty: int
modifier: int
rating: float
score: int
perfect_count: int
near_count: int
miss_count: int
clear_type: int
title: Dict[Literal["ja", "en"], str]
artist: str
time_played: int
bg: str
class TWebApiRatingMeValue(TypedDict):
best_rated_scores: List[TWebApiRatingMeScoreItem]
recent_rated_scores: List[TWebApiRatingMeScoreItem]
class TWebApiRatingMeResult(TypedDict):
success: bool
error_code: Optional[int]
value: Optional[TWebApiRatingMeValue]
class ArcaeaOnlineParser(ArcaeaParser):
def parse(self) -> List[Score]:
api_result_root: TWebApiRatingMeResult = json.loads(self.read_file_text())
api_result_value = api_result_root.get("value")
if not api_result_value:
error_code = api_result_root.get("error_code")
raise ValueError(f"Cannot parse API result, error code {error_code}")
best30_score_items = api_result_value.get("best_rated_scores", [])
recent_score_items = api_result_value.get("recent_rated_scores", [])
score_items = best30_score_items + recent_score_items
date_text = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
results: List[Score] = []
for score_item in score_items:
score = Score()
score.song_id = score_item["song_id"]
score.rating_class = score_item["difficulty"]
score.score = score_item["score"]
score.pure = score_item["perfect_count"]
score.far = score_item["near_count"]
score.lost = score_item["miss_count"]
score.date = fix_timestamp(int(score_item["time_played"] / 1000))
score.modifier = score_item["modifier"]
score.clear_type = score_item["clear_type"]
if score.lost == 0:
score.max_recall = score.pure + score.far
score.comment = f"Parsed from web API at {date_text}"
results.append(score)
return results

View File

@ -1,29 +0,0 @@
import json
from typing import List, Union
from ...models.songs import Pack, PackLocalized
from .common import ArcaeaParser, is_localized, set_model_localized_attrs
class PacklistParser(ArcaeaParser):
def parse(self) -> List[Union[Pack, PackLocalized]]:
packlist_json_root = json.loads(self.read_file_text())
packlist_json = packlist_json_root["packs"]
results: List[Union[Pack, PackLocalized]] = [
Pack(id="single", name="Memory Archive")
]
for item in packlist_json:
pack = Pack()
pack.id = item["id"]
pack.name = item["name_localized"]["en"]
pack.description = item["description_localized"]["en"] or None
results.append(pack)
if is_localized(item, "name") or is_localized(item, "description"):
pack_localized = PackLocalized(id=pack.id)
set_model_localized_attrs(pack_localized, item, "name")
set_model_localized_attrs(pack_localized, item, "description")
results.append(pack_localized)
return results

View File

@ -1,101 +0,0 @@
import json
from typing import List, Union
from ...models.songs import Difficulty, DifficultyLocalized, Song, SongLocalized
from .common import ArcaeaParser, is_localized, set_model_localized_attrs, to_db_value
class SonglistParser(ArcaeaParser):
def parse(
self,
) -> List[Union[Song, SongLocalized, Difficulty, DifficultyLocalized]]:
songlist_json_root = json.loads(self.read_file_text())
songlist_json = songlist_json_root["songs"]
results = []
for item in songlist_json:
song = Song()
song.idx = item["idx"]
song.id = item["id"]
song.title = item["title_localized"]["en"]
song.artist = item["artist"]
song.bpm = item["bpm"]
song.bpm_base = item["bpm_base"]
song.set = item["set"]
song.audio_preview = item["audioPreview"]
song.audio_preview_end = item["audioPreviewEnd"]
song.side = item["side"]
song.version = item["version"]
song.date = item["date"]
song.bg = to_db_value(item.get("bg"))
song.bg_inverse = to_db_value(item.get("bg_inverse"))
if item.get("bg_daynight"):
song.bg_day = to_db_value(item["bg_daynight"].get("day"))
song.bg_night = to_db_value(item["bg_daynight"].get("night"))
if item.get("source_localized"):
song.source = item["source_localized"]["en"]
song.source_copyright = to_db_value(item.get("source_copyright"))
results.append(song)
if (
is_localized(item, "title")
or is_localized(item, "search_title", append_localized=False)
or is_localized(item, "search_artist", append_localized=False)
or is_localized(item, "source")
):
song_localized = SongLocalized(id=song.id)
set_model_localized_attrs(song_localized, item, "title")
set_model_localized_attrs(
song_localized, item, "search_title", "search_title"
)
set_model_localized_attrs(
song_localized, item, "search_artist", "search_artist"
)
set_model_localized_attrs(song_localized, item, "source")
results.append(song_localized)
return results
class SonglistDifficultiesParser(ArcaeaParser):
def parse(self) -> List[Union[Difficulty, DifficultyLocalized]]:
songlist_json_root = json.loads(self.read_file_text())
songlist_json = songlist_json_root["songs"]
results = []
for song_item in songlist_json:
if not song_item.get("difficulties"):
continue
for item in song_item["difficulties"]:
if item["rating"] == 0:
continue
chart = Difficulty(song_id=song_item["id"])
chart.rating_class = item["ratingClass"]
chart.rating = item["rating"]
chart.rating_plus = item.get("ratingPlus") or False
chart.chart_designer = item["chartDesigner"]
chart.jacket_desginer = item.get("jacketDesigner") or None
chart.audio_override = item.get("audioOverride") or False
chart.jacket_override = item.get("jacketOverride") or False
chart.jacket_night = item.get("jacketNight") or None
chart.title = item.get("title_localized", {}).get("en") or None
chart.artist = item.get("artist") or None
chart.bg = item.get("bg") or None
chart.bg_inverse = item.get("bg_inverse")
chart.bpm = item.get("bpm") or None
chart.bpm_base = item.get("bpm_base") or None
chart.version = item.get("version") or None
chart.date = item.get("date") or None
results.append(chart)
if is_localized(item, "title") or is_localized(item, "artist"):
chart_localized = DifficultyLocalized(
song_id=chart.song_id, rating_class=chart.rating_class
)
set_model_localized_attrs(chart_localized, item, "title")
set_model_localized_attrs(chart_localized, item, "artist")
results.append(chart_localized)
return results

View File

@ -1,73 +0,0 @@
import logging
import sqlite3
from typing import List
from sqlalchemy import select
from sqlalchemy.orm import Session
from ...models.scores import Score
from .common import ArcaeaParser, fix_timestamp
logger = logging.getLogger(__name__)
class St3ScoreParser(ArcaeaParser):
def parse(self) -> List[Score]:
items = []
with sqlite3.connect(self.filepath) as st3_conn:
cursor = st3_conn.cursor()
db_scores = cursor.execute(
"SELECT songId, songDifficulty, score, perfectCount, nearCount, missCount, "
"date, modifier FROM scores"
).fetchall()
for (
song_id,
rating_class,
score,
pure,
far,
lost,
date,
modifier,
) in db_scores:
clear_type = cursor.execute(
"SELECT clearType FROM cleartypes WHERE songId = ? AND songDifficulty = ?",
(song_id, rating_class),
).fetchone()[0]
items.append(
Score(
song_id=song_id,
rating_class=rating_class,
score=score,
pure=pure,
far=far,
lost=lost,
date=fix_timestamp(date),
modifier=modifier,
clear_type=clear_type,
comment="Parsed from st3",
)
)
return items
def write_database(self, session: Session, *, skip_duplicate=True):
parsed_scores = self.parse()
for parsed_score in parsed_scores:
query_score = session.scalar(
select(Score).where(
(Score.song_id == parsed_score.song_id)
& (Score.rating_class == parsed_score.rating_class)
& (Score.score == parsed_score.score)
)
)
if query_score and skip_duplicate:
logger.info(
"%r skipped because potential duplicate item %r found.",
parsed_score,
query_score,
)
continue
session.add(parsed_score)

View File

@ -0,0 +1,10 @@
from .lists import ArcaeaPacklistParser, ArcaeaSonglistParser
from .online import ArcaeaOnlineApiParser
from .st3 import ArcaeaSt3Parser
__all__ = [
"ArcaeaPacklistParser",
"ArcaeaSonglistParser",
"ArcaeaOnlineApiParser",
"ArcaeaSt3Parser",
]