autotako

Service to monitor moombox for completed livestream downloads to upload for distribution
git clone https://code.alwayswait.ing/autotako.git
Log | Files | Refs

commit 8ef7208d4049c6e76fe5b3c1ed292d78b3d1e02f
Author: archiveanon <>
Date:   Thu, 16 Jan 2025 01:21:37 +0000

Initial commit

Diffstat:
A.gitignore | 4++++
A.justfile | 7+++++++
Apyproject.toml | 30++++++++++++++++++++++++++++++
Asrc/autotako/__init__.py | 0
Asrc/autotako/app.py | 95+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/autotako/config.py | 40++++++++++++++++++++++++++++++++++++++++
Asrc/autotako/database.py | 24++++++++++++++++++++++++
Asrc/autotako/feed.py | 10++++++++++
Asrc/autotako/job_render.py | 216+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/autotako/templates/index.html | 12++++++++++++
Asrc/autotako/templates/job.md | 20++++++++++++++++++++
11 files changed, 458 insertions(+), 0 deletions(-)

diff --git a/.gitignore b/.gitignore @@ -0,0 +1,4 @@ +.venv +*.egg-info +__pycache__ +config.toml diff --git a/.justfile b/.justfile @@ -0,0 +1,7 @@ +test: + ruff check src + mypy -p src + +format: + ruff check src --select I001 --fix + ruff format src diff --git a/pyproject.toml b/pyproject.toml @@ -0,0 +1,30 @@ +[project] +name = "autotako" +description = "Automated archive distribution." +version = "0.0.1" + +dependencies = [ + "aiohttp ~= 3.11.10", + "microdot ~= 2.0.7", + "msgspec ~= 0.19.0", +] + +requires-python = ">= 3.11" + +[project.scripts] +autotako = "autotako.app:main" + +[project.optional-dependencies] +dev = [ + "mypy == 1.9.0", + "ruff == 0.3.7", +] + +[build-system] +build-backend = 'setuptools.build_meta' +requires = [ + 'setuptools', +] + +[tool.ruff] +line-length = 96 diff --git a/src/autotako/__init__.py b/src/autotako/__init__.py diff --git a/src/autotako/app.py b/src/autotako/app.py @@ -0,0 +1,95 @@ +#!/usr/bin/python3 + +import argparse +import datetime +import json +import pathlib + +import microdot # type: ignore +import microdot.jinja # type: ignore +import mistune +import msgspec +import rfeed # type: ignore + +from . import database, job_render +from .config import AppConfig, config_ctx + + +def _sizeof_fmt(num: int | float, suffix: str = "B") -> str: + # https://stackoverflow.com/a/1094933 + for unit in ("", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"): + if abs(num) < 1024.0: + return f"{num:3.2f}{unit}{suffix}" + num /= 1024.0 + return f"{num:.2f}Yi{suffix}" + + +def create_app(): + # TODO implement + app = microdot.Microdot() + + microdot.jinja.Template.initialize( + pathlib.Path(__file__).parent / "templates", enable_async=True + ) + + microdot.jinja._jinja_env.filters["byteformat"] = _sizeof_fmt + + config_filepath = pathlib.Path("config.toml") + config = msgspec.toml.decode(config_filepath.read_bytes(), type=AppConfig) + config_ctx.set(config) + + database.setup_database(pathlib.Path("database.db3")) + + @app.get("/") + async def index(request): + return await microdot.jinja.Template("index.html").render_async(message="hello") + + @app.post("/submit") + async def process_job(request): + return "" + + app.mount(job_render.app, url_prefix="/render") + + @app.get("/feed") + async def sample_feed(request): + job = json.loads(pathlib.Path("sample.json").read_text("utf8")) + + description = await microdot.jinja.Template("job.md").render_async( + job=job, author_override=None + ) + + item = rfeed.Item( + title=job["title"], + author=job["author"], + description=mistune.html(description), + ) + + feed = rfeed.Feed( + title="archives.alwayswait.ing", + description="= w=)b", + link="https://archives.alwayswait.ing/rss", + lastBuildDate=datetime.datetime.now(tz=datetime.UTC), + items=[item], + ) + + return feed.rss(), { + "Content-Type": "application/xml", + } + + return app + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--host", default="0.0.0.0") + parser.add_argument("--port", default=5000) + parser.add_argument("--debug", action="store_true") + + args = parser.parse_args() + + app = create_app() + app.run(args.host, args.port, args.debug) + + +if __name__ == "__main__": + main() diff --git a/src/autotako/config.py b/src/autotako/config.py @@ -0,0 +1,40 @@ +#!/usr/bin/python3 + +from contextvars import ContextVar + +import msgspec + + +class ChannelConfig(msgspec.Struct): + id: str + name: str | None = None + + +class TorrentConfig(msgspec.Struct): + trackers: list[list[str]] + + +class QBittorrentConfig(msgspec.Struct): + host: str + username: str + password: str + + # defaults to the directory containing the files + # override if acting on a remote + default_save_path: str | None = None + + +class AppConfig(msgspec.Struct): + moombox_url: str + torrent: TorrentConfig + channels: list[ChannelConfig] = msgspec.field(name="channel", default_factory=list) + qbittorrent: QBittorrentConfig | None = None + + def get_channel_config_by_id(self, channel_id: str) -> ChannelConfig | None: + for channel in self.channels: + if channel.id == channel_id: + return channel + return None + + +config_ctx: ContextVar[AppConfig] = ContextVar("config") diff --git a/src/autotako/database.py b/src/autotako/database.py @@ -0,0 +1,24 @@ +#!/usr/bin/python3 + +import pathlib +import sqlite3 +from contextvars import ContextVar + +# holds the database context so it can be used across modules without circular imports +database_ctx: ContextVar[sqlite3.Connection] = ContextVar("database") + + +def setup_database(database_path: pathlib.Path): + database = sqlite3.connect(database_path) + + database.execute( + "CREATE TABLE IF NOT EXISTS gofile_tokens " + "(job_id TEXT PRIMARY KEY, token TEXT, folder_id TEXT, url TEXT)" + ) + database.execute( + "CREATE TABLE IF NOT EXISTS gofile_files " + "(job_id TEXT NOT NULL, filepath TEXT, FOREIGN KEY(job_id) REFERENCES gofile_tokens(job_id))" + ) + database.commit() + + database_ctx.set(database) diff --git a/src/autotako/feed.py b/src/autotako/feed.py @@ -0,0 +1,10 @@ +#!/usr/bin/python3 + + +def main(): + # TODO implement + pass + + +if __name__ == "__main__": + main() diff --git a/src/autotako/job_render.py b/src/autotako/job_render.py @@ -0,0 +1,216 @@ +#!/usr/bin/python3 + +import asyncio +import datetime +import pathlib + +import gofile.api # type: ignore +import httpx +import microdot # type: ignore +import qbittorrentapi # type: ignore +import torf # type: ignore + +from .config import config_ctx +from .database import database_ctx + +app = microdot.Microdot() + +background_tasks = set() +upload_tasks: dict[pathlib.Path, asyncio.Task] = {} + + +def stream_display_date(dt: datetime.datetime) -> datetime.date: + """ + Given the stream start time, returns a display date. + + If the stream start time is within the last 10 minutes of the day, we treat the next day + as the stream date instead based on the assumption that the streamer has a waiting screen + set up. + """ + end_of_day = (23, 50) + return dt.date() + datetime.timedelta(days=1 if (dt.hour, dt.minute) >= end_of_day else 0) + + +async def get_moombox_job_by_id(jobid: str) -> dict | None: + config = config_ctx.get() + async with httpx.AsyncClient() as client: + result = await client.get(f"{config.moombox_url}/status") + for job in result.json(): + if job["id"] == jobid: + return job + return None + + +async def gofile_upload_and_persist( + cursor, upload_file: pathlib.Path, job_id: str, token: str, folder_id: str +): + with upload_file.open("rb") as upload_filereader: + await gofile.api.upload_single(upload_filereader, token, folder_id) + database = database_ctx.get() + cursor = database.cursor() + cursor.execute( + "INSERT INTO gofile_files VALUES (?, ?)", + (job_id, str(upload_file)), + ) + database.commit() + + +async def do_gofile_upload(job: dict): + """ + Prepares a guest upload for the specific job's output files, then returns the folder URL + while the uploads are running in the background. + + This operation is idempotent; if it is interrupted, running this function will only upload + files that weren't already. + """ + database = database_ctx.get() + + known_files = set() + for known_file, *_ in database.execute( + "SELECT filepath FROM gofile_files WHERE job_id = ?", (job["id"],) + ): + known_files.add(pathlib.Path(known_file)) + + pending_files = set(map(pathlib.Path, job["output_paths"])) - known_files + + token = None + folder_id = None + gofile_url = None + + cursor = database.cursor() + cursor.execute( + "SELECT token, folder_id, url FROM gofile_tokens WHERE job_id = ?", (job["id"],) + ) + job_token_info = cursor.fetchone() + if job_token_info: + token, folder_id, gofile_url = job_token_info + else: + # get the smallest file to quickly get the upload result + pending_file = min(pending_files, key=lambda p: p.stat().st_size) + if pending_file in upload_tasks: + # another task is uploading the same file; just block until that one is available + upload = await upload_tasks[pending_file] + token, folder_id, gofile_url = ( + upload.result.guest_token, + upload.result.parent_folder, + upload.result.download_page, + ) + else: + with pending_file.open("rb") as fo: + upload_tasks[pending_file] = gofile.api.upload_single(fo) + upload = await upload_tasks[pending_file] + token, folder_id, gofile_url = ( + upload.result.guest_token, + upload.result.parent_folder, + upload.result.download_page, + ) + cursor.execute( + "INSERT INTO gofile_tokens VALUES (?, ?, ?, ?)", + (job["id"], token, folder_id, gofile_url), + ) + cursor.execute( + "INSERT INTO gofile_files VALUES (?, ?)", + (job["id"], str(pending_file)), + ) + database.commit() + pending_files.discard(pending_file) + + # upload any remaining files in the background + for pending_file in pending_files: + # don't double-dip and upload an in-progress file + if pending_file in upload_tasks: + continue + upload_tasks[pending_file] = asyncio.create_task( + gofile_upload_and_persist(cursor, pending_file, job["id"], token, folder_id) + ) + return gofile_url + + +@app.get("/<jobid>") +async def show_job(request, jobid): + job = await get_moombox_job_by_id(jobid) + if not job: + return "Couldn't find matching job", 404 + + config = config_ctx.get() + + channel = config.get_channel_config_by_id(job["channel_id"]) + stream_time = ( + datetime.datetime.fromisoformat(job["scheduled_start_datetime"]) + if "scheduled_start_datetime" in job and job["scheduled_start_datetime"] + else datetime.datetime.now(tz=datetime.UTC) + ) + + torrent_output_dir = pathlib.Path("torrents") + torrent_output_dir.mkdir(exist_ok=True) + + display_date = stream_display_date(stream_time) + display_date_str = display_date.strftime("%Y%m%d") + torrent_name = f"[{display_date_str}] Unarchived Karaoke ({job['video_id']})" + + render_kwargs = {} + if job["output_paths"]: + torrent_file = torrent_output_dir / f"{job['id']} ({job['video_id']}).torrent" + source_files = list(map(pathlib.Path, job["output_paths"])) + + save_path = str(source_files[0].parent) + + if not torrent_file.exists(): + t = torf.Torrent( + trackers=config.torrent.trackers, + ) + + # TODO extract key phrases from title / description + + t.filepaths = source_files + # note that the name also sets the subdirectory for the files + # as such, using other names for the same files *will* change the infohash + t.name = torrent_name + success = await asyncio.to_thread(t.generate) + if not success: + raise RuntimeError("Failed to generate torrent") + t.write(torrent_file) + t.write(torrent_output_dir / f"{torrent_name}.torrent") + + # TODO punt file to remote + + # send torrent to qbittorrent for seeding + if config.qbittorrent: + if config.qbittorrent.default_save_path: + save_path = config.qbittorrent.default_save_path + qbt_client = qbittorrentapi.Client( + host=config.qbittorrent.host, + username=config.qbittorrent.username, + password=config.qbittorrent.password, + VERIFY_WEBUI_CERTIFICATE=False, + RAISE_NOTIMPLEMENTEDERROR_FOR_UNIMPLEMENTED_API_ENDPOINTS=True, + ) + task = asyncio.create_task( + asyncio.to_thread( + qbt_client.torrents_add, + torrent_files=torrent_file, + save_path=save_path, + content_layout="NoSubfolder", + is_paused=True, + ) + ) + background_tasks.add(task) + task.add_done_callback(background_tasks.discard) + else: + t = torf.Torrent.read(torrent_file) + print(t) + print(t.magnet(size=False)) + print(t.files) + + # TODO punt file to remote + + render_kwargs["magnet_url"] = t.magnet(size=False) + + # punt files to gofile + render_kwargs["gofile_url"] = await do_gofile_upload(job) + return await microdot.jinja.Template("job.md").render_async( + job=job, + author_override=channel.name if channel else None, + stream_time=stream_time.strftime("%Y-%m-%dT%H:%MZ") if stream_time else "(unknown)", + **render_kwargs, + ) diff --git a/src/autotako/templates/index.html b/src/autotako/templates/index.html @@ -0,0 +1,11 @@ +<!DOCTYPE html> +<html> + <head> + <meta name="viewport" content="width=device-width, initial-scale=1.0" /> + <title>Hello!</title> + <style type="text/css">.body { width: auto; }</style> + </head> + <body> + {{ message }} + </body> +</html> +\ No newline at end of file diff --git a/src/autotako/templates/job.md b/src/autotako/templates/job.md @@ -0,0 +1,20 @@ +Archive of {{author_override or job.author}}'s stream on {{job.scheduled_start_datetime}}. +Original URL: https://youtu.be/{{job.video_id}} + +Torrent: {{magnet_url}} +Gofile (expires after download inactivity): {{gofile_url}} + +Number of fragments: {{job.manifest_progress.values() | sum(attribute="video_seq") + 1}} + +### Detailed QC report: +{% for broadcast, progress in job.manifest_progress.items() %} +- `{{broadcast}}` ({{progress.video_format.qualityLabel}}): + - Fragments: {{progress.video_seq + 1}} video, {{progress.audio_seq + 1}} audio + - Raw download size: {{(progress.total_downloaded or 0) | byteformat}} ({{"{:,}".format(progress.total_downloaded or 0)}} bytes) + - Muxed output size: {{(progress.output.total_size or 0) | byteformat}} ({{"{:,}".format(progress.output.total_size or 0)}} bytes) + - Output duration: {{(progress.output.out_time or "Not muxed").split(".") | first}} +{% endfor %} +{% if job.manifest_progress | length > 1 -%} +WARN: Fragments may overlap on multi-broadcast videos, or if unlucky, some portions may be missing entirely. +Ideally the operator is available to manually process the raw download into a single video. +{%- endif %}