autotako

Service to monitor moombox for completed livestream downloads to upload for distribution
git clone https://code.alwayswait.ing/autotako.git
Log | Files | Refs

commit 13a5e2a6c440079de909e07e2574bb49e06b4b8b
parent ac67546779040c0b148ceee0f07c50a49dc43a19
Author: archiveanon <>
Date:   Thu, 23 Jan 2025 09:41:58 +0000

Implement automatic job uploading

Diffstat:
Mpyproject.toml | 1+
Msrc/autotako/app.py | 49++++++++++++++++++++++++++++++++++++++++++++++---
Msrc/autotako/config.py | 5+++++
Msrc/autotako/database.py | 7+++++++
Msrc/autotako/job_render.py | 269++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------
Msrc/autotako/static/style.css | 23++++++++++++++++++++++-
Msrc/autotako/templates/edit.html | 23+++++++++++++++++++++--
Msrc/autotako/templates/index.html | 28+++++++++++++++++++++++-----
Msrc/autotako/templates/job.md | 2+-
9 files changed, 373 insertions(+), 34 deletions(-)

diff --git a/pyproject.toml b/pyproject.toml @@ -7,6 +7,7 @@ dependencies = [ "aiohttp ~= 3.11.10", "microdot ~= 2.0.7", "msgspec ~= 0.19.0", + "stamina ~= 24.3.0", ] requires-python = ">= 3.11" diff --git a/src/autotako/app.py b/src/autotako/app.py @@ -1,9 +1,11 @@ #!/usr/bin/python3 import argparse +import asyncio import datetime import json import pathlib +from typing import NamedTuple import httpx import microdot # type: ignore @@ -14,9 +16,15 @@ import rfeed # type: ignore from . import database, job_render from .config import AppConfig, config_ctx +from .database import database_ctx +from .job_render import JobConfig, MoomboxJobInfo, job_auto_monitor + +background_tasks = set() def _sizeof_fmt(num: int | float, suffix: str = "B") -> str: + # we implement this instead of using the built-in filesizeformat filter to accurately match + # ytarchive (and hoshinova by extension) # https://stackoverflow.com/a/1094933 for unit in ("", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi"): if abs(num) < 1024.0: @@ -25,6 +33,12 @@ def _sizeof_fmt(num: int | float, suffix: str = "B") -> str: return f"{num:.2f}Yi{suffix}" +class JobInfo(NamedTuple): + job: MoomboxJobInfo + completed: bool + conf: JobConfig | None + + def create_app(): # TODO implement app = microdot.Microdot() @@ -45,9 +59,30 @@ def create_app(): async def index(request): async with httpx.AsyncClient() as client: result = await client.get(f"{config.moombox_url}/status") + jobs = msgspec.convert(result.json(), type=list[MoomboxJobInfo]) + + db = database_ctx.get() + cursor = db.cursor() + configs = { + job_id: msgspec.json.decode(job_conf, type=JobConfig) + for job_id, job_conf in cursor.execute("SELECT job_id, config FROM job_config") + } + statuses = { + job_id: bool(completed) + for job_id, completed in cursor.execute("SELECT job_id, completed FROM job_status") + } + job_info = [ + JobInfo( + job, + statuses.get(job.id) or False, + configs.get(job.id, JobConfig.from_moombox_job(job)), + ) + for job in jobs + ] + return ( await microdot.jinja.Template("index.html").render_async( - moombox_jobs=reversed(result.json()), + moombox_jobs=reversed(job_info), moombox_url=config.moombox_url, ), { @@ -95,10 +130,14 @@ def create_app(): "Content-Type": "application/xml", } + monitor_task = asyncio.create_task(job_auto_monitor()) + background_tasks.add(monitor_task) + monitor_task.add_done_callback(background_tasks.discard) + return app -def main(): +async def amain(): parser = argparse.ArgumentParser() parser.add_argument("--host", default="0.0.0.0") parser.add_argument("--port", default=5000) @@ -107,7 +146,11 @@ def main(): args = parser.parse_args() app = create_app() - app.run(args.host, args.port, args.debug) + await app.start_server(args.host, args.port, args.debug) + + +def main(): + asyncio.run(amain()) if __name__ == "__main__": diff --git a/src/autotako/config.py b/src/autotako/config.py @@ -32,9 +32,14 @@ class WebDavConfig(msgspec.Struct): password: str +class AutoUploadConfig(msgspec.Struct): + active: bool = False + + class AppConfig(msgspec.Struct): moombox_url: str torrent: TorrentConfig + autoupload: AutoUploadConfig = msgspec.field(default_factory=AutoUploadConfig) channels: list[ChannelConfig] = msgspec.field(name="channel", default_factory=list) webdav: WebDavConfig | None = None qbittorrent: QBittorrentConfig | None = None diff --git a/src/autotako/database.py b/src/autotako/database.py @@ -19,6 +19,13 @@ def setup_database(database_path: pathlib.Path): "CREATE TABLE IF NOT EXISTS gofile_files " "(job_id TEXT NOT NULL, filepath TEXT, FOREIGN KEY(job_id) REFERENCES gofile_tokens(job_id))" ) + database.execute( + "CREATE TABLE IF NOT EXISTS job_config (job_id TEXT PRIMARY KEY, config TEXT)" + ) + database.execute( + "CREATE TABLE IF NOT EXISTS job_status (job_id TEXT PRIMARY KEY, completed BOOL)" + ) + database.commit() database_ctx.set(database) diff --git a/src/autotako/job_render.py b/src/autotako/job_render.py @@ -2,13 +2,18 @@ import asyncio import datetime +import enum +import itertools import pathlib import traceback +from typing import Self import gofile.api # type: ignore import httpx import microdot # type: ignore +import msgspec import qbittorrentapi # type: ignore +import stamina import torf # type: ignore from .config import WebDavConfig, config_ctx @@ -21,6 +26,120 @@ render_tasks: dict[str, asyncio.Task] = {} upload_tasks: dict[pathlib.Path, asyncio.Task] = {} +class JobUploadCriteria(enum.StrEnum): + MANUALLY = "manual" + WHEN_FINISHED = "finished" + IF_PRIVATE = "private" + IF_CUT = "cut" + + @property + def automatic(self) -> bool: + return self != JobUploadCriteria.MANUALLY + + +class MoomboxDownloadStatus(enum.StrEnum): + UNKNOWN = "Unknown" + UNAVAILABLE = "Unavailable" + WAITING = "Waiting" + DOWNLOADING = "Downloading" + MUXING = "Muxing" + FINISHED = "Finished" + ERROR = "Error" + + +class MoomboxJobInfo(msgspec.Struct, kw_only=True, frozen=True): + id: str + title: str | None + author: str | None + channel_id: str | None + video_id: str + manifest_progress: dict + status: MoomboxDownloadStatus = MoomboxDownloadStatus.UNKNOWN + output_paths: set[str] | None = msgspec.field(default_factory=set) + scheduled_start_datetime: datetime.datetime | None = None + + @property + def uploadability_state(self) -> JobUploadCriteria | None: + # returns a JobUploadCriteria that can be tested for equality to run + if self.status != MoomboxDownloadStatus.FINISHED: + return None + if not self.files_available: + return None + if not self.is_configured_for_upload: + return None + return JobUploadCriteria.WHEN_FINISHED + + @property + def is_configured_for_upload(self) -> bool: + if not self.channel_id: + return False + config = config_ctx.get() + channel_config = config.get_channel_config_by_id(self.channel_id) + if not channel_config: + return False + return True + + @property + def files_available(self) -> bool: + return bool(self.output_pathobjs) and all(p.exists() for p in self.output_pathobjs) + + @property + def output_pathobjs(self) -> set[pathlib.Path]: + return set(map(pathlib.Path, self.output_paths or [])) + + +class JobConfig(msgspec.Struct): + name: str + upload: JobUploadCriteria + + @classmethod + def from_moombox_job(cls, job: MoomboxJobInfo) -> Self: + # generate a default config from the job + name = "Unspecified Stream" + upload = JobUploadCriteria.MANUALLY + + if not job.title: + return cls(name, upload) + + # mapping of phrases and any possible synonyms + # if a synonym isn't present then an empty set should be present + phrases = { + "unarchived": {"no archive", "unarchive"}, + "karaoke": {"rock n' rawr", "歌枠", "sing"}, + "rebroadcast": {"kara-rewind"}, + } + + phrase_set = set(phrases.keys()) | set(itertools.chain.from_iterable(phrases.values())) + # not sure why mypy complains about job.title being possibly None here if left bare... + matched_phrases = set( + filter(lambda term: term in (job.title.lower() if job.title else ""), phrase_set) + ) + + # dedupe synonymous terms + for phrase, syn_phrases in phrases.items(): + if syn_phrases and matched_phrases & syn_phrases: + matched_phrases = (matched_phrases - syn_phrases) | {phrase} + + if matched_phrases == {"unarchived"}: + name = "Unarchived Stream" + upload = JobUploadCriteria.WHEN_FINISHED + elif matched_phrases == {"karaoke"}: + name = "Karaoke" + upload = JobUploadCriteria.IF_CUT + elif matched_phrases == {"unarchived", "karaoke"}: + name = "Unarchived Karaoke" + upload = JobUploadCriteria.WHEN_FINISHED + elif matched_phrases == {"unarchived", "karaoke", "rebroadcast"}: + name = "Unarchived Karaoke (rebroadcast)" + upload = JobUploadCriteria.MANUALLY + + # cannot auto-upload if no channel configuration exists + if not job.is_configured_for_upload: + upload = JobUploadCriteria.MANUALLY + + return cls(name, upload) + + def stream_display_date(dt: datetime.datetime) -> datetime.date: """ Given the stream start time, returns a display date. @@ -33,13 +152,17 @@ def stream_display_date(dt: datetime.datetime) -> datetime.date: return dt.date() + datetime.timedelta(days=1 if (dt.hour, dt.minute) >= end_of_day else 0) -async def get_moombox_job_by_id(jobid: str) -> dict | None: +async def get_moombox_job_by_id(jobid: str) -> MoomboxJobInfo | None: config = config_ctx.get() async with httpx.AsyncClient() as client: - result = await client.get(f"{config.moombox_url}/status") + for attempt in stamina.retry_context( + on=httpx.HTTPError, attempts=None, timeout=None, wait_initial=0.5, wait_max=10.0 + ): + with attempt: + result = await client.get(f"{config.moombox_url}/status") for job in result.json(): if job["id"] == jobid: - return job + return msgspec.convert(job, type=MoomboxJobInfo) return None @@ -57,7 +180,7 @@ async def gofile_upload_and_persist( database.commit() -async def do_gofile_upload(job: dict): +async def do_gofile_upload(job: MoomboxJobInfo): """ Prepares a guest upload for the specific job's output files, then returns the folder URL while the uploads are running in the background. @@ -69,11 +192,11 @@ async def do_gofile_upload(job: dict): known_files = set() for known_file, *_ in database.execute( - "SELECT filepath FROM gofile_files WHERE job_id = ?", (job["id"],) + "SELECT filepath FROM gofile_files WHERE job_id = ?", (job.id,) ): known_files.add(pathlib.Path(known_file)) - pending_files = set(map(pathlib.Path, job["output_paths"])) - known_files + pending_files = set(map(pathlib.Path, job.output_paths or [])) - known_files token = None folder_id = None @@ -81,7 +204,7 @@ async def do_gofile_upload(job: dict): cursor = database.cursor() cursor.execute( - "SELECT token, folder_id, url FROM gofile_tokens WHERE job_id = ?", (job["id"],) + "SELECT token, folder_id, url FROM gofile_tokens WHERE job_id = ?", (job.id,) ) job_token_info = cursor.fetchone() if job_token_info: @@ -108,11 +231,11 @@ async def do_gofile_upload(job: dict): ) cursor.execute( "INSERT INTO gofile_tokens VALUES (?, ?, ?, ?)", - (job["id"], token, folder_id, gofile_url), + (job.id, token, folder_id, gofile_url), ) cursor.execute( "INSERT INTO gofile_files VALUES (?, ?)", - (job["id"], str(pending_file)), + (job.id, str(pending_file)), ) database.commit() pending_files.discard(pending_file) @@ -123,7 +246,7 @@ async def do_gofile_upload(job: dict): if pending_file in upload_tasks: continue upload_tasks[pending_file] = asyncio.create_task( - gofile_upload_and_persist(cursor, pending_file, job["id"], token, folder_id) + gofile_upload_and_persist(cursor, pending_file, job.id, token, folder_id) ) return gofile_url @@ -143,21 +266,20 @@ async def _process_job(jobid): if not job: return "Couldn't find matching job", 404 + # TODO fill in necessary fields in template, etc. + job_config = _get_saved_jobconf(job.id) or JobConfig.from_moombox_job(job) + config = config_ctx.get() - channel = config.get_channel_config_by_id(job["channel_id"]) - stream_time = ( - datetime.datetime.fromisoformat(job["scheduled_start_datetime"]) - if "scheduled_start_datetime" in job and job["scheduled_start_datetime"] - else datetime.datetime.now(tz=datetime.UTC) - ) + channel = config.get_channel_config_by_id(job.channel_id) + stream_time = job.scheduled_start_datetime or datetime.datetime.now(tz=datetime.UTC) torrent_output_dir = pathlib.Path("torrents") torrent_output_dir.mkdir(exist_ok=True) display_date = stream_display_date(stream_time) display_date_str = display_date.strftime("%Y%m%d") - torrent_name = f"[{display_date_str}] Unarchived Karaoke ({job['video_id']})" + torrent_name = f"[{display_date_str}] {job_config.name} ({job.video_id})" folder_name = None if stream_time: @@ -165,9 +287,9 @@ async def _process_job(jobid): readme_finalized = False render_kwargs = {} - if job["output_paths"]: - torrent_file = torrent_output_dir / f"{job['id']} ({job['video_id']}).torrent" - source_files = list(map(pathlib.Path, job["output_paths"])) + if job.output_paths: + torrent_file = torrent_output_dir / f"{job.id} ({job.video_id}).torrent" + source_files = list(map(pathlib.Path, job.output_paths)) save_path = str(source_files[0].parent) @@ -235,11 +357,11 @@ async def _process_job(jobid): rendered_job = await microdot.jinja.Template("job.md").render_async( job=job, author_override=channel.name if channel else None, - stream_time=stream_time.strftime("%Y-%m-%dT%H:%MZ") if stream_time else "(unknown)", + stream_time=stream_time.strftime("%Y-%m-%dT%H:%M:%SZ") if stream_time else "(unknown)", **render_kwargs, ) if readme_finalized and config.webdav and channel and channel.webdav_path: - rendered_job_file = torrent_output_dir / f"{job['id']} ({job['video_id']}).md" + rendered_job_file = torrent_output_dir / f"{job.id} ({job.video_id}).md" rendered_job_file.write_text(rendered_job) target_base = f"{channel.webdav_path}/{folder_name}" task = asyncio.create_task( @@ -247,6 +369,14 @@ async def _process_job(jobid): ) background_tasks.add(task) task.add_done_callback(background_tasks.discard) + + database = database_ctx.get() + cursor = database.cursor() + cursor.execute( + "INSERT INTO job_status VALUES (?, ?) ON CONFLICT(job_id) DO UPDATE SET completed=excluded.completed", + (jobid, True), + ) + database.commit() if not readme_finalized and jobid in render_tasks: # allow rerun if not finalized del render_tasks[jobid] @@ -265,6 +395,58 @@ def get_process_job_task(jobid: str) -> asyncio.Task: return render_tasks[jobid] +def _get_saved_jobconf(jobid: str) -> JobConfig | None: + database = database_ctx.get() + cursor = database.cursor() + cursor.execute("SELECT config FROM job_config WHERE job_id = ?", (jobid,)) + result = cursor.fetchone() + if result: + job_config_payload, *_ = result + return msgspec.json.decode(job_config_payload, type=JobConfig) + return None + + +@app.get("/<jobid>/config") +async def show_job_editor(request, jobid: str): + job = await get_moombox_job_by_id(jobid) + if not job: + return "Couldn't find matching job", 404 + + job_config = _get_saved_jobconf(job.id) or JobConfig.from_moombox_job(job) + + return ( + await microdot.jinja.Template("edit.html").render_async(job=job, config=job_config), + { + "Content-Type": "text/html; charset=utf-8", + }, + ) + + +@app.post("/<jobid>/config") +async def apply_job_config(request, jobid: str): + job = await get_moombox_job_by_id(jobid) + if not job: + return "Couldn't find matching job", 404 + jobconf = msgspec.convert( + {item: request.form[item] for item in request.form}, type=JobConfig + ) + database = database_ctx.get() + + cursor = database.cursor() + cursor.execute( + "INSERT INTO job_config VALUES (?, ?) ON CONFLICT(job_id) DO UPDATE SET config=excluded.config", + (jobid, msgspec.json.encode(jobconf)), + ) + database.commit() + + return ( + await microdot.jinja.Template("edit.html").render_async(job=job, config=jobconf), + { + "Content-Type": "text/html; charset=utf-8", + }, + ) + + @app.get("/<jobid>") async def show_job(request, jobid: str): return await get_process_job_task(jobid) @@ -278,3 +460,46 @@ async def upload_job(request, jobid: str): except torf.TorfError: traceback.print_exc() return await microdot.jinja.Template("job/upload_error.html").render_async() + + +async def job_auto_monitor(): + while True: + config = config_ctx.get() + + async with httpx.AsyncClient() as client: + for attempt in stamina.retry_context( + on=httpx.HTTPError, attempts=None, timeout=None, wait_initial=0.5, wait_max=10.0 + ): + with attempt: + result = await client.get(f"{config.moombox_url}/status") + + jobs = msgspec.convert(result.json(), type=list[MoomboxJobInfo]) + + database = database_ctx.get() + cursor = database.cursor() + configs = { + job_id: msgspec.json.decode(job_conf, type=JobConfig) + for job_id, job_conf in cursor.execute("SELECT job_id, config FROM job_config") + } + statuses = { + job_id: bool(completed) + for job_id, completed in cursor.execute("SELECT job_id, completed FROM job_status") + } + + for job in reversed(jobs): + if not job.uploadability_state: + continue + job_conf = configs.get(job.id) + if not job_conf: + job_conf = JobConfig.from_moombox_job(job) + if job.uploadability_state != job_conf.upload: + continue + if not config.autoupload.active: + print(f"'{job.title}' meets conditions for upload") + continue + if statuses.get(job.id): + continue + print(f"'{job.title}' is scheduled for upload") + get_process_job_task(job.id) + + await asyncio.sleep(120) diff --git a/src/autotako/static/style.css b/src/autotako/static/style.css @@ -56,12 +56,33 @@ a { .job__autoupload { color: var(--sl-color-neutral-500); } -.job__autoupload--active sl-icon-button::part(base) { +.job__autoupload--active sl-icon { color: var(--sl-color-primary-500); } +.job__manualupload--unconfigured sl-icon-button::part(base) { + color: var(--sl-color-warning-600); +} .job__manualupload--done sl-icon-button::part(base) { color: var(--sl-color-success-600); } .job__manualupload--fail sl-icon-button::part(base) { color: var(--sl-color-danger-600); } +.job-editor { + max-width: 38rem; + margin: auto; + padding: var(--sl-spacing-medium); +} +.job-editor__form { + display: flex; + flex-direction: column; + gap: var(--sl-spacing-small); + padding: var(--sl-spacing-medium) 0; +} +.job-editor__title { + font-size: var(--sl-font-size-large); + font-weight: var(--sl-font-weight-semibold); +} +.return { + padding-bottom: var(--sl-spacing-small); +} diff --git a/src/autotako/templates/edit.html b/src/autotako/templates/edit.html @@ -7,7 +7,26 @@ <script type="module" src="https://cdn.jsdelivr.net/npm/@shoelace-style/shoelace@2.16.0/cdn/shoelace-autoloader.js" crossorigin="anonymous"></script> <link rel="stylesheet" href="/static/style.css" type="text/css" /> </head> - <body> - + <body class="job-editor"> + <sl-button class="return" href="/"> + <sl-icon slot="prefix" name="arrow-left"></sl-icon> + Back to overview + </sl-button> + <main> + <div class="job-editor__title">{{ job.title }}</div> + <form method="post" class="job-editor__form"> + <sl-input name="name" label="Torrent name (exclude date, video ID)" value="{{ config.name }}"></sl-input> + <sl-radio-group name="upload" value="{{ config.upload }}" label="Upload process:"> + <sl-radio value="manual">Manual</sl-radio> + <sl-radio value="finished">When finished</sl-radio> + <sl-radio value="private">If made private</sl-radio> + <sl-radio value="cut">If contents cut</sl-radio> + </sl-radio-group> + <div> + <sl-button variant="primary" type="submit">Save</sl-button> + <sl-button variant="default" href="/render/{{ job.id }}/config">Cancel</sl-button> + </div> + </form> + </main> </body> </html> diff --git a/src/autotako/templates/index.html b/src/autotako/templates/index.html @@ -11,9 +11,9 @@ <body> <h2>autotako control panel = w=)b</h2> <div class="job-table"> -{% for job in moombox_jobs %} -{% set no_outputs = (not job.output_paths and job.status == "Finished") %} - <div class="job__item {{- ' job__item--disabled' if no_outputs }}"> +{% for job, completed, conf in moombox_jobs %} +{% set no_outputs = (job.status == "Finished" and not job.files_available) %} + <div class="job__item {{- ' job__item--disabled' if no_outputs}}"> <div class="job__info"> <div class="job__title"> <a href="{{ moombox_url }}job/{{ job.id }}">{{ job.title }}</a> @@ -23,18 +23,36 @@ </div> </div> <div class="job__description"> - Unarchived Karaoke - <sl-icon-button name="pencil-square"></sl-icon-button> + {{ conf.name or "Unspecified Stream" }} + <sl-icon-button href="render/{{ job.id }}/config" name="pencil-square"></sl-icon-button> </div> <div class="job__controls"> + {% if conf.upload and conf.upload.automatic %} + <div class="job__autoupload job__autoupload--active"> + <sl-tooltip content="Scheduled for auto-upload (on: {{ conf.upload | lower }})"> + <sl-icon name="stopwatch"></sl-icon> + </sl-tooltip> + </div> + {% else %} <div class="job__autoupload"> <sl-tooltip content="Not scheduled for auto-upload"> <sl-icon name="stopwatch"></sl-icon> </sl-tooltip> </div> + {% endif %} + {% if completed %} + {% include "job/upload_success.html" %} + {% elif not job.is_configured_for_upload %} + <div class="job__manualupload job__manualupload--unconfigured"> + <sl-tooltip content="Channel not configured for one-click upload"> + <sl-icon-button name="exclamation-triangle"></sl-icon-button> + </sl-tooltip> + </div> + {% else %} <div class="job__manualupload" hx-target="this"> <sl-icon-button hx-post="render/{{ job.id }}" hx-swap="outerHTML" name="cloud-upload" {{- ' disabled' if no_outputs }}></sl-icon-button> </div> + {% endif %} </div> </div> {% endfor %} diff --git a/src/autotako/templates/job.md b/src/autotako/templates/job.md @@ -1,4 +1,4 @@ -Archive of {{author_override or job.author}}'s stream on {{job.scheduled_start_datetime}}. +Archive of {{author_override or job.author}}'s stream on {{stream_time}}. Original URL: https://youtu.be/{{job.video_id}} Torrent: {{magnet_url}}