autotako

Service to monitor moombox for completed livestream downloads to upload for distribution
git clone https://code.alwayswait.ing/autotako.git
Log | Files | Refs

job_render.py (18977B)


      1 #!/usr/bin/python3
      2 
      3 import asyncio
      4 import datetime
      5 import enum
      6 import itertools
      7 import pathlib
      8 import re
      9 import traceback
     10 from typing import Self
     11 
     12 import gofile.api  # type: ignore
     13 import httpx
     14 import microdot  # type: ignore
     15 import msgspec
     16 import qbittorrentapi  # type: ignore
     17 import stamina
     18 import torf  # type: ignore
     19 
     20 from .config import WebDavConfig, config_ctx
     21 from .database import database_ctx
     22 
     23 app = microdot.Microdot()
     24 
     25 background_tasks = set()
     26 render_tasks: dict[str, asyncio.Task] = {}
     27 upload_tasks: dict[pathlib.Path, asyncio.Task] = {}
     28 
     29 # https://www.japanesewithanime.com/2017/05/quotation-marks-japanese.html#sumitsukikakko
     30 # strip extraneous spaces and hashtag
     31 _pattern_bracketed_kw = re.compile(r"^【[\s#]*(?P<topic>.*?)[\s]?】")
     32 
     33 # bae prefers the double angle brackets because she's quirky like that
     34 _pattern_angled_bracketed_kw = re.compile(r"^≪[\s#]*(?P<topic>.*?)[\s]?≫")
     35 
     36 
     37 class JobUploadCriteria(enum.StrEnum):
     38     MANUALLY = "manual"
     39     WHEN_FINISHED = "finished"
     40     IF_PRIVATE = "private"
     41     IF_CUT = "cut"
     42 
     43     @property
     44     def automatic(self) -> bool:
     45         return self != JobUploadCriteria.MANUALLY
     46 
     47 
     48 class MoomboxDownloadStatus(enum.StrEnum):
     49     UNKNOWN = "Unknown"
     50     UNAVAILABLE = "Unavailable"
     51     WAITING = "Waiting"
     52     DOWNLOADING = "Downloading"
     53     MUXING = "Muxing"
     54     FINISHED = "Finished"
     55     ERROR = "Error"
     56 
     57 
     58 class MoomboxJobInfo(msgspec.Struct, kw_only=True, frozen=True):
     59     id: str
     60     title: str | None
     61     author: str | None
     62     channel_id: str | None
     63     video_id: str | None
     64     manifest_progress: dict
     65     status: MoomboxDownloadStatus = MoomboxDownloadStatus.UNKNOWN
     66     output_paths: set[str] | None = msgspec.field(default_factory=set)
     67     scheduled_start_datetime: datetime.datetime | None = None
     68 
     69     @property
     70     def uploadability_state(self) -> JobUploadCriteria | None:
     71         # returns a JobUploadCriteria that can be tested for equality to run
     72         if self.status != MoomboxDownloadStatus.FINISHED:
     73             return None
     74         if not self.files_available:
     75             return None
     76         if not self.is_configured_for_upload:
     77             return None
     78         return JobUploadCriteria.WHEN_FINISHED
     79 
     80     @property
     81     def is_configured_for_upload(self) -> bool:
     82         if not self.channel_id:
     83             return False
     84         config = config_ctx.get()
     85         channel_config = config.get_channel_config_by_id(self.channel_id)
     86         if not channel_config:
     87             return False
     88         return True
     89 
     90     @property
     91     def files_available(self) -> bool:
     92         return bool(self.output_pathobjs) and all(p.exists() for p in self.output_pathobjs)
     93 
     94     @property
     95     def output_pathobjs(self) -> set[pathlib.Path]:
     96         return set(map(pathlib.Path, self.output_paths or []))
     97 
     98 
     99 def _extract_name_from_stream_title(title: str) -> str | None:
    100     """
    101     Attempts to extract a preset name from the stream title.
    102     """
    103     match = _pattern_bracketed_kw.search(title) or _pattern_angled_bracketed_kw.search(title)
    104     if match:
    105         return str(match["topic"]).strip()
    106     return None
    107 
    108 
    109 class JobConfig(msgspec.Struct):
    110     name: str
    111     upload: JobUploadCriteria
    112     author_note: str | None = None
    113 
    114     @classmethod
    115     def from_moombox_job(cls, job: MoomboxJobInfo) -> Self:
    116         # generate a default config from the job
    117         name = "Unspecified Stream"
    118         upload = JobUploadCriteria.MANUALLY
    119 
    120         if not job.title:
    121             return cls(name, upload)
    122 
    123         name = _extract_name_from_stream_title(job.title) or name
    124 
    125         # mapping of phrases and any possible synonyms
    126         # if a synonym isn't present then an empty set should be present
    127         phrases = {
    128             "unarchived": {"no archive", "unarchive"},
    129             "karaoke": {"rock n' rawr", "歌枠", "sing"},
    130             "rebroadcast": {"kara-rewind"},
    131         }
    132 
    133         phrase_set = set(phrases.keys()) | set(itertools.chain.from_iterable(phrases.values()))
    134         # not sure why mypy complains about job.title being possibly None here if left bare...
    135         matched_phrases = set(
    136             filter(lambda term: term in (job.title.lower() if job.title else ""), phrase_set)
    137         )
    138 
    139         # dedupe synonymous terms
    140         for phrase, syn_phrases in phrases.items():
    141             if syn_phrases and matched_phrases & syn_phrases:
    142                 matched_phrases = (matched_phrases - syn_phrases) | {phrase}
    143 
    144         if matched_phrases == {"unarchived"}:
    145             name = "Unarchived Stream"
    146             upload = JobUploadCriteria.WHEN_FINISHED
    147         elif matched_phrases == {"karaoke"}:
    148             name = "Karaoke"
    149             upload = JobUploadCriteria.IF_CUT
    150         elif matched_phrases == {"unarchived", "karaoke"}:
    151             name = "Unarchived Karaoke"
    152             upload = JobUploadCriteria.WHEN_FINISHED
    153         elif matched_phrases == {"unarchived", "karaoke", "rebroadcast"}:
    154             name = "Unarchived Karaoke (rebroadcast)"
    155             upload = JobUploadCriteria.MANUALLY
    156 
    157         # cannot auto-upload if no channel configuration exists
    158         if not job.is_configured_for_upload:
    159             upload = JobUploadCriteria.MANUALLY
    160 
    161         return cls(name, upload)
    162 
    163 
    164 def stream_display_date(dt: datetime.datetime) -> datetime.date:
    165     """
    166     Given the stream start time, returns a display date.
    167 
    168     If the stream start time is within the last 10 minutes of the day, we treat the next day
    169     as the stream date instead based on the assumption that the streamer has a waiting screen
    170     set up.
    171     """
    172     end_of_day = (23, 50)
    173     return dt.date() + datetime.timedelta(days=1 if (dt.hour, dt.minute) >= end_of_day else 0)
    174 
    175 
    176 async def get_moombox_job_by_id(jobid: str) -> MoomboxJobInfo | None:
    177     config = config_ctx.get()
    178     async with httpx.AsyncClient() as client:
    179         for attempt in stamina.retry_context(
    180             on=httpx.HTTPError, attempts=None, timeout=None, wait_initial=0.5, wait_max=10.0
    181         ):
    182             with attempt:
    183                 result = await client.get(f"{config.moombox_url}/status")
    184         for job in result.json():
    185             if job["id"] == jobid:
    186                 return msgspec.convert(job, type=MoomboxJobInfo)
    187         return None
    188 
    189 
    190 async def gofile_upload_and_persist(
    191     cursor, upload_file: pathlib.Path, job_id: str, token: str, folder_id: str
    192 ):
    193     with upload_file.open("rb") as upload_filereader:
    194         await gofile.api.upload_single(upload_filereader, token, folder_id)
    195         database = database_ctx.get()
    196         cursor = database.cursor()
    197         cursor.execute(
    198             "INSERT INTO gofile_files VALUES (?, ?)",
    199             (job_id, str(upload_file)),
    200         )
    201         database.commit()
    202 
    203 
    204 async def do_gofile_upload(job: MoomboxJobInfo):
    205     """
    206     Prepares a guest upload for the specific job's output files, then returns the folder URL
    207     while the uploads are running in the background.
    208 
    209     This operation is idempotent; if it is interrupted, running this function will only upload
    210     files that weren't already.
    211     """
    212     database = database_ctx.get()
    213 
    214     known_files = set()
    215     for known_file, *_ in database.execute(
    216         "SELECT filepath FROM gofile_files WHERE job_id = ?", (job.id,)
    217     ):
    218         known_files.add(pathlib.Path(known_file))
    219 
    220     pending_files = set(map(pathlib.Path, job.output_paths or [])) - known_files
    221 
    222     token = None
    223     folder_id = None
    224     gofile_url = None
    225 
    226     cursor = database.cursor()
    227     cursor.execute(
    228         "SELECT token, folder_id, url FROM gofile_tokens WHERE job_id = ?", (job.id,)
    229     )
    230     job_token_info = cursor.fetchone()
    231     if job_token_info:
    232         token, folder_id, gofile_url = job_token_info
    233     else:
    234         # get the smallest file to quickly get the upload result
    235         pending_file = min(pending_files, key=lambda p: p.stat().st_size)
    236         if pending_file in upload_tasks:
    237             # another task is uploading the same file; just block until that one is available
    238             upload = await upload_tasks[pending_file]
    239             token, folder_id, gofile_url = (
    240                 upload.result.guest_token,
    241                 upload.result.parent_folder,
    242                 upload.result.download_page,
    243             )
    244         else:
    245             with pending_file.open("rb") as fo:
    246                 upload_tasks[pending_file] = gofile.api.upload_single(fo)
    247                 upload = await upload_tasks[pending_file]
    248                 token, folder_id, gofile_url = (
    249                     upload.result.guest_token,
    250                     upload.result.parent_folder,
    251                     upload.result.download_page,
    252                 )
    253             cursor.execute(
    254                 "INSERT INTO gofile_tokens VALUES (?, ?, ?, ?)",
    255                 (job.id, token, folder_id, gofile_url),
    256             )
    257             cursor.execute(
    258                 "INSERT INTO gofile_files VALUES (?, ?)",
    259                 (job.id, str(pending_file)),
    260             )
    261             database.commit()
    262         pending_files.discard(pending_file)
    263 
    264     # upload any remaining files in the background
    265     for pending_file in pending_files:
    266         # don't double-dip and upload an in-progress file
    267         if pending_file in upload_tasks:
    268             continue
    269         upload_tasks[pending_file] = asyncio.create_task(
    270             gofile_upload_and_persist(cursor, pending_file, job.id, token, folder_id)
    271         )
    272     return gofile_url
    273 
    274 
    275 async def do_webdav_upload(webdav: WebDavConfig, filepath: pathlib.Path, target: str):
    276     auth = httpx.BasicAuth(username=webdav.username, password=webdav.password)
    277     async with httpx.AsyncClient(auth=auth) as client:
    278         dest = f"{webdav.base_url}/{target}"
    279         file_check = await client.head(dest)
    280         if file_check.status_code == httpx.codes.NOT_FOUND:
    281             with filepath.open("rb") as fh:
    282                 await client.put(dest, content=fh.read())
    283 
    284 
    285 async def _process_job(jobid):
    286     job = await get_moombox_job_by_id(jobid)
    287     if not job:
    288         return "Couldn't find matching job", 404
    289 
    290     # TODO fill in necessary fields in template, etc.
    291     job_config = _get_saved_jobconf(job.id) or JobConfig.from_moombox_job(job)
    292 
    293     config = config_ctx.get()
    294 
    295     channel = config.get_channel_config_by_id(job.channel_id)
    296     stream_time = job.scheduled_start_datetime or datetime.datetime.now(tz=datetime.UTC)
    297 
    298     torrent_output_dir = pathlib.Path("torrents")
    299     torrent_output_dir.mkdir(exist_ok=True)
    300 
    301     display_date = stream_display_date(stream_time)
    302     display_date_str = display_date.strftime("%Y%m%d")
    303     torrent_name = f"[{display_date_str}] {job_config.name} ({job.video_id})"
    304 
    305     folder_name = None
    306     if stream_time:
    307         folder_name = stream_time.strftime("%Y-%m-%dT%H:%MZ")
    308 
    309     readme_finalized = False
    310     render_kwargs = {}
    311     if job.output_paths:
    312         torrent_file = torrent_output_dir / f"{job.id} ({job.video_id}).torrent"
    313         source_files = list(map(pathlib.Path, job.output_paths))
    314 
    315         save_path = str(source_files[0].parent)
    316 
    317         if not torrent_file.exists():
    318             t = torf.Torrent(
    319                 trackers=config.torrent.trackers,
    320             )
    321 
    322             # TODO extract key phrases from title / description
    323 
    324             t.filepaths = source_files
    325             # note that the name also sets the subdirectory for the files
    326             # as such, using other names for the same files *will* change the infohash
    327             t.name = torrent_name
    328             success = await asyncio.to_thread(t.generate)
    329             if not success:
    330                 raise RuntimeError("Failed to generate torrent")
    331             t.write(torrent_file)
    332             t.write(torrent_output_dir / f"{torrent_name}.torrent")
    333 
    334             # send torrent to qbittorrent for seeding
    335             if config.qbittorrent:
    336                 if config.qbittorrent.default_save_path:
    337                     save_path = config.qbittorrent.default_save_path
    338                 qbt_client = qbittorrentapi.Client(
    339                     host=config.qbittorrent.host,
    340                     username=config.qbittorrent.username,
    341                     password=config.qbittorrent.password,
    342                     VERIFY_WEBUI_CERTIFICATE=False,
    343                     RAISE_NOTIMPLEMENTEDERROR_FOR_UNIMPLEMENTED_API_ENDPOINTS=True,
    344                 )
    345                 task = asyncio.create_task(
    346                     asyncio.to_thread(
    347                         qbt_client.torrents_add,
    348                         torrent_files=torrent_file,
    349                         save_path=save_path,
    350                         content_layout="NoSubfolder",
    351                         is_paused=config.qbittorrent.start_paused,
    352                     )
    353                 )
    354                 background_tasks.add(task)
    355                 task.add_done_callback(background_tasks.discard)
    356         else:
    357             t = torf.Torrent.read(torrent_file)
    358         print(t)
    359         print(t.magnet(size=False))
    360         print(t.files)
    361 
    362         # punt file to webdav remote
    363         if config.webdav and channel and channel.webdav_path:
    364             target_base = f"{channel.webdav_path}/{folder_name}"
    365             task = asyncio.create_task(
    366                 do_webdav_upload(
    367                     config.webdav, torrent_file, f"{target_base}/{torrent_name}.torrent"
    368                 )
    369             )
    370             background_tasks.add(task)
    371             task.add_done_callback(background_tasks.discard)
    372 
    373         render_kwargs["magnet_url"] = t.magnet(size=False)
    374 
    375         # punt files to gofile
    376         render_kwargs["gofile_url"] = await do_gofile_upload(job)
    377         readme_finalized = True
    378     rendered_job = await microdot.jinja.Template("job.md").render_async(
    379         job=job,
    380         author_override=channel.name if channel else None,
    381         stream_time=stream_time.strftime("%Y-%m-%dT%H:%M:%SZ") if stream_time else "(unknown)",
    382         config=job_config,
    383         **render_kwargs,
    384     )
    385     if readme_finalized and config.webdav and channel and channel.webdav_path:
    386         rendered_job_file = torrent_output_dir / f"{job.id} ({job.video_id}).md"
    387         rendered_job_file.write_text(rendered_job)
    388         target_base = f"{channel.webdav_path}/{folder_name}"
    389         task = asyncio.create_task(
    390             do_webdav_upload(config.webdav, rendered_job_file, f"{target_base}/REPORT.md")
    391         )
    392         background_tasks.add(task)
    393         task.add_done_callback(background_tasks.discard)
    394 
    395         database = database_ctx.get()
    396         cursor = database.cursor()
    397         cursor.execute(
    398             "INSERT INTO job_status VALUES (?, ?) ON CONFLICT(job_id) DO UPDATE SET completed=excluded.completed",
    399             (jobid, True),
    400         )
    401         database.commit()
    402     if not readme_finalized and jobid in render_tasks:
    403         # allow rerun if not finalized
    404         del render_tasks[jobid]
    405     return rendered_job
    406 
    407 
    408 def get_process_job_task(jobid: str) -> asyncio.Task:
    409     """
    410     Creates or retrieves a singleton job rendering task.
    411     This is decoupled from the request callback to allow cancellations and multiple consumers.
    412     """
    413     if jobid in render_tasks and render_tasks[jobid].done() and render_tasks[jobid].exception():
    414         del render_tasks[jobid]
    415     if jobid not in render_tasks:
    416         render_tasks[jobid] = asyncio.create_task(_process_job(jobid))
    417     return render_tasks[jobid]
    418 
    419 
    420 def _get_saved_jobconf(jobid: str) -> JobConfig | None:
    421     database = database_ctx.get()
    422     cursor = database.cursor()
    423     cursor.execute("SELECT config FROM job_config WHERE job_id = ?", (jobid,))
    424     result = cursor.fetchone()
    425     if result:
    426         job_config_payload, *_ = result
    427         return msgspec.json.decode(job_config_payload, type=JobConfig)
    428     return None
    429 
    430 
    431 @app.get("/<jobid>/config")
    432 async def show_job_editor(request, jobid: str):
    433     job = await get_moombox_job_by_id(jobid)
    434     if not job:
    435         return "Couldn't find matching job", 404
    436 
    437     job_config = _get_saved_jobconf(job.id) or JobConfig.from_moombox_job(job)
    438 
    439     return (
    440         await microdot.jinja.Template("edit.html").render_async(job=job, config=job_config),
    441         {
    442             "Content-Type": "text/html; charset=utf-8",
    443         },
    444     )
    445 
    446 
    447 @app.post("/<jobid>/config")
    448 async def apply_job_config(request, jobid: str):
    449     job = await get_moombox_job_by_id(jobid)
    450     if not job:
    451         return "Couldn't find matching job", 404
    452     jobconf = msgspec.convert(
    453         {item: request.form[item] for item in request.form}, type=JobConfig
    454     )
    455     database = database_ctx.get()
    456 
    457     cursor = database.cursor()
    458     cursor.execute(
    459         "INSERT INTO job_config VALUES (?, ?) ON CONFLICT(job_id) DO UPDATE SET config=excluded.config",
    460         (jobid, msgspec.json.encode(jobconf)),
    461     )
    462     database.commit()
    463 
    464     return (
    465         await microdot.jinja.Template("edit.html").render_async(job=job, config=jobconf),
    466         {
    467             "Content-Type": "text/html; charset=utf-8",
    468         },
    469     )
    470 
    471 
    472 @app.get("/<jobid>")
    473 async def show_job(request, jobid: str):
    474     return await get_process_job_task(jobid)
    475 
    476 
    477 @app.post("/<jobid>")
    478 async def upload_job(request, jobid: str):
    479     try:
    480         await get_process_job_task(jobid)
    481         return await microdot.jinja.Template("job/upload_success.html").render_async()
    482     except torf.TorfError:
    483         traceback.print_exc()
    484         return await microdot.jinja.Template("job/upload_error.html").render_async()
    485 
    486 
    487 async def job_auto_monitor():
    488     while True:
    489         config = config_ctx.get()
    490 
    491         async with httpx.AsyncClient() as client:
    492             for attempt in stamina.retry_context(
    493                 on=httpx.HTTPError, attempts=None, timeout=None, wait_initial=0.5, wait_max=10.0
    494             ):
    495                 with attempt:
    496                     result = await client.get(f"{config.moombox_url}/status")
    497 
    498         jobs = []
    499         for serialized_job in result.json():
    500             try:
    501                 jobs.append(msgspec.convert(serialized_job, type=MoomboxJobInfo))
    502             except msgspec.ValidationError:
    503                 pass
    504 
    505         database = database_ctx.get()
    506         cursor = database.cursor()
    507         configs = {
    508             job_id: msgspec.json.decode(job_conf, type=JobConfig)
    509             for job_id, job_conf in cursor.execute("SELECT job_id, config FROM job_config")
    510         }
    511         statuses = {
    512             job_id: bool(completed)
    513             for job_id, completed in cursor.execute("SELECT job_id, completed FROM job_status")
    514         }
    515 
    516         for job in reversed(jobs):
    517             if not job.uploadability_state:
    518                 continue
    519             job_conf = configs.get(job.id)
    520             if not job_conf:
    521                 job_conf = JobConfig.from_moombox_job(job)
    522             if job.uploadability_state != job_conf.upload:
    523                 continue
    524             if not config.autoupload.active:
    525                 print(f"'{job.title}' meets conditions for upload")
    526                 continue
    527             if statuses.get(job.id):
    528                 continue
    529             print(f"'{job.title}' is scheduled for upload")
    530             get_process_job_task(job.id)
    531 
    532         await asyncio.sleep(120)