autotako

Service to monitor moombox for completed livestream downloads to upload for distribution
git clone https://code.alwayswait.ing/autotako
Log | Files | Refs | README

job_render.py (19536B)


      1 #!/usr/bin/python3
      2 
      3 import asyncio
      4 import datetime
      5 import enum
      6 import itertools
      7 import logging
      8 import pathlib
      9 import re
     10 from typing import Self
     11 
     12 import gofile.api  # type: ignore
     13 import httpx
     14 import microdot  # type: ignore
     15 import msgspec
     16 import qbittorrentapi  # type: ignore
     17 import stamina
     18 import torf  # type: ignore
     19 
     20 from .config import WebDavConfig, config_ctx
     21 from .database import database_ctx
     22 
     23 app = microdot.Microdot()
     24 
     25 log = logging.getLogger(__name__)
     26 
     27 background_tasks = set()
     28 render_tasks: dict[str, asyncio.Task] = {}
     29 upload_tasks: dict[pathlib.Path, asyncio.Task] = {}
     30 
     31 # https://www.japanesewithanime.com/2017/05/quotation-marks-japanese.html#sumitsukikakko
     32 # strip extraneous spaces and hashtag
     33 _pattern_bracketed_kw = re.compile(r"^【[\s#]*(?P<topic>.*?)[\s]?】")
     34 
     35 # bae prefers the double angle brackets because she's quirky like that
     36 _pattern_angled_bracketed_kw = re.compile(r"^≪[\s#]*(?P<topic>.*?)[\s]?≫")
     37 
     38 
     39 class JobUploadCriteria(enum.StrEnum):
     40     MANUALLY = "manual"
     41     WHEN_FINISHED = "finished"
     42     IF_PRIVATE = "private"
     43     IF_CUT = "cut"
     44 
     45     @property
     46     def automatic(self) -> bool:
     47         return self != JobUploadCriteria.MANUALLY
     48 
     49 
     50 class MoomboxDownloadStatus(enum.StrEnum):
     51     UNKNOWN = "Unknown"
     52     UNAVAILABLE = "Unavailable"
     53     WAITING = "Waiting"
     54     DOWNLOADING = "Downloading"
     55     MUXING = "Muxing"
     56     FINISHED = "Finished"
     57     ERROR = "Error"
     58 
     59 
     60 class MoomboxJobInfo(msgspec.Struct, kw_only=True, frozen=True):
     61     id: str
     62     title: str | None
     63     author: str | None
     64     channel_id: str | None
     65     video_id: str | None
     66     manifest_progress: dict
     67     status: MoomboxDownloadStatus = MoomboxDownloadStatus.UNKNOWN
     68     output_paths: set[str] | None = msgspec.field(default_factory=set)
     69     scheduled_start_datetime: datetime.datetime | None = None
     70 
     71     @property
     72     def uploadability_state(self) -> JobUploadCriteria | None:
     73         # returns a JobUploadCriteria that can be tested for equality to run
     74         if self.status != MoomboxDownloadStatus.FINISHED:
     75             return None
     76         if not self.files_available:
     77             return None
     78         if not self.is_configured_for_upload:
     79             return None
     80         return JobUploadCriteria.WHEN_FINISHED
     81 
     82     @property
     83     def is_configured_for_upload(self) -> bool:
     84         if not self.channel_id:
     85             return False
     86         config = config_ctx.get()
     87         channel_config = config.get_channel_config_by_id(self.channel_id)
     88         if not channel_config:
     89             return False
     90         return True
     91 
     92     @property
     93     def files_available(self) -> bool:
     94         return bool(self.output_pathobjs) and all(p.exists() for p in self.output_pathobjs)
     95 
     96     @property
     97     def output_pathobjs(self) -> set[pathlib.Path]:
     98         return set(map(pathlib.Path, self.output_paths or []))
     99 
    100 
    101 def _extract_name_from_stream_title(title: str) -> str | None:
    102     """
    103     Attempts to extract a preset name from the stream title.
    104     """
    105     match = _pattern_bracketed_kw.search(title) or _pattern_angled_bracketed_kw.search(title)
    106     if match:
    107         return str(match["topic"]).strip()
    108     return None
    109 
    110 
    111 class JobConfig(msgspec.Struct):
    112     name: str
    113     upload: JobUploadCriteria
    114     author_note: str | None = None
    115 
    116     @classmethod
    117     def from_moombox_job(cls, job: MoomboxJobInfo) -> Self:
    118         # generate a default config from the job
    119         name = "Unspecified Stream"
    120         upload = JobUploadCriteria.MANUALLY
    121 
    122         if not job.title:
    123             return cls(name, upload)
    124 
    125         name = _extract_name_from_stream_title(job.title) or name
    126 
    127         # mapping of phrases and any possible synonyms
    128         # if a synonym isn't present then an empty set should be present
    129         phrases = {
    130             "unarchived": {"no archive", "unarchive"},
    131             "karaoke": {"rock n' rawr", "歌枠", "sing"},
    132             "rebroadcast": {"kara-rewind"},
    133         }
    134 
    135         phrase_set = set(phrases.keys()) | set(itertools.chain.from_iterable(phrases.values()))
    136         # not sure why mypy complains about job.title being possibly None here if left bare...
    137         matched_phrases = set(
    138             filter(lambda term: term in (job.title.lower() if job.title else ""), phrase_set)
    139         )
    140 
    141         # dedupe synonymous terms
    142         for phrase, syn_phrases in phrases.items():
    143             if syn_phrases and matched_phrases & syn_phrases:
    144                 matched_phrases = (matched_phrases - syn_phrases) | {phrase}
    145 
    146         if matched_phrases == {"unarchived"}:
    147             name = "Unarchived Stream"
    148             upload = JobUploadCriteria.WHEN_FINISHED
    149         elif matched_phrases == {"karaoke"}:
    150             name = "Karaoke"
    151             upload = JobUploadCriteria.IF_CUT
    152         elif matched_phrases == {"unarchived", "karaoke"}:
    153             name = "Unarchived Karaoke"
    154             upload = JobUploadCriteria.WHEN_FINISHED
    155         elif matched_phrases == {"unarchived", "karaoke", "rebroadcast"}:
    156             name = "Unarchived Karaoke (rebroadcast)"
    157             upload = JobUploadCriteria.MANUALLY
    158 
    159         # cannot auto-upload if no channel configuration exists
    160         if not job.is_configured_for_upload:
    161             upload = JobUploadCriteria.MANUALLY
    162 
    163         return cls(name, upload)
    164 
    165 
    166 def stream_display_date(dt: datetime.datetime) -> datetime.date:
    167     """
    168     Given the stream start time, returns a display date.
    169 
    170     If the stream start time is within the last 10 minutes of the day, we treat the next day
    171     as the stream date instead based on the assumption that the streamer has a waiting screen
    172     set up.
    173     """
    174     end_of_day = (23, 50)
    175     return dt.date() + datetime.timedelta(days=1 if (dt.hour, dt.minute) >= end_of_day else 0)
    176 
    177 
    178 async def get_moombox_job_by_id(jobid: str) -> MoomboxJobInfo | None:
    179     config = config_ctx.get()
    180     async with httpx.AsyncClient() as client:
    181         for attempt in stamina.retry_context(
    182             on=httpx.HTTPError, attempts=None, timeout=None, wait_initial=0.5, wait_max=10.0
    183         ):
    184             with attempt:
    185                 result = await client.get(f"{config.moombox_url}/status")
    186         for job in result.json():
    187             if job["id"] == jobid:
    188                 return msgspec.convert(job, type=MoomboxJobInfo)
    189         return None
    190 
    191 
    192 async def gofile_upload_and_persist(
    193     cursor, upload_file: pathlib.Path, job_id: str, token: str, folder_id: str
    194 ):
    195     with upload_file.open("rb") as upload_filereader:
    196         await gofile.api.upload_single(upload_filereader, token, folder_id)
    197         database = database_ctx.get()
    198         cursor = database.cursor()
    199         cursor.execute(
    200             "INSERT INTO gofile_files VALUES (?, ?)",
    201             (job_id, str(upload_file)),
    202         )
    203         database.commit()
    204 
    205 
    206 async def do_gofile_upload(job: MoomboxJobInfo):
    207     """
    208     Prepares a guest upload for the specific job's output files, then returns the folder URL
    209     while the uploads are running in the background.
    210 
    211     This operation is idempotent; if it is interrupted, running this function will only upload
    212     files that weren't already.
    213     """
    214     database = database_ctx.get()
    215 
    216     known_files = set()
    217     for known_file, *_ in database.execute(
    218         "SELECT filepath FROM gofile_files WHERE job_id = ?", (job.id,)
    219     ):
    220         known_files.add(pathlib.Path(known_file))
    221 
    222     pending_files = set(map(pathlib.Path, job.output_paths or [])) - known_files
    223 
    224     token = None
    225     folder_id = None
    226     gofile_url = None
    227 
    228     cursor = database.cursor()
    229     cursor.execute(
    230         "SELECT token, folder_id, url FROM gofile_tokens WHERE job_id = ?", (job.id,)
    231     )
    232     job_token_info = cursor.fetchone()
    233     if job_token_info:
    234         token, folder_id, gofile_url = job_token_info
    235     else:
    236         # get the smallest file to quickly get the upload result
    237         pending_file = min(pending_files, key=lambda p: p.stat().st_size)
    238         if pending_file in upload_tasks:
    239             # another task is uploading the same file; just block until that one is available
    240             upload = await upload_tasks[pending_file]
    241             token, folder_id, gofile_url = (
    242                 upload.result.guest_token,
    243                 upload.result.parent_folder,
    244                 upload.result.download_page,
    245             )
    246         else:
    247             with pending_file.open("rb") as fo:
    248                 upload_tasks[pending_file] = gofile.api.upload_single(fo)
    249                 upload = await upload_tasks[pending_file]
    250                 token, folder_id, gofile_url = (
    251                     upload.result.guest_token,
    252                     upload.result.parent_folder,
    253                     upload.result.download_page,
    254                 )
    255             cursor.execute(
    256                 "INSERT INTO gofile_tokens VALUES (?, ?, ?, ?)",
    257                 (job.id, token, folder_id, gofile_url),
    258             )
    259             cursor.execute(
    260                 "INSERT INTO gofile_files VALUES (?, ?)",
    261                 (job.id, str(pending_file)),
    262             )
    263             database.commit()
    264         pending_files.discard(pending_file)
    265 
    266     # upload any remaining files in the background
    267     for pending_file in pending_files:
    268         # don't double-dip and upload an in-progress file
    269         if pending_file in upload_tasks:
    270             continue
    271         upload_tasks[pending_file] = asyncio.create_task(
    272             gofile_upload_and_persist(cursor, pending_file, job.id, token, folder_id)
    273         )
    274     return gofile_url
    275 
    276 
    277 async def do_webdav_upload(webdav: WebDavConfig, filepath: pathlib.Path, target: str):
    278     auth = httpx.BasicAuth(username=webdav.username, password=webdav.password)
    279     async with httpx.AsyncClient(auth=auth) as client:
    280         connection_warning_seen = False
    281         while True:
    282             try:
    283                 dest = f"{webdav.base_url}/{target}"
    284                 file_check = await client.head(dest)
    285                 if file_check.status_code == httpx.codes.NOT_FOUND:
    286                     with filepath.open("rb") as fh:
    287                         await client.put(dest, content=fh.read())
    288                 return
    289             except httpx.ConnectTimeout:
    290                 if not connection_warning_seen:
    291                     log.warning(f"Failed to connect to {webdav.base_url}.  Retrying...")
    292                     connection_warning_seen = True
    293                 await asyncio.sleep(10)
    294 
    295 
    296 async def _process_job(jobid):
    297     job = await get_moombox_job_by_id(jobid)
    298     if not job:
    299         return "Couldn't find matching job", 404
    300 
    301     # TODO fill in necessary fields in template, etc.
    302     job_config = _get_saved_jobconf(job.id) or JobConfig.from_moombox_job(job)
    303 
    304     config = config_ctx.get()
    305 
    306     channel = config.get_channel_config_by_id(job.channel_id)
    307     stream_time = job.scheduled_start_datetime or datetime.datetime.now(tz=datetime.UTC)
    308 
    309     torrent_output_dir = pathlib.Path("torrents")
    310     torrent_output_dir.mkdir(exist_ok=True)
    311 
    312     display_date = stream_display_date(stream_time)
    313     display_date_str = display_date.strftime("%Y%m%d")
    314     torrent_name = f"[{display_date_str}] {job_config.name} ({job.video_id})"
    315 
    316     folder_name = None
    317     if stream_time:
    318         folder_name = stream_time.strftime("%Y-%m-%dT%H:%MZ")
    319 
    320     readme_finalized = False
    321     render_kwargs = {}
    322     if job.output_paths:
    323         torrent_file = torrent_output_dir / f"{job.id} ({job.video_id}).torrent"
    324         source_files = list(map(pathlib.Path, job.output_paths))
    325 
    326         if not all(f.exists() for f in source_files):
    327             return
    328 
    329         save_path = str(source_files[0].parent)
    330 
    331         if not torrent_file.exists():
    332             t = torf.Torrent(
    333                 trackers=config.torrent.trackers,
    334             )
    335 
    336             # TODO extract key phrases from title / description
    337 
    338             t.filepaths = source_files
    339             # note that the name also sets the subdirectory for the files
    340             # as such, using other names for the same files *will* change the infohash
    341             t.name = torrent_name
    342             success = await asyncio.to_thread(t.generate)
    343             if not success:
    344                 raise RuntimeError("Failed to generate torrent")
    345             t.write(torrent_file)
    346             t.write(torrent_output_dir / f"{torrent_name}.torrent")
    347 
    348             # send torrent to qbittorrent for seeding
    349             if config.qbittorrent:
    350                 if config.qbittorrent.default_save_path:
    351                     save_path = config.qbittorrent.default_save_path
    352                 qbt_client = qbittorrentapi.Client(
    353                     host=config.qbittorrent.host,
    354                     username=config.qbittorrent.username,
    355                     password=config.qbittorrent.password,
    356                     VERIFY_WEBUI_CERTIFICATE=False,
    357                     RAISE_NOTIMPLEMENTEDERROR_FOR_UNIMPLEMENTED_API_ENDPOINTS=True,
    358                 )
    359                 task = asyncio.create_task(
    360                     asyncio.to_thread(
    361                         qbt_client.torrents_add,
    362                         torrent_files=torrent_file,
    363                         save_path=save_path,
    364                         content_layout="NoSubfolder",
    365                         is_paused=config.qbittorrent.start_paused,
    366                     )
    367                 )
    368                 background_tasks.add(task)
    369                 task.add_done_callback(background_tasks.discard)
    370         else:
    371             t = torf.Torrent.read(torrent_file)
    372         log.debug(t)
    373         log.debug(t.magnet(size=False))
    374         log.debug(t.files)
    375 
    376         # punt file to webdav remote
    377         if config.webdav and channel and channel.webdav_path:
    378             target_base = f"{channel.webdav_path}/{folder_name}"
    379             task = asyncio.create_task(
    380                 do_webdav_upload(
    381                     config.webdav, torrent_file, f"{target_base}/{torrent_name}.torrent"
    382                 )
    383             )
    384             background_tasks.add(task)
    385             task.add_done_callback(background_tasks.discard)
    386 
    387         render_kwargs["magnet_url"] = t.magnet(size=False)
    388 
    389         # punt files to gofile
    390         render_kwargs["gofile_url"] = await do_gofile_upload(job)
    391         readme_finalized = True
    392     rendered_job = await microdot.jinja.Template("job.md").render_async(
    393         job=job,
    394         author_override=channel.name if channel else None,
    395         stream_time=stream_time.strftime("%Y-%m-%dT%H:%M:%SZ") if stream_time else "(unknown)",
    396         config=job_config,
    397         **render_kwargs,
    398     )
    399     if readme_finalized and config.webdav and channel and channel.webdav_path:
    400         rendered_job_file = torrent_output_dir / f"{job.id} ({job.video_id}).md"
    401         rendered_job_file.write_text(rendered_job)
    402         target_base = f"{channel.webdav_path}/{folder_name}"
    403         task = asyncio.create_task(
    404             do_webdav_upload(config.webdav, rendered_job_file, f"{target_base}/REPORT.md")
    405         )
    406         background_tasks.add(task)
    407         task.add_done_callback(background_tasks.discard)
    408 
    409         database = database_ctx.get()
    410         cursor = database.cursor()
    411         cursor.execute(
    412             "INSERT INTO job_status VALUES (?, ?) ON CONFLICT(job_id) DO UPDATE SET completed=excluded.completed",
    413             (jobid, True),
    414         )
    415         database.commit()
    416     if not readme_finalized and jobid in render_tasks:
    417         # allow rerun if not finalized
    418         del render_tasks[jobid]
    419     return rendered_job
    420 
    421 
    422 def get_process_job_task(jobid: str) -> asyncio.Task:
    423     """
    424     Creates or retrieves a singleton job rendering task.
    425     This is decoupled from the request callback to allow cancellations and multiple consumers.
    426     """
    427     if jobid in render_tasks and render_tasks[jobid].done() and render_tasks[jobid].exception():
    428         del render_tasks[jobid]
    429     if jobid not in render_tasks:
    430         render_tasks[jobid] = asyncio.create_task(_process_job(jobid))
    431     return render_tasks[jobid]
    432 
    433 
    434 def _get_saved_jobconf(jobid: str) -> JobConfig | None:
    435     database = database_ctx.get()
    436     cursor = database.cursor()
    437     cursor.execute("SELECT config FROM job_config WHERE job_id = ?", (jobid,))
    438     result = cursor.fetchone()
    439     if result:
    440         job_config_payload, *_ = result
    441         return msgspec.json.decode(job_config_payload, type=JobConfig)
    442     return None
    443 
    444 
    445 @app.get("/<jobid>/config")
    446 async def show_job_editor(request, jobid: str):
    447     job = await get_moombox_job_by_id(jobid)
    448     if not job:
    449         return "Couldn't find matching job", 404
    450 
    451     job_config = _get_saved_jobconf(job.id) or JobConfig.from_moombox_job(job)
    452 
    453     return (
    454         await microdot.jinja.Template("edit.html").render_async(job=job, config=job_config),
    455         {
    456             "Content-Type": "text/html; charset=utf-8",
    457         },
    458     )
    459 
    460 
    461 @app.post("/<jobid>/config")
    462 async def apply_job_config(request, jobid: str):
    463     job = await get_moombox_job_by_id(jobid)
    464     if not job:
    465         return "Couldn't find matching job", 404
    466     jobconf = msgspec.convert(
    467         {item: request.form[item] for item in request.form}, type=JobConfig
    468     )
    469     database = database_ctx.get()
    470 
    471     cursor = database.cursor()
    472     cursor.execute(
    473         "INSERT INTO job_config VALUES (?, ?) ON CONFLICT(job_id) DO UPDATE SET config=excluded.config",
    474         (jobid, msgspec.json.encode(jobconf)),
    475     )
    476     database.commit()
    477 
    478     return (
    479         await microdot.jinja.Template("edit.html").render_async(job=job, config=jobconf),
    480         {
    481             "Content-Type": "text/html; charset=utf-8",
    482         },
    483     )
    484 
    485 
    486 @app.get("/<jobid>")
    487 async def show_job(request, jobid: str):
    488     return await get_process_job_task(jobid)
    489 
    490 
    491 @app.post("/<jobid>")
    492 async def upload_job(request, jobid: str):
    493     try:
    494         await get_process_job_task(jobid)
    495         return await microdot.jinja.Template("job/upload_success.html").render_async()
    496     except torf.TorfError:
    497         log.exception(f"Failed to upload job {jobid}")
    498         return await microdot.jinja.Template("job/upload_error.html").render_async()
    499 
    500 
    501 async def job_auto_monitor():
    502     while True:
    503         config = config_ctx.get()
    504 
    505         async with httpx.AsyncClient() as client:
    506             for attempt in stamina.retry_context(
    507                 on=httpx.HTTPError, attempts=None, timeout=None, wait_initial=0.5, wait_max=10.0
    508             ):
    509                 with attempt:
    510                     result = await client.get(f"{config.moombox_url}/status")
    511 
    512         jobs = []
    513         for serialized_job in result.json():
    514             try:
    515                 jobs.append(msgspec.convert(serialized_job, type=MoomboxJobInfo))
    516             except msgspec.ValidationError:
    517                 pass
    518 
    519         database = database_ctx.get()
    520         cursor = database.cursor()
    521         configs = {
    522             job_id: msgspec.json.decode(job_conf, type=JobConfig)
    523             for job_id, job_conf in cursor.execute("SELECT job_id, config FROM job_config")
    524         }
    525         statuses = {
    526             job_id: bool(completed)
    527             for job_id, completed in cursor.execute("SELECT job_id, completed FROM job_status")
    528         }
    529 
    530         for job in reversed(jobs):
    531             if not job.uploadability_state:
    532                 continue
    533             job_conf = configs.get(job.id)
    534             if not job_conf:
    535                 job_conf = JobConfig.from_moombox_job(job)
    536             if job.uploadability_state != job_conf.upload:
    537                 continue
    538             if not config.autoupload.active:
    539                 log.info(f"'{job.title}' meets conditions for upload")
    540                 continue
    541             if statuses.get(job.id):
    542                 continue
    543             log.info(f"'{job.title}' is scheduled for upload")
    544             get_process_job_task(job.id)
    545 
    546         await asyncio.sleep(120)