| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850 |
- import hashlib
- import json
- import os
- import secrets
- import shutil
- import subprocess
- import tempfile
- import threading
- import uuid
- from datetime import datetime, timedelta
- from typing import Literal, Optional
- import dotenv
- from fastapi import BackgroundTasks, Depends, FastAPI, File, Form, Header, HTTPException, UploadFile
- from fastapi.responses import PlainTextResponse, Response, FileResponse
- from fastapi.staticfiles import StaticFiles
- from pydantic import BaseModel
- from core.combine import combine
- from core.diarize import diarize
- from core.formats import to_srt, to_txt
- from core.transcribe import transcribe
- dotenv.load_dotenv()
- app = FastAPI(title="Transcriptor API", description="Audio/Video transcription and speaker diarization")
- # ---------------------------------------------------------------------------
- # API key store (direct REST access)
- # ---------------------------------------------------------------------------
- KEYS_FILE = os.path.join(os.path.dirname(__file__), "api_keys.json")
- def _load_keys() -> dict:
- if os.path.exists(KEYS_FILE):
- with open(KEYS_FILE) as f:
- return json.load(f)
- return {}
- def _save_keys(keys: dict):
- with open(KEYS_FILE, "w") as f:
- json.dump(keys, f, indent=2)
- # ---------------------------------------------------------------------------
- # User store (GUI accounts with passwords)
- # ---------------------------------------------------------------------------
- USERS_FILE = os.path.join(os.path.dirname(__file__), "users.json")
- _users_lock = threading.Lock()
- def _load_users() -> dict:
- if os.path.exists(USERS_FILE):
- with open(USERS_FILE) as f:
- return json.load(f)
- return {}
- def _save_users(users: dict):
- with _users_lock:
- with open(USERS_FILE, "w", encoding="utf-8") as f:
- json.dump(users, f, indent=2, ensure_ascii=False)
- def _hash_password(password: str, salt: Optional[str] = None) -> tuple[str, str]:
- if salt is None:
- salt = secrets.token_hex(16)
- key = hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), 200_000)
- return key.hex(), salt
- def _verify_password(password: str, stored_hash: str, salt: str) -> bool:
- computed, _ = _hash_password(password, salt)
- return secrets.compare_digest(computed, stored_hash)
- # ---------------------------------------------------------------------------
- # Session store (in-memory, 8-hour TTL)
- # ---------------------------------------------------------------------------
- _sessions: dict = {}
- _sessions_lock = threading.Lock()
- SESSION_TTL_HOURS = 8
- def _create_session(user: dict) -> str:
- token = secrets.token_hex(32)
- expires_at = (datetime.now() + timedelta(hours=SESSION_TTL_HOURS)).isoformat()
- with _sessions_lock:
- _sessions[token] = {
- "user_id": user["id"],
- "email": user["email"],
- "name": user["name"],
- "role": user["role"],
- "expires_at": expires_at,
- }
- return token
- def _get_session(token: str) -> Optional[dict]:
- with _sessions_lock:
- session = _sessions.get(token)
- if not session:
- return None
- if datetime.now() > datetime.fromisoformat(session["expires_at"]):
- with _sessions_lock:
- _sessions.pop(token, None)
- return None
- return session
- # ---------------------------------------------------------------------------
- # Auth dependencies
- # ---------------------------------------------------------------------------
- def verify_api_key(x_api_key: str = Header(..., description="Your API key")) -> dict:
- keys = _load_keys()
- if x_api_key not in keys:
- raise HTTPException(status_code=401, detail="Invalid or missing API key")
- return keys[x_api_key]
- def verify_session(x_session_token: Optional[str] = Header(None)) -> dict:
- if not x_session_token:
- raise HTTPException(status_code=401, detail="Not authenticated")
- session = _get_session(x_session_token)
- if not session:
- raise HTTPException(status_code=401, detail="Invalid or expired session")
- return session
- def require_admin(session: dict = Depends(verify_session)) -> dict:
- if session.get("role") != "admin":
- raise HTTPException(status_code=403, detail="Admin access required")
- return session
- def verify_any_auth(
- x_api_key: Optional[str] = Header(None),
- x_session_token: Optional[str] = Header(None),
- ) -> dict:
- if x_session_token:
- session = _get_session(x_session_token)
- if session:
- return session
- if x_api_key:
- keys = _load_keys()
- if x_api_key in keys:
- info = keys[x_api_key]
- return {"user_id": None, "email": info["email"], "name": info.get("name", ""), "role": "user"}
- raise HTTPException(status_code=401, detail="Authentication required")
- # ---------------------------------------------------------------------------
- # Persistent job store
- # ---------------------------------------------------------------------------
- _jobs: dict = {}
- _jobs_lock = threading.Lock()
- JOBS_DIR = os.path.join(tempfile.gettempdir(), "transcriptor_jobs")
- PERSIST_DIR = os.path.join(os.path.dirname(__file__), "jobs")
- os.makedirs(JOBS_DIR, exist_ok=True)
- os.makedirs(PERSIST_DIR, exist_ok=True)
- def _persist_job(job: dict):
- path = os.path.join(PERSIST_DIR, f"{job['job_id']}.json")
- with open(path, "w", encoding="utf-8") as f:
- json.dump(job, f, ensure_ascii=False, indent=2)
- def _load_persisted_jobs():
- for fname in os.listdir(PERSIST_DIR):
- if not fname.endswith(".json"):
- continue
- try:
- with open(os.path.join(PERSIST_DIR, fname), encoding="utf-8") as f:
- job = json.load(f)
- _jobs[job["job_id"]] = job
- except Exception:
- pass
- def _update_job(job_id: str, **kwargs):
- with _jobs_lock:
- _jobs[job_id].update(kwargs)
- job = dict(_jobs[job_id])
- if job.get("status") in ("completed", "failed"):
- _persist_job(job)
- _load_persisted_jobs()
- # ---------------------------------------------------------------------------
- # Pipeline helpers
- # ---------------------------------------------------------------------------
- def _video_to_audio(src: str, dest: str):
- subprocess.run(
- ["ffmpeg", "-i", src, "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-y", dest],
- stdout=subprocess.DEVNULL,
- stderr=subprocess.DEVNULL,
- check=True,
- )
- def _run_pipeline(
- job_id: str,
- file_path: str,
- model: str,
- device: str,
- language: str,
- do_srt: bool,
- do_txt: bool,
- do_srt_nh: bool,
- do_txt_nh: bool,
- initial_prompt: str = "",
- ):
- try:
- audio_path = file_path
- if not file_path.lower().endswith(".wav"):
- wav_path = file_path.rsplit(".", 1)[0] + "_converted.wav"
- _update_job(job_id, status="converting")
- _video_to_audio(file_path, wav_path)
- audio_path = wav_path
- _update_job(job_id, status="transcribing")
- segments, elapsed = transcribe(audio_path, model, device, language, initial_prompt=initial_prompt)
- if not segments:
- _update_job(job_id, status="failed", error="Transcription returned no segments")
- return
- results: dict = {}
- raw_segments_json = [
- {"start": s.start, "end": s.end, "text": s.text}
- for s in segments
- ]
- if do_txt_nh:
- results["txt_nh"] = to_txt(segments, with_speaker=False)
- if do_srt_nh:
- results["srt_nh"] = to_srt(segments, with_speaker=False)
- if do_srt or do_txt:
- _update_job(job_id, status="diarizing")
- diarization = diarize(audio_path)
- if diarization is None:
- _update_job(job_id, status="failed", error="Diarization failed")
- return
- _update_job(job_id, status="combining")
- final_segments = combine(segments, diarization)
- if do_srt:
- results["srt"] = to_srt(final_segments, with_speaker=True)
- if do_txt:
- results["txt"] = to_txt(final_segments, with_speaker=True)
- raw_segments_json = [
- {"start": s["start"], "end": s["end"], "text": s["text"], "speaker": s["speaker"]}
- for s in final_segments
- ]
- _update_job(
- job_id,
- status="completed",
- transcription_time=round(elapsed, 2),
- segments=raw_segments_json,
- results=results,
- )
- except Exception as exc:
- _update_job(job_id, status="failed", error=str(exc))
- finally:
- job_dir = os.path.join(JOBS_DIR, job_id)
- if os.path.exists(job_dir):
- shutil.rmtree(job_dir, ignore_errors=True)
- # ---------------------------------------------------------------------------
- # Skill / usage guide
- # ---------------------------------------------------------------------------
- _SKILL_MD = """# Transcriptor — Usage Guide
- ## What is this?
- Transcriptor is an audio and video transcription API with speaker diarization.
- It converts speech to text (Whisper) and identifies who is speaking at each moment (PyAnnote).
- ---
- ## Authentication
- ### Web Interface
- Log in with your **email** and **password**. Your session is saved in the browser.
- On first login, you will be prompted to set a new password.
- ### REST API
- Every request requires an `X-API-Key` header.
- API keys are managed by an administrator via the Admin Panel.
- ---
- ## Web Interface
- Open the app at `http://<host>:8010/` and follow these steps:
- ### 1 — Upload
- - Drag and drop any audio or video file onto the upload zone, or click to browse.
- - Supported formats: MP3, MP4, WAV, OGG, M4A, WebM, MKV, and more.
- - Files that are not WAV are automatically converted before processing.
- ### 2 — Settings
- | Setting | Options | Default |
- |---------|---------|---------|
- | Language | es, en, pt, fr, de, it, ja, zh, auto | es |
- | Model | large-v3, large-v2, medium, small, base | large-v3 |
- **Output formats** (select one or more):
- | Format | Description |
- |--------|-------------|
- | `.txt` | Plain text transcript with speaker labels |
- | `.srt` | Subtitle file with timestamps and speaker labels |
- | `.txt (no spk)` | Plain text without speaker labels |
- | `.srt (no spk)` | Subtitle file without speaker labels |
- > Formats with speaker labels trigger diarization (slower).
- > No-speaker formats skip diarization and finish faster.
- **Initial Prompt (optional)**
- Expand the "Initial Prompt" field to provide Whisper with context before transcription begins.
- Use it for proper nouns, acronyms, technical vocabulary, or speaker names that Whisper might
- otherwise misspell. The prompt is not included in the output — it only guides the model.
- Example: `"Participants: Dr. Ramírez and Lic. Ortega. Topic: quarterly budget review."`
- ### 3 — Processing
- After submitting, the job moves through these stages:
- ```
- pending → converting → transcribing → diarizing → combining → completed
- ```
- - **converting**: video/audio is normalised to 16 kHz WAV
- - **transcribing**: Whisper extracts word-level timestamps
- - **diarizing**: PyAnnote identifies each speaker turn
- - **combining**: words are aligned to speaker turns and merged
- ### 4 — Results
- Once completed, download buttons appear for each requested format.
- Click any button to download the file instantly.
- You can also preview the first segments with timestamps and speaker labels directly on the page.
- ---
- ## Jobs History
- The history table at the bottom of the page shows **only your own jobs**.
- - Click **↻ Refresh** to reload from the server.
- - Click any **completed** row to restore its results view.
- - Use the format buttons in each row to re-download files from previous jobs.
- Jobs are persisted on disk and survive server restarts.
- ---
- ## Admin Panel
- The Admin Panel is accessible to admin users via the **Admin Panel** button in the top navigation.
- ### Users tab
- - View all registered users with their roles, password status, and API key assignment.
- - **Create User**: set email, name, default password, and role.
- - **Reset Password**: assign a new default password (user is forced to change on next login).
- - **Generate / Revoke API Key**: manage REST API access per user.
- - **Delete User**: permanently remove an account.
- ### History tab
- - Global view of **all jobs across all accounts**, with the submitting user shown per row.
- - Columns: File, User, Status, Language, Model, Time, Created.
- - Click **↻ Refresh** to reload.
- ### Metrics tab
- - Overview of total jobs processed.
- - Breakdown by status, model, language, and user.
- - Average transcription time.
- ---
- ## REST API
- All endpoints require `X-API-Key` header.
- ### Verify credentials
- ```
- GET /auth/verify
- ```
- ### Submit a transcription job
- ```
- POST /transcribe
- Content-Type: multipart/form-data
- file — audio or video file (required)
- language — language code, default: es
- model — whisper model, default: large-v3
- device — cuda or cpu, default: cuda
- txt — true/false, default: false
- srt — true/false, default: false
- txt_nh — true/false, default: false
- srt_nh — true/false, default: false
- initial_prompt — text hint passed to Whisper before transcription, default: (empty)
- ```
- Response: `{"job_id": "...", "status": "pending"}`
- ### Poll job status
- ```
- GET /jobs/{job_id}
- ```
- Returns the full job object including segments and results when completed.
- ### List your jobs
- ```
- GET /jobs
- ```
- Returns jobs submitted by the authenticated account only (no file content).
- ### Download a result file
- ```
- GET /jobs/{job_id}/download/{fmt}
- ```
- `fmt` is one of: `txt`, `srt`, `txt_nh`, `srt_nh`
- ### Delete a job
- ```
- DELETE /jobs/{job_id}
- ```
- Removes the job from memory and from disk.
- ### Admin — list all jobs (admin only)
- ```
- GET /admin/jobs
- ```
- Returns all jobs across all accounts, sorted by creation date descending.
- Each entry includes `submitted_by` (email of the submitting account).
- ### This guide
- ```
- GET /skill
- ```
- Returns this markdown document.
- ---
- ## Tips
- - Use `large-v3` for best accuracy. Use `small` or `base` for faster results on short clips.
- - Set `language` explicitly — auto-detection adds latency.
- - If you only need a transcript without identifying speakers, use `txt_nh` or `srt_nh`; it skips the diarization step and finishes much faster.
- - Use `initial_prompt` when you know the topic, speaker names, or domain vocabulary upfront — it measurably reduces hallucinations and misspellings on proper nouns.
- - The segments preview on the results page shows the first 5 segments. The full content is in the downloaded file.
- """
- @app.get("/skill", response_class=PlainTextResponse)
- def skill_guide():
- return PlainTextResponse(content=_SKILL_MD, media_type="text/markdown")
- # ---------------------------------------------------------------------------
- # Pydantic request models
- # ---------------------------------------------------------------------------
- class LoginRequest(BaseModel):
- email: str
- password: str
- class ChangePasswordRequest(BaseModel):
- new_password: str
- current_password: Optional[str] = None
- class CreateUserRequest(BaseModel):
- email: str
- name: str
- password: str
- role: str = "user"
- class ResetPasswordRequest(BaseModel):
- new_password: str
- # ---------------------------------------------------------------------------
- # Auth endpoints
- # ---------------------------------------------------------------------------
- @app.get("/auth/verify")
- def auth_verify(
- x_session_token: Optional[str] = Header(None),
- x_api_key: Optional[str] = Header(None),
- ):
- if x_session_token:
- session = _get_session(x_session_token)
- if session:
- users = _load_users()
- user = users.get(session["user_id"])
- is_default = user.get("is_default_password", False) if user else False
- return {
- "email": session["email"],
- "name": session["name"],
- "role": session["role"],
- "is_default_password": is_default,
- }
- if x_api_key:
- keys = _load_keys()
- if x_api_key in keys:
- info = keys[x_api_key]
- return {"email": info["email"], "name": info.get("name", ""), "role": "user", "is_default_password": False}
- raise HTTPException(status_code=401, detail="Invalid credentials")
- @app.post("/auth/login")
- def auth_login(req: LoginRequest):
- users = _load_users()
- user = next((u for u in users.values() if u["email"] == req.email), None)
- if not user or not _verify_password(req.password, user["password_hash"], user["password_salt"]):
- raise HTTPException(status_code=401, detail="Invalid email or password")
- token = _create_session(user)
- return {
- "session_token": token,
- "user": {"email": user["email"], "name": user["name"], "role": user["role"]},
- "is_default_password": user.get("is_default_password", False),
- }
- @app.post("/auth/logout")
- def auth_logout(x_session_token: Optional[str] = Header(None)):
- if x_session_token:
- with _sessions_lock:
- _sessions.pop(x_session_token, None)
- return {"message": "Logged out"}
- @app.post("/auth/change-password")
- def auth_change_password(req: ChangePasswordRequest, session: dict = Depends(verify_session)):
- users = _load_users()
- user = users.get(session["user_id"])
- if not user:
- raise HTTPException(status_code=404, detail="User not found")
- if not user.get("is_default_password"):
- if not req.current_password:
- raise HTTPException(status_code=400, detail="current_password required")
- if not _verify_password(req.current_password, user["password_hash"], user["password_salt"]):
- raise HTTPException(status_code=401, detail="Wrong current password")
- if len(req.new_password) < 8:
- raise HTTPException(status_code=400, detail="Password must be at least 8 characters")
- pw_hash, pw_salt = _hash_password(req.new_password)
- user["password_hash"] = pw_hash
- user["password_salt"] = pw_salt
- user["is_default_password"] = False
- _save_users(users)
- return {"message": "Password changed"}
- # ---------------------------------------------------------------------------
- # Admin endpoints
- # ---------------------------------------------------------------------------
- @app.get("/admin/users")
- def admin_list_users(_admin: dict = Depends(require_admin)):
- users = _load_users()
- keys = _load_keys()
- email_to_key = {v["email"]: k for k, v in keys.items()}
- return [
- {
- "id": u["id"],
- "email": u["email"],
- "name": u["name"],
- "role": u["role"],
- "is_default_password": u.get("is_default_password", False),
- "created_at": u.get("created_at", ""),
- "has_api_key": u["email"] in email_to_key,
- }
- for u in users.values()
- ]
- @app.post("/admin/users", status_code=201)
- def admin_create_user(req: CreateUserRequest, _admin: dict = Depends(require_admin)):
- if req.role not in ("user", "admin"):
- raise HTTPException(status_code=400, detail="role must be 'user' or 'admin'")
- users = _load_users()
- if any(u["email"] == req.email for u in users.values()):
- raise HTTPException(status_code=409, detail="Email already exists")
- user_id = str(uuid.uuid4())
- pw_hash, pw_salt = _hash_password(req.password)
- users[user_id] = {
- "id": user_id,
- "email": req.email,
- "name": req.name,
- "role": req.role,
- "password_hash": pw_hash,
- "password_salt": pw_salt,
- "is_default_password": True,
- "created_at": datetime.now().isoformat(),
- }
- _save_users(users)
- return {"id": user_id, "email": req.email, "name": req.name, "role": req.role}
- @app.delete("/admin/users/{user_id}", status_code=204)
- def admin_delete_user(user_id: str, session: dict = Depends(require_admin)):
- users = _load_users()
- if user_id not in users:
- raise HTTPException(status_code=404, detail="User not found")
- if users[user_id]["email"] == session["email"]:
- raise HTTPException(status_code=400, detail="Cannot delete your own account")
- del users[user_id]
- _save_users(users)
- return Response(status_code=204)
- @app.patch("/admin/users/{user_id}/reset-password")
- def admin_reset_password(user_id: str, req: ResetPasswordRequest, _admin: dict = Depends(require_admin)):
- users = _load_users()
- if user_id not in users:
- raise HTTPException(status_code=404, detail="User not found")
- pw_hash, pw_salt = _hash_password(req.new_password)
- users[user_id]["password_hash"] = pw_hash
- users[user_id]["password_salt"] = pw_salt
- users[user_id]["is_default_password"] = True
- _save_users(users)
- return {"message": "Password reset"}
- @app.post("/admin/users/{user_id}/api-key")
- def admin_generate_api_key(user_id: str, _admin: dict = Depends(require_admin)):
- users = _load_users()
- if user_id not in users:
- raise HTTPException(status_code=404, detail="User not found")
- user = users[user_id]
- keys = _load_keys()
- for k in [k for k, v in keys.items() if v["email"] == user["email"]]:
- del keys[k]
- new_key = "tk_" + secrets.token_hex(24)
- keys[new_key] = {"email": user["email"], "name": user["name"], "created_at": datetime.now().isoformat()}
- _save_keys(keys)
- return {"api_key": new_key}
- @app.delete("/admin/users/{user_id}/api-key", status_code=204)
- def admin_revoke_api_key(user_id: str, _admin: dict = Depends(require_admin)):
- users = _load_users()
- if user_id not in users:
- raise HTTPException(status_code=404, detail="User not found")
- user = users[user_id]
- keys = _load_keys()
- for k in [k for k, v in keys.items() if v["email"] == user["email"]]:
- del keys[k]
- _save_keys(keys)
- return Response(status_code=204)
- @app.get("/admin/metrics")
- def admin_metrics(_admin: dict = Depends(require_admin)):
- with _jobs_lock:
- jobs = list(_jobs.values())
- by_status: dict = {}
- by_model: dict = {}
- by_language: dict = {}
- by_user: dict = {}
- times = []
- for j in jobs:
- s = j.get("status", "unknown")
- by_status[s] = by_status.get(s, 0) + 1
- m = j.get("model", "unknown")
- by_model[m] = by_model.get(m, 0) + 1
- lang = j.get("language", "unknown")
- by_language[lang] = by_language.get(lang, 0) + 1
- u = j.get("submitted_by", "api")
- by_user[u] = by_user.get(u, 0) + 1
- if j.get("transcription_time"):
- times.append(j["transcription_time"])
- return {
- "total_jobs": len(jobs),
- "by_status": by_status,
- "by_model": by_model,
- "by_language": by_language,
- "by_user": by_user,
- "avg_transcription_time": round(sum(times) / len(times), 1) if times else None,
- }
- # ---------------------------------------------------------------------------
- # Transcription endpoints (API key OR session token)
- # ---------------------------------------------------------------------------
- @app.post("/transcribe", status_code=202)
- async def start_transcription(
- background_tasks: BackgroundTasks,
- file: UploadFile = File(...),
- model: str = Form("large-v3"),
- device: str = Form("cuda"),
- language: str = Form("es"),
- srt: bool = Form(False),
- txt: bool = Form(False),
- srt_nh: bool = Form(False),
- txt_nh: bool = Form(False),
- initial_prompt: str = Form(""),
- user: dict = Depends(verify_any_auth),
- ):
- job_id = str(uuid.uuid4())
- job_dir = os.path.join(JOBS_DIR, job_id)
- os.makedirs(job_dir)
- file_path = os.path.join(job_dir, file.filename or "upload")
- content = await file.read()
- with open(file_path, "wb") as f:
- f.write(content)
- with _jobs_lock:
- _jobs[job_id] = {
- "job_id": job_id,
- "status": "pending",
- "filename": file.filename,
- "model": model,
- "language": language,
- "submitted_by": user.get("email", "unknown"),
- "created_at": datetime.now().isoformat(),
- "error": None,
- "segments": None,
- "results": {},
- }
- background_tasks.add_task(
- _run_pipeline,
- job_id=job_id,
- file_path=file_path,
- model=model,
- device=device,
- language=language,
- do_srt=srt,
- do_txt=txt,
- do_srt_nh=srt_nh,
- do_txt_nh=txt_nh,
- initial_prompt=initial_prompt,
- )
- return {"job_id": job_id, "status": "pending"}
- @app.get("/admin/jobs")
- def admin_list_jobs(_admin: dict = Depends(require_admin)):
- with _jobs_lock:
- out = []
- for job in _jobs.values():
- row = {k: v for k, v in job.items() if k not in ("segments", "results")}
- row["formats"] = list(job.get("results", {}).keys())
- out.append(row)
- return sorted(out, key=lambda j: j.get("created_at", ""), reverse=True)
- @app.get("/jobs")
- def list_jobs(user: dict = Depends(verify_any_auth)):
- user_email = user.get("email", "")
- with _jobs_lock:
- out = []
- for job in _jobs.values():
- if job.get("submitted_by") != user_email:
- continue
- row = {k: v for k, v in job.items() if k not in ("segments", "results")}
- row["formats"] = list(job.get("results", {}).keys())
- out.append(row)
- return out
- @app.get("/jobs/{job_id}")
- def get_job(job_id: str, user: dict = Depends(verify_any_auth)):
- with _jobs_lock:
- job = _jobs.get(job_id)
- if job is None:
- raise HTTPException(status_code=404, detail="Job not found")
- return job
- @app.get("/jobs/{job_id}/download/{fmt}")
- def download_result(
- job_id: str,
- fmt: Literal["srt", "txt", "srt_nh", "txt_nh"],
- user: dict = Depends(verify_any_auth),
- ):
- with _jobs_lock:
- job = _jobs.get(job_id)
- if job is None:
- raise HTTPException(status_code=404, detail="Job not found")
- if job["status"] != "completed":
- raise HTTPException(status_code=400, detail=f"Job is '{job['status']}', not completed")
- if fmt not in job["results"]:
- raise HTTPException(status_code=404, detail=f"Format '{fmt}' was not requested for this job")
- ext = fmt.split("_")[0]
- filename = f"{os.path.splitext(job['filename'])[0]}_{fmt}.{ext}"
- return PlainTextResponse(
- content=job["results"][fmt],
- headers={"Content-Disposition": f'attachment; filename="{filename}"'},
- )
- @app.delete("/jobs/{job_id}", status_code=204)
- def delete_job(job_id: str, user: dict = Depends(verify_any_auth)):
- with _jobs_lock:
- if job_id not in _jobs:
- raise HTTPException(status_code=404, detail="Job not found")
- del _jobs[job_id]
- path = os.path.join(PERSIST_DIR, f"{job_id}.json")
- if os.path.exists(path):
- os.remove(path)
- return Response(status_code=204)
- # ---------------------------------------------------------------------------
- # Static frontend — mounted last so API routes take precedence
- # ---------------------------------------------------------------------------
- STATIC_DIR = os.path.join(os.path.dirname(__file__), "static")
- @app.get("/")
- def index():
- return FileResponse(os.path.join(STATIC_DIR, "index.html"))
- app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")
|