autotako

Service to monitor moombox for completed livestream downloads to upload for distribution
git clone https://code.alwayswait.ing/autotako
Log | Files | Refs | README

job_render.py (19858B)


      1 #!/usr/bin/python3
      2 
      3 import asyncio
      4 import datetime
      5 import enum
      6 import itertools
      7 import logging
      8 import pathlib
      9 import re
     10 from typing import Self
     11 
     12 import gofile.api  # type: ignore
     13 import httpx
     14 import microdot  # type: ignore
     15 import msgspec
     16 import qbittorrentapi  # type: ignore
     17 import stamina
     18 import torf  # type: ignore
     19 
     20 from .config import WebDavConfig, config_ctx
     21 from .database import database_ctx
     22 
     23 app = microdot.Microdot()
     24 
     25 log = logging.getLogger(__name__)
     26 
     27 background_tasks = set()
     28 render_tasks: dict[str, asyncio.Task] = {}
     29 upload_tasks: dict[pathlib.Path, asyncio.Task] = {}
     30 
     31 # https://www.japanesewithanime.com/2017/05/quotation-marks-japanese.html#sumitsukikakko
     32 # strip extraneous spaces and hashtag
     33 _pattern_bracketed_kw = re.compile(r"^【[\s#]*(?P<topic>.*?)[\s]?】")
     34 
     35 # bae prefers the double angle brackets because she's quirky like that
     36 _pattern_angled_bracketed_kw = re.compile(r"^≪[\s#]*(?P<topic>.*?)[\s]?≫")
     37 
     38 
     39 class JobUploadCriteria(enum.StrEnum):
     40     MANUALLY = "manual"
     41     WHEN_FINISHED = "finished"
     42     IF_PRIVATE = "private"
     43     IF_CUT = "cut"
     44 
     45     @property
     46     def automatic(self) -> bool:
     47         return self != JobUploadCriteria.MANUALLY
     48 
     49 
     50 class MoomboxDownloadStatus(enum.StrEnum):
     51     UNKNOWN = "Unknown"
     52     UNAVAILABLE = "Unavailable"
     53     WAITING = "Waiting"
     54     DOWNLOADING = "Downloading"
     55     MUXING = "Muxing"
     56     FINISHED = "Finished"
     57     ERROR = "Error"
     58 
     59 
     60 class MoomboxJobInfo(msgspec.Struct, kw_only=True, frozen=True):
     61     id: str
     62     title: str | None
     63     author: str | None
     64     channel_id: str | None
     65     video_id: str | None
     66     manifest_progress: dict
     67     status: MoomboxDownloadStatus = MoomboxDownloadStatus.UNKNOWN
     68     output_paths: set[str] | None = msgspec.field(default_factory=set)
     69     scheduled_start_datetime: datetime.datetime | None = None
     70 
     71     @property
     72     def uploadability_state(self) -> JobUploadCriteria | None:
     73         # returns a JobUploadCriteria that can be tested for equality to run
     74         if self.status != MoomboxDownloadStatus.FINISHED:
     75             return None
     76         if not self.files_available:
     77             return None
     78         if not self.is_configured_for_upload:
     79             return None
     80         return JobUploadCriteria.WHEN_FINISHED
     81 
     82     @property
     83     def is_configured_for_upload(self) -> bool:
     84         if not self.channel_id:
     85             return False
     86         config = config_ctx.get()
     87         channel_config = config.get_channel_config_by_id(self.channel_id)
     88         if not channel_config:
     89             return False
     90         return True
     91 
     92     @property
     93     def files_available(self) -> bool:
     94         return bool(self.output_pathobjs) and all(p.exists() for p in self.output_pathobjs)
     95 
     96     @property
     97     def output_pathobjs(self) -> set[pathlib.Path]:
     98         return set(map(pathlib.Path, self.output_paths or []))
     99 
    100 
    101 def _extract_name_from_stream_title(title: str) -> str | None:
    102     """
    103     Attempts to extract a preset name from the stream title.
    104     """
    105     match = _pattern_bracketed_kw.search(title) or _pattern_angled_bracketed_kw.search(title)
    106     if match:
    107         return str(match["topic"]).strip()
    108     return None
    109 
    110 
    111 class JobConfig(msgspec.Struct):
    112     name: str
    113     upload: JobUploadCriteria
    114     author_note: str | None = None
    115 
    116     @classmethod
    117     def from_moombox_job(cls, job: MoomboxJobInfo) -> Self:
    118         # generate a default config from the job
    119         name = "Unspecified Stream"
    120         upload = JobUploadCriteria.MANUALLY
    121 
    122         if not job.title:
    123             return cls(name, upload)
    124 
    125         name = _extract_name_from_stream_title(job.title) or name
    126 
    127         # mapping of phrases and any possible synonyms
    128         # if a synonym isn't present then an empty set should be present
    129         phrases = {
    130             "unarchived": {"no archive", "unarchive"},
    131             "karaoke": {"rock n' rawr", "歌枠", "sing"},
    132             "rebroadcast": {"kara-rewind"},
    133         }
    134 
    135         phrase_set = set(phrases.keys()) | set(itertools.chain.from_iterable(phrases.values()))
    136         # not sure why mypy complains about job.title being possibly None here if left bare...
    137         matched_phrases = set(
    138             filter(lambda term: term in (job.title.lower() if job.title else ""), phrase_set)
    139         )
    140 
    141         # dedupe synonymous terms
    142         for phrase, syn_phrases in phrases.items():
    143             if syn_phrases and matched_phrases & syn_phrases:
    144                 matched_phrases = (matched_phrases - syn_phrases) | {phrase}
    145 
    146         if matched_phrases == {"unarchived"}:
    147             name = "Unarchived Stream"
    148             upload = JobUploadCriteria.WHEN_FINISHED
    149         elif matched_phrases == {"karaoke"}:
    150             name = "Karaoke"
    151             upload = JobUploadCriteria.IF_CUT
    152         elif matched_phrases == {"unarchived", "karaoke"}:
    153             name = "Unarchived Karaoke"
    154             upload = JobUploadCriteria.WHEN_FINISHED
    155         elif matched_phrases == {"unarchived", "karaoke", "rebroadcast"}:
    156             name = "Unarchived Karaoke (rebroadcast)"
    157             upload = JobUploadCriteria.MANUALLY
    158 
    159         # cannot auto-upload if no channel configuration exists
    160         if not job.is_configured_for_upload:
    161             upload = JobUploadCriteria.MANUALLY
    162 
    163         return cls(name, upload)
    164 
    165 
    166 def stream_display_date(dt: datetime.datetime) -> datetime.date:
    167     """
    168     Given the stream start time, returns a display date.
    169 
    170     If the stream start time is within the last 10 minutes of the day, we treat the next day
    171     as the stream date instead based on the assumption that the streamer has a waiting screen
    172     set up.
    173     """
    174     end_of_day = (23, 50)
    175     return dt.date() + datetime.timedelta(days=1 if (dt.hour, dt.minute) >= end_of_day else 0)
    176 
    177 
    178 async def get_moombox_job_by_id(jobid: str) -> MoomboxJobInfo | None:
    179     config = config_ctx.get()
    180     async with httpx.AsyncClient() as client:
    181         for attempt in stamina.retry_context(
    182             on=httpx.HTTPError, attempts=None, timeout=None, wait_initial=0.5, wait_max=10.0
    183         ):
    184             with attempt:
    185                 result = await client.get(f"{config.moombox_url}/status")
    186         for job in result.json():
    187             if job["id"] == jobid:
    188                 return msgspec.convert(job, type=MoomboxJobInfo)
    189         return None
    190 
    191 
    192 async def gofile_upload_and_persist(
    193     cursor, upload_file: pathlib.Path, job_id: str, token: str, folder_id: str
    194 ):
    195     with upload_file.open("rb") as upload_filereader:
    196         await gofile.api.upload_single(upload_filereader, token, folder_id)
    197         database = database_ctx.get()
    198         cursor = database.cursor()
    199         cursor.execute(
    200             "INSERT INTO gofile_files VALUES (?, ?)",
    201             (job_id, str(upload_file)),
    202         )
    203         database.commit()
    204 
    205 
    206 async def do_gofile_upload(job: MoomboxJobInfo):
    207     """
    208     Prepares a guest upload for the specific job's output files, then returns the folder URL
    209     while the uploads are running in the background.
    210 
    211     This operation is idempotent; if it is interrupted, running this function will only upload
    212     files that weren't already.
    213     """
    214     database = database_ctx.get()
    215 
    216     known_files = set()
    217     for known_file, *_ in database.execute(
    218         "SELECT filepath FROM gofile_files WHERE job_id = ?", (job.id,)
    219     ):
    220         known_files.add(pathlib.Path(known_file))
    221 
    222     pending_files = set(map(pathlib.Path, job.output_paths or [])) - known_files
    223 
    224     token = None
    225     folder_id = None
    226     gofile_url = None
    227 
    228     cursor = database.cursor()
    229     cursor.execute(
    230         "SELECT token, folder_id, url FROM gofile_tokens WHERE job_id = ?", (job.id,)
    231     )
    232     job_token_info = cursor.fetchone()
    233     if job_token_info:
    234         token, folder_id, gofile_url = job_token_info
    235     else:
    236         # get the smallest file to quickly get the upload result
    237         pending_file = min(pending_files, key=lambda p: p.stat().st_size)
    238         if pending_file in upload_tasks:
    239             # another task is uploading the same file; just block until that one is available
    240             upload = await upload_tasks[pending_file]
    241             token, folder_id, gofile_url = (
    242                 upload.result.guest_token,
    243                 upload.result.parent_folder,
    244                 upload.result.download_page,
    245             )
    246         else:
    247             with pending_file.open("rb") as fo:
    248                 upload_tasks[pending_file] = gofile.api.upload_single(fo)
    249                 upload = await upload_tasks[pending_file]
    250                 token, folder_id, gofile_url = (
    251                     upload.result.guest_token,
    252                     upload.result.parent_folder,
    253                     upload.result.download_page,
    254                 )
    255             cursor.execute(
    256                 "INSERT INTO gofile_tokens VALUES (?, ?, ?, ?)",
    257                 (job.id, token, folder_id, gofile_url),
    258             )
    259             cursor.execute(
    260                 "INSERT INTO gofile_files VALUES (?, ?)",
    261                 (job.id, str(pending_file)),
    262             )
    263             database.commit()
    264         pending_files.discard(pending_file)
    265 
    266     # upload any remaining files in the background
    267     for pending_file in pending_files:
    268         # don't double-dip and upload an in-progress file
    269         if pending_file in upload_tasks:
    270             continue
    271         upload_tasks[pending_file] = asyncio.create_task(
    272             gofile_upload_and_persist(cursor, pending_file, job.id, token, folder_id)
    273         )
    274     return gofile_url
    275 
    276 
    277 async def do_webdav_upload(webdav: WebDavConfig, filepath: pathlib.Path, target: str):
    278     auth = httpx.BasicAuth(username=webdav.username, password=webdav.password)
    279     async with httpx.AsyncClient(auth=auth) as client:
    280         connection_warning_seen = False
    281         while True:
    282             try:
    283                 dest = f"{webdav.base_url}/{target}"
    284                 file_check = await client.head(dest)
    285                 if file_check.status_code == httpx.codes.NOT_FOUND:
    286                     with filepath.open("rb") as fh:
    287                         await client.put(dest, content=fh.read())
    288                 return
    289             except httpx.ConnectTimeout:
    290                 if not connection_warning_seen:
    291                     log.warning(f"Failed to connect to {webdav.base_url}.  Retrying...")
    292                     connection_warning_seen = True
    293                 await asyncio.sleep(10)
    294 
    295 
    296 async def _process_job(jobid):
    297     job = await get_moombox_job_by_id(jobid)
    298     if not job:
    299         return "Couldn't find matching job", 404
    300 
    301     # TODO fill in necessary fields in template, etc.
    302     job_config = _get_saved_jobconf(job.id) or JobConfig.from_moombox_job(job)
    303 
    304     config = config_ctx.get()
    305 
    306     channel = config.get_channel_config_by_id(job.channel_id)
    307     stream_time = job.scheduled_start_datetime or datetime.datetime.now(tz=datetime.UTC)
    308 
    309     torrent_output_dir = pathlib.Path("torrents")
    310     torrent_output_dir.mkdir(exist_ok=True)
    311 
    312     display_date = stream_display_date(stream_time)
    313     display_date_str = display_date.strftime("%Y%m%d")
    314     torrent_name = f"[{display_date_str}] {job_config.name} ({job.video_id})"
    315 
    316     folder_name = None
    317     if stream_time:
    318         folder_name = stream_time.strftime("%Y-%m-%dT%H:%MZ")
    319 
    320     readme_finalized = False
    321     render_kwargs = {}
    322     if job.output_paths:
    323         torrent_file = torrent_output_dir / f"{job.id} ({job.video_id}).torrent"
    324         source_files = list(map(pathlib.Path, job.output_paths))
    325 
    326         if not all(f.exists() for f in source_files):
    327             return
    328 
    329         save_path = str(source_files[0].parent)
    330 
    331         if not torrent_file.exists():
    332             t = torf.Torrent(
    333                 trackers=config.torrent.trackers,
    334             )
    335 
    336             # TODO extract key phrases from title / description
    337 
    338             t.filepaths = source_files
    339             # note that the name also sets the subdirectory for the files
    340             # as such, using other names for the same files *will* change the infohash
    341             t.name = torrent_name
    342             success = await asyncio.to_thread(t.generate)
    343             if not success:
    344                 raise RuntimeError("Failed to generate torrent")
    345             t.write(torrent_file)
    346             t.write(torrent_output_dir / f"{torrent_name}.torrent")
    347 
    348             # send torrent to qbittorrent for seeding
    349             if config.qbittorrent:
    350                 if config.qbittorrent.default_save_path:
    351                     save_path = config.qbittorrent.default_save_path
    352                 qbt_client = qbittorrentapi.Client(
    353                     host=config.qbittorrent.host,
    354                     username=config.qbittorrent.username,
    355                     password=config.qbittorrent.password,
    356                     VERIFY_WEBUI_CERTIFICATE=False,
    357                     RAISE_NOTIMPLEMENTEDERROR_FOR_UNIMPLEMENTED_API_ENDPOINTS=True,
    358                 )
    359                 task = asyncio.create_task(
    360                     asyncio.to_thread(
    361                         qbt_client.torrents_add,
    362                         torrent_files=torrent_file,
    363                         save_path=save_path,
    364                         content_layout="NoSubfolder",
    365                         # annoyingly, the API doesn't translate these kwargs
    366                         # - is_paused for 4.5.2 (Debian 12, qbittorrent-api==2022.7.33)
    367                         # - is_stopped for 5.1.0 (Debian 13, qbittorrent-api==2025.11.1)
    368                         is_paused=config.qbittorrent.start_paused,
    369                         is_stopped=config.qbittorrent.start_paused,
    370                     )
    371                 )
    372                 background_tasks.add(task)
    373                 task.add_done_callback(background_tasks.discard)
    374         else:
    375             t = torf.Torrent.read(torrent_file)
    376         log.debug(t)
    377         log.debug(t.magnet(size=False))
    378         log.debug(t.files)
    379 
    380         # punt file to webdav remote
    381         if config.webdav and channel and channel.webdav_path:
    382             target_base = f"{channel.webdav_path}/{folder_name}"
    383             task = asyncio.create_task(
    384                 do_webdav_upload(
    385                     config.webdav, torrent_file, f"{target_base}/{torrent_name}.torrent"
    386                 )
    387             )
    388             background_tasks.add(task)
    389             task.add_done_callback(background_tasks.discard)
    390 
    391         render_kwargs["magnet_url"] = t.magnet(size=False)
    392 
    393         # punt files to gofile
    394         render_kwargs["gofile_url"] = await do_gofile_upload(job)
    395         readme_finalized = True
    396     rendered_job = await microdot.jinja.Template("job.md").render_async(
    397         job=job,
    398         author_override=channel.name if channel else None,
    399         stream_time=stream_time.strftime("%Y-%m-%dT%H:%M:%SZ") if stream_time else "(unknown)",
    400         config=job_config,
    401         **render_kwargs,
    402     )
    403     if readme_finalized and config.webdav and channel and channel.webdav_path:
    404         rendered_job_file = torrent_output_dir / f"{job.id} ({job.video_id}).md"
    405         rendered_job_file.write_text(rendered_job)
    406         target_base = f"{channel.webdav_path}/{folder_name}"
    407         task = asyncio.create_task(
    408             do_webdav_upload(config.webdav, rendered_job_file, f"{target_base}/REPORT.md")
    409         )
    410         background_tasks.add(task)
    411         task.add_done_callback(background_tasks.discard)
    412 
    413         database = database_ctx.get()
    414         cursor = database.cursor()
    415         cursor.execute(
    416             "INSERT INTO job_status VALUES (?, ?) ON CONFLICT(job_id) DO UPDATE SET completed=excluded.completed",
    417             (jobid, True),
    418         )
    419         database.commit()
    420     if not readme_finalized and jobid in render_tasks:
    421         # allow rerun if not finalized
    422         del render_tasks[jobid]
    423     return rendered_job
    424 
    425 
    426 def get_process_job_task(jobid: str) -> asyncio.Task:
    427     """
    428     Creates or retrieves a singleton job rendering task.
    429     This is decoupled from the request callback to allow cancellations and multiple consumers.
    430     """
    431     if jobid in render_tasks and render_tasks[jobid].done() and render_tasks[jobid].exception():
    432         del render_tasks[jobid]
    433     if jobid not in render_tasks:
    434         render_tasks[jobid] = asyncio.create_task(_process_job(jobid))
    435     return render_tasks[jobid]
    436 
    437 
    438 def _get_saved_jobconf(jobid: str) -> JobConfig | None:
    439     database = database_ctx.get()
    440     cursor = database.cursor()
    441     cursor.execute("SELECT config FROM job_config WHERE job_id = ?", (jobid,))
    442     result = cursor.fetchone()
    443     if result:
    444         job_config_payload, *_ = result
    445         return msgspec.json.decode(job_config_payload, type=JobConfig)
    446     return None
    447 
    448 
    449 @app.get("/<jobid>/config")
    450 async def show_job_editor(request, jobid: str):
    451     job = await get_moombox_job_by_id(jobid)
    452     if not job:
    453         return "Couldn't find matching job", 404
    454 
    455     job_config = _get_saved_jobconf(job.id) or JobConfig.from_moombox_job(job)
    456 
    457     return (
    458         await microdot.jinja.Template("edit.html").render_async(job=job, config=job_config),
    459         {
    460             "Content-Type": "text/html; charset=utf-8",
    461         },
    462     )
    463 
    464 
    465 @app.post("/<jobid>/config")
    466 async def apply_job_config(request, jobid: str):
    467     job = await get_moombox_job_by_id(jobid)
    468     if not job:
    469         return "Couldn't find matching job", 404
    470     jobconf = msgspec.convert(
    471         {item: request.form[item] for item in request.form}, type=JobConfig
    472     )
    473     database = database_ctx.get()
    474 
    475     cursor = database.cursor()
    476     cursor.execute(
    477         "INSERT INTO job_config VALUES (?, ?) ON CONFLICT(job_id) DO UPDATE SET config=excluded.config",
    478         (jobid, msgspec.json.encode(jobconf)),
    479     )
    480     database.commit()
    481 
    482     return (
    483         await microdot.jinja.Template("edit.html").render_async(job=job, config=jobconf),
    484         {
    485             "Content-Type": "text/html; charset=utf-8",
    486         },
    487     )
    488 
    489 
    490 @app.get("/<jobid>")
    491 async def show_job(request, jobid: str):
    492     return await get_process_job_task(jobid)
    493 
    494 
    495 @app.post("/<jobid>")
    496 async def upload_job(request, jobid: str):
    497     try:
    498         await get_process_job_task(jobid)
    499         return await microdot.jinja.Template("job/upload_success.html").render_async()
    500     except torf.TorfError:
    501         log.exception(f"Failed to upload job {jobid}")
    502         return await microdot.jinja.Template("job/upload_error.html").render_async()
    503 
    504 
    505 async def job_auto_monitor():
    506     while True:
    507         config = config_ctx.get()
    508 
    509         async with httpx.AsyncClient() as client:
    510             for attempt in stamina.retry_context(
    511                 on=httpx.HTTPError, attempts=None, timeout=None, wait_initial=0.5, wait_max=10.0
    512             ):
    513                 with attempt:
    514                     result = await client.get(f"{config.moombox_url}/status")
    515 
    516         jobs = []
    517         for serialized_job in result.json():
    518             try:
    519                 jobs.append(msgspec.convert(serialized_job, type=MoomboxJobInfo))
    520             except msgspec.ValidationError:
    521                 pass
    522 
    523         database = database_ctx.get()
    524         cursor = database.cursor()
    525         configs = {
    526             job_id: msgspec.json.decode(job_conf, type=JobConfig)
    527             for job_id, job_conf in cursor.execute("SELECT job_id, config FROM job_config")
    528         }
    529         statuses = {
    530             job_id: bool(completed)
    531             for job_id, completed in cursor.execute("SELECT job_id, completed FROM job_status")
    532         }
    533 
    534         for job in reversed(jobs):
    535             if not job.uploadability_state:
    536                 continue
    537             job_conf = configs.get(job.id)
    538             if not job_conf:
    539                 job_conf = JobConfig.from_moombox_job(job)
    540             if job.uploadability_state != job_conf.upload:
    541                 continue
    542             if not config.autoupload.active:
    543                 log.info(f"'{job.title}' meets conditions for upload")
    544                 continue
    545             if statuses.get(job.id):
    546                 continue
    547             log.info(f"'{job.title}' is scheduled for upload")
    548             get_process_job_task(job.id)
    549 
    550         await asyncio.sleep(120)