from fastapi import FastAPI, UploadFile, File, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import Optional import json import os from pathlib import Path app = FastAPI( title="Hotspot API", description="API per riconoscimento testo e matching hotspot europei", version="1.0.0" ) # CORS per permettere chiamate dal frontend app.add_middleware( CORSMiddleware, allow_origins=["*"], # In produzione, specifica i domini allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Path del file JSON (montato come volume in Docker) DATA_PATH = os.getenv("DATA_PATH", "/data/hotspots.json") # Cache del database in memoria hotspots_db: dict = {} video_mapping: dict = {} def load_database(): """Carica il database JSON all'avvio""" global hotspots_db, video_mapping try: with open(DATA_PATH, 'r', encoding='utf-8') as f: data = json.load(f) # Supporta sia formato semplice che complesso if isinstance(data, dict): if "hotspots" in data: hotspots_db = data["hotspots"] video_mapping = data.get("video_mapping", {}) else: # Formato semplice: {id: nome} hotspots_db = {k: {"id": k, "name": v} for k, v in data.items()} print(f"✅ Database caricato: {len(hotspots_db)} hotspots") except FileNotFoundError: print(f"⚠️ File {DATA_PATH} non trovato, uso database vuoto") hotspots_db = {} except json.JSONDecodeError as e: print(f"❌ Errore parsing JSON: {e}") hotspots_db = {} @app.on_event("startup") async def startup_event(): load_database() # === MODELS === class MatchRequest(BaseModel): text: str language: Optional[str] = "en" class MatchResponse(BaseModel): found: bool hotspot_id: Optional[str] = None name: Optional[str] = None confidence: Optional[float] = None video_ids: Optional[list] = None class HotspotResponse(BaseModel): id: str name: str video_ids: Optional[list] = None # === UTILITIES === def calculate_similarity(str1: str, str2: str) -> float: """Calcola similarità tra due stringhe (Levenshtein normalizzato)""" if not str1 or not str2: return 0.0 str1, str2 = str1.lower(), str2.lower() if str1 == str2: return 1.0 len1, len2 = len(str1), len(str2) if len1 == 0 or len2 == 0: return 0.0 # Matrice per Levenshtein matrix = [[0] * (len2 + 1) for _ in range(len1 + 1)] for i in range(len1 + 1): matrix[i][0] = i for j in range(len2 + 1): matrix[0][j] = j for i in range(1, len1 + 1): for j in range(1, len2 + 1): cost = 0 if str1[i-1] == str2[j-1] else 1 matrix[i][j] = min( matrix[i-1][j] + 1, matrix[i][j-1] + 1, matrix[i-1][j-1] + cost ) distance = matrix[len1][len2] max_len = max(len1, len2) return (max_len - distance) / max_len def clean_text(text: str) -> str: """Pulisce il testo OCR""" import re cleaned = text.lower().strip() cleaned = re.sub(r'[{}§\[\]()@#$%^&*+=|\\<>?/~`!]', ' ', cleaned) cleaned = re.sub(r'\s+', ' ', cleaned) return cleaned.strip() def find_best_match(text: str, threshold: float = 0.65) -> tuple: """Trova il miglior match nel database""" cleaned = clean_text(text) words = cleaned.split() best_match = None best_score = 0.0 for hotspot_id, hotspot in hotspots_db.items(): name = hotspot.get("name", "").lower() if isinstance(hotspot, dict) else str(hotspot).lower() # Match esatto if name == cleaned: return hotspot_id, name, 1.0 # Substring match if name in cleaned or cleaned in name: return hotspot_id, name, 0.95 # Word match name_words = name.split() for word in words: if len(word) < 2: continue for name_word in name_words: if name_word == word: return hotspot_id, name, 0.9 similarity = calculate_similarity(word, name_word) if similarity > best_score: best_score = similarity best_match = (hotspot_id, name) if best_match and best_score >= threshold: return best_match[0], best_match[1], best_score return None, None, 0.0 # === ENDPOINTS === @app.get("/") async def root(): return { "service": "Hotspot API", "version": "1.0.0", "hotspots_loaded": len(hotspots_db) } @app.get("/health") async def health(): return {"status": "healthy", "database_loaded": len(hotspots_db) > 0} @app.get("/hotspots", response_model=list[HotspotResponse]) async def get_hotspots(): """Restituisce tutti gli hotspot""" result = [] for hotspot_id, hotspot in hotspots_db.items(): name = hotspot.get("name", hotspot) if isinstance(hotspot, dict) else str(hotspot) result.append(HotspotResponse( id=hotspot_id, name=name, video_ids=video_mapping.get(hotspot_id) )) return result @app.get("/hotspots/{hotspot_id}", response_model=HotspotResponse) async def get_hotspot(hotspot_id: str): """Restituisce un singolo hotspot""" if hotspot_id not in hotspots_db: raise HTTPException(status_code=404, detail="Hotspot non trovato") hotspot = hotspots_db[hotspot_id] name = hotspot.get("name", hotspot) if isinstance(hotspot, dict) else str(hotspot) return HotspotResponse( id=hotspot_id, name=name, video_ids=video_mapping.get(hotspot_id) ) @app.post("/match", response_model=MatchResponse) async def match_text(request: MatchRequest): """Cerca corrispondenza per il testo fornito""" if not request.text.strip(): return MatchResponse(found=False) hotspot_id, name, confidence = find_best_match(request.text) if hotspot_id: return MatchResponse( found=True, hotspot_id=hotspot_id, name=name, confidence=round(confidence, 2), video_ids=video_mapping.get(hotspot_id) ) return MatchResponse(found=False) @app.post("/reload") async def reload_database(): """Ricarica il database dal file JSON""" load_database() return {"status": "reloaded", "hotspots_count": len(hotspots_db)}