241 lines
6.5 KiB
Python
241 lines
6.5 KiB
Python
from fastapi import FastAPI, UploadFile, File, HTTPException
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from pydantic import BaseModel
|
|
from typing import Optional
|
|
import json
|
|
import os
|
|
from pathlib import Path
|
|
|
|
app = FastAPI(
|
|
title="Hotspot API",
|
|
description="API per riconoscimento testo e matching hotspot europei",
|
|
version="1.0.0"
|
|
)
|
|
|
|
# CORS per permettere chiamate dal frontend
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"], # In produzione, specifica i domini
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
# Path del file JSON (montato come volume in Docker)
|
|
DATA_PATH = os.getenv("DATA_PATH", "/data/hotspots.json")
|
|
|
|
# Cache del database in memoria
|
|
hotspots_db: dict = {}
|
|
video_mapping: dict = {}
|
|
|
|
|
|
def load_database():
|
|
"""Carica il database JSON all'avvio"""
|
|
global hotspots_db, video_mapping
|
|
|
|
try:
|
|
with open(DATA_PATH, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
# Supporta sia formato semplice che complesso
|
|
if isinstance(data, dict):
|
|
if "hotspots" in data:
|
|
hotspots_db = data["hotspots"]
|
|
video_mapping = data.get("video_mapping", {})
|
|
else:
|
|
# Formato semplice: {id: nome}
|
|
hotspots_db = {k: {"id": k, "name": v} for k, v in data.items()}
|
|
|
|
print(f"✅ Database caricato: {len(hotspots_db)} hotspots")
|
|
except FileNotFoundError:
|
|
print(f"⚠️ File {DATA_PATH} non trovato, uso database vuoto")
|
|
hotspots_db = {}
|
|
except json.JSONDecodeError as e:
|
|
print(f"❌ Errore parsing JSON: {e}")
|
|
hotspots_db = {}
|
|
|
|
|
|
@app.on_event("startup")
|
|
async def startup_event():
|
|
load_database()
|
|
|
|
|
|
# === MODELS ===
|
|
|
|
class MatchRequest(BaseModel):
|
|
text: str
|
|
language: Optional[str] = "en"
|
|
|
|
|
|
class MatchResponse(BaseModel):
|
|
found: bool
|
|
hotspot_id: Optional[str] = None
|
|
name: Optional[str] = None
|
|
confidence: Optional[float] = None
|
|
video_ids: Optional[list] = None
|
|
|
|
|
|
class HotspotResponse(BaseModel):
|
|
id: str
|
|
name: str
|
|
video_ids: Optional[list] = None
|
|
|
|
|
|
# === UTILITIES ===
|
|
|
|
def calculate_similarity(str1: str, str2: str) -> float:
|
|
"""Calcola similarità tra due stringhe (Levenshtein normalizzato)"""
|
|
if not str1 or not str2:
|
|
return 0.0
|
|
|
|
str1, str2 = str1.lower(), str2.lower()
|
|
|
|
if str1 == str2:
|
|
return 1.0
|
|
|
|
len1, len2 = len(str1), len(str2)
|
|
if len1 == 0 or len2 == 0:
|
|
return 0.0
|
|
|
|
# Matrice per Levenshtein
|
|
matrix = [[0] * (len2 + 1) for _ in range(len1 + 1)]
|
|
|
|
for i in range(len1 + 1):
|
|
matrix[i][0] = i
|
|
for j in range(len2 + 1):
|
|
matrix[0][j] = j
|
|
|
|
for i in range(1, len1 + 1):
|
|
for j in range(1, len2 + 1):
|
|
cost = 0 if str1[i-1] == str2[j-1] else 1
|
|
matrix[i][j] = min(
|
|
matrix[i-1][j] + 1,
|
|
matrix[i][j-1] + 1,
|
|
matrix[i-1][j-1] + cost
|
|
)
|
|
|
|
distance = matrix[len1][len2]
|
|
max_len = max(len1, len2)
|
|
return (max_len - distance) / max_len
|
|
|
|
|
|
def clean_text(text: str) -> str:
|
|
"""Pulisce il testo OCR"""
|
|
import re
|
|
cleaned = text.lower().strip()
|
|
cleaned = re.sub(r'[{}§\[\]()@#$%^&*+=|\\<>?/~`!]', ' ', cleaned)
|
|
cleaned = re.sub(r'\s+', ' ', cleaned)
|
|
return cleaned.strip()
|
|
|
|
|
|
def find_best_match(text: str, threshold: float = 0.65) -> tuple:
|
|
"""Trova il miglior match nel database"""
|
|
cleaned = clean_text(text)
|
|
words = cleaned.split()
|
|
|
|
best_match = None
|
|
best_score = 0.0
|
|
|
|
for hotspot_id, hotspot in hotspots_db.items():
|
|
name = hotspot.get("name", "").lower() if isinstance(hotspot, dict) else str(hotspot).lower()
|
|
|
|
# Match esatto
|
|
if name == cleaned:
|
|
return hotspot_id, name, 1.0
|
|
|
|
# Substring match
|
|
if name in cleaned or cleaned in name:
|
|
return hotspot_id, name, 0.95
|
|
|
|
# Word match
|
|
name_words = name.split()
|
|
for word in words:
|
|
if len(word) < 2:
|
|
continue
|
|
for name_word in name_words:
|
|
if name_word == word:
|
|
return hotspot_id, name, 0.9
|
|
|
|
similarity = calculate_similarity(word, name_word)
|
|
if similarity > best_score:
|
|
best_score = similarity
|
|
best_match = (hotspot_id, name)
|
|
|
|
if best_match and best_score >= threshold:
|
|
return best_match[0], best_match[1], best_score
|
|
|
|
return None, None, 0.0
|
|
|
|
|
|
# === ENDPOINTS ===
|
|
|
|
@app.get("/")
|
|
async def root():
|
|
return {
|
|
"service": "Hotspot API",
|
|
"version": "1.0.0",
|
|
"hotspots_loaded": len(hotspots_db)
|
|
}
|
|
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
return {"status": "healthy", "database_loaded": len(hotspots_db) > 0}
|
|
|
|
|
|
@app.get("/hotspots", response_model=list[HotspotResponse])
|
|
async def get_hotspots():
|
|
"""Restituisce tutti gli hotspot"""
|
|
result = []
|
|
for hotspot_id, hotspot in hotspots_db.items():
|
|
name = hotspot.get("name", hotspot) if isinstance(hotspot, dict) else str(hotspot)
|
|
result.append(HotspotResponse(
|
|
id=hotspot_id,
|
|
name=name,
|
|
video_ids=video_mapping.get(hotspot_id)
|
|
))
|
|
return result
|
|
|
|
|
|
@app.get("/hotspots/{hotspot_id}", response_model=HotspotResponse)
|
|
async def get_hotspot(hotspot_id: str):
|
|
"""Restituisce un singolo hotspot"""
|
|
if hotspot_id not in hotspots_db:
|
|
raise HTTPException(status_code=404, detail="Hotspot non trovato")
|
|
|
|
hotspot = hotspots_db[hotspot_id]
|
|
name = hotspot.get("name", hotspot) if isinstance(hotspot, dict) else str(hotspot)
|
|
|
|
return HotspotResponse(
|
|
id=hotspot_id,
|
|
name=name,
|
|
video_ids=video_mapping.get(hotspot_id)
|
|
)
|
|
|
|
|
|
@app.post("/match", response_model=MatchResponse)
|
|
async def match_text(request: MatchRequest):
|
|
"""Cerca corrispondenza per il testo fornito"""
|
|
if not request.text.strip():
|
|
return MatchResponse(found=False)
|
|
|
|
hotspot_id, name, confidence = find_best_match(request.text)
|
|
|
|
if hotspot_id:
|
|
return MatchResponse(
|
|
found=True,
|
|
hotspot_id=hotspot_id,
|
|
name=name,
|
|
confidence=round(confidence, 2),
|
|
video_ids=video_mapping.get(hotspot_id)
|
|
)
|
|
|
|
return MatchResponse(found=False)
|
|
|
|
|
|
@app.post("/reload")
|
|
async def reload_database():
|
|
"""Ricarica il database dal file JSON"""
|
|
load_database()
|
|
return {"status": "reloaded", "hotspots_count": len(hotspots_db)}
|