import hashlib import json import os import secrets import shutil import subprocess import tempfile import threading import uuid from datetime import datetime, timedelta from typing import Literal, Optional import dotenv from fastapi import BackgroundTasks, Depends, FastAPI, File, Form, Header, HTTPException, UploadFile from fastapi.responses import PlainTextResponse, Response, FileResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from core.combine import combine from core.diarize import diarize from core.formats import to_srt, to_txt from core.transcribe import transcribe dotenv.load_dotenv() app = FastAPI(title="Transcriptor API", description="Audio/Video transcription and speaker diarization") # --------------------------------------------------------------------------- # API key store (direct REST access) # --------------------------------------------------------------------------- KEYS_FILE = os.path.join(os.path.dirname(__file__), "api_keys.json") def _load_keys() -> dict: if os.path.exists(KEYS_FILE): with open(KEYS_FILE) as f: return json.load(f) return {} def _save_keys(keys: dict): with open(KEYS_FILE, "w") as f: json.dump(keys, f, indent=2) # --------------------------------------------------------------------------- # User store (GUI accounts with passwords) # --------------------------------------------------------------------------- USERS_FILE = os.path.join(os.path.dirname(__file__), "users.json") _users_lock = threading.Lock() def _load_users() -> dict: if os.path.exists(USERS_FILE): with open(USERS_FILE) as f: return json.load(f) return {} def _save_users(users: dict): with _users_lock: with open(USERS_FILE, "w", encoding="utf-8") as f: json.dump(users, f, indent=2, ensure_ascii=False) def _hash_password(password: str, salt: Optional[str] = None) -> tuple[str, str]: if salt is None: salt = secrets.token_hex(16) key = hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), 200_000) return key.hex(), salt def _verify_password(password: str, stored_hash: str, salt: str) -> bool: computed, _ = _hash_password(password, salt) return secrets.compare_digest(computed, stored_hash) # --------------------------------------------------------------------------- # Session store (in-memory, 8-hour TTL) # --------------------------------------------------------------------------- _sessions: dict = {} _sessions_lock = threading.Lock() SESSION_TTL_HOURS = 8 def _create_session(user: dict) -> str: token = secrets.token_hex(32) expires_at = (datetime.now() + timedelta(hours=SESSION_TTL_HOURS)).isoformat() with _sessions_lock: _sessions[token] = { "user_id": user["id"], "email": user["email"], "name": user["name"], "role": user["role"], "expires_at": expires_at, } return token def _get_session(token: str) -> Optional[dict]: with _sessions_lock: session = _sessions.get(token) if not session: return None if datetime.now() > datetime.fromisoformat(session["expires_at"]): with _sessions_lock: _sessions.pop(token, None) return None return session # --------------------------------------------------------------------------- # Auth dependencies # --------------------------------------------------------------------------- def verify_api_key(x_api_key: str = Header(..., description="Your API key")) -> dict: keys = _load_keys() if x_api_key not in keys: raise HTTPException(status_code=401, detail="Invalid or missing API key") return keys[x_api_key] def verify_session(x_session_token: Optional[str] = Header(None)) -> dict: if not x_session_token: raise HTTPException(status_code=401, detail="Not authenticated") session = _get_session(x_session_token) if not session: raise HTTPException(status_code=401, detail="Invalid or expired session") return session def require_admin(session: dict = Depends(verify_session)) -> dict: if session.get("role") != "admin": raise HTTPException(status_code=403, detail="Admin access required") return session def verify_any_auth( x_api_key: Optional[str] = Header(None), x_session_token: Optional[str] = Header(None), ) -> dict: if x_session_token: session = _get_session(x_session_token) if session: return session if x_api_key: keys = _load_keys() if x_api_key in keys: info = keys[x_api_key] return {"user_id": None, "email": info["email"], "name": info.get("name", ""), "role": "user"} raise HTTPException(status_code=401, detail="Authentication required") # --------------------------------------------------------------------------- # Persistent job store # --------------------------------------------------------------------------- _jobs: dict = {} _jobs_lock = threading.Lock() JOBS_DIR = os.path.join(tempfile.gettempdir(), "transcriptor_jobs") PERSIST_DIR = os.path.join(os.path.dirname(__file__), "jobs") os.makedirs(JOBS_DIR, exist_ok=True) os.makedirs(PERSIST_DIR, exist_ok=True) def _persist_job(job: dict): path = os.path.join(PERSIST_DIR, f"{job['job_id']}.json") with open(path, "w", encoding="utf-8") as f: json.dump(job, f, ensure_ascii=False, indent=2) def _load_persisted_jobs(): for fname in os.listdir(PERSIST_DIR): if not fname.endswith(".json"): continue try: with open(os.path.join(PERSIST_DIR, fname), encoding="utf-8") as f: job = json.load(f) _jobs[job["job_id"]] = job except Exception: pass def _update_job(job_id: str, **kwargs): with _jobs_lock: _jobs[job_id].update(kwargs) job = dict(_jobs[job_id]) if job.get("status") in ("completed", "failed"): _persist_job(job) _load_persisted_jobs() # --------------------------------------------------------------------------- # Pipeline helpers # --------------------------------------------------------------------------- def _video_to_audio(src: str, dest: str): subprocess.run( ["ffmpeg", "-i", src, "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-y", dest], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True, ) def _run_pipeline( job_id: str, file_path: str, model: str, device: str, language: str, do_srt: bool, do_txt: bool, do_srt_nh: bool, do_txt_nh: bool, initial_prompt: str = "", ): try: audio_path = file_path if not file_path.lower().endswith(".wav"): wav_path = file_path.rsplit(".", 1)[0] + "_converted.wav" _update_job(job_id, status="converting") _video_to_audio(file_path, wav_path) audio_path = wav_path _update_job(job_id, status="transcribing") segments, elapsed = transcribe(audio_path, model, device, language, initial_prompt=initial_prompt) if not segments: _update_job(job_id, status="failed", error="Transcription returned no segments") return results: dict = {} raw_segments_json = [ {"start": s.start, "end": s.end, "text": s.text} for s in segments ] if do_txt_nh: results["txt_nh"] = to_txt(segments, with_speaker=False) if do_srt_nh: results["srt_nh"] = to_srt(segments, with_speaker=False) if do_srt or do_txt: _update_job(job_id, status="diarizing") diarization = diarize(audio_path) if diarization is None: _update_job(job_id, status="failed", error="Diarization failed") return _update_job(job_id, status="combining") final_segments = combine(segments, diarization) if do_srt: results["srt"] = to_srt(final_segments, with_speaker=True) if do_txt: results["txt"] = to_txt(final_segments, with_speaker=True) raw_segments_json = [ {"start": s["start"], "end": s["end"], "text": s["text"], "speaker": s["speaker"]} for s in final_segments ] _update_job( job_id, status="completed", transcription_time=round(elapsed, 2), segments=raw_segments_json, results=results, ) except Exception as exc: _update_job(job_id, status="failed", error=str(exc)) finally: job_dir = os.path.join(JOBS_DIR, job_id) if os.path.exists(job_dir): shutil.rmtree(job_dir, ignore_errors=True) # --------------------------------------------------------------------------- # Skill / usage guide # --------------------------------------------------------------------------- _SKILL_MD = """# Transcriptor — Usage Guide ## What is this? Transcriptor is an audio and video transcription API with speaker diarization. It converts speech to text (Whisper) and identifies who is speaking at each moment (PyAnnote). --- ## Authentication ### Web Interface Log in with your **email** and **password**. Your session is saved in the browser. On first login, you will be prompted to set a new password. ### REST API Every request requires an `X-API-Key` header. API keys are managed by an administrator via the Admin Panel. --- ## Web Interface Open the app at `http://:8010/` and follow these steps: ### 1 — Upload - Drag and drop any audio or video file onto the upload zone, or click to browse. - Supported formats: MP3, MP4, WAV, OGG, M4A, WebM, MKV, and more. - Files that are not WAV are automatically converted before processing. ### 2 — Settings | Setting | Options | Default | |---------|---------|---------| | Language | es, en, pt, fr, de, it, ja, zh, auto | es | | Model | large-v3, large-v2, medium, small, base | large-v3 | **Output formats** (select one or more): | Format | Description | |--------|-------------| | `.txt` | Plain text transcript with speaker labels | | `.srt` | Subtitle file with timestamps and speaker labels | | `.txt (no spk)` | Plain text without speaker labels | | `.srt (no spk)` | Subtitle file without speaker labels | > Formats with speaker labels trigger diarization (slower). > No-speaker formats skip diarization and finish faster. **Initial Prompt (optional)** Expand the "Initial Prompt" field to provide Whisper with context before transcription begins. Use it for proper nouns, acronyms, technical vocabulary, or speaker names that Whisper might otherwise misspell. The prompt is not included in the output — it only guides the model. Example: `"Participants: Dr. Ramírez and Lic. Ortega. Topic: quarterly budget review."` ### 3 — Processing After submitting, the job moves through these stages: ``` pending → converting → transcribing → diarizing → combining → completed ``` - **converting**: video/audio is normalised to 16 kHz WAV - **transcribing**: Whisper extracts word-level timestamps - **diarizing**: PyAnnote identifies each speaker turn - **combining**: words are aligned to speaker turns and merged ### 4 — Results Once completed, download buttons appear for each requested format. Click any button to download the file instantly. You can also preview the first segments with timestamps and speaker labels directly on the page. --- ## Jobs History The history table at the bottom of the page shows **only your own jobs**. - Click **↻ Refresh** to reload from the server. - Click any **completed** row to restore its results view. - Use the format buttons in each row to re-download files from previous jobs. Jobs are persisted on disk and survive server restarts. --- ## Admin Panel The Admin Panel is accessible to admin users via the **Admin Panel** button in the top navigation. ### Users tab - View all registered users with their roles, password status, and API key assignment. - **Create User**: set email, name, default password, and role. - **Reset Password**: assign a new default password (user is forced to change on next login). - **Generate / Revoke API Key**: manage REST API access per user. - **Delete User**: permanently remove an account. ### History tab - Global view of **all jobs across all accounts**, with the submitting user shown per row. - Columns: File, User, Status, Language, Model, Time, Created. - Click **↻ Refresh** to reload. ### Metrics tab - Overview of total jobs processed. - Breakdown by status, model, language, and user. - Average transcription time. --- ## REST API All endpoints require `X-API-Key` header. ### Verify credentials ``` GET /auth/verify ``` ### Submit a transcription job ``` POST /transcribe Content-Type: multipart/form-data file — audio or video file (required) language — language code, default: es model — whisper model, default: large-v3 device — cuda or cpu, default: cuda txt — true/false, default: false srt — true/false, default: false txt_nh — true/false, default: false srt_nh — true/false, default: false initial_prompt — text hint passed to Whisper before transcription, default: (empty) ``` Response: `{"job_id": "...", "status": "pending"}` ### Poll job status ``` GET /jobs/{job_id} ``` Returns the full job object including segments and results when completed. ### List your jobs ``` GET /jobs ``` Returns jobs submitted by the authenticated account only (no file content). ### Download a result file ``` GET /jobs/{job_id}/download/{fmt} ``` `fmt` is one of: `txt`, `srt`, `txt_nh`, `srt_nh` ### Delete a job ``` DELETE /jobs/{job_id} ``` Removes the job from memory and from disk. ### Admin — list all jobs (admin only) ``` GET /admin/jobs ``` Returns all jobs across all accounts, sorted by creation date descending. Each entry includes `submitted_by` (email of the submitting account). ### This guide ``` GET /skill ``` Returns this markdown document. --- ## Tips - Use `large-v3` for best accuracy. Use `small` or `base` for faster results on short clips. - Set `language` explicitly — auto-detection adds latency. - If you only need a transcript without identifying speakers, use `txt_nh` or `srt_nh`; it skips the diarization step and finishes much faster. - Use `initial_prompt` when you know the topic, speaker names, or domain vocabulary upfront — it measurably reduces hallucinations and misspellings on proper nouns. - The segments preview on the results page shows the first 5 segments. The full content is in the downloaded file. """ @app.get("/skill", response_class=PlainTextResponse) def skill_guide(): return PlainTextResponse(content=_SKILL_MD, media_type="text/markdown") # --------------------------------------------------------------------------- # Pydantic request models # --------------------------------------------------------------------------- class LoginRequest(BaseModel): email: str password: str class ChangePasswordRequest(BaseModel): new_password: str current_password: Optional[str] = None class CreateUserRequest(BaseModel): email: str name: str password: str role: str = "user" class ResetPasswordRequest(BaseModel): new_password: str # --------------------------------------------------------------------------- # Auth endpoints # --------------------------------------------------------------------------- @app.get("/auth/verify") def auth_verify( x_session_token: Optional[str] = Header(None), x_api_key: Optional[str] = Header(None), ): if x_session_token: session = _get_session(x_session_token) if session: users = _load_users() user = users.get(session["user_id"]) is_default = user.get("is_default_password", False) if user else False return { "email": session["email"], "name": session["name"], "role": session["role"], "is_default_password": is_default, } if x_api_key: keys = _load_keys() if x_api_key in keys: info = keys[x_api_key] return {"email": info["email"], "name": info.get("name", ""), "role": "user", "is_default_password": False} raise HTTPException(status_code=401, detail="Invalid credentials") @app.post("/auth/login") def auth_login(req: LoginRequest): users = _load_users() user = next((u for u in users.values() if u["email"] == req.email), None) if not user or not _verify_password(req.password, user["password_hash"], user["password_salt"]): raise HTTPException(status_code=401, detail="Invalid email or password") token = _create_session(user) return { "session_token": token, "user": {"email": user["email"], "name": user["name"], "role": user["role"]}, "is_default_password": user.get("is_default_password", False), } @app.post("/auth/logout") def auth_logout(x_session_token: Optional[str] = Header(None)): if x_session_token: with _sessions_lock: _sessions.pop(x_session_token, None) return {"message": "Logged out"} @app.post("/auth/change-password") def auth_change_password(req: ChangePasswordRequest, session: dict = Depends(verify_session)): users = _load_users() user = users.get(session["user_id"]) if not user: raise HTTPException(status_code=404, detail="User not found") if not user.get("is_default_password"): if not req.current_password: raise HTTPException(status_code=400, detail="current_password required") if not _verify_password(req.current_password, user["password_hash"], user["password_salt"]): raise HTTPException(status_code=401, detail="Wrong current password") if len(req.new_password) < 8: raise HTTPException(status_code=400, detail="Password must be at least 8 characters") pw_hash, pw_salt = _hash_password(req.new_password) user["password_hash"] = pw_hash user["password_salt"] = pw_salt user["is_default_password"] = False _save_users(users) return {"message": "Password changed"} # --------------------------------------------------------------------------- # Admin endpoints # --------------------------------------------------------------------------- @app.get("/admin/users") def admin_list_users(_admin: dict = Depends(require_admin)): users = _load_users() keys = _load_keys() email_to_key = {v["email"]: k for k, v in keys.items()} return [ { "id": u["id"], "email": u["email"], "name": u["name"], "role": u["role"], "is_default_password": u.get("is_default_password", False), "created_at": u.get("created_at", ""), "has_api_key": u["email"] in email_to_key, } for u in users.values() ] @app.post("/admin/users", status_code=201) def admin_create_user(req: CreateUserRequest, _admin: dict = Depends(require_admin)): if req.role not in ("user", "admin"): raise HTTPException(status_code=400, detail="role must be 'user' or 'admin'") users = _load_users() if any(u["email"] == req.email for u in users.values()): raise HTTPException(status_code=409, detail="Email already exists") user_id = str(uuid.uuid4()) pw_hash, pw_salt = _hash_password(req.password) users[user_id] = { "id": user_id, "email": req.email, "name": req.name, "role": req.role, "password_hash": pw_hash, "password_salt": pw_salt, "is_default_password": True, "created_at": datetime.now().isoformat(), } _save_users(users) return {"id": user_id, "email": req.email, "name": req.name, "role": req.role} @app.delete("/admin/users/{user_id}", status_code=204) def admin_delete_user(user_id: str, session: dict = Depends(require_admin)): users = _load_users() if user_id not in users: raise HTTPException(status_code=404, detail="User not found") if users[user_id]["email"] == session["email"]: raise HTTPException(status_code=400, detail="Cannot delete your own account") del users[user_id] _save_users(users) return Response(status_code=204) @app.patch("/admin/users/{user_id}/reset-password") def admin_reset_password(user_id: str, req: ResetPasswordRequest, _admin: dict = Depends(require_admin)): users = _load_users() if user_id not in users: raise HTTPException(status_code=404, detail="User not found") pw_hash, pw_salt = _hash_password(req.new_password) users[user_id]["password_hash"] = pw_hash users[user_id]["password_salt"] = pw_salt users[user_id]["is_default_password"] = True _save_users(users) return {"message": "Password reset"} @app.post("/admin/users/{user_id}/api-key") def admin_generate_api_key(user_id: str, _admin: dict = Depends(require_admin)): users = _load_users() if user_id not in users: raise HTTPException(status_code=404, detail="User not found") user = users[user_id] keys = _load_keys() for k in [k for k, v in keys.items() if v["email"] == user["email"]]: del keys[k] new_key = "tk_" + secrets.token_hex(24) keys[new_key] = {"email": user["email"], "name": user["name"], "created_at": datetime.now().isoformat()} _save_keys(keys) return {"api_key": new_key} @app.delete("/admin/users/{user_id}/api-key", status_code=204) def admin_revoke_api_key(user_id: str, _admin: dict = Depends(require_admin)): users = _load_users() if user_id not in users: raise HTTPException(status_code=404, detail="User not found") user = users[user_id] keys = _load_keys() for k in [k for k, v in keys.items() if v["email"] == user["email"]]: del keys[k] _save_keys(keys) return Response(status_code=204) @app.get("/admin/metrics") def admin_metrics(_admin: dict = Depends(require_admin)): with _jobs_lock: jobs = list(_jobs.values()) by_status: dict = {} by_model: dict = {} by_language: dict = {} by_user: dict = {} times = [] for j in jobs: s = j.get("status", "unknown") by_status[s] = by_status.get(s, 0) + 1 m = j.get("model", "unknown") by_model[m] = by_model.get(m, 0) + 1 lang = j.get("language", "unknown") by_language[lang] = by_language.get(lang, 0) + 1 u = j.get("submitted_by", "api") by_user[u] = by_user.get(u, 0) + 1 if j.get("transcription_time"): times.append(j["transcription_time"]) return { "total_jobs": len(jobs), "by_status": by_status, "by_model": by_model, "by_language": by_language, "by_user": by_user, "avg_transcription_time": round(sum(times) / len(times), 1) if times else None, } # --------------------------------------------------------------------------- # Transcription endpoints (API key OR session token) # --------------------------------------------------------------------------- @app.post("/transcribe", status_code=202) async def start_transcription( background_tasks: BackgroundTasks, file: UploadFile = File(...), model: str = Form("large-v3"), device: str = Form("cuda"), language: str = Form("es"), srt: bool = Form(False), txt: bool = Form(False), srt_nh: bool = Form(False), txt_nh: bool = Form(False), initial_prompt: str = Form(""), user: dict = Depends(verify_any_auth), ): job_id = str(uuid.uuid4()) job_dir = os.path.join(JOBS_DIR, job_id) os.makedirs(job_dir) file_path = os.path.join(job_dir, file.filename or "upload") content = await file.read() with open(file_path, "wb") as f: f.write(content) with _jobs_lock: _jobs[job_id] = { "job_id": job_id, "status": "pending", "filename": file.filename, "model": model, "language": language, "submitted_by": user.get("email", "unknown"), "created_at": datetime.now().isoformat(), "error": None, "segments": None, "results": {}, } background_tasks.add_task( _run_pipeline, job_id=job_id, file_path=file_path, model=model, device=device, language=language, do_srt=srt, do_txt=txt, do_srt_nh=srt_nh, do_txt_nh=txt_nh, initial_prompt=initial_prompt, ) return {"job_id": job_id, "status": "pending"} @app.get("/admin/jobs") def admin_list_jobs(_admin: dict = Depends(require_admin)): with _jobs_lock: out = [] for job in _jobs.values(): row = {k: v for k, v in job.items() if k not in ("segments", "results")} row["formats"] = list(job.get("results", {}).keys()) out.append(row) return sorted(out, key=lambda j: j.get("created_at", ""), reverse=True) @app.get("/jobs") def list_jobs(user: dict = Depends(verify_any_auth)): user_email = user.get("email", "") with _jobs_lock: out = [] for job in _jobs.values(): if job.get("submitted_by") != user_email: continue row = {k: v for k, v in job.items() if k not in ("segments", "results")} row["formats"] = list(job.get("results", {}).keys()) out.append(row) return out @app.get("/jobs/{job_id}") def get_job(job_id: str, user: dict = Depends(verify_any_auth)): with _jobs_lock: job = _jobs.get(job_id) if job is None: raise HTTPException(status_code=404, detail="Job not found") return job @app.get("/jobs/{job_id}/download/{fmt}") def download_result( job_id: str, fmt: Literal["srt", "txt", "srt_nh", "txt_nh"], user: dict = Depends(verify_any_auth), ): with _jobs_lock: job = _jobs.get(job_id) if job is None: raise HTTPException(status_code=404, detail="Job not found") if job["status"] != "completed": raise HTTPException(status_code=400, detail=f"Job is '{job['status']}', not completed") if fmt not in job["results"]: raise HTTPException(status_code=404, detail=f"Format '{fmt}' was not requested for this job") ext = fmt.split("_")[0] filename = f"{os.path.splitext(job['filename'])[0]}_{fmt}.{ext}" return PlainTextResponse( content=job["results"][fmt], headers={"Content-Disposition": f'attachment; filename="{filename}"'}, ) @app.delete("/jobs/{job_id}", status_code=204) def delete_job(job_id: str, user: dict = Depends(verify_any_auth)): with _jobs_lock: if job_id not in _jobs: raise HTTPException(status_code=404, detail="Job not found") del _jobs[job_id] path = os.path.join(PERSIST_DIR, f"{job_id}.json") if os.path.exists(path): os.remove(path) return Response(status_code=204) # --------------------------------------------------------------------------- # Static frontend — mounted last so API routes take precedence # --------------------------------------------------------------------------- STATIC_DIR = os.path.join(os.path.dirname(__file__), "static") @app.get("/") def index(): return FileResponse(os.path.join(STATIC_DIR, "index.html")) app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")