38 Commits

Author SHA1 Message Date
c1f83eff55 feat(db): get_score_best 2023-09-23 17:51:50 +08:00
3349620b3a impr(db): better recommend_charts 2023-09-23 17:51:32 +08:00
3867b273c7 impr(calc): negative result of play_rating_from_step 2023-09-21 11:36:00 +08:00
0c292fc3be chore(calc)!: api changes
- `PlayResults` -> `PlayResult`
- remove expose of `world_step` from `calculate.__init__`
2023-09-20 00:20:32 +08:00
2c73570a65 fix(calc): default value of PartnerBonus.step_bonus 2023-09-18 20:50:18 +08:00
ec9926993c feat(db): recommend charts based on score 2023-09-18 20:41:57 +08:00
bed9e14368 chore(calc)!: potential -> play_rating 2023-09-18 14:05:28 +08:00
f72410d0bc feat(calc): calculate constants from potential 2023-09-18 14:02:33 +08:00
64fd328ff8 feat(calc): calculate potential from world step 2023-09-18 14:02:02 +08:00
a97031e717 wip(calc): world step calculation 2023-09-18 09:25:15 +08:00
c6014bd1b6 chore(calc): split calculate module 2023-09-18 07:58:35 +08:00
c909886902 feat(db): get_songs_by_pack_id 2023-09-17 00:27:20 +08:00
589826004b fix(db): minor fixes 2023-09-17 00:26:56 +08:00
4899e0383f fix(external): SonglistDifficultiesParser file reading 2023-09-16 20:31:38 +08:00
5796d9a80f impr(external): ArcaeaParser file reading 2023-09-16 02:40:56 +08:00
6258a55ba4 fix(eternal): fix Andreal score timestamp 2023-09-08 18:16:58 +08:00
d763896c0c feat(db): Score related modify methods 2023-09-06 01:10:47 +08:00
c2a171a6d3 chore(db)!: Database method rename 2023-09-06 00:30:24 +08:00
d36a858464 feat(db): new methods 2023-09-06 00:01:18 +08:00
66a4cc8cff impr(external): Andreal B30 best30_overflow field 2023-09-02 18:15:22 +08:00
00255e5b74 feat(external): AndrealImageGenerator 2023-09-01 02:37:39 +08:00
844568db1a feat(db): export methods 2023-08-31 22:18:11 +08:00
167f72f9bb refactor(models)!: Score & ChartInfo column changed 2023-08-31 22:12:02 +08:00
01bfd0f350 feat(db): COUNT related methods 2023-08-31 21:51:49 +08:00
0d882fa138 feat(external): arcsong.json generator 2023-08-31 21:25:50 +08:00
daf2a46632 feat(db): Score related Database methods 2023-08-31 21:03:10 +08:00
35e6fde664 fix: add static single pack in PacklistParser 2023-08-31 05:05:33 +08:00
6b8a3e1565 fix(eternal): omit difficulties that have "rating": 0 2023-08-29 01:33:37 +08:00
ca9576160f fix(db): convert results to list before closing session 2023-08-29 01:32:26 +08:00
e948b6abea chore(db)!: Database methods 2023-08-28 22:54:37 +08:00
1282993810 impr: module exporting 2023-08-28 22:46:28 +08:00
b561cc51e0 refactor(db)!: new charts view & models refactor 2023-08-28 21:36:17 +08:00
316b02cd1b wip: searcher 2023-08-28 20:28:40 +08:00
b180976284 feat: handy methods under Database 2023-08-28 00:15:34 +08:00
54851549d5 feat(utils): bring utils back 2023-08-27 23:59:27 +08:00
262495a580 fix: singleton Database behavior 2023-08-27 19:44:20 +08:00
a6c1e594c4 feat(db): Database.check_init() 2023-08-27 00:31:44 +08:00
d979b6cd10 add LICENSE 2023-08-26 22:46:51 +08:00
30 changed files with 1369 additions and 143 deletions

7
LICENSE Normal file
View File

@ -0,0 +1,7 @@
Copyright 2023-now Lin He
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@ -13,6 +13,7 @@ dependencies = [
"beautifulsoup4==4.12.2",
"SQLAlchemy==2.0.20",
"SQLAlchemy-Utils==0.41.1",
"Whoosh==2.7.4",
]
classifiers = [
"Development Status :: 3 - Alpha",

View File

@ -1,3 +1,4 @@
beautifulsoup4==4.12.2
SQLAlchemy==2.0.20
SQLAlchemy-Utils==0.41.1
Whoosh==2.7.4

View File

@ -1,48 +0,0 @@
from decimal import Decimal
from math import floor
from typing import Dict, List
from .models.scores import Calculated
def calculate_score_range(notes: int, pure: int, far: int):
single_note_score = 10000000 / Decimal(notes)
actual_score = floor(
single_note_score * pure + single_note_score * Decimal(0.5) * far
)
return (actual_score, actual_score + pure)
def calculate_potential(_constant: float, score: int) -> Decimal:
constant = Decimal(_constant)
if score >= 10000000:
return constant + 2
elif score >= 9800000:
return constant + 1 + (Decimal(score - 9800000) / 200000)
else:
return max(Decimal(0), constant + (Decimal(score - 9500000) / 300000))
def calculate_shiny_pure(notes: int, score: int, pure: int, far: int) -> int:
single_note_score = 10000000 / Decimal(notes)
actual_score = single_note_score * pure + single_note_score * Decimal(0.5) * far
return score - floor(actual_score)
def get_b30_calculated_list(calculated_list: List[Calculated]) -> List[Calculated]:
best_scores: Dict[str, Calculated] = {}
for calculated in calculated_list:
key = f"{calculated.song_id}_{calculated.rating_class}"
stored = best_scores.get(key)
if stored and stored.score < calculated.score or not stored:
best_scores[key] = calculated
ret_list = list(best_scores.values())
ret_list = sorted(ret_list, key=lambda c: c.potential, reverse=True)[:30]
return ret_list
def calculate_b30(calculated_list: List[Calculated]) -> Decimal:
ptt_list = [Decimal(c.potential) for c in get_b30_calculated_list(calculated_list)]
sum_ptt_list = sum(ptt_list)
return (sum_ptt_list / len(ptt_list)) if sum_ptt_list else Decimal("0.0")

View File

@ -0,0 +1,8 @@
from .b30 import calculate_b30, get_b30_calculated_list
from .score import (
calculate_constants_from_play_rating,
calculate_play_rating,
calculate_score_modifier,
calculate_score_range,
calculate_shiny_pure,
)

View File

@ -0,0 +1,24 @@
from decimal import Decimal
from typing import Dict, List
from ..models.scores import ScoreCalculated
def get_b30_calculated_list(
calculated_list: List[ScoreCalculated],
) -> List[ScoreCalculated]:
best_scores: Dict[str, ScoreCalculated] = {}
for calculated in calculated_list:
key = f"{calculated.song_id}_{calculated.rating_class}"
stored = best_scores.get(key)
if stored and stored.score < calculated.score or not stored:
best_scores[key] = calculated
ret_list = list(best_scores.values())
ret_list = sorted(ret_list, key=lambda c: c.potential, reverse=True)[:30]
return ret_list
def calculate_b30(calculated_list: List[ScoreCalculated]) -> Decimal:
ptt_list = [Decimal(c.potential) for c in get_b30_calculated_list(calculated_list)]
sum_ptt_list = sum(ptt_list)
return (sum_ptt_list / len(ptt_list)) if sum_ptt_list else Decimal("0.0")

View File

@ -0,0 +1,67 @@
from dataclasses import dataclass
from decimal import Decimal
from math import floor
from typing import Tuple, Union
def calculate_score_range(notes: int, pure: int, far: int):
single_note_score = 10000000 / Decimal(notes)
actual_score = floor(
single_note_score * pure + single_note_score * Decimal(0.5) * far
)
return (actual_score, actual_score + pure)
def calculate_score_modifier(score: int) -> Decimal:
if score >= 10000000:
return Decimal(2)
elif score >= 9800000:
return Decimal(1) + (Decimal(score - 9800000) / 200000)
else:
return Decimal(score - 9500000) / 300000
def calculate_play_rating(
constant: Union[Decimal, str, float, int], score: int
) -> Decimal:
constant = Decimal(constant)
score_modifier = calculate_score_modifier(score)
return max(Decimal(0), constant + score_modifier)
def calculate_shiny_pure(notes: int, score: int, pure: int, far: int) -> int:
single_note_score = 10000000 / Decimal(notes)
actual_score = single_note_score * pure + single_note_score * Decimal(0.5) * far
return score - floor(actual_score)
@dataclass
class ConstantsFromPlayRatingResult:
EXPlus: Tuple[Decimal, Decimal]
EX: Tuple[Decimal, Decimal]
AA: Tuple[Decimal, Decimal]
A: Tuple[Decimal, Decimal]
B: Tuple[Decimal, Decimal]
C: Tuple[Decimal, Decimal]
def calculate_constants_from_play_rating(play_rating: Union[Decimal, str, float, int]):
play_rating = Decimal(play_rating)
ranges = []
for upperScore, lowerScore in [
(10000000, 9900000),
(9899999, 9800000),
(9799999, 9500000),
(9499999, 9200000),
(9199999, 8900000),
(8899999, 8600000),
]:
upperScoreModifier = calculate_score_modifier(upperScore)
lowerScoreModifier = calculate_score_modifier(lowerScore)
ranges.append(
(play_rating - upperScoreModifier, play_rating - lowerScoreModifier)
)
return ConstantsFromPlayRatingResult(*ranges)

View File

@ -0,0 +1,175 @@
from decimal import Decimal
from typing import Literal, Optional, Union
class PlayResult:
def __init__(
self,
*,
play_rating: Union[Decimal, str, float, int],
partner_step: Union[Decimal, str, float, int],
):
self.__play_rating = play_rating
self.__partner_step = partner_step
@property
def play_rating(self):
return Decimal(self.__play_rating)
@property
def partner_step(self):
return Decimal(self.__partner_step)
class PartnerBonus:
def __init__(
self,
*,
step_bonus: Union[Decimal, str, float, int] = Decimal("0.0"),
final_multiplier: Union[Decimal, str, float, int] = Decimal("1.0"),
):
self.__step_bonus = step_bonus
self.__final_multiplier = final_multiplier
@property
def step_bonus(self):
return Decimal(self.__step_bonus)
@property
def final_multiplier(self):
return Decimal(self.__final_multiplier)
AwakenedIlithPartnerBonus = PartnerBonus(step_bonus="6.0")
AwakenedEtoPartnerBonus = PartnerBonus(step_bonus="7.0")
AwakenedLunaPartnerBonus = PartnerBonus(step_bonus="7.0")
class AwakenedAyuPartnerBonus(PartnerBonus):
def __init__(self, step_bonus: Union[Decimal, str, float, int]):
super().__init__(step_bonus=step_bonus)
AmaneBelowExPartnerBonus = PartnerBonus(final_multiplier="0.5")
class MithraTerceraPartnerBonus(PartnerBonus):
def __init__(self, step_bonus: int):
super().__init__(step_bonus=step_bonus)
MayaPartnerBonus = PartnerBonus(final_multiplier="2.0")
class StepBooster:
def final_value(self) -> Decimal:
raise NotImplementedError()
class LegacyMapStepBooster(StepBooster):
def __init__(
self,
stamina: Literal[2, 4, 6],
fragments: Literal[100, 250, 500, None],
):
self.stamina = stamina
self.fragments = fragments
@property
def stamina(self):
return self.__stamina
@stamina.setter
def stamina(self, value: Literal[2, 4, 6]):
if value not in [2, 4, 6]:
raise ValueError("stamina can only be one of [2, 4, 6]")
self.__stamina = value
@property
def fragments(self):
return self.__fragments
@fragments.setter
def fragments(self, value: Literal[100, 250, 500, None]):
if value not in [100, 250, 500, None]:
raise ValueError("fragments can only be one of [100, 250, 500, None]")
self.__fragments = value
def final_value(self) -> Decimal:
stamina_multiplier = Decimal(self.stamina)
if self.fragments is None:
fragments_multiplier = Decimal(1)
elif self.fragments == 100:
fragments_multiplier = Decimal("1.1")
elif self.fragments == 250:
fragments_multiplier = Decimal("1.25")
elif self.fragments == 500:
fragments_multiplier = Decimal("1.5")
return stamina_multiplier * fragments_multiplier
class MemoriesStepBooster(StepBooster):
def final_value(self) -> Decimal:
return Decimal("4.0")
def calculate_step_original(
play_result: PlayResult,
*,
partner_bonus: Optional[PartnerBonus] = None,
step_booster: Optional[StepBooster] = None,
):
ptt = play_result.play_rating
step = play_result.partner_step
if partner_bonus:
partner_bonus_step = partner_bonus.step_bonus
partner_bonus_multiplier = partner_bonus.final_multiplier
else:
partner_bonus_step = Decimal("0")
partner_bonus_multiplier = Decimal("1.0")
play_result = (Decimal("2.45") * ptt.sqrt() + Decimal("2.5")) * (step / 50)
play_result += partner_bonus_step
play_result *= partner_bonus_multiplier
if step_booster:
play_result *= step_booster.final_value()
return play_result
def calculate_step(
play_result: PlayResult,
*,
partner_bonus: Optional[PartnerBonus] = None,
step_booster: Optional[StepBooster] = None,
):
play_result_original = calculate_step_original(
play_result, partner_bonus=partner_bonus, step_booster=step_booster
)
return round(play_result_original, 1)
def calculate_play_rating_from_step(
step: Union[Decimal, str, int, float],
partner_step_value: Union[Decimal, str, int, float],
*,
partner_bonus: Optional[PartnerBonus] = None,
step_booster: Optional[StepBooster] = None,
):
step = Decimal(step)
partner_step_value = Decimal(partner_step_value)
# get original play result
if partner_bonus and partner_bonus.final_multiplier:
step /= partner_bonus.final_multiplier
if step_booster:
step /= step_booster.final_value()
if partner_bonus and partner_bonus.step_bonus:
step -= partner_bonus.step_bonus
play_rating_sqrt = (Decimal(50) * step - Decimal("2.5") * partner_step_value) / (
Decimal("2.45") * partner_step_value
)
return play_rating_sqrt**2 if play_rating_sqrt >= 0 else -(play_rating_sqrt**2)

View File

@ -1,19 +1,47 @@
from sqlalchemy import Engine, select
from sqlalchemy.orm import sessionmaker
import logging
import math
from typing import Iterable, List, Optional, Type, Union
from sqlalchemy import Engine, func, inspect, select
from sqlalchemy.orm import DeclarativeBase, InstrumentedAttribute, sessionmaker
from .calculate import calculate_score_modifier
from .external.arcsong.arcsong_json import ArcSongJsonBuilder
from .external.exports import ScoreExport, exporters
from .models.config import *
from .models.scores import *
from .models.songs import *
from .singleton import Singleton
logger = logging.getLogger(__name__)
class Database(metaclass=Singleton):
def __init__(self, engine: Engine):
self.engine = engine
def __init__(self, engine: Optional[Engine]):
try:
self.__engine
except AttributeError:
self.__engine = None
if engine is None:
if isinstance(self.engine, Engine):
return
raise ValueError("No sqlalchemy.Engine instance specified before.")
elif isinstance(engine, Engine):
if isinstance(self.engine, Engine):
logger.warning(
f"A sqlalchemy.Engine instance {self.engine} has been specified "
f"and will be replaced to {engine}"
)
self.engine = engine
else:
raise ValueError(
f"A sqlalchemy.Engine instance expected, not {repr(engine)}"
)
@property
def engine(self):
return self.__engine
def engine(self) -> Engine:
return self.__engine # type: ignore
@engine.setter
def engine(self, value: Engine):
@ -26,15 +54,19 @@ class Database(metaclass=Singleton):
def sessionmaker(self):
return self.__sessionmaker
# region init
def init(self, checkfirst: bool = True):
# create tables & views
if checkfirst:
# > https://github.com/kvesteri/sqlalchemy-utils/issues/396
# > view.create_view() causes DuplicateTableError on Base.metadata.create_all(checkfirst=True)
# so if `checkfirst` is True, drop these views before creating
SongsViewBase.metadata.drop_all(self.engine)
ScoresViewBase.metadata.drop_all(self.engine)
SongsBase.metadata.create_all(self.engine, checkfirst=checkfirst)
SongsViewBase.metadata.create_all(self.engine)
ScoresBase.metadata.create_all(self.engine, checkfirst=checkfirst)
ScoresViewBase.metadata.create_all(self.engine)
ConfigBase.metadata.create_all(self.engine, checkfirst=checkfirst)
@ -44,5 +76,322 @@ class Database(metaclass=Singleton):
stmt = select(Property.value).where(Property.key == "version")
result = session.execute(stmt).fetchone()
if not checkfirst or not result:
session.add(Property(key="version", value="2"))
session.add(Property(key="version", value="4"))
session.commit()
def check_init(self) -> bool:
# check table exists
expect_tables = (
list(SongsBase.metadata.tables.keys())
+ list(ScoresBase.metadata.tables.keys())
+ list(ConfigBase.metadata.tables.keys())
+ [
Chart.__tablename__,
ScoreCalculated.__tablename__,
ScoreBest.__tablename__,
CalculatedPotential.__tablename__,
]
)
return all(inspect(self.engine).has_table(t) for t in expect_tables)
# endregion
def version(self) -> Union[int, None]:
stmt = select(Property).where(Property.key == "version")
with self.sessionmaker() as session:
result = session.scalar(stmt)
return None if result is None else int(result.value)
# region Pack
def get_packs(self):
stmt = select(Pack)
with self.sessionmaker() as session:
results = list(session.scalars(stmt))
return results
def get_pack(self, pack_id: str):
stmt = select(Pack).where(Pack.id == pack_id)
with self.sessionmaker() as session:
result = session.scalar(stmt)
return result
def get_pack_localized(self, pack_id: str):
stmt = select(PackLocalized).where(PackLocalized.id == pack_id)
with self.sessionmaker() as session:
result = session.scalar(stmt)
return result
# endregion
# region Song
def get_songs(self):
stmt = select(Song)
with self.sessionmaker() as session:
results = list(session.scalars(stmt))
return results
def get_songs_by_pack_id(self, pack_id: str):
stmt = select(Song).where(Song.set == pack_id)
with self.sessionmaker() as session:
results = list(session.scalars(stmt))
return results
def get_song(self, song_id: str):
stmt = select(Song).where(Song.id == song_id)
with self.sessionmaker() as session:
result = session.scalar(stmt)
return result
def get_song_localized(self, song_id: str):
stmt = select(SongLocalized).where(SongLocalized.id == song_id)
with self.sessionmaker() as session:
result = session.scalar(stmt)
return result
# endregion
# region Difficulty
def get_difficulties(self):
stmt = select(Difficulty)
with self.sessionmaker() as session:
results = list(session.scalars(stmt))
return results
def get_difficulties_by_song_id(self, song_id: str):
stmt = select(Difficulty).where(Difficulty.song_id == song_id)
with self.sessionmaker() as session:
results = list(session.scalars(stmt))
return results
def get_difficulties_localized_by_song_id(self, song_id: str):
stmt = select(DifficultyLocalized).where(DifficultyLocalized.song_id == song_id)
with self.sessionmaker() as session:
results = list(session.scalars(stmt))
return results
def get_difficulty(self, song_id: str, rating_class: int):
stmt = select(Difficulty).where(
(Difficulty.song_id == song_id) & (Difficulty.rating_class == rating_class)
)
with self.sessionmaker() as session:
result = session.scalar(stmt)
return result
def get_difficulty_localized(self, song_id: str, rating_class: int):
stmt = select(DifficultyLocalized).where(
(DifficultyLocalized.song_id == song_id)
& (DifficultyLocalized.rating_class == rating_class)
)
with self.sessionmaker() as session:
result = session.scalar(stmt)
return result
# endregion
# region ChartInfo
def get_chart_infos(self):
stmt = select(ChartInfo)
with self.sessionmaker() as session:
results = list(session.scalars(stmt))
return results
def get_chart_infos_by_song_id(self, song_id: str):
stmt = select(ChartInfo).where(ChartInfo.song_id == song_id)
with self.sessionmaker() as session:
results = list(session.scalars(stmt))
return results
def get_chart_info(self, song_id: str, rating_class: int):
stmt = select(ChartInfo).where(
(ChartInfo.song_id == song_id) & (ChartInfo.rating_class == rating_class)
)
with self.sessionmaker() as session:
result = session.scalar(stmt)
return result
# endregion
# region Chart
def get_charts_by_pack_id(self, pack_id: str):
stmt = select(Chart).where(Chart.set == pack_id)
with self.sessionmaker() as session:
results = list(session.scalars(stmt))
return results
def get_charts_by_song_id(self, song_id: str):
stmt = select(Chart).where(Chart.song_id == song_id)
with self.sessionmaker() as session:
results = list(session.scalars(stmt))
return results
def get_charts_by_constant(self, constant: int):
stmt = select(Chart).where(Chart.constant == constant)
with self.sessionmaker() as session:
results = list(session.scalars(stmt))
return results
def get_chart(self, song_id: str, rating_class: int):
stmt = select(Chart).where(
(Chart.song_id == song_id) & (Chart.rating_class == rating_class)
)
with self.sessionmaker() as session:
result = session.scalar(stmt)
return result
# endregion
# region Score
def get_scores(self):
stmt = select(Score)
with self.sessionmaker() as session:
results = list(session.scalars(stmt))
return results
def get_score(self, score_id: int):
stmt = select(Score).where(Score.id == score_id)
with self.sessionmaker() as session:
result = session.scalar(stmt)
return result
def get_score_best(self, song_id: str, rating_class: int):
stmt = select(ScoreBest).where(
(ScoreBest.song_id == song_id) & (ScoreBest.rating_class == rating_class)
)
with self.sessionmaker() as session:
result = session.scalar(stmt)
return result
def insert_score(self, score: Score):
with self.sessionmaker() as session:
session.add(score)
session.commit()
def insert_scores(self, scores: Iterable[Score]):
with self.sessionmaker() as session:
session.add_all(scores)
session.commit()
def update_score(self, score: Score):
if score.id is None:
raise ValueError(
"Cannot determine which score to update, please specify `score.id`"
)
with self.sessionmaker() as session:
session.merge(score)
session.commit()
def delete_score(self, score: Score):
with self.sessionmaker() as session:
session.delete(score)
session.commit()
def recommend_charts(self, play_result: float, bounds: float = 0.1):
base_constant = math.ceil(play_result * 10)
results = []
results_id = []
with self.sessionmaker() as session:
for constant in range(base_constant - 20, base_constant + 1):
# from Pure Memory(EX+) to AA
score_modifier = (play_result * 10 - constant) / 10
if score_modifier >= 2.0:
min_score = 10000000
elif score_modifier >= 1.0:
min_score = 200000 * (score_modifier - 1) + 9800000
else:
min_score = 300000 * score_modifier + 9500000
min_score = int(min_score)
charts = self.get_charts_by_constant(constant)
for chart in charts:
score_best_stmt = select(ScoreBest).where(
(ScoreBest.song_id == chart.song_id)
& (ScoreBest.rating_class == chart.rating_class)
& (ScoreBest.score >= min_score)
& (play_result - bounds < ScoreBest.potential)
& (ScoreBest.potential < play_result + bounds)
)
if session.scalar(score_best_stmt):
chart_id = f"{chart.song_id},{chart.rating_class}"
if chart_id not in results_id:
results.append(chart)
results_id.append(chart_id)
return results
# endregion
def get_b30(self):
stmt = select(CalculatedPotential.b30).select_from(CalculatedPotential)
with self.sessionmaker() as session:
result = session.scalar(stmt)
return result
# region COUNT
def __count_table(self, base: Type[DeclarativeBase]):
stmt = select(func.count()).select_from(base)
with self.sessionmaker() as session:
result = session.scalar(stmt)
return result or 0
def __count_column(self, column: InstrumentedAttribute):
stmt = select(func.count(column))
with self.sessionmaker() as session:
result = session.scalar(stmt)
return result or 0
def count_packs(self):
return self.__count_column(Pack.id)
def count_songs(self):
return self.__count_column(Song.id)
def count_difficulties(self):
return self.__count_table(Difficulty)
def count_chart_infos(self):
return self.__count_table(ChartInfo)
def count_complete_chart_infos(self):
stmt = (
select(func.count())
.select_from(ChartInfo)
.where((ChartInfo.constant != None) & (ChartInfo.notes != None))
)
with self.sessionmaker() as session:
result = session.scalar(stmt)
return result or 0
def count_charts(self):
return self.__count_table(Chart)
def count_scores(self):
return self.__count_column(Score.id)
def count_scores_calculated(self):
return self.__count_table(ScoreCalculated)
def count_scores_best(self):
return self.__count_table(ScoreBest)
# endregion
# region export
def export_scores(self) -> List[ScoreExport]:
scores = self.get_scores()
return [exporters.score(score) for score in scores]
def generate_arcsong(self):
with self.sessionmaker() as session:
arcsong = ArcSongJsonBuilder(session).generate_arcsong_json()
return arcsong
# endregion

View File

@ -0,0 +1 @@
from .api_data import AndrealImageGeneratorApiDataConverter

View File

@ -0,0 +1,14 @@
class AndrealImageGeneratorAccount:
def __init__(
self,
name: str = "Player",
code: int = 123456789,
rating: int = -1,
character: int = 5,
character_uncapped: bool = False,
):
self.name = name
self.code = code
self.rating = rating
self.character = character
self.character_uncapped = character_uncapped

View File

@ -0,0 +1,94 @@
from typing import Optional, Union
from sqlalchemy import select
from sqlalchemy.orm import Session
from ...models import CalculatedPotential, ScoreBest, ScoreCalculated
from .account import AndrealImageGeneratorAccount
class AndrealImageGeneratorApiDataConverter:
def __init__(
self,
session: Session,
account: AndrealImageGeneratorAccount = AndrealImageGeneratorAccount(),
):
self.session = session
self.account = account
def account_info(self):
return {
"code": self.account.code,
"name": self.account.name,
"is_char_uncapped": self.account.character_uncapped,
"rating": self.account.rating,
"character": self.account.character,
}
def score(self, score: Union[ScoreCalculated, ScoreBest]):
return {
"score": score.score,
"health": 75,
"rating": score.potential,
"song_id": score.song_id,
"modifier": score.modifier or 0,
"difficulty": score.rating_class,
"clear_type": score.clear_type or 1,
"best_clear_type": score.clear_type or 1,
"time_played": score.date * 1000 if score.date else 0,
"near_count": score.far,
"miss_count": score.lost,
"perfect_count": score.pure,
"shiny_perfect_count": score.shiny_pure,
}
def user_info(self, score: Optional[ScoreCalculated] = None):
if not score:
score = self.session.scalar(
select(ScoreCalculated).order_by(ScoreCalculated.date.desc()).limit(1)
)
if not score:
raise ValueError("No score available.")
return {
"content": {
"account_info": self.account_info(),
"recent_score": [self.score(score)],
}
}
def user_best(self, song_id: str, rating_class: int):
score = self.session.scalar(
select(ScoreBest).where(
(ScoreBest.song_id == song_id)
& (ScoreBest.rating_class == rating_class)
)
)
if not score:
raise ValueError("No score available.")
return {
"content": {
"account_info": self.account_info(),
"record": self.score(score),
}
}
def user_best30(self):
scores = list(
self.session.scalars(
select(ScoreBest).order_by(ScoreBest.potential.desc()).limit(40)
)
)
if not scores:
raise ValueError("No score available.")
best30_avg = self.session.scalar(select(CalculatedPotential.b30))
return {
"content": {
"account_info": self.account_info(),
"best30_avg": best30_avg,
"best30_list": [self.score(score) for score in scores[:30]],
"best30_overflow": [self.score(score) for score in scores[-10:]],
}
}

View File

@ -1,3 +1,4 @@
import contextlib
import json
from os import PathLike
from typing import Any, List, Optional, Union
@ -43,6 +44,25 @@ class ArcaeaParser:
def __init__(self, filepath: Union[str, bytes, PathLike]):
self.filepath = filepath
def read_file_text(self):
file_handle = None
with contextlib.suppress(TypeError):
# original open
file_handle = open(self.filepath, "r", encoding="utf-8")
if file_handle is None:
try:
# or maybe a `pathlib.Path` subset
# or an `importlib.resources.abc.Traversable` like object
# e.g. `zipfile.Path`
file_handle = self.filepath.open(mode="r", encoding="utf-8")
except Exception as e:
raise ValueError("Invalid `filepath`.") from e
with file_handle:
return file_handle.read()
def parse(self) -> List[DeclarativeBase]:
...

View File

@ -10,11 +10,12 @@ class PacklistParser(ArcaeaParser):
super().__init__(filepath)
def parse(self) -> List[Union[Pack, PackLocalized]]:
with open(self.filepath, "r", encoding="utf-8") as pl_f:
packlist_json_root = json.loads(pl_f.read())
packlist_json_root = json.loads(self.read_file_text())
packlist_json = packlist_json_root["packs"]
results = []
results: List[Union[Pack, PackLocalized]] = [
Pack(id="single", name="Memory Archive")
]
for item in packlist_json:
pack = Pack()
pack.id = item["id"]

View File

@ -1,7 +1,7 @@
import json
from typing import List, Union
from ...models.songs import Chart, ChartLocalized, Song, SongLocalized
from ...models.songs import Difficulty, DifficultyLocalized, Song, SongLocalized
from .common import ArcaeaParser, is_localized, set_model_localized_attrs, to_db_value
@ -9,9 +9,10 @@ class SonglistParser(ArcaeaParser):
def __init__(self, filepath):
super().__init__(filepath)
def parse(self) -> List[Union[Song, SongLocalized, Chart, ChartLocalized]]:
with open(self.filepath, "r", encoding="utf-8") as sl_f:
songlist_json_root = json.loads(sl_f.read())
def parse(
self,
) -> List[Union[Song, SongLocalized, Difficulty, DifficultyLocalized]]:
songlist_json_root = json.loads(self.read_file_text())
songlist_json = songlist_json_root["songs"]
results = []
@ -63,9 +64,8 @@ class SonglistDifficultiesParser(ArcaeaParser):
def __init__(self, filepath):
self.filepath = filepath
def parse(self) -> List[Union[Chart, ChartLocalized]]:
with open(self.filepath, "r", encoding="utf-8") as sl_f:
songlist_json_root = json.loads(sl_f.read())
def parse(self) -> List[Union[Difficulty, DifficultyLocalized]]:
songlist_json_root = json.loads(self.read_file_text())
songlist_json = songlist_json_root["songs"]
results = []
@ -74,7 +74,10 @@ class SonglistDifficultiesParser(ArcaeaParser):
continue
for item in song_item["difficulties"]:
chart = Chart(song_id=song_item["id"])
if item["rating"] == 0:
continue
chart = Difficulty(song_id=song_item["id"])
chart.rating_class = item["ratingClass"]
chart.rating = item["rating"]
chart.rating_plus = item.get("ratingPlus") or False
@ -94,7 +97,7 @@ class SonglistDifficultiesParser(ArcaeaParser):
results.append(chart)
if is_localized(item, "title") or is_localized(item, "artist"):
chart_localized = ChartLocalized(
chart_localized = DifficultyLocalized(
song_id=chart.song_id, rating_class=chart.rating_class
)
set_model_localized_attrs(chart_localized, item, "title")

View File

@ -12,8 +12,6 @@ logger = logging.getLogger(__name__)
class St3ScoreParser(ArcaeaParser):
CLEAR_TYPES_MAP = {0: -1, 1: 1, 2: 1, 3: 1, 4: 1, 5: 1}
def __init__(self, filepath):
super().__init__(filepath)
@ -22,14 +20,22 @@ class St3ScoreParser(ArcaeaParser):
with sqlite3.connect(self.filepath) as st3_conn:
cursor = st3_conn.cursor()
db_scores = cursor.execute(
"SELECT songId, songDifficulty, score, perfectCount, nearCount, missCount, date FROM scores"
"SELECT songId, songDifficulty, score, perfectCount, nearCount, missCount, date, modifier FROM scores"
).fetchall()
for song_id, rating_class, score, pure, far, lost, date in db_scores:
db_clear_type = cursor.execute(
for (
song_id,
rating_class,
score,
pure,
far,
lost,
date,
modifier,
) in db_scores:
clear_type = cursor.execute(
"SELECT clearType FROM cleartypes WHERE songId = ? AND songDifficulty = ?",
(song_id, rating_class),
).fetchone()[0]
r10_clear_type = self.CLEAR_TYPES_MAP[db_clear_type]
date_str = str(date)
date = None if len(date_str) < 7 else int(date_str.ljust(10, "0"))
@ -43,7 +49,9 @@ class St3ScoreParser(ArcaeaParser):
far=far,
lost=lost,
date=date,
r10_clear_type=r10_clear_type,
modifier=modifier,
clear_type=clear_type,
comment="Imported from st3",
)
)

View File

@ -22,7 +22,7 @@ class ArcsongDbParser:
song_id=result[0],
rating_class=result[1],
constant=result[2],
note=result[3] or None,
notes=result[3] or None,
)
results.append(chart)

View File

@ -0,0 +1,155 @@
import logging
import re
from typing import List, Optional, TypedDict
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from ...models import (
ChartInfo,
Difficulty,
DifficultyLocalized,
Pack,
Song,
SongLocalized,
)
logger = logging.getLogger(__name__)
class TArcSongJsonDifficultyItem(TypedDict):
name_en: str
name_jp: str
artist: str
bpm: str
bpm_base: float
set: str
set_friendly: str
time: int
side: int
world_unlock: bool
remote_download: bool
bg: str
date: int
version: str
difficulty: int
rating: int
note: int
chart_designer: str
jacket_designer: str
jacket_override: bool
audio_override: bool
class TArcSongJsonSongItem(TypedDict):
song_id: str
difficulties: List[TArcSongJsonDifficultyItem]
alias: List[str]
class TArcSongJson(TypedDict):
songs: List[TArcSongJsonSongItem]
class ArcSongJsonBuilder:
def __init__(self, session: Session):
self.session = session
def get_difficulty_item(
self,
difficulty: Difficulty,
song: Song,
pack: Pack,
song_localized: Optional[SongLocalized],
) -> TArcSongJsonDifficultyItem:
if "_append_" in pack.id:
base_pack = self.session.scalar(
select(Pack).where(Pack.id == re.sub(r"_append_.*$", "", pack.id))
)
else:
base_pack = None
difficulty_localized = self.session.scalar(
select(DifficultyLocalized).where(
(DifficultyLocalized.song_id == difficulty.song_id)
& (DifficultyLocalized.rating_class == difficulty.rating_class)
)
)
chart_info = self.session.scalar(
select(ChartInfo).where(
(ChartInfo.song_id == difficulty.song_id)
& (ChartInfo.rating_class == difficulty.rating_class)
)
)
if difficulty_localized:
name_jp = difficulty_localized.title_ja or ""
elif song_localized:
name_jp = song_localized.title_ja or ""
else:
name_jp = ""
return {
"name_en": difficulty.title or song.title,
"name_jp": name_jp,
"artist": difficulty.artist or song.artist,
"bpm": difficulty.bpm or song.bpm or "",
"bpm_base": difficulty.bpm_base or song.bpm_base or 0.0,
"set": song.set,
"set_friendly": f"{base_pack.name} - {pack.name}"
if base_pack
else pack.name,
"time": 0,
"side": song.side or 0,
"world_unlock": False,
"remote_download": False,
"bg": difficulty.bg or song.bg or "",
"date": difficulty.date or song.date or 0,
"version": difficulty.version or song.version or "",
"difficulty": difficulty.rating * 2 + int(difficulty.rating_plus),
"rating": chart_info.constant or 0 if chart_info else 0,
"note": chart_info.notes or 0 if chart_info else 0,
"chart_designer": difficulty.chart_designer or "",
"jacket_designer": difficulty.jacket_desginer or "",
"jacket_override": difficulty.jacket_override,
"audio_override": difficulty.audio_override,
}
def get_song_item(self, song: Song) -> TArcSongJsonSongItem:
difficulties = self.session.scalars(
select(Difficulty).where(Difficulty.song_id == song.id)
)
pack = self.session.scalar(select(Pack).where(Pack.id == song.set))
if not pack:
logger.warning(f'Cannot find pack "{song.set}", using placeholder instead.')
pack = Pack(id="unknown", name="Unknown", description="__PLACEHOLDER__")
song_localized = self.session.scalar(
select(SongLocalized).where(SongLocalized.id == song.id)
)
return {
"song_id": song.id,
"difficulties": [
self.get_difficulty_item(difficulty, song, pack, song_localized)
for difficulty in difficulties
],
"alias": [],
}
def generate_arcsong_json(self) -> TArcSongJson:
songs = self.session.scalars(select(Song))
arcsong_songs = []
for song in songs:
proceed = self.session.scalar(
select(func.count(Difficulty.rating_class)).where(
Difficulty.song_id == song.id
)
)
if not proceed:
continue
arcsong_songs.append(self.get_song_item(song))
return {"songs": arcsong_songs}

View File

@ -0,0 +1,2 @@
from . import exporters
from .types import ScoreExport

View File

@ -0,0 +1,19 @@
from ...models import Score
from .types import ScoreExport
def score(score: Score) -> ScoreExport:
return {
"id": score.id,
"song_id": score.song_id,
"rating_class": score.rating_class,
"score": score.score,
"pure": score.pure,
"far": score.far,
"lost": score.lost,
"date": score.date,
"max_recall": score.max_recall,
"modifier": score.modifier,
"clear_type": score.clear_type,
"comment": score.comment,
}

View File

@ -0,0 +1,16 @@
from typing import Optional, TypedDict
class ScoreExport(TypedDict):
id: int
song_id: str
rating_class: int
score: int
pure: Optional[int]
far: Optional[int]
lost: Optional[int]
date: Optional[int]
max_recall: Optional[int]
modifier: Optional[int]
clear_type: Optional[int]
comment: Optional[str]

View File

@ -0,0 +1,21 @@
from .config import ConfigBase, Property
from .scores import (
CalculatedPotential,
Score,
ScoreBest,
ScoreCalculated,
ScoresBase,
ScoresViewBase,
)
from .songs import (
Chart,
ChartInfo,
Difficulty,
DifficultyLocalized,
Pack,
PackLocalized,
Song,
SongLocalized,
SongsBase,
SongsViewBase,
)

View File

@ -14,7 +14,7 @@ class ConfigBase(DeclarativeBase, ReprHelper):
class Property(ConfigBase):
__tablename__ = "property"
__tablename__ = "properties"
key: Mapped[str] = mapped_column(TEXT(), primary_key=True)
value: Mapped[str] = mapped_column(TEXT())

View File

@ -5,14 +5,14 @@ from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy_utils import create_view
from .common import ReprHelper
from .songs import Chart, ChartInfo
from .songs import ChartInfo, Difficulty
__all__ = [
"ScoresBase",
"Score",
"ScoresViewBase",
"Calculated",
"Best",
"ScoreCalculated",
"ScoreBest",
"CalculatedPotential",
]
@ -22,7 +22,7 @@ class ScoresBase(DeclarativeBase, ReprHelper):
class Score(ScoresBase):
__tablename__ = "score"
__tablename__ = "scores"
id: Mapped[int] = mapped_column(autoincrement=True, primary_key=True)
song_id: Mapped[str] = mapped_column(TEXT())
@ -33,9 +33,13 @@ class Score(ScoresBase):
lost: Mapped[Optional[int]]
date: Mapped[Optional[int]]
max_recall: Mapped[Optional[int]]
r10_clear_type: Mapped[Optional[int]] = mapped_column(
comment="0: LOST, 1: COMPLETE, 2: HARD_LOST"
modifier: Mapped[Optional[int]] = mapped_column(
comment="0: NORMAL, 1: EASY, 2: HARD"
)
clear_type: Mapped[Optional[int]] = mapped_column(
comment="0: TRACK LOST, 1: NORMAL CLEAR, 2: FULL RECALL, 3: PURE MEMORY, 4: EASY CLEAR, 5: HARD CLEAR"
)
comment: Mapped[Optional[str]] = mapped_column(TEXT())
# How to create an SQL View with SQLAlchemy?
@ -47,40 +51,45 @@ class ScoresViewBase(DeclarativeBase, ReprHelper):
pass
class Calculated(ScoresViewBase):
score_id: Mapped[str]
class ScoreCalculated(ScoresViewBase):
__tablename__ = "scores_calculated"
id: Mapped[int]
song_id: Mapped[str]
rating_class: Mapped[int]
score: Mapped[int]
pure: Mapped[Optional[int]]
shiny_pure: Mapped[Optional[int]]
far: Mapped[Optional[int]]
lost: Mapped[Optional[int]]
date: Mapped[Optional[int]]
max_recall: Mapped[Optional[int]]
r10_clear_type: Mapped[Optional[int]]
shiny_pure: Mapped[Optional[int]]
modifier: Mapped[Optional[int]]
clear_type: Mapped[Optional[int]]
potential: Mapped[float]
comment: Mapped[Optional[str]]
__table__ = create_view(
name="calculated",
name=__tablename__,
selectable=select(
Score.id.label("score_id"),
Chart.song_id,
Chart.rating_class,
Score.id,
Difficulty.song_id,
Difficulty.rating_class,
Score.score,
Score.pure,
(
Score.score
- func.floor(
(Score.pure * 10000000.0 / ChartInfo.notes)
+ (Score.far * 0.5 * 10000000.0 / ChartInfo.notes)
)
).label("shiny_pure"),
Score.far,
Score.lost,
Score.date,
Score.max_recall,
Score.r10_clear_type,
(
Score.score
- func.floor(
(Score.pure * 10000000.0 / ChartInfo.note)
+ (Score.far * 0.5 * 10000000.0 / ChartInfo.note)
)
).label("shiny_pure"),
Score.modifier,
Score.clear_type,
case(
(Score.score >= 10000000, ChartInfo.constant / 10.0 + 2),
(
@ -92,62 +101,73 @@ class Calculated(ScoresViewBase):
0,
),
).label("potential"),
Score.comment,
)
.select_from(Chart)
.select_from(Difficulty)
.join(
ChartInfo,
(Chart.song_id == ChartInfo.song_id)
& (Chart.rating_class == ChartInfo.rating_class),
(Difficulty.song_id == ChartInfo.song_id)
& (Difficulty.rating_class == ChartInfo.rating_class),
)
.join(
Score,
(Chart.song_id == Score.song_id)
& (Chart.rating_class == Score.rating_class),
(Difficulty.song_id == Score.song_id)
& (Difficulty.rating_class == Score.rating_class),
),
metadata=ScoresViewBase.metadata,
cascade_on_drop=False,
)
class Best(ScoresViewBase):
score_id: Mapped[str]
class ScoreBest(ScoresViewBase):
__tablename__ = "scores_best"
id: Mapped[int]
song_id: Mapped[str]
rating_class: Mapped[int]
score: Mapped[int]
pure: Mapped[Optional[int]]
shiny_pure: Mapped[Optional[int]]
far: Mapped[Optional[int]]
lost: Mapped[Optional[int]]
date: Mapped[Optional[int]]
max_recall: Mapped[Optional[int]]
r10_clear_type: Mapped[Optional[int]]
shiny_pure: Mapped[Optional[int]]
modifier: Mapped[Optional[int]]
clear_type: Mapped[Optional[int]]
potential: Mapped[float]
comment: Mapped[Optional[str]]
__table__ = create_view(
name="best",
name=__tablename__,
selectable=select(
*[col for col in inspect(Calculated).columns if col.name != "potential"],
func.max(Calculated.potential).label("potential"),
*[
col
for col in inspect(ScoreCalculated).columns
if col.name != "potential"
],
func.max(ScoreCalculated.potential).label("potential"),
)
.select_from(Calculated)
.group_by(Calculated.song_id, Calculated.rating_class)
.order_by(Calculated.potential.desc()),
.select_from(ScoreCalculated)
.group_by(ScoreCalculated.song_id, ScoreCalculated.rating_class)
.order_by(ScoreCalculated.potential.desc()),
metadata=ScoresViewBase.metadata,
cascade_on_drop=False,
)
class CalculatedPotential(ScoresViewBase):
__tablename__ = "calculated_potential"
b30: Mapped[float]
_select_bests_subquery = (
select(Best.potential.label("b30_sum"))
.order_by(Best.potential.desc())
select(ScoreBest.potential.label("b30_sum"))
.order_by(ScoreBest.potential.desc())
.limit(30)
.subquery()
)
__table__ = create_view(
name="calculated_potential",
name=__tablename__,
selectable=select(func.avg(_select_bests_subquery.c.b30_sum).label("b30")),
metadata=ScoresViewBase.metadata,
cascade_on_drop=False,

View File

@ -1,7 +1,8 @@
from typing import Optional
from sqlalchemy import TEXT, ForeignKey
from sqlalchemy import TEXT, ForeignKey, func, select
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy_utils import create_view
from .common import ReprHelper
@ -11,9 +12,11 @@ __all__ = [
"PackLocalized",
"Song",
"SongLocalized",
"Chart",
"ChartLocalized",
"Difficulty",
"DifficultyLocalized",
"ChartInfo",
"SongsViewBase",
"Chart",
]
@ -22,7 +25,7 @@ class SongsBase(DeclarativeBase, ReprHelper):
class Pack(SongsBase):
__tablename__ = "pack"
__tablename__ = "packs"
id: Mapped[str] = mapped_column(TEXT(), primary_key=True)
name: Mapped[str] = mapped_column(TEXT())
@ -30,9 +33,9 @@ class Pack(SongsBase):
class PackLocalized(SongsBase):
__tablename__ = "pack_localized"
__tablename__ = "packs_localized"
id: Mapped[str] = mapped_column(ForeignKey("pack.id"), primary_key=True)
id: Mapped[str] = mapped_column(ForeignKey("packs.id"), primary_key=True)
name_ja: Mapped[Optional[str]] = mapped_column(TEXT())
name_ko: Mapped[Optional[str]] = mapped_column(TEXT())
name_zh_hans: Mapped[Optional[str]] = mapped_column(TEXT())
@ -44,7 +47,7 @@ class PackLocalized(SongsBase):
class Song(SongsBase):
__tablename__ = "song"
__tablename__ = "songs"
idx: Mapped[int]
id: Mapped[str] = mapped_column(TEXT(), primary_key=True)
@ -67,29 +70,41 @@ class Song(SongsBase):
class SongLocalized(SongsBase):
__tablename__ = "song_localized"
__tablename__ = "songs_localized"
id: Mapped[str] = mapped_column(ForeignKey("song.id"), primary_key=True)
id: Mapped[str] = mapped_column(ForeignKey("songs.id"), primary_key=True)
title_ja: Mapped[Optional[str]] = mapped_column(TEXT())
title_ko: Mapped[Optional[str]] = mapped_column(TEXT())
title_zh_hans: Mapped[Optional[str]] = mapped_column(TEXT())
title_zh_hant: Mapped[Optional[str]] = mapped_column(TEXT())
search_title_ja: Mapped[Optional[str]] = mapped_column(TEXT(), comment="json")
search_title_ko: Mapped[Optional[str]] = mapped_column(TEXT(), comment="json")
search_title_zh_hans: Mapped[Optional[str]] = mapped_column(TEXT(), comment="json")
search_title_zh_hant: Mapped[Optional[str]] = mapped_column(TEXT(), comment="json")
search_artist_ja: Mapped[Optional[str]] = mapped_column(TEXT(), comment="json")
search_artist_ko: Mapped[Optional[str]] = mapped_column(TEXT(), comment="json")
search_artist_zh_hans: Mapped[Optional[str]] = mapped_column(TEXT(), comment="json")
search_artist_zh_hant: Mapped[Optional[str]] = mapped_column(TEXT(), comment="json")
search_title_ja: Mapped[Optional[str]] = mapped_column(TEXT(), comment="JSON array")
search_title_ko: Mapped[Optional[str]] = mapped_column(TEXT(), comment="JSON array")
search_title_zh_hans: Mapped[Optional[str]] = mapped_column(
TEXT(), comment="JSON array"
)
search_title_zh_hant: Mapped[Optional[str]] = mapped_column(
TEXT(), comment="JSON array"
)
search_artist_ja: Mapped[Optional[str]] = mapped_column(
TEXT(), comment="JSON array"
)
search_artist_ko: Mapped[Optional[str]] = mapped_column(
TEXT(), comment="JSON array"
)
search_artist_zh_hans: Mapped[Optional[str]] = mapped_column(
TEXT(), comment="JSON array"
)
search_artist_zh_hant: Mapped[Optional[str]] = mapped_column(
TEXT(), comment="JSON array"
)
source_ja: Mapped[Optional[str]] = mapped_column(TEXT())
source_ko: Mapped[Optional[str]] = mapped_column(TEXT())
source_zh_hans: Mapped[Optional[str]] = mapped_column(TEXT())
source_zh_hant: Mapped[Optional[str]] = mapped_column(TEXT())
class Chart(SongsBase):
__tablename__ = "chart"
class Difficulty(SongsBase):
__tablename__ = "difficulties"
song_id: Mapped[str] = mapped_column(TEXT(), primary_key=True)
rating_class: Mapped[int] = mapped_column(primary_key=True)
@ -110,12 +125,14 @@ class Chart(SongsBase):
date: Mapped[Optional[int]]
class ChartLocalized(SongsBase):
__tablename__ = "chart_localized"
class DifficultyLocalized(SongsBase):
__tablename__ = "difficulties_localized"
song_id: Mapped[str] = mapped_column(ForeignKey("chart.song_id"), primary_key=True)
song_id: Mapped[str] = mapped_column(
ForeignKey("difficulties.song_id"), primary_key=True
)
rating_class: Mapped[str] = mapped_column(
ForeignKey("chart.rating_class"), primary_key=True
ForeignKey("difficulties.rating_class"), primary_key=True
)
title_ja: Mapped[Optional[str]] = mapped_column(TEXT())
title_ko: Mapped[Optional[str]] = mapped_column(TEXT())
@ -128,13 +145,95 @@ class ChartLocalized(SongsBase):
class ChartInfo(SongsBase):
__tablename__ = "chart_info"
__tablename__ = "charts_info"
song_id: Mapped[str] = mapped_column(ForeignKey("chart.song_id"), primary_key=True)
song_id: Mapped[str] = mapped_column(
ForeignKey("difficulties.song_id"), primary_key=True
)
rating_class: Mapped[str] = mapped_column(
ForeignKey("chart.rating_class"), primary_key=True
ForeignKey("difficulties.rating_class"), primary_key=True
)
constant: Mapped[int] = mapped_column(
comment="real_constant * 10. For example, Crimson Throne [FTR] is 10.4, then store 104 here."
)
note: Mapped[Optional[int]]
notes: Mapped[Optional[int]]
class SongsViewBase(DeclarativeBase, ReprHelper):
pass
class Chart(SongsViewBase):
__tablename__ = "charts"
song_idx: Mapped[int]
song_id: Mapped[str]
rating_class: Mapped[int]
rating: Mapped[int]
rating_plus: Mapped[bool]
title: Mapped[str]
artist: Mapped[str]
set: Mapped[str]
bpm: Mapped[Optional[str]]
bpm_base: Mapped[Optional[float]]
audio_preview: Mapped[Optional[int]]
audio_preview_end: Mapped[Optional[int]]
side: Mapped[Optional[int]]
version: Mapped[Optional[str]]
date: Mapped[Optional[int]]
bg: Mapped[Optional[str]]
bg_inverse: Mapped[Optional[str]]
bg_day: Mapped[Optional[str]]
bg_night: Mapped[Optional[str]]
source: Mapped[Optional[str]]
source_copyright: Mapped[Optional[str]]
chart_designer: Mapped[Optional[str]]
jacket_desginer: Mapped[Optional[str]]
audio_override: Mapped[bool]
jacket_override: Mapped[bool]
jacket_night: Mapped[Optional[str]]
constant: Mapped[int]
notes: Mapped[Optional[int]]
__table__ = create_view(
name=__tablename__,
selectable=select(
Song.idx.label("song_idx"),
Difficulty.song_id,
Difficulty.rating_class,
Difficulty.rating,
Difficulty.rating_plus,
func.coalesce(Difficulty.title, Song.title).label("title"),
func.coalesce(Difficulty.artist, Song.artist).label("artist"),
Song.set,
func.coalesce(Difficulty.bpm, Song.bpm).label("bpm"),
func.coalesce(Difficulty.bpm_base, Song.bpm_base).label("bpm_base"),
Song.audio_preview,
Song.audio_preview_end,
Song.side,
func.coalesce(Difficulty.version, Song.version).label("version"),
func.coalesce(Difficulty.date, Song.date).label("date"),
func.coalesce(Difficulty.bg, Song.bg).label("bg"),
func.coalesce(Difficulty.bg_inverse, Song.bg_inverse).label("bg_inverse"),
Song.bg_day,
Song.bg_night,
Song.source,
Song.source_copyright,
Difficulty.chart_designer,
Difficulty.jacket_desginer,
Difficulty.audio_override,
Difficulty.jacket_override,
Difficulty.jacket_night,
ChartInfo.constant,
ChartInfo.notes,
)
.select_from(Difficulty)
.join(
ChartInfo,
(Difficulty.song_id == ChartInfo.song_id)
& (Difficulty.rating_class == ChartInfo.rating_class),
)
.join(Song, Difficulty.song_id == Song.id),
metadata=SongsViewBase.metadata,
cascade_on_drop=False,
)

View File

@ -0,0 +1,111 @@
from typing import List, Union
from sqlalchemy import select
from sqlalchemy.orm import Session
from whoosh.analysis import LowercaseFilter, RegexTokenizer
from whoosh.fields import ID, KEYWORD, TEXT, Schema
from whoosh.filedb.filestore import RamStorage
from whoosh.qparser import FuzzyTermPlugin, MultifieldParser, OrGroup
from .models.songs import Song, SongLocalized
from .utils.search_title import recover_search_title
class Searcher:
def __init__(self):
self.text_analyzer = RegexTokenizer() | LowercaseFilter()
self.song_schema = Schema(
song_id=ID(stored=True, unique=True),
title=TEXT(analyzer=self.text_analyzer, spelling=True),
artist=TEXT(analyzer=self.text_analyzer, spelling=True),
source=TEXT(analyzer=self.text_analyzer, spelling=True),
keywords=KEYWORD(lowercase=True, stored=True, scorable=True),
)
self.storage = RamStorage()
self.index = self.storage.create_index(self.song_schema)
self.default_query_parser = MultifieldParser(
["song_id", "title", "artist", "source", "keywords"],
self.song_schema,
group=OrGroup,
)
self.default_query_parser.add_plugin(FuzzyTermPlugin())
def import_songs(self, session: Session):
writer = self.index.writer()
songs = list(session.scalars(select(Song)))
song_localize_stmt = select(SongLocalized)
for song in songs:
stmt = song_localize_stmt.where(SongLocalized.id == song.id)
sl = session.scalar(stmt)
song_id = song.id
possible_titles: List[Union[str, None]] = [song.title]
possible_artists: List[Union[str, None]] = [song.artist]
possible_sources: List[Union[str, None]] = [song.source]
if sl:
possible_titles.extend(
[sl.title_ja, sl.title_ko, sl.title_zh_hans, sl.title_zh_hant]
)
possible_titles.extend(
recover_search_title(sl.search_title_ja)
+ recover_search_title(sl.search_title_ko)
+ recover_search_title(sl.search_title_zh_hans)
+ recover_search_title(sl.search_title_zh_hant)
)
possible_artists.extend(
recover_search_title(sl.search_artist_ja)
+ recover_search_title(sl.search_artist_ko)
+ recover_search_title(sl.search_artist_zh_hans)
+ recover_search_title(sl.search_artist_zh_hant)
)
possible_sources.extend(
[
sl.source_ja,
sl.source_ko,
sl.source_zh_hans,
sl.source_zh_hant,
]
)
# remove empty items in list
titles = [t for t in possible_titles if t != "" and t is not None]
artists = [t for t in possible_artists if t != "" and t is not None]
sources = [t for t in possible_sources if t != "" and t is not None]
writer.update_document(
song_id=song_id,
title=" ".join(titles),
artist=" ".join(artists),
source=" ".join(sources),
keywords=" ".join([song_id] + titles + artists + sources),
)
writer.commit()
def did_you_mean(self, string: str):
results = set()
with self.index.searcher() as searcher:
corrector_keywords = searcher.corrector("keywords") # type: ignore
corrector_song_id = searcher.corrector("song_id") # type: ignore
corrector_title = searcher.corrector("title") # type: ignore
corrector_artist = searcher.corrector("artist") # type: ignore
corrector_source = searcher.corrector("source") # type: ignore
results.update(corrector_keywords.suggest(string))
results.update(corrector_song_id.suggest(string))
results.update(corrector_title.suggest(string))
results.update(corrector_artist.suggest(string))
results.update(corrector_source.suggest(string))
if string in results:
results.remove(string)
return list(results)
def search(self, string: str, *, limit: int = 10, fuzzy_distance: int = 10):
query_string = f"{string}"
query = self.default_query_parser.parse(query_string)
with self.index.searcher() as searcher:
results = list(searcher.search(query, limit=limit))
return results

View File

View File

@ -0,0 +1,23 @@
from typing import Optional
RATING_CLASS_TEXT_MAP = {
0: "Past",
1: "Present",
2: "Future",
3: "Beyond",
}
RATING_CLASS_SHORT_TEXT_MAP = {
0: "PST",
1: "PRS",
2: "FTR",
3: "BYD",
}
def rating_class_to_text(rating_class: int) -> Optional[str]:
return RATING_CLASS_TEXT_MAP.get(rating_class)
def rating_class_to_short_text(rating_class: int) -> Optional[str]:
return RATING_CLASS_SHORT_TEXT_MAP.get(rating_class)

View File

@ -0,0 +1,29 @@
from typing import Any, Sequence
SCORE_GRADE_FLOOR = [9900000, 9800000, 9500000, 9200000, 8900000, 8600000, 0]
SCORE_GRADE_TEXTS = ["EX+", "EX", "AA", "A", "B", "C", "D"]
def zip_score_grade(score: int, __seq: Sequence, default: Any = "__PRESERVE__"):
"""
zip_score_grade is a simple wrapper that equals to:
```py
for score_floor, val in zip(SCORE_GRADE_FLOOR, __seq):
if score >= score_floor:
return val
return seq[-1] if default == "__PRESERVE__" else default
```
Could be useful in specific cases.
"""
return next(
(
val
for score_floor, val in zip(SCORE_GRADE_FLOOR, __seq)
if score >= score_floor
),
__seq[-1] if default == "__PRESERVE__" else default,
)
def score_to_grade_text(score: int) -> str:
return zip_score_grade(score, SCORE_GRADE_TEXTS)

View File

@ -0,0 +1,6 @@
import json
from typing import List, Optional
def recover_search_title(db_value: Optional[str]) -> List[str]:
return json.loads(db_value) if db_value else []