16 Commits

25 changed files with 720 additions and 205 deletions

View File

@ -0,0 +1,48 @@
name: "Build and draft a release"
on:
workflow_dispatch:
push:
tags:
- "v[0-9]+.[0-9]+.[0-9]+"
permissions:
contents: write
discussions: write
jobs:
build-and-draft-release:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Set up Python environment
uses: actions/setup-python@v5
with:
python-version: "3.x"
- name: Build package
run: |
pip install build
python -m build
- name: Remove `v` in tag name
uses: mad9000/actions-find-and-replace-string@5
id: tagNameReplaced
with:
source: ${{ github.ref_name }}
find: "v"
replace: ""
- name: Draft a release
uses: softprops/action-gh-release@v2
with:
discussion_category_name: New releases
draft: true
generate_release_notes: true
files: |
dist/arcaea_offline_ocr-${{ steps.tagNameReplaced.outputs.value }}*.whl
dist/arcaea-offline-ocr-${{ steps.tagNameReplaced.outputs.value }}.tar.gz

View File

@ -4,11 +4,10 @@ repos:
hooks:
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/psf/black
rev: 23.1.0
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.11.13
hooks:
- id: black
- repo: https://github.com/PyCQA/isort
rev: 5.12.0
hooks:
- id: isort
- id: ruff
args: ["--fix"]
- id: ruff-format

View File

@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "arcaea-offline-ocr"
version = "0.0.97"
version = "0.0.99"
authors = [{ name = "283375", email = "log_283375@163.com" }]
description = "Extract your Arcaea play result from screenshot."
readme = "README.md"
@ -16,8 +16,8 @@ classifiers = [
]
[project.urls]
"Homepage" = "https://github.com/283375/arcaea-offline-ocr"
"Bug Tracker" = "https://github.com/283375/arcaea-offline-ocr/issues"
"Homepage" = "https://github.com/ArcaeaOffline/core-ocr"
"Bug Tracker" = "https://github.com/ArcaeaOffline/core-ocr/issues"
[tool.isort]
profile = "black"
@ -25,3 +25,14 @@ src_paths = ["src/arcaea_offline_ocr"]
[tool.pyright]
ignore = ["**/__debug*.*"]
[tool.pylint.main]
# extension-pkg-allow-list = ["cv2"]
generated-members = ["cv2.*"]
[tool.pylint.logging]
disable = [
"missing-module-docstring",
"missing-class-docstring",
"missing-function-docstring"
]

View File

@ -1,3 +1,2 @@
attrs==23.1.0
numpy==1.26.1
opencv-python==4.8.1.78
numpy~=2.3
opencv-python~=4.11

View File

@ -1,4 +1,3 @@
from .crop import *
from .device import *
from .ocr import *
from .utils import *

View File

@ -1,52 +1,42 @@
from math import floor
from typing import List, Optional, Tuple
import cv2
import numpy as np
from ....crop import crop_xywh
from ....ocr import (
FixRects,
ocr_digits_by_contour_knn,
preprocess_hog,
resize_fill_square,
)
from ....phash_db import ImagePhashDatabase
from ....types import Mat
from ....utils import construct_int_xywh_rect
from ...shared import B30OcrResultItem
from .colors import *
from .colors import (
BYD_MAX_HSV,
BYD_MIN_HSV,
FAR_BG_MAX_HSV,
FAR_BG_MIN_HSV,
FTR_MAX_HSV,
FTR_MIN_HSV,
LOST_BG_MAX_HSV,
LOST_BG_MIN_HSV,
PRS_MAX_HSV,
PRS_MIN_HSV,
PURE_BG_MAX_HSV,
PURE_BG_MIN_HSV,
)
from .rois import ChieriBotV4Rois
from ....providers.knn import OcrKNearestTextProvider
class ChieriBotV4Ocr:
def __init__(
self,
score_knn: cv2.ml.KNearest,
pfl_knn: cv2.ml.KNearest,
score_knn_provider: OcrKNearestTextProvider,
pfl_knn_provider: OcrKNearestTextProvider,
phash_db: ImagePhashDatabase,
factor: Optional[float] = 1.0,
factor: float = 1.0,
):
self.__score_knn = score_knn
self.__pfl_knn = pfl_knn
self.__phash_db = phash_db
self.__rois = ChieriBotV4Rois(factor)
@property
def score_knn(self):
return self.__score_knn
@score_knn.setter
def score_knn(self, knn_digits_model: cv2.ml.KNearest):
self.__score_knn = knn_digits_model
@property
def pfl_knn(self):
return self.__pfl_knn
@pfl_knn.setter
def pfl_knn(self, knn_digits_model: cv2.ml.KNearest):
self.__pfl_knn = knn_digits_model
self.pfl_knn_provider = pfl_knn_provider
self.score_knn_provider = score_knn_provider
@property
def phash_db(self):
@ -72,9 +62,8 @@ class ChieriBotV4Ocr:
self.factor = img.shape[0] / 4400
def ocr_component_rating_class(self, component_bgr: Mat) -> int:
rating_class_rect = construct_int_xywh_rect(
self.rois.component_rois.rating_class_rect
)
rating_class_rect = self.rois.component_rois.rating_class_rect.rounded()
rating_class_roi = crop_xywh(component_bgr, rating_class_rect)
rating_class_roi = cv2.cvtColor(rating_class_roi, cv2.COLOR_BGR2HSV)
rating_class_masks = [
@ -89,9 +78,7 @@ class ChieriBotV4Ocr:
return max(enumerate(rating_class_results), key=lambda i: i[1])[0] + 1
def ocr_component_song_id(self, component_bgr: Mat):
jacket_rect = construct_int_xywh_rect(
self.rois.component_rois.jacket_rect, floor
)
jacket_rect = self.rois.component_rois.jacket_rect.floored()
jacket_roi = cv2.cvtColor(
crop_xywh(component_bgr, jacket_rect), cv2.COLOR_BGR2GRAY
)
@ -99,7 +86,7 @@ class ChieriBotV4Ocr:
def ocr_component_score_knn(self, component_bgr: Mat) -> int:
# sourcery skip: inline-immediately-returned-variable
score_rect = construct_int_xywh_rect(self.rois.component_rois.score_rect)
score_rect = self.rois.component_rois.score_rect.rounded()
score_roi = cv2.cvtColor(
crop_xywh(component_bgr, score_rect), cv2.COLOR_BGR2GRAY
)
@ -117,9 +104,13 @@ class ChieriBotV4Ocr:
if rect[3] > score_roi.shape[0] * 0.5:
continue
score_roi = cv2.fillPoly(score_roi, [contour], 0)
return ocr_digits_by_contour_knn(score_roi, self.score_knn)
def find_pfl_rects(self, component_pfl_processed: Mat) -> List[List[int]]:
ocr_result = self.score_knn_provider.result(score_roi)
return int(ocr_result) if ocr_result else 0
def find_pfl_rects(
self, component_pfl_processed: Mat
) -> List[Tuple[int, int, int, int]]:
# sourcery skip: inline-immediately-returned-variable
pfl_roi_find = cv2.morphologyEx(
component_pfl_processed,
@ -146,7 +137,7 @@ class ChieriBotV4Ocr:
return pfl_rects_adjusted
def preprocess_component_pfl(self, component_bgr: Mat) -> Mat:
pfl_rect = construct_int_xywh_rect(self.rois.component_rois.pfl_rect)
pfl_rect = self.rois.component_rois.pfl_rect.rounded()
pfl_roi = crop_xywh(component_bgr, pfl_rect)
pfl_roi_hsv = cv2.cvtColor(pfl_roi, cv2.COLOR_BGR2HSV)
@ -193,25 +184,9 @@ class ChieriBotV4Ocr:
pure_far_lost = []
for pfl_roi_rect in pfl_rects:
roi = crop_xywh(pfl_roi, pfl_roi_rect)
digit_contours, _ = cv2.findContours(
roi, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
)
digit_rects = [cv2.boundingRect(c) for c in digit_contours]
digit_rects = FixRects.connect_broken(
digit_rects, roi.shape[1], roi.shape[0]
)
digit_rects = FixRects.split_connected(roi, digit_rects)
digit_rects = sorted(digit_rects, key=lambda r: r[0])
digits = []
for digit_rect in digit_rects:
digit = crop_xywh(roi, digit_rect)
digit = resize_fill_square(digit, 20)
digits.append(digit)
samples = preprocess_hog(digits)
result = self.pfl_knn_provider.result(roi)
pure_far_lost.append(int(result) if result else None)
_, results, _, _ = self.pfl_knn.findNearest(samples, 4)
results = [str(int(i)) for i in results.ravel()]
pure_far_lost.append(int("".join(results)))
return tuple(pure_far_lost)
except Exception:
return (None, None, None)

View File

@ -1,12 +1,12 @@
from typing import List, Optional
from typing import List
from ....crop import crop_xywh
from ....types import Mat, XYWHRect
from ....utils import apply_factor, construct_int_xywh_rect
from ....utils import apply_factor
class ChieriBotV4ComponentRois:
def __init__(self, factor: Optional[float] = 1.0):
def __init__(self, factor: float = 1.0):
self.__factor = factor
@property
@ -19,11 +19,11 @@ class ChieriBotV4ComponentRois:
@property
def top_font_color_detect(self):
return apply_factor((35, 10, 120, 100), self.factor)
return apply_factor(XYWHRect(35, 10, 120, 100), self.factor)
@property
def bottom_font_color_detect(self):
return apply_factor((30, 125, 175, 110), self.factor)
return apply_factor(XYWHRect(30, 125, 175, 110), self.factor)
@property
def bg_point(self):
@ -31,31 +31,31 @@ class ChieriBotV4ComponentRois:
@property
def rating_class_rect(self):
return apply_factor((21, 40, 7, 20), self.factor)
return apply_factor(XYWHRect(21, 40, 7, 20), self.factor)
@property
def title_rect(self):
return apply_factor((35, 10, 430, 50), self.factor)
return apply_factor(XYWHRect(35, 10, 430, 50), self.factor)
@property
def jacket_rect(self):
return apply_factor((263, 0, 239, 239), self.factor)
return apply_factor(XYWHRect(263, 0, 239, 239), self.factor)
@property
def score_rect(self):
return apply_factor((30, 60, 270, 55), self.factor)
return apply_factor(XYWHRect(30, 60, 270, 55), self.factor)
@property
def pfl_rect(self):
return apply_factor((50, 125, 80, 100), self.factor)
return apply_factor(XYWHRect(50, 125, 80, 100), self.factor)
@property
def date_rect(self):
return apply_factor((205, 200, 225, 25), self.factor)
return apply_factor(XYWHRect(205, 200, 225, 25), self.factor)
class ChieriBotV4Rois:
def __init__(self, factor: Optional[float] = 1.0):
def __init__(self, factor: float = 1.0):
self.__factor = factor
self.__component_rois = ChieriBotV4ComponentRois(factor)
@ -100,9 +100,7 @@ class ChieriBotV4Rois:
def horizontal_items(self):
return 3
@property
def vertical_items(self):
return 10
vertical_items = 10
@property
def b33_vertical_gap(self):
@ -112,16 +110,17 @@ class ChieriBotV4Rois:
first_rect = XYWHRect(x=self.left, y=self.top, w=self.width, h=self.height)
results = []
last_rect = first_rect
for vi in range(self.vertical_items):
rect = XYWHRect(*first_rect)
rect += (0, (self.vertical_gap + self.height) * vi, 0, 0)
for hi in range(self.horizontal_items):
if hi > 0:
rect += ((self.width + self.horizontal_gap), 0, 0, 0)
int_rect = construct_int_xywh_rect(rect)
results.append(crop_xywh(img_bgr, int_rect))
results.append(crop_xywh(img_bgr, rect.rounded()))
last_rect = rect
rect += (
last_rect += (
-(self.width + self.horizontal_gap) * 2,
self.height + self.b33_vertical_gap,
0,
@ -129,8 +128,7 @@ class ChieriBotV4Rois:
)
for hi in range(self.horizontal_items):
if hi > 0:
rect += ((self.width + self.horizontal_gap), 0, 0, 0)
int_rect = construct_int_xywh_rect(rect)
results.append(crop_xywh(img_bgr, int_rect))
last_rect += ((self.width + self.horizontal_gap), 0, 0, 0)
results.append(crop_xywh(img_bgr, last_rect.rounded()))
return results

View File

@ -1,10 +1,9 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Optional, Union
import attrs
from typing import Optional
@attrs.define
@dataclass
class B30OcrResultItem:
rating_class: int
score: int

View File

@ -0,0 +1,6 @@
from .ihdb import ImageHashDatabaseBuildTask, ImageHashesDatabaseBuilder
__all__ = [
"ImageHashDatabaseBuildTask",
"ImageHashesDatabaseBuilder",
]

View File

@ -0,0 +1,112 @@
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Callable, List
import cv2
from arcaea_offline_ocr.core import hashers
from arcaea_offline_ocr.providers import ImageCategory
from arcaea_offline_ocr.providers.ihdb import (
PROP_KEY_BUILT_AT,
PROP_KEY_HASH_SIZE,
PROP_KEY_HIGH_FREQ_FACTOR,
ImageHashDatabaseIdProvider,
ImageHashType,
)
if TYPE_CHECKING:
from sqlite3 import Connection
from arcaea_offline_ocr.types import Mat
def _default_imread_gray(image_path: str):
return cv2.cvtColor(cv2.imread(image_path, cv2.IMREAD_COLOR), cv2.COLOR_BGR2GRAY)
@dataclass
class ImageHashDatabaseBuildTask:
image_path: str
image_id: str
category: ImageCategory
imread_function: Callable[[str], "Mat"] = _default_imread_gray
@dataclass
class _ImageHash:
image_id: str
category: ImageCategory
image_hash_type: ImageHashType
hash: bytes
class ImageHashesDatabaseBuilder:
@staticmethod
def __insert_property(conn: "Connection", key: str, value: str):
return conn.execute(
"INSERT INTO properties (key, value) VALUES (?, ?)",
(key, value),
)
@classmethod
def build(
cls,
conn: "Connection",
tasks: List[ImageHashDatabaseBuildTask],
*,
hash_size: int = 16,
high_freq_factor: int = 4,
):
hashes: List[_ImageHash] = []
for task in tasks:
img_gray = task.imread_function(task.image_path)
for hash_type, hash_mat in [
(
ImageHashType.AVERAGE,
hashers.average(img_gray, hash_size),
),
(
ImageHashType.DCT,
hashers.dct(img_gray, hash_size, high_freq_factor),
),
(
ImageHashType.DIFFERENCE,
hashers.difference(img_gray, hash_size),
),
]:
hashes.append(
_ImageHash(
image_id=task.image_id,
image_hash_type=hash_type,
category=task.category,
hash=ImageHashDatabaseIdProvider.hash_mat_to_bytes(hash_mat),
)
)
conn.execute("CREATE TABLE properties (`key` VARCHAR, `value` VARCHAR)")
conn.execute(
"""CREATE TABLE hashes (
`id` VARCHAR,
`category` INTEGER,
`hash_type` INTEGER,
`hash` BLOB
)"""
)
now = datetime.now(tz=timezone.utc)
timestamp = int(now.timestamp() * 1000)
cls.__insert_property(conn, PROP_KEY_HASH_SIZE, str(hash_size))
cls.__insert_property(conn, PROP_KEY_HIGH_FREQ_FACTOR, str(high_freq_factor))
cls.__insert_property(conn, PROP_KEY_BUILT_AT, str(timestamp))
conn.executemany(
"INSERT INTO hashes (`id`, `category`, `hash_type`, `hash`) VALUES (?, ?, ?, ?)",
[
(it.image_id, it.category.value, it.image_hash_type.value, it.hash)
for it in hashes
],
)
conn.commit()

View File

View File

@ -0,0 +1,3 @@
from .index import average, dct, difference
__all__ = ["average", "dct", "difference"]

View File

@ -0,0 +1,7 @@
import cv2
from arcaea_offline_ocr.types import Mat
def _resize_image(src: Mat, dsize: ...) -> Mat:
return cv2.resize(src, dsize, fx=0, fy=0, interpolation=cv2.INTER_AREA)

View File

@ -0,0 +1,35 @@
import cv2
import numpy as np
from arcaea_offline_ocr.types import Mat
from ._common import _resize_image
def average(img_gray: Mat, hash_size: int) -> Mat:
img_resized = _resize_image(img_gray, (hash_size, hash_size))
diff = img_resized > img_resized.mean()
return diff.flatten()
def difference(img_gray: Mat, hash_size: int) -> Mat:
img_size = (hash_size + 1, hash_size)
img_resized = _resize_image(img_gray, img_size)
previous = img_resized[:, :-1]
current = img_resized[:, 1:]
diff = previous > current
return diff.flatten()
def dct(img_gray: Mat, hash_size: int = 16, high_freq_factor: int = 4) -> Mat:
# TODO: consistency?
img_size_base = hash_size * high_freq_factor
img_size = (img_size_base, img_size_base)
img_resized = _resize_image(img_gray, img_size)
img_resized = img_resized.astype(np.float32)
dct_mat = cv2.dct(img_resized)
hash_mat = dct_mat[:hash_size, :hash_size]
return hash_mat > hash_mat.mean()

View File

@ -1,15 +1,14 @@
from dataclasses import dataclass
from typing import Optional
import attrs
@attrs.define
@dataclass
class DeviceOcrResult:
rating_class: int
pure: int
far: int
lost: int
score: int
pure: Optional[int] = None
far: Optional[int] = None
lost: Optional[int] = None
max_recall: Optional[int] = None
song_id: Optional[str] = None
song_id_possibility: Optional[float] = None

View File

@ -1,15 +1,8 @@
import cv2
import numpy as np
from ..crop import crop_xywh
from ..ocr import (
FixRects,
ocr_digit_samples_knn,
ocr_digits_by_contour_knn,
preprocess_hog,
resize_fill_square,
)
from ..phash_db import ImagePhashDatabase
from ..providers.knn import OcrKNearestTextProvider
from ..types import Mat
from .common import DeviceOcrResult
from .rois.extractor import DeviceRoisExtractor
@ -21,38 +14,37 @@ class DeviceOcr:
self,
extractor: DeviceRoisExtractor,
masker: DeviceRoisMasker,
knn_model: cv2.ml.KNearest,
knn_provider: OcrKNearestTextProvider,
phash_db: ImagePhashDatabase,
):
self.extractor = extractor
self.masker = masker
self.knn_model = knn_model
self.knn_provider = knn_provider
self.phash_db = phash_db
def pfl(self, roi_gray: Mat, factor: float = 1.25):
contours, _ = cv2.findContours(
roi_gray, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE
)
filtered_contours = [c for c in contours if cv2.contourArea(c) >= 5 * factor]
rects = [cv2.boundingRect(c) for c in filtered_contours]
rects = FixRects.connect_broken(rects, roi_gray.shape[1], roi_gray.shape[0])
def contour_filter(cnt):
return cv2.contourArea(cnt) >= 5 * factor
filtered_rects = [r for r in rects if r[2] >= 5 * factor and r[3] >= 6 * factor]
filtered_rects = FixRects.split_connected(roi_gray, filtered_rects)
filtered_rects = sorted(filtered_rects, key=lambda r: r[0])
contours = self.knn_provider.contours(roi_gray)
contours_filtered = self.knn_provider.contours(
roi_gray, contours_filter=contour_filter
)
roi_ocr = roi_gray.copy()
filtered_contours_flattened = {tuple(c.flatten()) for c in filtered_contours}
contours_filtered_flattened = {tuple(c.flatten()) for c in contours_filtered}
for contour in contours:
if tuple(contour.flatten()) in filtered_contours_flattened:
if tuple(contour.flatten()) in contours_filtered_flattened:
continue
roi_ocr = cv2.fillPoly(roi_ocr, [contour], [0])
digit_rois = [
resize_fill_square(crop_xywh(roi_ocr, r), 20) for r in filtered_rects
]
samples = preprocess_hog(digit_rois)
return ocr_digit_samples_knn(samples, self.knn_model)
ocr_result = self.knn_provider.result(
roi_ocr,
contours_filter=lambda cnt: cv2.contourArea(cnt) >= 5 * factor,
rects_filter=lambda rect: rect[2] >= 5 * factor and rect[3] >= 6 * factor,
)
return int(ocr_result) if ocr_result else 0
def pure(self):
return self.pfl(self.masker.pure(self.extractor.pure))
@ -65,12 +57,14 @@ class DeviceOcr:
def score(self):
roi = self.masker.score(self.extractor.score)
contours, _ = cv2.findContours(roi, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)
contours = self.knn_provider.contours(roi)
for contour in contours:
x, y, w, h = cv2.boundingRect(contour)
if h < roi.shape[0] * 0.6:
if (
cv2.boundingRect(contour)[3] < roi.shape[0] * 0.6
): # h < score_component_h * 0.6
roi = cv2.fillPoly(roi, [contour], [0])
return ocr_digits_by_contour_knn(roi, self.knn_model)
ocr_result = self.knn_provider.result(roi)
return int(ocr_result) if ocr_result else 0
def rating_class(self):
roi = self.extractor.rating_class
@ -79,13 +73,15 @@ class DeviceOcr:
self.masker.rating_class_prs(roi),
self.masker.rating_class_ftr(roi),
self.masker.rating_class_byd(roi),
self.masker.rating_class_etr(roi),
]
return max(enumerate(results), key=lambda i: np.count_nonzero(i[1]))[0]
def max_recall(self):
return ocr_digits_by_contour_knn(
self.masker.max_recall(self.extractor.max_recall), self.knn_model
ocr_result = self.knn_provider.result(
self.masker.max_recall(self.extractor.max_recall)
)
return int(ocr_result) if ocr_result else None
def clear_status(self):
roi = self.extractor.clear_status
@ -108,7 +104,7 @@ class DeviceOcr:
@staticmethod
def preprocess_char_icon(img_gray: Mat):
h, w = img_gray.shape[:2]
img = cv2.copyMakeBorder(img_gray, w - h, 0, 0, 0, cv2.BORDER_REPLICATE)
img = cv2.copyMakeBorder(img_gray, max(w - h, 0), 0, 0, 0, cv2.BORDER_REPLICATE)
h, w = img.shape[:2]
img = cv2.fillPoly(
img,

View File

@ -6,6 +6,8 @@ from .common import DeviceRoisMasker
class DeviceRoisMaskerAuto(DeviceRoisMasker):
# pylint: disable=abstract-method
@staticmethod
def mask_bgr_in_hsv(roi_bgr: Mat, hsv_lower: Mat, hsv_upper: Mat):
return cv2.inRange(
@ -32,6 +34,9 @@ class DeviceRoisMaskerAutoT1(DeviceRoisMaskerAuto):
BYD_HSV_MIN = np.array([170, 50, 50], np.uint8)
BYD_HSV_MAX = np.array([179, 210, 198], np.uint8)
ETR_HSV_MIN = np.array([130, 60, 80], np.uint8)
ETR_HSV_MAX = np.array([140, 145, 180], np.uint8)
TRACK_LOST_HSV_MIN = np.array([170, 75, 90], np.uint8)
TRACK_LOST_HSV_MAX = np.array([175, 170, 160], np.uint8)
@ -85,6 +90,10 @@ class DeviceRoisMaskerAutoT1(DeviceRoisMaskerAuto):
def rating_class_byd(cls, roi_bgr: Mat) -> Mat:
return cls.mask_bgr_in_hsv(roi_bgr, cls.BYD_HSV_MIN, cls.BYD_HSV_MAX)
@classmethod
def rating_class_etr(cls, roi_bgr: Mat) -> Mat:
return cls.mask_bgr_in_hsv(roi_bgr, cls.ETR_HSV_MIN, cls.ETR_HSV_MAX)
@classmethod
def max_recall(cls, roi_bgr: Mat) -> Mat:
return cls.gray(roi_bgr)
@ -116,7 +125,7 @@ class DeviceRoisMaskerAutoT1(DeviceRoisMaskerAuto):
class DeviceRoisMaskerAutoT2(DeviceRoisMaskerAuto):
PFL_HSV_MIN = np.array([0, 0, 248], np.uint8)
PFL_HSV_MAX = np.array([179, 10, 255], np.uint8)
PFL_HSV_MAX = np.array([179, 40, 255], np.uint8)
SCORE_HSV_MIN = np.array([0, 0, 180], np.uint8)
SCORE_HSV_MAX = np.array([179, 255, 255], np.uint8)
@ -133,6 +142,9 @@ class DeviceRoisMaskerAutoT2(DeviceRoisMaskerAuto):
BYD_HSV_MIN = np.array([170, 50, 50], np.uint8)
BYD_HSV_MAX = np.array([179, 210, 198], np.uint8)
ETR_HSV_MIN = np.array([130, 60, 80], np.uint8)
ETR_HSV_MAX = np.array([140, 145, 180], np.uint8)
MAX_RECALL_HSV_MIN = np.array([125, 0, 0], np.uint8)
MAX_RECALL_HSV_MAX = np.array([145, 100, 150], np.uint8)
@ -184,6 +196,10 @@ class DeviceRoisMaskerAutoT2(DeviceRoisMaskerAuto):
def rating_class_byd(cls, roi_bgr: Mat) -> Mat:
return cls.mask_bgr_in_hsv(roi_bgr, cls.BYD_HSV_MIN, cls.BYD_HSV_MAX)
@classmethod
def rating_class_etr(cls, roi_bgr: Mat) -> Mat:
return cls.mask_bgr_in_hsv(roi_bgr, cls.ETR_HSV_MIN, cls.ETR_HSV_MAX)
@classmethod
def max_recall(cls, roi_bgr: Mat) -> Mat:
return cls.mask_bgr_in_hsv(

View File

@ -34,6 +34,10 @@ class DeviceRoisMasker:
def rating_class_byd(cls, roi_bgr: Mat) -> Mat:
raise NotImplementedError()
@classmethod
def rating_class_etr(cls, roi_bgr: Mat) -> Mat:
raise NotImplementedError()
@classmethod
def max_recall(cls, roi_bgr: Mat) -> Mat:
raise NotImplementedError()

View File

@ -12,7 +12,8 @@ def phash_opencv(img_gray, hash_size=8, highfreq_factor=4):
"""
Perceptual Hash computation.
Implementation follows http://www.hackerfactor.com/blog/index.php?/archives/432-Looks-Like-It.html
Implementation follows
http://www.hackerfactor.com/blog/index.php?/archives/432-Looks-Like-It.html
Adapted from `imagehash.phash`, pure opencv implementation
@ -69,14 +70,14 @@ class ImagePhashDatabase:
self.partner_icon_ids: List[str] = []
self.partner_icon_hashes = []
for id, hash in zip(self.ids, self.hashes):
id_splitted = id.split("||")
for _id, _hash in zip(self.ids, self.hashes):
id_splitted = _id.split("||")
if len(id_splitted) > 1 and id_splitted[0] == "partner_icon":
self.partner_icon_ids.append(id_splitted[1])
self.partner_icon_hashes.append(hash)
self.partner_icon_hashes.append(_hash)
else:
self.jacket_ids.append(id)
self.jacket_hashes.append(hash)
self.jacket_ids.append(_id)
self.jacket_hashes.append(_hash)
def calculate_phash(self, img_gray: Mat):
return phash_opencv(

View File

@ -0,0 +1,12 @@
from .base import ImageCategory, ImageIdProvider, ImageIdProviderResult, OcrTextProvider
from .ihdb import ImageHashDatabaseIdProvider
from .knn import OcrKNearestTextProvider
__all__ = [
"ImageCategory",
"ImageHashDatabaseIdProvider",
"OcrKNearestTextProvider",
"ImageIdProvider",
"OcrTextProvider",
"ImageIdProviderResult",
]

View File

@ -0,0 +1,38 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import IntEnum
from typing import TYPE_CHECKING, Any, Sequence, Optional
if TYPE_CHECKING:
from ..types import Mat
class OcrTextProvider(ABC):
@abstractmethod
def result_raw(self, img: "Mat", /, *args, **kwargs) -> Any: ...
@abstractmethod
def result(self, img: "Mat", /, *args, **kwargs) -> Optional[str]: ...
class ImageCategory(IntEnum):
JACKET = 0
PARTNER_ICON = 1
@dataclass(kw_only=True)
class ImageIdProviderResult:
image_id: str
category: ImageCategory
confidence: float
class ImageIdProvider(ABC):
@abstractmethod
def result(
self, img: "Mat", category: ImageCategory, /, *args, **kwargs
) -> ImageIdProviderResult: ...
@abstractmethod
def results(
self, img: "Mat", category: ImageCategory, /, *args, **kwargs
) -> Sequence[ImageIdProviderResult]: ...

View File

@ -0,0 +1,194 @@
import sqlite3
from dataclasses import dataclass
from datetime import datetime, timezone
from enum import IntEnum
from typing import TYPE_CHECKING, Any, Callable, List, Optional, TypeVar
from arcaea_offline_ocr.core import hashers
from .base import ImageCategory, ImageIdProvider, ImageIdProviderResult
if TYPE_CHECKING:
from arcaea_offline_ocr.types import Mat
T = TypeVar("T")
PROP_KEY_HASH_SIZE = "hash_size"
PROP_KEY_HIGH_FREQ_FACTOR = "high_freq_factor"
PROP_KEY_BUILT_AT = "built_at"
def _sql_hamming_distance(hash1: bytes, hash2: bytes):
assert len(hash1) == len(hash2), "hash size does not match!"
count = sum(1 for byte1, byte2 in zip(hash1, hash2) if byte1 != byte2)
return count
class ImageHashType(IntEnum):
AVERAGE = 0
DIFFERENCE = 1
DCT = 2
@dataclass(kw_only=True)
class ImageHashDatabaseIdProviderResult(ImageIdProviderResult):
image_hash_type: ImageHashType
class MissingPropertiesError(Exception):
keys: List[str]
def __init__(self, keys, *args):
super().__init__(*args)
self.keys = keys
class ImageHashDatabaseIdProvider(ImageIdProvider):
def __init__(self, conn: sqlite3.Connection):
self.conn = conn
self.conn.create_function("HAMMING_DISTANCE", 2, _sql_hamming_distance)
self.properties = {
PROP_KEY_HASH_SIZE: -1,
PROP_KEY_HIGH_FREQ_FACTOR: -1,
PROP_KEY_BUILT_AT: None,
}
self._hashes_count = {
ImageCategory.JACKET: 0,
ImageCategory.PARTNER_ICON: 0,
}
self._hash_length: int = -1
self._initialize()
@property
def hash_size(self) -> int:
return self.properties[PROP_KEY_HASH_SIZE]
@property
def high_freq_factor(self) -> int:
return self.properties[PROP_KEY_HIGH_FREQ_FACTOR]
@property
def built_at(self) -> Optional[datetime]:
return self.properties.get(PROP_KEY_BUILT_AT)
@property
def hash_length(self):
return self._hash_length
def _initialize(self):
def get_property(key, converter: Callable[[Any], T]) -> Optional[T]:
result = self.conn.execute(
"SELECT value FROM properties WHERE key = ?",
(key,),
).fetchone()
return converter(result[0]) if result is not None else None
def set_hashes_count(category: ImageCategory):
self._hashes_count[category] = self.conn.execute(
"SELECT COUNT(DISTINCT `id`) FROM hashes WHERE category = ?",
(category.value,),
).fetchone()[0]
properties_converter_map = {
PROP_KEY_HASH_SIZE: lambda x: int(x),
PROP_KEY_HIGH_FREQ_FACTOR: lambda x: int(x),
PROP_KEY_BUILT_AT: lambda ts: datetime.fromtimestamp(
int(ts) / 1000, tz=timezone.utc
),
}
required_properties = [PROP_KEY_HASH_SIZE, PROP_KEY_HIGH_FREQ_FACTOR]
missing_properties = []
for property_key, converter in properties_converter_map.items():
value = get_property(property_key, converter)
if value is None:
if property_key in required_properties:
missing_properties.append(property_key)
continue
self.properties[property_key] = value
if missing_properties:
raise MissingPropertiesError(keys=missing_properties)
set_hashes_count(ImageCategory.JACKET)
set_hashes_count(ImageCategory.PARTNER_ICON)
self._hash_length = self.hash_size**2
def lookup_hash(
self, category: ImageCategory, hash_type: ImageHashType, hash: bytes
) -> List[ImageHashDatabaseIdProviderResult]:
cursor = self.conn.execute(
"""
SELECT
`id`,
HAMMING_DISTANCE(hash, ?) AS distance
FROM hashes
WHERE category = ? AND hash_type = ?
ORDER BY distance ASC LIMIT 10""",
(hash, category.value, hash_type.value),
)
results = []
for id_, distance in cursor.fetchall():
results.append(
ImageHashDatabaseIdProviderResult(
image_id=id_,
category=category,
confidence=(self.hash_length - distance) / self.hash_length,
image_hash_type=hash_type,
)
)
return results
@staticmethod
def hash_mat_to_bytes(hash: "Mat") -> bytes:
return bytes([255 if b else 0 for b in hash.flatten()])
def results(self, img: "Mat", category: ImageCategory, /):
results: List[ImageHashDatabaseIdProviderResult] = []
results.extend(
self.lookup_hash(
category,
ImageHashType.AVERAGE,
self.hash_mat_to_bytes(hashers.average(img, self.hash_size)),
)
)
results.extend(
self.lookup_hash(
category,
ImageHashType.DIFFERENCE,
self.hash_mat_to_bytes(hashers.difference(img, self.hash_size)),
)
)
results.extend(
self.lookup_hash(
category,
ImageHashType.DCT,
self.hash_mat_to_bytes(
hashers.dct(img, self.hash_size, self.high_freq_factor)
),
)
)
return results
def result(
self,
img: "Mat",
category: ImageCategory,
/,
*,
hash_type: ImageHashType = ImageHashType.DCT,
):
return [
it for it in self.results(img, category) if it.image_hash_type == hash_type
][0]

View File

@ -1,18 +1,19 @@
import logging
import math
from typing import Optional, Sequence, Tuple
from typing import TYPE_CHECKING, Callable, Optional, Sequence, Tuple
import cv2
import numpy as np
from .crop import crop_xywh
from .types import Mat
from ..crop import crop_xywh
from .base import OcrTextProvider
__all__ = [
"FixRects",
"preprocess_hog",
"ocr_digits_by_contour_get_samples",
"ocr_digits_by_contour_knn",
]
if TYPE_CHECKING:
from cv2.ml import KNearest
from ..types import Mat
logger = logging.getLogger(__name__)
class FixRects:
@ -36,7 +37,7 @@ class FixRects:
if rect in consumed_rects:
continue
x, y, w, h = rect
x, _, w, h = rect
# grab those small rects
if not img_height * 0.1 <= h <= img_height * 0.6:
continue
@ -46,7 +47,7 @@ class FixRects:
for other_rect in rects:
if rect == other_rect:
continue
ox, oy, ow, oh = other_rect
ox, _, ow, _ = other_rect
if abs(x - ox) < tolerance and abs((x + w) - (ox + ow)) < tolerance:
group.append(other_rect)
@ -68,7 +69,7 @@ class FixRects:
@staticmethod
def split_connected(
img_masked: Mat,
img_masked: "Mat",
rects: Sequence[Tuple[int, int, int, int]],
rect_wh_ratio: float = 1.05,
width_range_ratio: float = 0.1,
@ -118,7 +119,7 @@ class FixRects:
return return_rects
def resize_fill_square(img: Mat, target: int = 20):
def resize_fill_square(img: "Mat", target: int = 20):
h, w = img.shape[:2]
if h > w:
new_h = target
@ -152,29 +153,88 @@ def preprocess_hog(digit_rois):
def ocr_digit_samples_knn(__samples, knn_model: cv2.ml.KNearest, k: int = 4):
_, results, _, _ = knn_model.findNearest(__samples, k)
result_list = [int(r) for r in results.ravel()]
result_str = "".join(str(r) for r in result_list if r > -1)
return int(result_str) if result_str else 0
return [int(r) for r in results.ravel()]
def ocr_digits_by_contour_get_samples(__roi_gray: Mat, size: int):
roi = __roi_gray.copy()
contours, _ = cv2.findContours(roi, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)
rects = [cv2.boundingRect(c) for c in contours]
rects = FixRects.connect_broken(rects, roi.shape[1], roi.shape[0])
rects = FixRects.split_connected(roi, rects)
rects = sorted(rects, key=lambda r: r[0])
# digit_rois = [cv2.resize(crop_xywh(roi, rect), size) for rect in rects]
digit_rois = [resize_fill_square(crop_xywh(roi, rect), size) for rect in rects]
return preprocess_hog(digit_rois)
class OcrKNearestTextProvider(OcrTextProvider):
_ContourFilter = Callable[["Mat"], bool]
_RectsFilter = Callable[[Sequence[int]], bool]
def __init__(self, model: "KNearest"):
self.model = model
def ocr_digits_by_contour_knn(
__roi_gray: Mat,
knn_model: cv2.ml.KNearest,
*,
k=4,
size: int = 20,
) -> int:
samples = ocr_digits_by_contour_get_samples(__roi_gray, size)
return ocr_digit_samples_knn(samples, knn_model, k)
def contours(
self, img: "Mat", /, *, contours_filter: Optional[_ContourFilter] = None
):
cnts, _ = cv2.findContours(img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)
if contours_filter:
cnts = list(filter(contours_filter, cnts))
return cnts
def result_raw(
self,
img: "Mat",
/,
*,
fix_rects: bool = True,
contours_filter: Optional[_ContourFilter] = None,
rects_filter: Optional[_RectsFilter] = None,
):
"""
:param img: grayscaled roi
"""
try:
cnts, _ = cv2.findContours(img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if contours_filter:
cnts = list(filter(contours_filter, cnts))
rects = [cv2.boundingRect(cnt) for cnt in cnts]
if fix_rects and rects_filter:
rects = FixRects.connect_broken(rects, img.shape[1], img.shape[0]) # type: ignore
rects = list(filter(rects_filter, rects))
rects = FixRects.split_connected(img, rects)
elif fix_rects:
rects = FixRects.connect_broken(rects, img.shape[1], img.shape[0]) # type: ignore
rects = FixRects.split_connected(img, rects)
elif rects_filter:
rects = list(filter(rects_filter, rects))
rects = sorted(rects, key=lambda r: r[0])
digits = []
for rect in rects:
digit = crop_xywh(img, rect)
digit = resize_fill_square(digit, 20)
digits.append(digit)
samples = preprocess_hog(digits)
return ocr_digit_samples_knn(samples, self.model)
except Exception:
logger.exception("Error occurred during KNearest OCR")
return None
def result(
self,
img: "Mat",
/,
*,
fix_rects: bool = True,
contours_filter: Optional[_ContourFilter] = None,
rects_filter: Optional[_RectsFilter] = None,
):
"""
:param img: grayscaled roi
"""
raw = self.result_raw(
img,
fix_rects=fix_rects,
contours_filter=contours_filter,
rects_filter=rects_filter,
)
return (
"".join(["".join(str(r) for r in raw if r > -1)])
if raw is not None
else None
)

View File

@ -1,25 +1,36 @@
from collections.abc import Iterable
from typing import NamedTuple, Tuple, Union
from math import floor
from typing import Callable, NamedTuple, Union
import numpy as np
Mat = np.ndarray
_IntOrFloat = Union[int, float]
class XYWHRect(NamedTuple):
x: int
y: int
w: int
h: int
x: _IntOrFloat
y: _IntOrFloat
w: _IntOrFloat
h: _IntOrFloat
def __add__(self, other: Union["XYWHRect", Tuple[int, int, int, int]]):
if not isinstance(other, Iterable) or len(other) != 4:
raise ValueError()
def _to_int(self, func: Callable[[_IntOrFloat], int]):
return (func(self.x), func(self.y), func(self.w), func(self.h))
def rounded(self):
return self._to_int(round)
def floored(self):
return self._to_int(floor)
def __add__(self, other):
if not isinstance(other, (list, tuple)) or len(other) != 4:
raise TypeError()
return self.__class__(*[a + b for a, b in zip(self, other)])
def __sub__(self, other: Union["XYWHRect", Tuple[int, int, int, int]]):
if not isinstance(other, Iterable) or len(other) != 4:
raise ValueError()
def __sub__(self, other):
if not isinstance(other, (list, tuple)) or len(other) != 4:
raise TypeError()
return self.__class__(*[a - b for a, b in zip(self, other)])

View File

@ -1,5 +1,5 @@
from collections.abc import Iterable
from typing import Callable, TypeVar, Union, overload
from typing import TypeVar, overload
import cv2
import numpy as np
@ -15,32 +15,25 @@ def imread_unicode(filepath: str, flags: int = cv2.IMREAD_UNCHANGED):
return cv2.imdecode(np.fromfile(filepath, dtype=np.uint8), flags)
def construct_int_xywh_rect(
rect: XYWHRect, func: Callable[[Union[int, float]], int] = round
):
return XYWHRect(*[func(num) for num in rect])
@overload
def apply_factor(item: int, factor: float) -> float: ...
@overload
def apply_factor(item: int, factor: float) -> float:
...
@overload
def apply_factor(item: float, factor: float) -> float:
...
def apply_factor(item: float, factor: float) -> float: ...
T = TypeVar("T", bound=Iterable)
@overload
def apply_factor(item: T, factor: float) -> T:
...
def apply_factor(item: T, factor: float) -> T: ...
def apply_factor(item, factor: float):
if isinstance(item, (int, float)):
return item * factor
elif isinstance(item, Iterable):
if isinstance(item, XYWHRect):
return item.__class__(*[i * factor for i in item])
if isinstance(item, Iterable):
return item.__class__([i * factor for i in item])