Skip to content

Webhook

OneLiveRec provides webhooks that allow external services to react to livestream monitoring and recording events.

Webhooks enable automation such as:

  • uploading recordings automatically
  • sending notifications when streams start
  • triggering post-processing pipelines
  • monitoring system errors

Each event is delivered as a HTTP POST request with JSON payload.


All webhook events share a common structure.

Example Event Payload
{
"id": "398ea849-7c4b-45c4-bd2f-093df217266c",
"ts": "2026-03-11T05:22:38.393477700Z",
"type": "event_type",
"data": {}
}
FieldTypeDescription
idstringUnique event identifier
tsstringEvent timestamp (RFC3339)
typestringEvent type
dataobjectEvent-specific payload

Represents metadata describing a livestream.

Example
{
"uid": "<USER ID>",
"uname": "<USER NAME>",
"avatar": "https://live.example.com/avatar.jpg",
"title": "<TITLE>",
"cover": "https://live.example.com/cover.jpg",
"categories": [
"<PRIMARY CATEGORY>",
"<SECONDARY CATEGORY>"
],
"status": "live",
"live_id": "<LIVE ID>",
"start_time": "2026-03-11T05:22:38Z"
}
FieldTypeDescription
uidstringPlatform user ID
unamestringStreamer username
avatarstringAvatar URL
titlestringStream title
coverstringStream cover image
categoriesstring[]Stream categories
statusstringlive or offline
live_idstringnull
start_timestringnull

Triggered when OneLiveRec starts.

Example
{
"type": "app_launch"
}

Typical uses:

  • monitoring startup
  • health checks
  • logging

Triggered when OneLiveRec exits.

Example
{
"type": "app_exit"
}

Typical uses:

  • alerting
  • automatic restart workflows

Triggered when a monitored channel goes live.

Example
{
"type": "live_start",
"data": {
"platform": "<PLATFORM>",
"channel": "<CHANNEL>",
"url": "https://live.example.com",
"live_info": {}
}
}
FieldDescription
platformPlatform name
channelChannel identifier
urlLivestream URL
live_infoCurrent livestream metadata

Triggered when a livestream ends.

Example
{
"type": "live_end",
"data": {
"platform": "<PLATFORM>",
"channel": "<CHANNEL>",
"url": "https://live.example.com",
"live_info": {}
}
}

Typical uses:

  • archive recordings
  • trigger post-processing

Triggered when a recording file starts being written.

Example
{
"type": "video_file_create",
"data": {
"platform": "<PLATFORM>",
"channel": "<CHANNEL>",
"path": "path/to/file.ts"
}
}

Triggered when a recording file finishes writing.

Example
{
"type": "video_file_finish",
"data": {
"platform": "<PLATFORM>",
"channel": "<CHANNEL>",
"path": "path/to/file.mp4",
"filesize": 1073741824,
"duration": 3600
}
}
FieldTypeDescription
pathstringOutput file path
filesizenumberFile size in bytes
durationnumberDuration in seconds

Triggered when video transmuxing finishes.

Examples:

  • .ts → .mp4
  • .flv → .mp4
  • .ts → .mkv
Example
{
"type": "video_transmux_finish",
"data": {
"platform": "<PLATFORM>",
"channel": "<CHANNEL>",
"input": "path/to/input.file",
"output": "path/to/output.file"
}
}

Triggered when a stream title changes during a live session.

Example
{
"type": "title_change",
"data": {
"platform": "<PLATFORM>",
"channel": "<CHANNEL>",
"old_live_info": {},
"new_live_info": {}
}
}
FieldDescription
old_live_infoStream metadata before the change
new_live_infoStream metadata after the change

Triggered when a stream category changes.

Example
{
"type": "category_change",
"data": {
"platform": "<PLATFORM>",
"channel": "<CHANNEL>",
"old_live_info": {},
"new_live_info": {}
}
}

Triggered when OneLiveRec encounters an error.

Example
{
"type": "error",
"data": {
"platform": "<PLATFORM>",
"channel": "<CHANNEL>",
"error": "Something went wrong"
}
}

Typical uses:

  • monitoring
  • alert systems
  • automatic retries

OneLiveRec can be integrated with automation scripts using webhooks.

The example script uploader.py automatically uploads finished recordings to Pixeldrain.

# /// script
# dependencies = [
# "fastapi",
# "uvicorn",
# "httpx",
# "pydantic",
# "loguru",
# "tenacity",
# "tqdm",
# "anyio",
# ]
# ///
import os
import httpx
import uvicorn
import secrets
import itertools
import anyio
from typing import Dict, Any
from fastapi import FastAPI, BackgroundTasks, Query, HTTPException
from pydantic import BaseModel
from loguru import logger
from tenacity import retry, stop_after_attempt, wait_fixed, before_sleep_log
from tqdm import tqdm
# --- CONFIGURATION ---
PIXELDRAIN_API_KEY = "5f45f184-64bb-4eaa-be19-4a5f0459db49"
WEBHOOK_SECRET_KEY = "my_super_secret_webhook_key_123"
PORT = 5000
# --- PATH MARKERS ---
DOCKER_REC_MARKER = "/app/rec"
DOCKER_BASE = "/app"
# --- LOGGING CONFIG ---
LOG_DIR = "./logs"
os.makedirs(LOG_DIR, exist_ok=True)
logger.add(
f"{LOG_DIR}/uploader_{{time:YYYY-MM-DD}}.log",
rotation="10 MB",
retention="30 days",
compression="zip",
level="DEBUG",
enqueue=True,
)
# --- RETRY SETTINGS ---
MAX_RETRIES = 3
RETRY_DELAY = 10
bar_position = itertools.count()
# --- UTILS ---
class UploadProgressGenerator:
"""An async iterator that yields chunks and updates tqdm for httpx 'content'."""
def __init__(self, file_path, pbar):
self.file_path = file_path
self.pbar = pbar
async def __aiter__(self):
# anyio provides asynchronous file access compatible with httpx streaming
async with await anyio.open_file(self.file_path, "rb") as f:
while True:
chunk = await f.read(1024 * 1024) # 1MB chunks
if not chunk:
break
self.pbar.update(len(chunk))
yield chunk
# --- CORE LOGIC ---
@retry(
stop=stop_after_attempt(MAX_RETRIES),
wait=wait_fixed(RETRY_DELAY),
before_sleep=before_sleep_log(logger, "DEBUG"),
reraise=True,
)
async def execute_upload(file_path: str, filename: str, position: int):
file_size = os.path.getsize(file_path)
url = f"https://pixeldrain.com/api/file/{filename}"
pbar = tqdm(
total=file_size,
unit="B",
unit_scale=True,
desc=f"🚀 {filename}",
position=position,
leave=False,
)
try:
async with httpx.AsyncClient(timeout=60) as client:
# Create the async generator for the request body
upload_gen = UploadProgressGenerator(file_path, pbar)
# PUT with 'content' as an async iterable is the most stable stream method
response = await client.put(
url, auth=("", PIXELDRAIN_API_KEY), content=upload_gen
)
if response.status_code not in [200, 201]:
logger.error(f"Server rejected PUT: {response.text}")
raise Exception(f"Pixeldrain Error {response.status_code}")
return response.json().get("id")
finally:
pbar.close()
async def process_and_upload(incoming_path: str):
if incoming_path.startswith(DOCKER_REC_MARKER):
rel_path = os.path.relpath(incoming_path, DOCKER_BASE)
real_path = os.path.abspath(os.path.join(os.getcwd(), rel_path))
else:
real_path = incoming_path
if not os.path.exists(real_path):
logger.error(f"File not found: {real_path}")
return
filename = os.path.basename(real_path)
pos = next(bar_position)
try:
logger.info(f"Stream Upload Started: {filename}")
file_id = await execute_upload(real_path, filename, pos)
logger.success(f"SUCCESS: {filename} -> https://pixeldrain.com/u/{file_id}")
except Exception as e:
logger.error(f"Failed to upload {filename} after {MAX_RETRIES} attempts.")
logger.exception(e)
# --- WEB SERVER ---
app = FastAPI()
class OneLiveRecWebhook(BaseModel):
id: str
ts: str
type: str
data: Dict[str, Any]
@app.post("/webhook")
async def handle_webhook(
payload: OneLiveRecWebhook, background_tasks: BackgroundTasks, key: str = Query(...)
):
if not secrets.compare_digest(key, WEBHOOK_SECRET_KEY):
raise HTTPException(status_code=403, detail="Forbidden")
logger.debug(
f"Received {payload.type} from {payload.data.get('platform')}-{payload.data.get('channel')}"
)
if payload.type == "video_transmux_finish":
file_path = payload.data.get("output")
if file_path:
logger.info(f"Webhook Received: {os.path.basename(file_path)}")
background_tasks.add_task(process_and_upload, file_path)
return {"status": "success"}
return {"status": "ignored"}
if __name__ == "__main__":
logger.info(f"Uploader running at {os.getcwd()}")
uvicorn.run(app, host="0.0.0.0", port=PORT)

uv is your all-in-one Python manager. It handles everything so you don’t have to worry about “Python versions” or “Pip.”

  • Windows (PowerShell):
Terminal window
powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"
  • macOS / Linux:
Terminal window
curl -LsSf https://astral.sh/uv/install.sh | sh
  • Restart your terminal after running the command.
  1. Save the code above as uploader.py in your OneLiveRec root folder (where your compose.yaml and rec/ folder are).
  2. Open the script and edit these two lines:
  • PIXELDRAIN_API_KEY: Your key from Pixeldrain.
  • WEBHOOK_SECRET_KEY: A random password you make up.

Navigate to your folder in the terminal and type:

Terminal window
uv run uploader.py
  • What happens? uv will automatically download dependences then start the server on port 5000.
  1. Open OneLiveRec
  2. Go to Settings -> Automation -> Webhooks
  3. New a webhook and set the URL to: http://YOUR_IP:5000/webhook?key=YOUR_SECRET_KEY

flowchart TD

A[OneLiveRec] -->|Webhook POST| B[Webhook Server]

B --> C{Event Type}

C -->|live_start| D[Notifications]

C -->|video_file_finish| E[Post Processing]

C -->|video_transmux_finish| F[Upload Automation]

F --> G[Pixeldrain API]

C -->|error| H[Alert System]

The example server exposes:

POST /webhook

Example URL:

http://localhost:5000/webhook?key=SECRET

The automation listens for the event:

video_transmux_finish

When triggered:

  1. extract output file path
  2. resolve Docker path
  3. upload the file
  4. retry if upload fails

Example logic:

if payload.type == "video_transmux_finish":
file_path = payload.data.get("output")
background_tasks.add_task(process_and_upload, file_path)

Uploads display progress using tqdm.

Example output:

🚀 stream_record_01.mp4 1.2GB / 1.2GB

Webhook endpoints should always be protected.

Recommended options:

  • secret query parameters
  • reverse proxy authentication
  • HMAC signatures
  • IP allowlists