Files
hotspot-api/app/main.py
2026-01-27 13:52:44 +01:00

241 lines
6.5 KiB
Python

from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional
import json
import os
from pathlib import Path
app = FastAPI(
title="Hotspot API",
description="API per riconoscimento testo e matching hotspot europei",
version="1.0.0"
)
# CORS per permettere chiamate dal frontend
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In produzione, specifica i domini
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Path del file JSON (montato come volume in Docker)
DATA_PATH = os.getenv("DATA_PATH", "/data/hotspots.json")
# Cache del database in memoria
hotspots_db: dict = {}
video_mapping: dict = {}
def load_database():
"""Carica il database JSON all'avvio"""
global hotspots_db, video_mapping
try:
with open(DATA_PATH, 'r', encoding='utf-8') as f:
data = json.load(f)
# Supporta sia formato semplice che complesso
if isinstance(data, dict):
if "hotspots" in data:
hotspots_db = data["hotspots"]
video_mapping = data.get("video_mapping", {})
else:
# Formato semplice: {id: nome}
hotspots_db = {k: {"id": k, "name": v} for k, v in data.items()}
print(f"✅ Database caricato: {len(hotspots_db)} hotspots")
except FileNotFoundError:
print(f"⚠️ File {DATA_PATH} non trovato, uso database vuoto")
hotspots_db = {}
except json.JSONDecodeError as e:
print(f"❌ Errore parsing JSON: {e}")
hotspots_db = {}
@app.on_event("startup")
async def startup_event():
load_database()
# === MODELS ===
class MatchRequest(BaseModel):
text: str
language: Optional[str] = "en"
class MatchResponse(BaseModel):
found: bool
hotspot_id: Optional[str] = None
name: Optional[str] = None
confidence: Optional[float] = None
video_ids: Optional[list] = None
class HotspotResponse(BaseModel):
id: str
name: str
video_ids: Optional[list] = None
# === UTILITIES ===
def calculate_similarity(str1: str, str2: str) -> float:
"""Calcola similarità tra due stringhe (Levenshtein normalizzato)"""
if not str1 or not str2:
return 0.0
str1, str2 = str1.lower(), str2.lower()
if str1 == str2:
return 1.0
len1, len2 = len(str1), len(str2)
if len1 == 0 or len2 == 0:
return 0.0
# Matrice per Levenshtein
matrix = [[0] * (len2 + 1) for _ in range(len1 + 1)]
for i in range(len1 + 1):
matrix[i][0] = i
for j in range(len2 + 1):
matrix[0][j] = j
for i in range(1, len1 + 1):
for j in range(1, len2 + 1):
cost = 0 if str1[i-1] == str2[j-1] else 1
matrix[i][j] = min(
matrix[i-1][j] + 1,
matrix[i][j-1] + 1,
matrix[i-1][j-1] + cost
)
distance = matrix[len1][len2]
max_len = max(len1, len2)
return (max_len - distance) / max_len
def clean_text(text: str) -> str:
"""Pulisce il testo OCR"""
import re
cleaned = text.lower().strip()
cleaned = re.sub(r'[{}§\[\]()@#$%^&*+=|\\<>?/~`!]', ' ', cleaned)
cleaned = re.sub(r'\s+', ' ', cleaned)
return cleaned.strip()
def find_best_match(text: str, threshold: float = 0.65) -> tuple:
"""Trova il miglior match nel database"""
cleaned = clean_text(text)
words = cleaned.split()
best_match = None
best_score = 0.0
for hotspot_id, hotspot in hotspots_db.items():
name = hotspot.get("name", "").lower() if isinstance(hotspot, dict) else str(hotspot).lower()
# Match esatto
if name == cleaned:
return hotspot_id, name, 1.0
# Substring match
if name in cleaned or cleaned in name:
return hotspot_id, name, 0.95
# Word match
name_words = name.split()
for word in words:
if len(word) < 2:
continue
for name_word in name_words:
if name_word == word:
return hotspot_id, name, 0.9
similarity = calculate_similarity(word, name_word)
if similarity > best_score:
best_score = similarity
best_match = (hotspot_id, name)
if best_match and best_score >= threshold:
return best_match[0], best_match[1], best_score
return None, None, 0.0
# === ENDPOINTS ===
@app.get("/")
async def root():
return {
"service": "Hotspot API",
"version": "1.0.0",
"hotspots_loaded": len(hotspots_db)
}
@app.get("/health")
async def health():
return {"status": "healthy", "database_loaded": len(hotspots_db) > 0}
@app.get("/hotspots", response_model=list[HotspotResponse])
async def get_hotspots():
"""Restituisce tutti gli hotspot"""
result = []
for hotspot_id, hotspot in hotspots_db.items():
name = hotspot.get("name", hotspot) if isinstance(hotspot, dict) else str(hotspot)
result.append(HotspotResponse(
id=hotspot_id,
name=name,
video_ids=video_mapping.get(hotspot_id)
))
return result
@app.get("/hotspots/{hotspot_id}", response_model=HotspotResponse)
async def get_hotspot(hotspot_id: str):
"""Restituisce un singolo hotspot"""
if hotspot_id not in hotspots_db:
raise HTTPException(status_code=404, detail="Hotspot non trovato")
hotspot = hotspots_db[hotspot_id]
name = hotspot.get("name", hotspot) if isinstance(hotspot, dict) else str(hotspot)
return HotspotResponse(
id=hotspot_id,
name=name,
video_ids=video_mapping.get(hotspot_id)
)
@app.post("/match", response_model=MatchResponse)
async def match_text(request: MatchRequest):
"""Cerca corrispondenza per il testo fornito"""
if not request.text.strip():
return MatchResponse(found=False)
hotspot_id, name, confidence = find_best_match(request.text)
if hotspot_id:
return MatchResponse(
found=True,
hotspot_id=hotspot_id,
name=name,
confidence=round(confidence, 2),
video_ids=video_mapping.get(hotspot_id)
)
return MatchResponse(found=False)
@app.post("/reload")
async def reload_database():
"""Ricarica il database dal file JSON"""
load_database()
return {"status": "reloaded", "hotspots_count": len(hotspots_db)}