mirror of
https://github.com/283375/arcaea-offline.git
synced 2025-07-01 12:16:26 +00:00
Compare commits
38 Commits
v0.1.0
...
c1f83eff55
Author | SHA1 | Date | |
---|---|---|---|
c1f83eff55
|
|||
3349620b3a
|
|||
3867b273c7
|
|||
0c292fc3be
|
|||
2c73570a65
|
|||
ec9926993c
|
|||
bed9e14368
|
|||
f72410d0bc
|
|||
64fd328ff8
|
|||
a97031e717
|
|||
c6014bd1b6
|
|||
c909886902
|
|||
589826004b
|
|||
4899e0383f
|
|||
5796d9a80f
|
|||
6258a55ba4
|
|||
d763896c0c
|
|||
c2a171a6d3
|
|||
d36a858464
|
|||
66a4cc8cff
|
|||
00255e5b74
|
|||
844568db1a
|
|||
167f72f9bb
|
|||
01bfd0f350
|
|||
0d882fa138
|
|||
daf2a46632
|
|||
35e6fde664
|
|||
6b8a3e1565
|
|||
ca9576160f
|
|||
e948b6abea
|
|||
1282993810
|
|||
b561cc51e0
|
|||
316b02cd1b
|
|||
b180976284
|
|||
54851549d5
|
|||
262495a580
|
|||
a6c1e594c4
|
|||
d979b6cd10
|
7
LICENSE
Normal file
7
LICENSE
Normal file
@ -0,0 +1,7 @@
|
||||
Copyright 2023-now Lin He
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
@ -13,6 +13,7 @@ dependencies = [
|
||||
"beautifulsoup4==4.12.2",
|
||||
"SQLAlchemy==2.0.20",
|
||||
"SQLAlchemy-Utils==0.41.1",
|
||||
"Whoosh==2.7.4",
|
||||
]
|
||||
classifiers = [
|
||||
"Development Status :: 3 - Alpha",
|
||||
|
@ -1,3 +1,4 @@
|
||||
beautifulsoup4==4.12.2
|
||||
SQLAlchemy==2.0.20
|
||||
SQLAlchemy-Utils==0.41.1
|
||||
Whoosh==2.7.4
|
||||
|
@ -1,48 +0,0 @@
|
||||
from decimal import Decimal
|
||||
from math import floor
|
||||
from typing import Dict, List
|
||||
|
||||
from .models.scores import Calculated
|
||||
|
||||
|
||||
def calculate_score_range(notes: int, pure: int, far: int):
|
||||
single_note_score = 10000000 / Decimal(notes)
|
||||
|
||||
actual_score = floor(
|
||||
single_note_score * pure + single_note_score * Decimal(0.5) * far
|
||||
)
|
||||
return (actual_score, actual_score + pure)
|
||||
|
||||
|
||||
def calculate_potential(_constant: float, score: int) -> Decimal:
|
||||
constant = Decimal(_constant)
|
||||
if score >= 10000000:
|
||||
return constant + 2
|
||||
elif score >= 9800000:
|
||||
return constant + 1 + (Decimal(score - 9800000) / 200000)
|
||||
else:
|
||||
return max(Decimal(0), constant + (Decimal(score - 9500000) / 300000))
|
||||
|
||||
|
||||
def calculate_shiny_pure(notes: int, score: int, pure: int, far: int) -> int:
|
||||
single_note_score = 10000000 / Decimal(notes)
|
||||
actual_score = single_note_score * pure + single_note_score * Decimal(0.5) * far
|
||||
return score - floor(actual_score)
|
||||
|
||||
|
||||
def get_b30_calculated_list(calculated_list: List[Calculated]) -> List[Calculated]:
|
||||
best_scores: Dict[str, Calculated] = {}
|
||||
for calculated in calculated_list:
|
||||
key = f"{calculated.song_id}_{calculated.rating_class}"
|
||||
stored = best_scores.get(key)
|
||||
if stored and stored.score < calculated.score or not stored:
|
||||
best_scores[key] = calculated
|
||||
ret_list = list(best_scores.values())
|
||||
ret_list = sorted(ret_list, key=lambda c: c.potential, reverse=True)[:30]
|
||||
return ret_list
|
||||
|
||||
|
||||
def calculate_b30(calculated_list: List[Calculated]) -> Decimal:
|
||||
ptt_list = [Decimal(c.potential) for c in get_b30_calculated_list(calculated_list)]
|
||||
sum_ptt_list = sum(ptt_list)
|
||||
return (sum_ptt_list / len(ptt_list)) if sum_ptt_list else Decimal("0.0")
|
8
src/arcaea_offline/calculate/__init__.py
Normal file
8
src/arcaea_offline/calculate/__init__.py
Normal file
@ -0,0 +1,8 @@
|
||||
from .b30 import calculate_b30, get_b30_calculated_list
|
||||
from .score import (
|
||||
calculate_constants_from_play_rating,
|
||||
calculate_play_rating,
|
||||
calculate_score_modifier,
|
||||
calculate_score_range,
|
||||
calculate_shiny_pure,
|
||||
)
|
24
src/arcaea_offline/calculate/b30.py
Normal file
24
src/arcaea_offline/calculate/b30.py
Normal file
@ -0,0 +1,24 @@
|
||||
from decimal import Decimal
|
||||
from typing import Dict, List
|
||||
|
||||
from ..models.scores import ScoreCalculated
|
||||
|
||||
|
||||
def get_b30_calculated_list(
|
||||
calculated_list: List[ScoreCalculated],
|
||||
) -> List[ScoreCalculated]:
|
||||
best_scores: Dict[str, ScoreCalculated] = {}
|
||||
for calculated in calculated_list:
|
||||
key = f"{calculated.song_id}_{calculated.rating_class}"
|
||||
stored = best_scores.get(key)
|
||||
if stored and stored.score < calculated.score or not stored:
|
||||
best_scores[key] = calculated
|
||||
ret_list = list(best_scores.values())
|
||||
ret_list = sorted(ret_list, key=lambda c: c.potential, reverse=True)[:30]
|
||||
return ret_list
|
||||
|
||||
|
||||
def calculate_b30(calculated_list: List[ScoreCalculated]) -> Decimal:
|
||||
ptt_list = [Decimal(c.potential) for c in get_b30_calculated_list(calculated_list)]
|
||||
sum_ptt_list = sum(ptt_list)
|
||||
return (sum_ptt_list / len(ptt_list)) if sum_ptt_list else Decimal("0.0")
|
67
src/arcaea_offline/calculate/score.py
Normal file
67
src/arcaea_offline/calculate/score.py
Normal file
@ -0,0 +1,67 @@
|
||||
from dataclasses import dataclass
|
||||
from decimal import Decimal
|
||||
from math import floor
|
||||
from typing import Tuple, Union
|
||||
|
||||
|
||||
def calculate_score_range(notes: int, pure: int, far: int):
|
||||
single_note_score = 10000000 / Decimal(notes)
|
||||
|
||||
actual_score = floor(
|
||||
single_note_score * pure + single_note_score * Decimal(0.5) * far
|
||||
)
|
||||
return (actual_score, actual_score + pure)
|
||||
|
||||
|
||||
def calculate_score_modifier(score: int) -> Decimal:
|
||||
if score >= 10000000:
|
||||
return Decimal(2)
|
||||
elif score >= 9800000:
|
||||
return Decimal(1) + (Decimal(score - 9800000) / 200000)
|
||||
else:
|
||||
return Decimal(score - 9500000) / 300000
|
||||
|
||||
|
||||
def calculate_play_rating(
|
||||
constant: Union[Decimal, str, float, int], score: int
|
||||
) -> Decimal:
|
||||
constant = Decimal(constant)
|
||||
score_modifier = calculate_score_modifier(score)
|
||||
return max(Decimal(0), constant + score_modifier)
|
||||
|
||||
|
||||
def calculate_shiny_pure(notes: int, score: int, pure: int, far: int) -> int:
|
||||
single_note_score = 10000000 / Decimal(notes)
|
||||
actual_score = single_note_score * pure + single_note_score * Decimal(0.5) * far
|
||||
return score - floor(actual_score)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ConstantsFromPlayRatingResult:
|
||||
EXPlus: Tuple[Decimal, Decimal]
|
||||
EX: Tuple[Decimal, Decimal]
|
||||
AA: Tuple[Decimal, Decimal]
|
||||
A: Tuple[Decimal, Decimal]
|
||||
B: Tuple[Decimal, Decimal]
|
||||
C: Tuple[Decimal, Decimal]
|
||||
|
||||
|
||||
def calculate_constants_from_play_rating(play_rating: Union[Decimal, str, float, int]):
|
||||
play_rating = Decimal(play_rating)
|
||||
|
||||
ranges = []
|
||||
for upperScore, lowerScore in [
|
||||
(10000000, 9900000),
|
||||
(9899999, 9800000),
|
||||
(9799999, 9500000),
|
||||
(9499999, 9200000),
|
||||
(9199999, 8900000),
|
||||
(8899999, 8600000),
|
||||
]:
|
||||
upperScoreModifier = calculate_score_modifier(upperScore)
|
||||
lowerScoreModifier = calculate_score_modifier(lowerScore)
|
||||
ranges.append(
|
||||
(play_rating - upperScoreModifier, play_rating - lowerScoreModifier)
|
||||
)
|
||||
|
||||
return ConstantsFromPlayRatingResult(*ranges)
|
175
src/arcaea_offline/calculate/world_step.py
Normal file
175
src/arcaea_offline/calculate/world_step.py
Normal file
@ -0,0 +1,175 @@
|
||||
from decimal import Decimal
|
||||
from typing import Literal, Optional, Union
|
||||
|
||||
|
||||
class PlayResult:
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
play_rating: Union[Decimal, str, float, int],
|
||||
partner_step: Union[Decimal, str, float, int],
|
||||
):
|
||||
self.__play_rating = play_rating
|
||||
self.__partner_step = partner_step
|
||||
|
||||
@property
|
||||
def play_rating(self):
|
||||
return Decimal(self.__play_rating)
|
||||
|
||||
@property
|
||||
def partner_step(self):
|
||||
return Decimal(self.__partner_step)
|
||||
|
||||
|
||||
class PartnerBonus:
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
step_bonus: Union[Decimal, str, float, int] = Decimal("0.0"),
|
||||
final_multiplier: Union[Decimal, str, float, int] = Decimal("1.0"),
|
||||
):
|
||||
self.__step_bonus = step_bonus
|
||||
self.__final_multiplier = final_multiplier
|
||||
|
||||
@property
|
||||
def step_bonus(self):
|
||||
return Decimal(self.__step_bonus)
|
||||
|
||||
@property
|
||||
def final_multiplier(self):
|
||||
return Decimal(self.__final_multiplier)
|
||||
|
||||
|
||||
AwakenedIlithPartnerBonus = PartnerBonus(step_bonus="6.0")
|
||||
AwakenedEtoPartnerBonus = PartnerBonus(step_bonus="7.0")
|
||||
AwakenedLunaPartnerBonus = PartnerBonus(step_bonus="7.0")
|
||||
|
||||
|
||||
class AwakenedAyuPartnerBonus(PartnerBonus):
|
||||
def __init__(self, step_bonus: Union[Decimal, str, float, int]):
|
||||
super().__init__(step_bonus=step_bonus)
|
||||
|
||||
|
||||
AmaneBelowExPartnerBonus = PartnerBonus(final_multiplier="0.5")
|
||||
|
||||
|
||||
class MithraTerceraPartnerBonus(PartnerBonus):
|
||||
def __init__(self, step_bonus: int):
|
||||
super().__init__(step_bonus=step_bonus)
|
||||
|
||||
|
||||
MayaPartnerBonus = PartnerBonus(final_multiplier="2.0")
|
||||
|
||||
|
||||
class StepBooster:
|
||||
def final_value(self) -> Decimal:
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class LegacyMapStepBooster(StepBooster):
|
||||
def __init__(
|
||||
self,
|
||||
stamina: Literal[2, 4, 6],
|
||||
fragments: Literal[100, 250, 500, None],
|
||||
):
|
||||
self.stamina = stamina
|
||||
self.fragments = fragments
|
||||
|
||||
@property
|
||||
def stamina(self):
|
||||
return self.__stamina
|
||||
|
||||
@stamina.setter
|
||||
def stamina(self, value: Literal[2, 4, 6]):
|
||||
if value not in [2, 4, 6]:
|
||||
raise ValueError("stamina can only be one of [2, 4, 6]")
|
||||
self.__stamina = value
|
||||
|
||||
@property
|
||||
def fragments(self):
|
||||
return self.__fragments
|
||||
|
||||
@fragments.setter
|
||||
def fragments(self, value: Literal[100, 250, 500, None]):
|
||||
if value not in [100, 250, 500, None]:
|
||||
raise ValueError("fragments can only be one of [100, 250, 500, None]")
|
||||
self.__fragments = value
|
||||
|
||||
def final_value(self) -> Decimal:
|
||||
stamina_multiplier = Decimal(self.stamina)
|
||||
if self.fragments is None:
|
||||
fragments_multiplier = Decimal(1)
|
||||
elif self.fragments == 100:
|
||||
fragments_multiplier = Decimal("1.1")
|
||||
elif self.fragments == 250:
|
||||
fragments_multiplier = Decimal("1.25")
|
||||
elif self.fragments == 500:
|
||||
fragments_multiplier = Decimal("1.5")
|
||||
return stamina_multiplier * fragments_multiplier
|
||||
|
||||
|
||||
class MemoriesStepBooster(StepBooster):
|
||||
def final_value(self) -> Decimal:
|
||||
return Decimal("4.0")
|
||||
|
||||
|
||||
def calculate_step_original(
|
||||
play_result: PlayResult,
|
||||
*,
|
||||
partner_bonus: Optional[PartnerBonus] = None,
|
||||
step_booster: Optional[StepBooster] = None,
|
||||
):
|
||||
ptt = play_result.play_rating
|
||||
step = play_result.partner_step
|
||||
if partner_bonus:
|
||||
partner_bonus_step = partner_bonus.step_bonus
|
||||
partner_bonus_multiplier = partner_bonus.final_multiplier
|
||||
else:
|
||||
partner_bonus_step = Decimal("0")
|
||||
partner_bonus_multiplier = Decimal("1.0")
|
||||
|
||||
play_result = (Decimal("2.45") * ptt.sqrt() + Decimal("2.5")) * (step / 50)
|
||||
play_result += partner_bonus_step
|
||||
play_result *= partner_bonus_multiplier
|
||||
if step_booster:
|
||||
play_result *= step_booster.final_value()
|
||||
|
||||
return play_result
|
||||
|
||||
|
||||
def calculate_step(
|
||||
play_result: PlayResult,
|
||||
*,
|
||||
partner_bonus: Optional[PartnerBonus] = None,
|
||||
step_booster: Optional[StepBooster] = None,
|
||||
):
|
||||
play_result_original = calculate_step_original(
|
||||
play_result, partner_bonus=partner_bonus, step_booster=step_booster
|
||||
)
|
||||
|
||||
return round(play_result_original, 1)
|
||||
|
||||
|
||||
def calculate_play_rating_from_step(
|
||||
step: Union[Decimal, str, int, float],
|
||||
partner_step_value: Union[Decimal, str, int, float],
|
||||
*,
|
||||
partner_bonus: Optional[PartnerBonus] = None,
|
||||
step_booster: Optional[StepBooster] = None,
|
||||
):
|
||||
step = Decimal(step)
|
||||
partner_step_value = Decimal(partner_step_value)
|
||||
|
||||
# get original play result
|
||||
if partner_bonus and partner_bonus.final_multiplier:
|
||||
step /= partner_bonus.final_multiplier
|
||||
if step_booster:
|
||||
step /= step_booster.final_value()
|
||||
|
||||
if partner_bonus and partner_bonus.step_bonus:
|
||||
step -= partner_bonus.step_bonus
|
||||
|
||||
play_rating_sqrt = (Decimal(50) * step - Decimal("2.5") * partner_step_value) / (
|
||||
Decimal("2.45") * partner_step_value
|
||||
)
|
||||
return play_rating_sqrt**2 if play_rating_sqrt >= 0 else -(play_rating_sqrt**2)
|
@ -1,19 +1,47 @@
|
||||
from sqlalchemy import Engine, select
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
import logging
|
||||
import math
|
||||
from typing import Iterable, List, Optional, Type, Union
|
||||
|
||||
from sqlalchemy import Engine, func, inspect, select
|
||||
from sqlalchemy.orm import DeclarativeBase, InstrumentedAttribute, sessionmaker
|
||||
|
||||
from .calculate import calculate_score_modifier
|
||||
from .external.arcsong.arcsong_json import ArcSongJsonBuilder
|
||||
from .external.exports import ScoreExport, exporters
|
||||
from .models.config import *
|
||||
from .models.scores import *
|
||||
from .models.songs import *
|
||||
from .singleton import Singleton
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Database(metaclass=Singleton):
|
||||
def __init__(self, engine: Engine):
|
||||
self.engine = engine
|
||||
def __init__(self, engine: Optional[Engine]):
|
||||
try:
|
||||
self.__engine
|
||||
except AttributeError:
|
||||
self.__engine = None
|
||||
|
||||
if engine is None:
|
||||
if isinstance(self.engine, Engine):
|
||||
return
|
||||
raise ValueError("No sqlalchemy.Engine instance specified before.")
|
||||
elif isinstance(engine, Engine):
|
||||
if isinstance(self.engine, Engine):
|
||||
logger.warning(
|
||||
f"A sqlalchemy.Engine instance {self.engine} has been specified "
|
||||
f"and will be replaced to {engine}"
|
||||
)
|
||||
self.engine = engine
|
||||
else:
|
||||
raise ValueError(
|
||||
f"A sqlalchemy.Engine instance expected, not {repr(engine)}"
|
||||
)
|
||||
|
||||
@property
|
||||
def engine(self):
|
||||
return self.__engine
|
||||
def engine(self) -> Engine:
|
||||
return self.__engine # type: ignore
|
||||
|
||||
@engine.setter
|
||||
def engine(self, value: Engine):
|
||||
@ -26,15 +54,19 @@ class Database(metaclass=Singleton):
|
||||
def sessionmaker(self):
|
||||
return self.__sessionmaker
|
||||
|
||||
# region init
|
||||
|
||||
def init(self, checkfirst: bool = True):
|
||||
# create tables & views
|
||||
if checkfirst:
|
||||
# > https://github.com/kvesteri/sqlalchemy-utils/issues/396
|
||||
# > view.create_view() causes DuplicateTableError on Base.metadata.create_all(checkfirst=True)
|
||||
# so if `checkfirst` is True, drop these views before creating
|
||||
SongsViewBase.metadata.drop_all(self.engine)
|
||||
ScoresViewBase.metadata.drop_all(self.engine)
|
||||
|
||||
SongsBase.metadata.create_all(self.engine, checkfirst=checkfirst)
|
||||
SongsViewBase.metadata.create_all(self.engine)
|
||||
ScoresBase.metadata.create_all(self.engine, checkfirst=checkfirst)
|
||||
ScoresViewBase.metadata.create_all(self.engine)
|
||||
ConfigBase.metadata.create_all(self.engine, checkfirst=checkfirst)
|
||||
@ -44,5 +76,322 @@ class Database(metaclass=Singleton):
|
||||
stmt = select(Property.value).where(Property.key == "version")
|
||||
result = session.execute(stmt).fetchone()
|
||||
if not checkfirst or not result:
|
||||
session.add(Property(key="version", value="2"))
|
||||
session.add(Property(key="version", value="4"))
|
||||
session.commit()
|
||||
|
||||
def check_init(self) -> bool:
|
||||
# check table exists
|
||||
expect_tables = (
|
||||
list(SongsBase.metadata.tables.keys())
|
||||
+ list(ScoresBase.metadata.tables.keys())
|
||||
+ list(ConfigBase.metadata.tables.keys())
|
||||
+ [
|
||||
Chart.__tablename__,
|
||||
ScoreCalculated.__tablename__,
|
||||
ScoreBest.__tablename__,
|
||||
CalculatedPotential.__tablename__,
|
||||
]
|
||||
)
|
||||
return all(inspect(self.engine).has_table(t) for t in expect_tables)
|
||||
|
||||
# endregion
|
||||
|
||||
def version(self) -> Union[int, None]:
|
||||
stmt = select(Property).where(Property.key == "version")
|
||||
with self.sessionmaker() as session:
|
||||
result = session.scalar(stmt)
|
||||
return None if result is None else int(result.value)
|
||||
|
||||
# region Pack
|
||||
|
||||
def get_packs(self):
|
||||
stmt = select(Pack)
|
||||
with self.sessionmaker() as session:
|
||||
results = list(session.scalars(stmt))
|
||||
return results
|
||||
|
||||
def get_pack(self, pack_id: str):
|
||||
stmt = select(Pack).where(Pack.id == pack_id)
|
||||
with self.sessionmaker() as session:
|
||||
result = session.scalar(stmt)
|
||||
return result
|
||||
|
||||
def get_pack_localized(self, pack_id: str):
|
||||
stmt = select(PackLocalized).where(PackLocalized.id == pack_id)
|
||||
with self.sessionmaker() as session:
|
||||
result = session.scalar(stmt)
|
||||
return result
|
||||
|
||||
# endregion
|
||||
|
||||
# region Song
|
||||
|
||||
def get_songs(self):
|
||||
stmt = select(Song)
|
||||
with self.sessionmaker() as session:
|
||||
results = list(session.scalars(stmt))
|
||||
return results
|
||||
|
||||
def get_songs_by_pack_id(self, pack_id: str):
|
||||
stmt = select(Song).where(Song.set == pack_id)
|
||||
with self.sessionmaker() as session:
|
||||
results = list(session.scalars(stmt))
|
||||
return results
|
||||
|
||||
def get_song(self, song_id: str):
|
||||
stmt = select(Song).where(Song.id == song_id)
|
||||
with self.sessionmaker() as session:
|
||||
result = session.scalar(stmt)
|
||||
return result
|
||||
|
||||
def get_song_localized(self, song_id: str):
|
||||
stmt = select(SongLocalized).where(SongLocalized.id == song_id)
|
||||
with self.sessionmaker() as session:
|
||||
result = session.scalar(stmt)
|
||||
return result
|
||||
|
||||
# endregion
|
||||
|
||||
# region Difficulty
|
||||
|
||||
def get_difficulties(self):
|
||||
stmt = select(Difficulty)
|
||||
with self.sessionmaker() as session:
|
||||
results = list(session.scalars(stmt))
|
||||
return results
|
||||
|
||||
def get_difficulties_by_song_id(self, song_id: str):
|
||||
stmt = select(Difficulty).where(Difficulty.song_id == song_id)
|
||||
with self.sessionmaker() as session:
|
||||
results = list(session.scalars(stmt))
|
||||
return results
|
||||
|
||||
def get_difficulties_localized_by_song_id(self, song_id: str):
|
||||
stmt = select(DifficultyLocalized).where(DifficultyLocalized.song_id == song_id)
|
||||
with self.sessionmaker() as session:
|
||||
results = list(session.scalars(stmt))
|
||||
return results
|
||||
|
||||
def get_difficulty(self, song_id: str, rating_class: int):
|
||||
stmt = select(Difficulty).where(
|
||||
(Difficulty.song_id == song_id) & (Difficulty.rating_class == rating_class)
|
||||
)
|
||||
with self.sessionmaker() as session:
|
||||
result = session.scalar(stmt)
|
||||
return result
|
||||
|
||||
def get_difficulty_localized(self, song_id: str, rating_class: int):
|
||||
stmt = select(DifficultyLocalized).where(
|
||||
(DifficultyLocalized.song_id == song_id)
|
||||
& (DifficultyLocalized.rating_class == rating_class)
|
||||
)
|
||||
with self.sessionmaker() as session:
|
||||
result = session.scalar(stmt)
|
||||
return result
|
||||
|
||||
# endregion
|
||||
|
||||
# region ChartInfo
|
||||
|
||||
def get_chart_infos(self):
|
||||
stmt = select(ChartInfo)
|
||||
with self.sessionmaker() as session:
|
||||
results = list(session.scalars(stmt))
|
||||
return results
|
||||
|
||||
def get_chart_infos_by_song_id(self, song_id: str):
|
||||
stmt = select(ChartInfo).where(ChartInfo.song_id == song_id)
|
||||
with self.sessionmaker() as session:
|
||||
results = list(session.scalars(stmt))
|
||||
return results
|
||||
|
||||
def get_chart_info(self, song_id: str, rating_class: int):
|
||||
stmt = select(ChartInfo).where(
|
||||
(ChartInfo.song_id == song_id) & (ChartInfo.rating_class == rating_class)
|
||||
)
|
||||
with self.sessionmaker() as session:
|
||||
result = session.scalar(stmt)
|
||||
return result
|
||||
|
||||
# endregion
|
||||
|
||||
# region Chart
|
||||
|
||||
def get_charts_by_pack_id(self, pack_id: str):
|
||||
stmt = select(Chart).where(Chart.set == pack_id)
|
||||
with self.sessionmaker() as session:
|
||||
results = list(session.scalars(stmt))
|
||||
return results
|
||||
|
||||
def get_charts_by_song_id(self, song_id: str):
|
||||
stmt = select(Chart).where(Chart.song_id == song_id)
|
||||
with self.sessionmaker() as session:
|
||||
results = list(session.scalars(stmt))
|
||||
return results
|
||||
|
||||
def get_charts_by_constant(self, constant: int):
|
||||
stmt = select(Chart).where(Chart.constant == constant)
|
||||
with self.sessionmaker() as session:
|
||||
results = list(session.scalars(stmt))
|
||||
return results
|
||||
|
||||
def get_chart(self, song_id: str, rating_class: int):
|
||||
stmt = select(Chart).where(
|
||||
(Chart.song_id == song_id) & (Chart.rating_class == rating_class)
|
||||
)
|
||||
with self.sessionmaker() as session:
|
||||
result = session.scalar(stmt)
|
||||
return result
|
||||
|
||||
# endregion
|
||||
|
||||
# region Score
|
||||
|
||||
def get_scores(self):
|
||||
stmt = select(Score)
|
||||
with self.sessionmaker() as session:
|
||||
results = list(session.scalars(stmt))
|
||||
return results
|
||||
|
||||
def get_score(self, score_id: int):
|
||||
stmt = select(Score).where(Score.id == score_id)
|
||||
with self.sessionmaker() as session:
|
||||
result = session.scalar(stmt)
|
||||
return result
|
||||
|
||||
def get_score_best(self, song_id: str, rating_class: int):
|
||||
stmt = select(ScoreBest).where(
|
||||
(ScoreBest.song_id == song_id) & (ScoreBest.rating_class == rating_class)
|
||||
)
|
||||
with self.sessionmaker() as session:
|
||||
result = session.scalar(stmt)
|
||||
return result
|
||||
|
||||
def insert_score(self, score: Score):
|
||||
with self.sessionmaker() as session:
|
||||
session.add(score)
|
||||
session.commit()
|
||||
|
||||
def insert_scores(self, scores: Iterable[Score]):
|
||||
with self.sessionmaker() as session:
|
||||
session.add_all(scores)
|
||||
session.commit()
|
||||
|
||||
def update_score(self, score: Score):
|
||||
if score.id is None:
|
||||
raise ValueError(
|
||||
"Cannot determine which score to update, please specify `score.id`"
|
||||
)
|
||||
with self.sessionmaker() as session:
|
||||
session.merge(score)
|
||||
session.commit()
|
||||
|
||||
def delete_score(self, score: Score):
|
||||
with self.sessionmaker() as session:
|
||||
session.delete(score)
|
||||
session.commit()
|
||||
|
||||
def recommend_charts(self, play_result: float, bounds: float = 0.1):
|
||||
base_constant = math.ceil(play_result * 10)
|
||||
|
||||
results = []
|
||||
results_id = []
|
||||
with self.sessionmaker() as session:
|
||||
for constant in range(base_constant - 20, base_constant + 1):
|
||||
# from Pure Memory(EX+) to AA
|
||||
score_modifier = (play_result * 10 - constant) / 10
|
||||
if score_modifier >= 2.0:
|
||||
min_score = 10000000
|
||||
elif score_modifier >= 1.0:
|
||||
min_score = 200000 * (score_modifier - 1) + 9800000
|
||||
else:
|
||||
min_score = 300000 * score_modifier + 9500000
|
||||
min_score = int(min_score)
|
||||
|
||||
charts = self.get_charts_by_constant(constant)
|
||||
for chart in charts:
|
||||
score_best_stmt = select(ScoreBest).where(
|
||||
(ScoreBest.song_id == chart.song_id)
|
||||
& (ScoreBest.rating_class == chart.rating_class)
|
||||
& (ScoreBest.score >= min_score)
|
||||
& (play_result - bounds < ScoreBest.potential)
|
||||
& (ScoreBest.potential < play_result + bounds)
|
||||
)
|
||||
if session.scalar(score_best_stmt):
|
||||
chart_id = f"{chart.song_id},{chart.rating_class}"
|
||||
if chart_id not in results_id:
|
||||
results.append(chart)
|
||||
results_id.append(chart_id)
|
||||
|
||||
return results
|
||||
|
||||
# endregion
|
||||
|
||||
def get_b30(self):
|
||||
stmt = select(CalculatedPotential.b30).select_from(CalculatedPotential)
|
||||
with self.sessionmaker() as session:
|
||||
result = session.scalar(stmt)
|
||||
return result
|
||||
|
||||
# region COUNT
|
||||
|
||||
def __count_table(self, base: Type[DeclarativeBase]):
|
||||
stmt = select(func.count()).select_from(base)
|
||||
with self.sessionmaker() as session:
|
||||
result = session.scalar(stmt)
|
||||
return result or 0
|
||||
|
||||
def __count_column(self, column: InstrumentedAttribute):
|
||||
stmt = select(func.count(column))
|
||||
with self.sessionmaker() as session:
|
||||
result = session.scalar(stmt)
|
||||
return result or 0
|
||||
|
||||
def count_packs(self):
|
||||
return self.__count_column(Pack.id)
|
||||
|
||||
def count_songs(self):
|
||||
return self.__count_column(Song.id)
|
||||
|
||||
def count_difficulties(self):
|
||||
return self.__count_table(Difficulty)
|
||||
|
||||
def count_chart_infos(self):
|
||||
return self.__count_table(ChartInfo)
|
||||
|
||||
def count_complete_chart_infos(self):
|
||||
stmt = (
|
||||
select(func.count())
|
||||
.select_from(ChartInfo)
|
||||
.where((ChartInfo.constant != None) & (ChartInfo.notes != None))
|
||||
)
|
||||
with self.sessionmaker() as session:
|
||||
result = session.scalar(stmt)
|
||||
return result or 0
|
||||
|
||||
def count_charts(self):
|
||||
return self.__count_table(Chart)
|
||||
|
||||
def count_scores(self):
|
||||
return self.__count_column(Score.id)
|
||||
|
||||
def count_scores_calculated(self):
|
||||
return self.__count_table(ScoreCalculated)
|
||||
|
||||
def count_scores_best(self):
|
||||
return self.__count_table(ScoreBest)
|
||||
|
||||
# endregion
|
||||
|
||||
# region export
|
||||
|
||||
def export_scores(self) -> List[ScoreExport]:
|
||||
scores = self.get_scores()
|
||||
return [exporters.score(score) for score in scores]
|
||||
|
||||
def generate_arcsong(self):
|
||||
with self.sessionmaker() as session:
|
||||
arcsong = ArcSongJsonBuilder(session).generate_arcsong_json()
|
||||
return arcsong
|
||||
|
||||
# endregion
|
||||
|
1
src/arcaea_offline/external/andreal/__init__.py
vendored
Normal file
1
src/arcaea_offline/external/andreal/__init__.py
vendored
Normal file
@ -0,0 +1 @@
|
||||
from .api_data import AndrealImageGeneratorApiDataConverter
|
14
src/arcaea_offline/external/andreal/account.py
vendored
Normal file
14
src/arcaea_offline/external/andreal/account.py
vendored
Normal file
@ -0,0 +1,14 @@
|
||||
class AndrealImageGeneratorAccount:
|
||||
def __init__(
|
||||
self,
|
||||
name: str = "Player",
|
||||
code: int = 123456789,
|
||||
rating: int = -1,
|
||||
character: int = 5,
|
||||
character_uncapped: bool = False,
|
||||
):
|
||||
self.name = name
|
||||
self.code = code
|
||||
self.rating = rating
|
||||
self.character = character
|
||||
self.character_uncapped = character_uncapped
|
94
src/arcaea_offline/external/andreal/api_data.py
vendored
Normal file
94
src/arcaea_offline/external/andreal/api_data.py
vendored
Normal file
@ -0,0 +1,94 @@
|
||||
from typing import Optional, Union
|
||||
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from ...models import CalculatedPotential, ScoreBest, ScoreCalculated
|
||||
from .account import AndrealImageGeneratorAccount
|
||||
|
||||
|
||||
class AndrealImageGeneratorApiDataConverter:
|
||||
def __init__(
|
||||
self,
|
||||
session: Session,
|
||||
account: AndrealImageGeneratorAccount = AndrealImageGeneratorAccount(),
|
||||
):
|
||||
self.session = session
|
||||
self.account = account
|
||||
|
||||
def account_info(self):
|
||||
return {
|
||||
"code": self.account.code,
|
||||
"name": self.account.name,
|
||||
"is_char_uncapped": self.account.character_uncapped,
|
||||
"rating": self.account.rating,
|
||||
"character": self.account.character,
|
||||
}
|
||||
|
||||
def score(self, score: Union[ScoreCalculated, ScoreBest]):
|
||||
return {
|
||||
"score": score.score,
|
||||
"health": 75,
|
||||
"rating": score.potential,
|
||||
"song_id": score.song_id,
|
||||
"modifier": score.modifier or 0,
|
||||
"difficulty": score.rating_class,
|
||||
"clear_type": score.clear_type or 1,
|
||||
"best_clear_type": score.clear_type or 1,
|
||||
"time_played": score.date * 1000 if score.date else 0,
|
||||
"near_count": score.far,
|
||||
"miss_count": score.lost,
|
||||
"perfect_count": score.pure,
|
||||
"shiny_perfect_count": score.shiny_pure,
|
||||
}
|
||||
|
||||
def user_info(self, score: Optional[ScoreCalculated] = None):
|
||||
if not score:
|
||||
score = self.session.scalar(
|
||||
select(ScoreCalculated).order_by(ScoreCalculated.date.desc()).limit(1)
|
||||
)
|
||||
if not score:
|
||||
raise ValueError("No score available.")
|
||||
|
||||
return {
|
||||
"content": {
|
||||
"account_info": self.account_info(),
|
||||
"recent_score": [self.score(score)],
|
||||
}
|
||||
}
|
||||
|
||||
def user_best(self, song_id: str, rating_class: int):
|
||||
score = self.session.scalar(
|
||||
select(ScoreBest).where(
|
||||
(ScoreBest.song_id == song_id)
|
||||
& (ScoreBest.rating_class == rating_class)
|
||||
)
|
||||
)
|
||||
if not score:
|
||||
raise ValueError("No score available.")
|
||||
|
||||
return {
|
||||
"content": {
|
||||
"account_info": self.account_info(),
|
||||
"record": self.score(score),
|
||||
}
|
||||
}
|
||||
|
||||
def user_best30(self):
|
||||
scores = list(
|
||||
self.session.scalars(
|
||||
select(ScoreBest).order_by(ScoreBest.potential.desc()).limit(40)
|
||||
)
|
||||
)
|
||||
if not scores:
|
||||
raise ValueError("No score available.")
|
||||
best30_avg = self.session.scalar(select(CalculatedPotential.b30))
|
||||
|
||||
return {
|
||||
"content": {
|
||||
"account_info": self.account_info(),
|
||||
"best30_avg": best30_avg,
|
||||
"best30_list": [self.score(score) for score in scores[:30]],
|
||||
"best30_overflow": [self.score(score) for score in scores[-10:]],
|
||||
}
|
||||
}
|
20
src/arcaea_offline/external/arcaea/common.py
vendored
20
src/arcaea_offline/external/arcaea/common.py
vendored
@ -1,3 +1,4 @@
|
||||
import contextlib
|
||||
import json
|
||||
from os import PathLike
|
||||
from typing import Any, List, Optional, Union
|
||||
@ -43,6 +44,25 @@ class ArcaeaParser:
|
||||
def __init__(self, filepath: Union[str, bytes, PathLike]):
|
||||
self.filepath = filepath
|
||||
|
||||
def read_file_text(self):
|
||||
file_handle = None
|
||||
|
||||
with contextlib.suppress(TypeError):
|
||||
# original open
|
||||
file_handle = open(self.filepath, "r", encoding="utf-8")
|
||||
|
||||
if file_handle is None:
|
||||
try:
|
||||
# or maybe a `pathlib.Path` subset
|
||||
# or an `importlib.resources.abc.Traversable` like object
|
||||
# e.g. `zipfile.Path`
|
||||
file_handle = self.filepath.open(mode="r", encoding="utf-8")
|
||||
except Exception as e:
|
||||
raise ValueError("Invalid `filepath`.") from e
|
||||
|
||||
with file_handle:
|
||||
return file_handle.read()
|
||||
|
||||
def parse(self) -> List[DeclarativeBase]:
|
||||
...
|
||||
|
||||
|
@ -10,11 +10,12 @@ class PacklistParser(ArcaeaParser):
|
||||
super().__init__(filepath)
|
||||
|
||||
def parse(self) -> List[Union[Pack, PackLocalized]]:
|
||||
with open(self.filepath, "r", encoding="utf-8") as pl_f:
|
||||
packlist_json_root = json.loads(pl_f.read())
|
||||
packlist_json_root = json.loads(self.read_file_text())
|
||||
|
||||
packlist_json = packlist_json_root["packs"]
|
||||
results = []
|
||||
results: List[Union[Pack, PackLocalized]] = [
|
||||
Pack(id="single", name="Memory Archive")
|
||||
]
|
||||
for item in packlist_json:
|
||||
pack = Pack()
|
||||
pack.id = item["id"]
|
||||
|
21
src/arcaea_offline/external/arcaea/songlist.py
vendored
21
src/arcaea_offline/external/arcaea/songlist.py
vendored
@ -1,7 +1,7 @@
|
||||
import json
|
||||
from typing import List, Union
|
||||
|
||||
from ...models.songs import Chart, ChartLocalized, Song, SongLocalized
|
||||
from ...models.songs import Difficulty, DifficultyLocalized, Song, SongLocalized
|
||||
from .common import ArcaeaParser, is_localized, set_model_localized_attrs, to_db_value
|
||||
|
||||
|
||||
@ -9,9 +9,10 @@ class SonglistParser(ArcaeaParser):
|
||||
def __init__(self, filepath):
|
||||
super().__init__(filepath)
|
||||
|
||||
def parse(self) -> List[Union[Song, SongLocalized, Chart, ChartLocalized]]:
|
||||
with open(self.filepath, "r", encoding="utf-8") as sl_f:
|
||||
songlist_json_root = json.loads(sl_f.read())
|
||||
def parse(
|
||||
self,
|
||||
) -> List[Union[Song, SongLocalized, Difficulty, DifficultyLocalized]]:
|
||||
songlist_json_root = json.loads(self.read_file_text())
|
||||
|
||||
songlist_json = songlist_json_root["songs"]
|
||||
results = []
|
||||
@ -63,9 +64,8 @@ class SonglistDifficultiesParser(ArcaeaParser):
|
||||
def __init__(self, filepath):
|
||||
self.filepath = filepath
|
||||
|
||||
def parse(self) -> List[Union[Chart, ChartLocalized]]:
|
||||
with open(self.filepath, "r", encoding="utf-8") as sl_f:
|
||||
songlist_json_root = json.loads(sl_f.read())
|
||||
def parse(self) -> List[Union[Difficulty, DifficultyLocalized]]:
|
||||
songlist_json_root = json.loads(self.read_file_text())
|
||||
|
||||
songlist_json = songlist_json_root["songs"]
|
||||
results = []
|
||||
@ -74,7 +74,10 @@ class SonglistDifficultiesParser(ArcaeaParser):
|
||||
continue
|
||||
|
||||
for item in song_item["difficulties"]:
|
||||
chart = Chart(song_id=song_item["id"])
|
||||
if item["rating"] == 0:
|
||||
continue
|
||||
|
||||
chart = Difficulty(song_id=song_item["id"])
|
||||
chart.rating_class = item["ratingClass"]
|
||||
chart.rating = item["rating"]
|
||||
chart.rating_plus = item.get("ratingPlus") or False
|
||||
@ -94,7 +97,7 @@ class SonglistDifficultiesParser(ArcaeaParser):
|
||||
results.append(chart)
|
||||
|
||||
if is_localized(item, "title") or is_localized(item, "artist"):
|
||||
chart_localized = ChartLocalized(
|
||||
chart_localized = DifficultyLocalized(
|
||||
song_id=chart.song_id, rating_class=chart.rating_class
|
||||
)
|
||||
set_model_localized_attrs(chart_localized, item, "title")
|
||||
|
22
src/arcaea_offline/external/arcaea/st3.py
vendored
22
src/arcaea_offline/external/arcaea/st3.py
vendored
@ -12,8 +12,6 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class St3ScoreParser(ArcaeaParser):
|
||||
CLEAR_TYPES_MAP = {0: -1, 1: 1, 2: 1, 3: 1, 4: 1, 5: 1}
|
||||
|
||||
def __init__(self, filepath):
|
||||
super().__init__(filepath)
|
||||
|
||||
@ -22,14 +20,22 @@ class St3ScoreParser(ArcaeaParser):
|
||||
with sqlite3.connect(self.filepath) as st3_conn:
|
||||
cursor = st3_conn.cursor()
|
||||
db_scores = cursor.execute(
|
||||
"SELECT songId, songDifficulty, score, perfectCount, nearCount, missCount, date FROM scores"
|
||||
"SELECT songId, songDifficulty, score, perfectCount, nearCount, missCount, date, modifier FROM scores"
|
||||
).fetchall()
|
||||
for song_id, rating_class, score, pure, far, lost, date in db_scores:
|
||||
db_clear_type = cursor.execute(
|
||||
for (
|
||||
song_id,
|
||||
rating_class,
|
||||
score,
|
||||
pure,
|
||||
far,
|
||||
lost,
|
||||
date,
|
||||
modifier,
|
||||
) in db_scores:
|
||||
clear_type = cursor.execute(
|
||||
"SELECT clearType FROM cleartypes WHERE songId = ? AND songDifficulty = ?",
|
||||
(song_id, rating_class),
|
||||
).fetchone()[0]
|
||||
r10_clear_type = self.CLEAR_TYPES_MAP[db_clear_type]
|
||||
|
||||
date_str = str(date)
|
||||
date = None if len(date_str) < 7 else int(date_str.ljust(10, "0"))
|
||||
@ -43,7 +49,9 @@ class St3ScoreParser(ArcaeaParser):
|
||||
far=far,
|
||||
lost=lost,
|
||||
date=date,
|
||||
r10_clear_type=r10_clear_type,
|
||||
modifier=modifier,
|
||||
clear_type=clear_type,
|
||||
comment="Imported from st3",
|
||||
)
|
||||
)
|
||||
|
||||
|
@ -22,7 +22,7 @@ class ArcsongDbParser:
|
||||
song_id=result[0],
|
||||
rating_class=result[1],
|
||||
constant=result[2],
|
||||
note=result[3] or None,
|
||||
notes=result[3] or None,
|
||||
)
|
||||
results.append(chart)
|
||||
|
||||
|
155
src/arcaea_offline/external/arcsong/arcsong_json.py
vendored
Normal file
155
src/arcaea_offline/external/arcsong/arcsong_json.py
vendored
Normal file
@ -0,0 +1,155 @@
|
||||
import logging
|
||||
import re
|
||||
from typing import List, Optional, TypedDict
|
||||
|
||||
from sqlalchemy import func, select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from ...models import (
|
||||
ChartInfo,
|
||||
Difficulty,
|
||||
DifficultyLocalized,
|
||||
Pack,
|
||||
Song,
|
||||
SongLocalized,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TArcSongJsonDifficultyItem(TypedDict):
|
||||
name_en: str
|
||||
name_jp: str
|
||||
artist: str
|
||||
bpm: str
|
||||
bpm_base: float
|
||||
set: str
|
||||
set_friendly: str
|
||||
time: int
|
||||
side: int
|
||||
world_unlock: bool
|
||||
remote_download: bool
|
||||
bg: str
|
||||
date: int
|
||||
version: str
|
||||
difficulty: int
|
||||
rating: int
|
||||
note: int
|
||||
chart_designer: str
|
||||
jacket_designer: str
|
||||
jacket_override: bool
|
||||
audio_override: bool
|
||||
|
||||
|
||||
class TArcSongJsonSongItem(TypedDict):
|
||||
song_id: str
|
||||
difficulties: List[TArcSongJsonDifficultyItem]
|
||||
alias: List[str]
|
||||
|
||||
|
||||
class TArcSongJson(TypedDict):
|
||||
songs: List[TArcSongJsonSongItem]
|
||||
|
||||
|
||||
class ArcSongJsonBuilder:
|
||||
def __init__(self, session: Session):
|
||||
self.session = session
|
||||
|
||||
def get_difficulty_item(
|
||||
self,
|
||||
difficulty: Difficulty,
|
||||
song: Song,
|
||||
pack: Pack,
|
||||
song_localized: Optional[SongLocalized],
|
||||
) -> TArcSongJsonDifficultyItem:
|
||||
if "_append_" in pack.id:
|
||||
base_pack = self.session.scalar(
|
||||
select(Pack).where(Pack.id == re.sub(r"_append_.*$", "", pack.id))
|
||||
)
|
||||
else:
|
||||
base_pack = None
|
||||
|
||||
difficulty_localized = self.session.scalar(
|
||||
select(DifficultyLocalized).where(
|
||||
(DifficultyLocalized.song_id == difficulty.song_id)
|
||||
& (DifficultyLocalized.rating_class == difficulty.rating_class)
|
||||
)
|
||||
)
|
||||
chart_info = self.session.scalar(
|
||||
select(ChartInfo).where(
|
||||
(ChartInfo.song_id == difficulty.song_id)
|
||||
& (ChartInfo.rating_class == difficulty.rating_class)
|
||||
)
|
||||
)
|
||||
|
||||
if difficulty_localized:
|
||||
name_jp = difficulty_localized.title_ja or ""
|
||||
elif song_localized:
|
||||
name_jp = song_localized.title_ja or ""
|
||||
else:
|
||||
name_jp = ""
|
||||
|
||||
return {
|
||||
"name_en": difficulty.title or song.title,
|
||||
"name_jp": name_jp,
|
||||
"artist": difficulty.artist or song.artist,
|
||||
"bpm": difficulty.bpm or song.bpm or "",
|
||||
"bpm_base": difficulty.bpm_base or song.bpm_base or 0.0,
|
||||
"set": song.set,
|
||||
"set_friendly": f"{base_pack.name} - {pack.name}"
|
||||
if base_pack
|
||||
else pack.name,
|
||||
"time": 0,
|
||||
"side": song.side or 0,
|
||||
"world_unlock": False,
|
||||
"remote_download": False,
|
||||
"bg": difficulty.bg or song.bg or "",
|
||||
"date": difficulty.date or song.date or 0,
|
||||
"version": difficulty.version or song.version or "",
|
||||
"difficulty": difficulty.rating * 2 + int(difficulty.rating_plus),
|
||||
"rating": chart_info.constant or 0 if chart_info else 0,
|
||||
"note": chart_info.notes or 0 if chart_info else 0,
|
||||
"chart_designer": difficulty.chart_designer or "",
|
||||
"jacket_designer": difficulty.jacket_desginer or "",
|
||||
"jacket_override": difficulty.jacket_override,
|
||||
"audio_override": difficulty.audio_override,
|
||||
}
|
||||
|
||||
def get_song_item(self, song: Song) -> TArcSongJsonSongItem:
|
||||
difficulties = self.session.scalars(
|
||||
select(Difficulty).where(Difficulty.song_id == song.id)
|
||||
)
|
||||
|
||||
pack = self.session.scalar(select(Pack).where(Pack.id == song.set))
|
||||
if not pack:
|
||||
logger.warning(f'Cannot find pack "{song.set}", using placeholder instead.')
|
||||
pack = Pack(id="unknown", name="Unknown", description="__PLACEHOLDER__")
|
||||
song_localized = self.session.scalar(
|
||||
select(SongLocalized).where(SongLocalized.id == song.id)
|
||||
)
|
||||
|
||||
return {
|
||||
"song_id": song.id,
|
||||
"difficulties": [
|
||||
self.get_difficulty_item(difficulty, song, pack, song_localized)
|
||||
for difficulty in difficulties
|
||||
],
|
||||
"alias": [],
|
||||
}
|
||||
|
||||
def generate_arcsong_json(self) -> TArcSongJson:
|
||||
songs = self.session.scalars(select(Song))
|
||||
arcsong_songs = []
|
||||
for song in songs:
|
||||
proceed = self.session.scalar(
|
||||
select(func.count(Difficulty.rating_class)).where(
|
||||
Difficulty.song_id == song.id
|
||||
)
|
||||
)
|
||||
|
||||
if not proceed:
|
||||
continue
|
||||
|
||||
arcsong_songs.append(self.get_song_item(song))
|
||||
|
||||
return {"songs": arcsong_songs}
|
2
src/arcaea_offline/external/exports/__init__.py
vendored
Normal file
2
src/arcaea_offline/external/exports/__init__.py
vendored
Normal file
@ -0,0 +1,2 @@
|
||||
from . import exporters
|
||||
from .types import ScoreExport
|
19
src/arcaea_offline/external/exports/exporters.py
vendored
Normal file
19
src/arcaea_offline/external/exports/exporters.py
vendored
Normal file
@ -0,0 +1,19 @@
|
||||
from ...models import Score
|
||||
from .types import ScoreExport
|
||||
|
||||
|
||||
def score(score: Score) -> ScoreExport:
|
||||
return {
|
||||
"id": score.id,
|
||||
"song_id": score.song_id,
|
||||
"rating_class": score.rating_class,
|
||||
"score": score.score,
|
||||
"pure": score.pure,
|
||||
"far": score.far,
|
||||
"lost": score.lost,
|
||||
"date": score.date,
|
||||
"max_recall": score.max_recall,
|
||||
"modifier": score.modifier,
|
||||
"clear_type": score.clear_type,
|
||||
"comment": score.comment,
|
||||
}
|
16
src/arcaea_offline/external/exports/types.py
vendored
Normal file
16
src/arcaea_offline/external/exports/types.py
vendored
Normal file
@ -0,0 +1,16 @@
|
||||
from typing import Optional, TypedDict
|
||||
|
||||
|
||||
class ScoreExport(TypedDict):
|
||||
id: int
|
||||
song_id: str
|
||||
rating_class: int
|
||||
score: int
|
||||
pure: Optional[int]
|
||||
far: Optional[int]
|
||||
lost: Optional[int]
|
||||
date: Optional[int]
|
||||
max_recall: Optional[int]
|
||||
modifier: Optional[int]
|
||||
clear_type: Optional[int]
|
||||
comment: Optional[str]
|
@ -0,0 +1,21 @@
|
||||
from .config import ConfigBase, Property
|
||||
from .scores import (
|
||||
CalculatedPotential,
|
||||
Score,
|
||||
ScoreBest,
|
||||
ScoreCalculated,
|
||||
ScoresBase,
|
||||
ScoresViewBase,
|
||||
)
|
||||
from .songs import (
|
||||
Chart,
|
||||
ChartInfo,
|
||||
Difficulty,
|
||||
DifficultyLocalized,
|
||||
Pack,
|
||||
PackLocalized,
|
||||
Song,
|
||||
SongLocalized,
|
||||
SongsBase,
|
||||
SongsViewBase,
|
||||
)
|
||||
|
@ -14,7 +14,7 @@ class ConfigBase(DeclarativeBase, ReprHelper):
|
||||
|
||||
|
||||
class Property(ConfigBase):
|
||||
__tablename__ = "property"
|
||||
__tablename__ = "properties"
|
||||
|
||||
key: Mapped[str] = mapped_column(TEXT(), primary_key=True)
|
||||
value: Mapped[str] = mapped_column(TEXT())
|
||||
|
@ -5,14 +5,14 @@ from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
|
||||
from sqlalchemy_utils import create_view
|
||||
|
||||
from .common import ReprHelper
|
||||
from .songs import Chart, ChartInfo
|
||||
from .songs import ChartInfo, Difficulty
|
||||
|
||||
__all__ = [
|
||||
"ScoresBase",
|
||||
"Score",
|
||||
"ScoresViewBase",
|
||||
"Calculated",
|
||||
"Best",
|
||||
"ScoreCalculated",
|
||||
"ScoreBest",
|
||||
"CalculatedPotential",
|
||||
]
|
||||
|
||||
@ -22,7 +22,7 @@ class ScoresBase(DeclarativeBase, ReprHelper):
|
||||
|
||||
|
||||
class Score(ScoresBase):
|
||||
__tablename__ = "score"
|
||||
__tablename__ = "scores"
|
||||
|
||||
id: Mapped[int] = mapped_column(autoincrement=True, primary_key=True)
|
||||
song_id: Mapped[str] = mapped_column(TEXT())
|
||||
@ -33,9 +33,13 @@ class Score(ScoresBase):
|
||||
lost: Mapped[Optional[int]]
|
||||
date: Mapped[Optional[int]]
|
||||
max_recall: Mapped[Optional[int]]
|
||||
r10_clear_type: Mapped[Optional[int]] = mapped_column(
|
||||
comment="0: LOST, 1: COMPLETE, 2: HARD_LOST"
|
||||
modifier: Mapped[Optional[int]] = mapped_column(
|
||||
comment="0: NORMAL, 1: EASY, 2: HARD"
|
||||
)
|
||||
clear_type: Mapped[Optional[int]] = mapped_column(
|
||||
comment="0: TRACK LOST, 1: NORMAL CLEAR, 2: FULL RECALL, 3: PURE MEMORY, 4: EASY CLEAR, 5: HARD CLEAR"
|
||||
)
|
||||
comment: Mapped[Optional[str]] = mapped_column(TEXT())
|
||||
|
||||
|
||||
# How to create an SQL View with SQLAlchemy?
|
||||
@ -47,40 +51,45 @@ class ScoresViewBase(DeclarativeBase, ReprHelper):
|
||||
pass
|
||||
|
||||
|
||||
class Calculated(ScoresViewBase):
|
||||
score_id: Mapped[str]
|
||||
class ScoreCalculated(ScoresViewBase):
|
||||
__tablename__ = "scores_calculated"
|
||||
|
||||
id: Mapped[int]
|
||||
song_id: Mapped[str]
|
||||
rating_class: Mapped[int]
|
||||
score: Mapped[int]
|
||||
pure: Mapped[Optional[int]]
|
||||
shiny_pure: Mapped[Optional[int]]
|
||||
far: Mapped[Optional[int]]
|
||||
lost: Mapped[Optional[int]]
|
||||
date: Mapped[Optional[int]]
|
||||
max_recall: Mapped[Optional[int]]
|
||||
r10_clear_type: Mapped[Optional[int]]
|
||||
shiny_pure: Mapped[Optional[int]]
|
||||
modifier: Mapped[Optional[int]]
|
||||
clear_type: Mapped[Optional[int]]
|
||||
potential: Mapped[float]
|
||||
comment: Mapped[Optional[str]]
|
||||
|
||||
__table__ = create_view(
|
||||
name="calculated",
|
||||
name=__tablename__,
|
||||
selectable=select(
|
||||
Score.id.label("score_id"),
|
||||
Chart.song_id,
|
||||
Chart.rating_class,
|
||||
Score.id,
|
||||
Difficulty.song_id,
|
||||
Difficulty.rating_class,
|
||||
Score.score,
|
||||
Score.pure,
|
||||
(
|
||||
Score.score
|
||||
- func.floor(
|
||||
(Score.pure * 10000000.0 / ChartInfo.notes)
|
||||
+ (Score.far * 0.5 * 10000000.0 / ChartInfo.notes)
|
||||
)
|
||||
).label("shiny_pure"),
|
||||
Score.far,
|
||||
Score.lost,
|
||||
Score.date,
|
||||
Score.max_recall,
|
||||
Score.r10_clear_type,
|
||||
(
|
||||
Score.score
|
||||
- func.floor(
|
||||
(Score.pure * 10000000.0 / ChartInfo.note)
|
||||
+ (Score.far * 0.5 * 10000000.0 / ChartInfo.note)
|
||||
)
|
||||
).label("shiny_pure"),
|
||||
Score.modifier,
|
||||
Score.clear_type,
|
||||
case(
|
||||
(Score.score >= 10000000, ChartInfo.constant / 10.0 + 2),
|
||||
(
|
||||
@ -92,62 +101,73 @@ class Calculated(ScoresViewBase):
|
||||
0,
|
||||
),
|
||||
).label("potential"),
|
||||
Score.comment,
|
||||
)
|
||||
.select_from(Chart)
|
||||
.select_from(Difficulty)
|
||||
.join(
|
||||
ChartInfo,
|
||||
(Chart.song_id == ChartInfo.song_id)
|
||||
& (Chart.rating_class == ChartInfo.rating_class),
|
||||
(Difficulty.song_id == ChartInfo.song_id)
|
||||
& (Difficulty.rating_class == ChartInfo.rating_class),
|
||||
)
|
||||
.join(
|
||||
Score,
|
||||
(Chart.song_id == Score.song_id)
|
||||
& (Chart.rating_class == Score.rating_class),
|
||||
(Difficulty.song_id == Score.song_id)
|
||||
& (Difficulty.rating_class == Score.rating_class),
|
||||
),
|
||||
metadata=ScoresViewBase.metadata,
|
||||
cascade_on_drop=False,
|
||||
)
|
||||
|
||||
|
||||
class Best(ScoresViewBase):
|
||||
score_id: Mapped[str]
|
||||
class ScoreBest(ScoresViewBase):
|
||||
__tablename__ = "scores_best"
|
||||
|
||||
id: Mapped[int]
|
||||
song_id: Mapped[str]
|
||||
rating_class: Mapped[int]
|
||||
score: Mapped[int]
|
||||
pure: Mapped[Optional[int]]
|
||||
shiny_pure: Mapped[Optional[int]]
|
||||
far: Mapped[Optional[int]]
|
||||
lost: Mapped[Optional[int]]
|
||||
date: Mapped[Optional[int]]
|
||||
max_recall: Mapped[Optional[int]]
|
||||
r10_clear_type: Mapped[Optional[int]]
|
||||
shiny_pure: Mapped[Optional[int]]
|
||||
modifier: Mapped[Optional[int]]
|
||||
clear_type: Mapped[Optional[int]]
|
||||
potential: Mapped[float]
|
||||
comment: Mapped[Optional[str]]
|
||||
|
||||
__table__ = create_view(
|
||||
name="best",
|
||||
name=__tablename__,
|
||||
selectable=select(
|
||||
*[col for col in inspect(Calculated).columns if col.name != "potential"],
|
||||
func.max(Calculated.potential).label("potential"),
|
||||
*[
|
||||
col
|
||||
for col in inspect(ScoreCalculated).columns
|
||||
if col.name != "potential"
|
||||
],
|
||||
func.max(ScoreCalculated.potential).label("potential"),
|
||||
)
|
||||
.select_from(Calculated)
|
||||
.group_by(Calculated.song_id, Calculated.rating_class)
|
||||
.order_by(Calculated.potential.desc()),
|
||||
.select_from(ScoreCalculated)
|
||||
.group_by(ScoreCalculated.song_id, ScoreCalculated.rating_class)
|
||||
.order_by(ScoreCalculated.potential.desc()),
|
||||
metadata=ScoresViewBase.metadata,
|
||||
cascade_on_drop=False,
|
||||
)
|
||||
|
||||
|
||||
class CalculatedPotential(ScoresViewBase):
|
||||
__tablename__ = "calculated_potential"
|
||||
|
||||
b30: Mapped[float]
|
||||
|
||||
_select_bests_subquery = (
|
||||
select(Best.potential.label("b30_sum"))
|
||||
.order_by(Best.potential.desc())
|
||||
select(ScoreBest.potential.label("b30_sum"))
|
||||
.order_by(ScoreBest.potential.desc())
|
||||
.limit(30)
|
||||
.subquery()
|
||||
)
|
||||
__table__ = create_view(
|
||||
name="calculated_potential",
|
||||
name=__tablename__,
|
||||
selectable=select(func.avg(_select_bests_subquery.c.b30_sum).label("b30")),
|
||||
metadata=ScoresViewBase.metadata,
|
||||
cascade_on_drop=False,
|
||||
|
@ -1,7 +1,8 @@
|
||||
from typing import Optional
|
||||
|
||||
from sqlalchemy import TEXT, ForeignKey
|
||||
from sqlalchemy import TEXT, ForeignKey, func, select
|
||||
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
|
||||
from sqlalchemy_utils import create_view
|
||||
|
||||
from .common import ReprHelper
|
||||
|
||||
@ -11,9 +12,11 @@ __all__ = [
|
||||
"PackLocalized",
|
||||
"Song",
|
||||
"SongLocalized",
|
||||
"Chart",
|
||||
"ChartLocalized",
|
||||
"Difficulty",
|
||||
"DifficultyLocalized",
|
||||
"ChartInfo",
|
||||
"SongsViewBase",
|
||||
"Chart",
|
||||
]
|
||||
|
||||
|
||||
@ -22,7 +25,7 @@ class SongsBase(DeclarativeBase, ReprHelper):
|
||||
|
||||
|
||||
class Pack(SongsBase):
|
||||
__tablename__ = "pack"
|
||||
__tablename__ = "packs"
|
||||
|
||||
id: Mapped[str] = mapped_column(TEXT(), primary_key=True)
|
||||
name: Mapped[str] = mapped_column(TEXT())
|
||||
@ -30,9 +33,9 @@ class Pack(SongsBase):
|
||||
|
||||
|
||||
class PackLocalized(SongsBase):
|
||||
__tablename__ = "pack_localized"
|
||||
__tablename__ = "packs_localized"
|
||||
|
||||
id: Mapped[str] = mapped_column(ForeignKey("pack.id"), primary_key=True)
|
||||
id: Mapped[str] = mapped_column(ForeignKey("packs.id"), primary_key=True)
|
||||
name_ja: Mapped[Optional[str]] = mapped_column(TEXT())
|
||||
name_ko: Mapped[Optional[str]] = mapped_column(TEXT())
|
||||
name_zh_hans: Mapped[Optional[str]] = mapped_column(TEXT())
|
||||
@ -44,7 +47,7 @@ class PackLocalized(SongsBase):
|
||||
|
||||
|
||||
class Song(SongsBase):
|
||||
__tablename__ = "song"
|
||||
__tablename__ = "songs"
|
||||
|
||||
idx: Mapped[int]
|
||||
id: Mapped[str] = mapped_column(TEXT(), primary_key=True)
|
||||
@ -67,29 +70,41 @@ class Song(SongsBase):
|
||||
|
||||
|
||||
class SongLocalized(SongsBase):
|
||||
__tablename__ = "song_localized"
|
||||
__tablename__ = "songs_localized"
|
||||
|
||||
id: Mapped[str] = mapped_column(ForeignKey("song.id"), primary_key=True)
|
||||
id: Mapped[str] = mapped_column(ForeignKey("songs.id"), primary_key=True)
|
||||
title_ja: Mapped[Optional[str]] = mapped_column(TEXT())
|
||||
title_ko: Mapped[Optional[str]] = mapped_column(TEXT())
|
||||
title_zh_hans: Mapped[Optional[str]] = mapped_column(TEXT())
|
||||
title_zh_hant: Mapped[Optional[str]] = mapped_column(TEXT())
|
||||
search_title_ja: Mapped[Optional[str]] = mapped_column(TEXT(), comment="json")
|
||||
search_title_ko: Mapped[Optional[str]] = mapped_column(TEXT(), comment="json")
|
||||
search_title_zh_hans: Mapped[Optional[str]] = mapped_column(TEXT(), comment="json")
|
||||
search_title_zh_hant: Mapped[Optional[str]] = mapped_column(TEXT(), comment="json")
|
||||
search_artist_ja: Mapped[Optional[str]] = mapped_column(TEXT(), comment="json")
|
||||
search_artist_ko: Mapped[Optional[str]] = mapped_column(TEXT(), comment="json")
|
||||
search_artist_zh_hans: Mapped[Optional[str]] = mapped_column(TEXT(), comment="json")
|
||||
search_artist_zh_hant: Mapped[Optional[str]] = mapped_column(TEXT(), comment="json")
|
||||
search_title_ja: Mapped[Optional[str]] = mapped_column(TEXT(), comment="JSON array")
|
||||
search_title_ko: Mapped[Optional[str]] = mapped_column(TEXT(), comment="JSON array")
|
||||
search_title_zh_hans: Mapped[Optional[str]] = mapped_column(
|
||||
TEXT(), comment="JSON array"
|
||||
)
|
||||
search_title_zh_hant: Mapped[Optional[str]] = mapped_column(
|
||||
TEXT(), comment="JSON array"
|
||||
)
|
||||
search_artist_ja: Mapped[Optional[str]] = mapped_column(
|
||||
TEXT(), comment="JSON array"
|
||||
)
|
||||
search_artist_ko: Mapped[Optional[str]] = mapped_column(
|
||||
TEXT(), comment="JSON array"
|
||||
)
|
||||
search_artist_zh_hans: Mapped[Optional[str]] = mapped_column(
|
||||
TEXT(), comment="JSON array"
|
||||
)
|
||||
search_artist_zh_hant: Mapped[Optional[str]] = mapped_column(
|
||||
TEXT(), comment="JSON array"
|
||||
)
|
||||
source_ja: Mapped[Optional[str]] = mapped_column(TEXT())
|
||||
source_ko: Mapped[Optional[str]] = mapped_column(TEXT())
|
||||
source_zh_hans: Mapped[Optional[str]] = mapped_column(TEXT())
|
||||
source_zh_hant: Mapped[Optional[str]] = mapped_column(TEXT())
|
||||
|
||||
|
||||
class Chart(SongsBase):
|
||||
__tablename__ = "chart"
|
||||
class Difficulty(SongsBase):
|
||||
__tablename__ = "difficulties"
|
||||
|
||||
song_id: Mapped[str] = mapped_column(TEXT(), primary_key=True)
|
||||
rating_class: Mapped[int] = mapped_column(primary_key=True)
|
||||
@ -110,12 +125,14 @@ class Chart(SongsBase):
|
||||
date: Mapped[Optional[int]]
|
||||
|
||||
|
||||
class ChartLocalized(SongsBase):
|
||||
__tablename__ = "chart_localized"
|
||||
class DifficultyLocalized(SongsBase):
|
||||
__tablename__ = "difficulties_localized"
|
||||
|
||||
song_id: Mapped[str] = mapped_column(ForeignKey("chart.song_id"), primary_key=True)
|
||||
song_id: Mapped[str] = mapped_column(
|
||||
ForeignKey("difficulties.song_id"), primary_key=True
|
||||
)
|
||||
rating_class: Mapped[str] = mapped_column(
|
||||
ForeignKey("chart.rating_class"), primary_key=True
|
||||
ForeignKey("difficulties.rating_class"), primary_key=True
|
||||
)
|
||||
title_ja: Mapped[Optional[str]] = mapped_column(TEXT())
|
||||
title_ko: Mapped[Optional[str]] = mapped_column(TEXT())
|
||||
@ -128,13 +145,95 @@ class ChartLocalized(SongsBase):
|
||||
|
||||
|
||||
class ChartInfo(SongsBase):
|
||||
__tablename__ = "chart_info"
|
||||
__tablename__ = "charts_info"
|
||||
|
||||
song_id: Mapped[str] = mapped_column(ForeignKey("chart.song_id"), primary_key=True)
|
||||
song_id: Mapped[str] = mapped_column(
|
||||
ForeignKey("difficulties.song_id"), primary_key=True
|
||||
)
|
||||
rating_class: Mapped[str] = mapped_column(
|
||||
ForeignKey("chart.rating_class"), primary_key=True
|
||||
ForeignKey("difficulties.rating_class"), primary_key=True
|
||||
)
|
||||
constant: Mapped[int] = mapped_column(
|
||||
comment="real_constant * 10. For example, Crimson Throne [FTR] is 10.4, then store 104 here."
|
||||
)
|
||||
note: Mapped[Optional[int]]
|
||||
notes: Mapped[Optional[int]]
|
||||
|
||||
|
||||
class SongsViewBase(DeclarativeBase, ReprHelper):
|
||||
pass
|
||||
|
||||
|
||||
class Chart(SongsViewBase):
|
||||
__tablename__ = "charts"
|
||||
|
||||
song_idx: Mapped[int]
|
||||
song_id: Mapped[str]
|
||||
rating_class: Mapped[int]
|
||||
rating: Mapped[int]
|
||||
rating_plus: Mapped[bool]
|
||||
title: Mapped[str]
|
||||
artist: Mapped[str]
|
||||
set: Mapped[str]
|
||||
bpm: Mapped[Optional[str]]
|
||||
bpm_base: Mapped[Optional[float]]
|
||||
audio_preview: Mapped[Optional[int]]
|
||||
audio_preview_end: Mapped[Optional[int]]
|
||||
side: Mapped[Optional[int]]
|
||||
version: Mapped[Optional[str]]
|
||||
date: Mapped[Optional[int]]
|
||||
bg: Mapped[Optional[str]]
|
||||
bg_inverse: Mapped[Optional[str]]
|
||||
bg_day: Mapped[Optional[str]]
|
||||
bg_night: Mapped[Optional[str]]
|
||||
source: Mapped[Optional[str]]
|
||||
source_copyright: Mapped[Optional[str]]
|
||||
chart_designer: Mapped[Optional[str]]
|
||||
jacket_desginer: Mapped[Optional[str]]
|
||||
audio_override: Mapped[bool]
|
||||
jacket_override: Mapped[bool]
|
||||
jacket_night: Mapped[Optional[str]]
|
||||
constant: Mapped[int]
|
||||
notes: Mapped[Optional[int]]
|
||||
|
||||
__table__ = create_view(
|
||||
name=__tablename__,
|
||||
selectable=select(
|
||||
Song.idx.label("song_idx"),
|
||||
Difficulty.song_id,
|
||||
Difficulty.rating_class,
|
||||
Difficulty.rating,
|
||||
Difficulty.rating_plus,
|
||||
func.coalesce(Difficulty.title, Song.title).label("title"),
|
||||
func.coalesce(Difficulty.artist, Song.artist).label("artist"),
|
||||
Song.set,
|
||||
func.coalesce(Difficulty.bpm, Song.bpm).label("bpm"),
|
||||
func.coalesce(Difficulty.bpm_base, Song.bpm_base).label("bpm_base"),
|
||||
Song.audio_preview,
|
||||
Song.audio_preview_end,
|
||||
Song.side,
|
||||
func.coalesce(Difficulty.version, Song.version).label("version"),
|
||||
func.coalesce(Difficulty.date, Song.date).label("date"),
|
||||
func.coalesce(Difficulty.bg, Song.bg).label("bg"),
|
||||
func.coalesce(Difficulty.bg_inverse, Song.bg_inverse).label("bg_inverse"),
|
||||
Song.bg_day,
|
||||
Song.bg_night,
|
||||
Song.source,
|
||||
Song.source_copyright,
|
||||
Difficulty.chart_designer,
|
||||
Difficulty.jacket_desginer,
|
||||
Difficulty.audio_override,
|
||||
Difficulty.jacket_override,
|
||||
Difficulty.jacket_night,
|
||||
ChartInfo.constant,
|
||||
ChartInfo.notes,
|
||||
)
|
||||
.select_from(Difficulty)
|
||||
.join(
|
||||
ChartInfo,
|
||||
(Difficulty.song_id == ChartInfo.song_id)
|
||||
& (Difficulty.rating_class == ChartInfo.rating_class),
|
||||
)
|
||||
.join(Song, Difficulty.song_id == Song.id),
|
||||
metadata=SongsViewBase.metadata,
|
||||
cascade_on_drop=False,
|
||||
)
|
||||
|
111
src/arcaea_offline/searcher.py
Normal file
111
src/arcaea_offline/searcher.py
Normal file
@ -0,0 +1,111 @@
|
||||
from typing import List, Union
|
||||
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session
|
||||
from whoosh.analysis import LowercaseFilter, RegexTokenizer
|
||||
from whoosh.fields import ID, KEYWORD, TEXT, Schema
|
||||
from whoosh.filedb.filestore import RamStorage
|
||||
from whoosh.qparser import FuzzyTermPlugin, MultifieldParser, OrGroup
|
||||
|
||||
from .models.songs import Song, SongLocalized
|
||||
from .utils.search_title import recover_search_title
|
||||
|
||||
|
||||
class Searcher:
|
||||
def __init__(self):
|
||||
self.text_analyzer = RegexTokenizer() | LowercaseFilter()
|
||||
self.song_schema = Schema(
|
||||
song_id=ID(stored=True, unique=True),
|
||||
title=TEXT(analyzer=self.text_analyzer, spelling=True),
|
||||
artist=TEXT(analyzer=self.text_analyzer, spelling=True),
|
||||
source=TEXT(analyzer=self.text_analyzer, spelling=True),
|
||||
keywords=KEYWORD(lowercase=True, stored=True, scorable=True),
|
||||
)
|
||||
self.storage = RamStorage()
|
||||
self.index = self.storage.create_index(self.song_schema)
|
||||
|
||||
self.default_query_parser = MultifieldParser(
|
||||
["song_id", "title", "artist", "source", "keywords"],
|
||||
self.song_schema,
|
||||
group=OrGroup,
|
||||
)
|
||||
self.default_query_parser.add_plugin(FuzzyTermPlugin())
|
||||
|
||||
def import_songs(self, session: Session):
|
||||
writer = self.index.writer()
|
||||
songs = list(session.scalars(select(Song)))
|
||||
song_localize_stmt = select(SongLocalized)
|
||||
for song in songs:
|
||||
stmt = song_localize_stmt.where(SongLocalized.id == song.id)
|
||||
sl = session.scalar(stmt)
|
||||
song_id = song.id
|
||||
possible_titles: List[Union[str, None]] = [song.title]
|
||||
possible_artists: List[Union[str, None]] = [song.artist]
|
||||
possible_sources: List[Union[str, None]] = [song.source]
|
||||
if sl:
|
||||
possible_titles.extend(
|
||||
[sl.title_ja, sl.title_ko, sl.title_zh_hans, sl.title_zh_hant]
|
||||
)
|
||||
possible_titles.extend(
|
||||
recover_search_title(sl.search_title_ja)
|
||||
+ recover_search_title(sl.search_title_ko)
|
||||
+ recover_search_title(sl.search_title_zh_hans)
|
||||
+ recover_search_title(sl.search_title_zh_hant)
|
||||
)
|
||||
possible_artists.extend(
|
||||
recover_search_title(sl.search_artist_ja)
|
||||
+ recover_search_title(sl.search_artist_ko)
|
||||
+ recover_search_title(sl.search_artist_zh_hans)
|
||||
+ recover_search_title(sl.search_artist_zh_hant)
|
||||
)
|
||||
possible_sources.extend(
|
||||
[
|
||||
sl.source_ja,
|
||||
sl.source_ko,
|
||||
sl.source_zh_hans,
|
||||
sl.source_zh_hant,
|
||||
]
|
||||
)
|
||||
|
||||
# remove empty items in list
|
||||
titles = [t for t in possible_titles if t != "" and t is not None]
|
||||
artists = [t for t in possible_artists if t != "" and t is not None]
|
||||
sources = [t for t in possible_sources if t != "" and t is not None]
|
||||
|
||||
writer.update_document(
|
||||
song_id=song_id,
|
||||
title=" ".join(titles),
|
||||
artist=" ".join(artists),
|
||||
source=" ".join(sources),
|
||||
keywords=" ".join([song_id] + titles + artists + sources),
|
||||
)
|
||||
|
||||
writer.commit()
|
||||
|
||||
def did_you_mean(self, string: str):
|
||||
results = set()
|
||||
|
||||
with self.index.searcher() as searcher:
|
||||
corrector_keywords = searcher.corrector("keywords") # type: ignore
|
||||
corrector_song_id = searcher.corrector("song_id") # type: ignore
|
||||
corrector_title = searcher.corrector("title") # type: ignore
|
||||
corrector_artist = searcher.corrector("artist") # type: ignore
|
||||
corrector_source = searcher.corrector("source") # type: ignore
|
||||
|
||||
results.update(corrector_keywords.suggest(string))
|
||||
results.update(corrector_song_id.suggest(string))
|
||||
results.update(corrector_title.suggest(string))
|
||||
results.update(corrector_artist.suggest(string))
|
||||
results.update(corrector_source.suggest(string))
|
||||
|
||||
if string in results:
|
||||
results.remove(string)
|
||||
|
||||
return list(results)
|
||||
|
||||
def search(self, string: str, *, limit: int = 10, fuzzy_distance: int = 10):
|
||||
query_string = f"{string}"
|
||||
query = self.default_query_parser.parse(query_string)
|
||||
with self.index.searcher() as searcher:
|
||||
results = list(searcher.search(query, limit=limit))
|
||||
return results
|
0
src/arcaea_offline/utils/__init__.py
Normal file
0
src/arcaea_offline/utils/__init__.py
Normal file
23
src/arcaea_offline/utils/rating.py
Normal file
23
src/arcaea_offline/utils/rating.py
Normal file
@ -0,0 +1,23 @@
|
||||
from typing import Optional
|
||||
|
||||
RATING_CLASS_TEXT_MAP = {
|
||||
0: "Past",
|
||||
1: "Present",
|
||||
2: "Future",
|
||||
3: "Beyond",
|
||||
}
|
||||
|
||||
RATING_CLASS_SHORT_TEXT_MAP = {
|
||||
0: "PST",
|
||||
1: "PRS",
|
||||
2: "FTR",
|
||||
3: "BYD",
|
||||
}
|
||||
|
||||
|
||||
def rating_class_to_text(rating_class: int) -> Optional[str]:
|
||||
return RATING_CLASS_TEXT_MAP.get(rating_class)
|
||||
|
||||
|
||||
def rating_class_to_short_text(rating_class: int) -> Optional[str]:
|
||||
return RATING_CLASS_SHORT_TEXT_MAP.get(rating_class)
|
29
src/arcaea_offline/utils/score.py
Normal file
29
src/arcaea_offline/utils/score.py
Normal file
@ -0,0 +1,29 @@
|
||||
from typing import Any, Sequence
|
||||
|
||||
SCORE_GRADE_FLOOR = [9900000, 9800000, 9500000, 9200000, 8900000, 8600000, 0]
|
||||
SCORE_GRADE_TEXTS = ["EX+", "EX", "AA", "A", "B", "C", "D"]
|
||||
|
||||
|
||||
def zip_score_grade(score: int, __seq: Sequence, default: Any = "__PRESERVE__"):
|
||||
"""
|
||||
zip_score_grade is a simple wrapper that equals to:
|
||||
```py
|
||||
for score_floor, val in zip(SCORE_GRADE_FLOOR, __seq):
|
||||
if score >= score_floor:
|
||||
return val
|
||||
return seq[-1] if default == "__PRESERVE__" else default
|
||||
```
|
||||
Could be useful in specific cases.
|
||||
"""
|
||||
return next(
|
||||
(
|
||||
val
|
||||
for score_floor, val in zip(SCORE_GRADE_FLOOR, __seq)
|
||||
if score >= score_floor
|
||||
),
|
||||
__seq[-1] if default == "__PRESERVE__" else default,
|
||||
)
|
||||
|
||||
|
||||
def score_to_grade_text(score: int) -> str:
|
||||
return zip_score_grade(score, SCORE_GRADE_TEXTS)
|
6
src/arcaea_offline/utils/search_title.py
Normal file
6
src/arcaea_offline/utils/search_title.py
Normal file
@ -0,0 +1,6 @@
|
||||
import json
|
||||
from typing import List, Optional
|
||||
|
||||
|
||||
def recover_search_title(db_value: Optional[str]) -> List[str]:
|
||||
return json.loads(db_value) if db_value else []
|
Reference in New Issue
Block a user