autotako

Service to monitor moombox for completed livestream downloads to upload for distribution
git clone https://code.alwayswait.ing/autotako
Log | Files | Refs | README

job_render.py (20634B)


      1 #!/usr/bin/python3
      2 
      3 import asyncio
      4 import datetime
      5 import enum
      6 import itertools
      7 import json
      8 import logging
      9 import pathlib
     10 import re
     11 import urllib.parse
     12 from typing import Self
     13 
     14 import gofile.api  # type: ignore
     15 import httpx
     16 import microdot  # type: ignore
     17 import msgspec
     18 import stamina
     19 import torf  # type: ignore
     20 
     21 from .config import QBittorrentConfig, WebDavConfig, config_ctx
     22 from .database import database_ctx
     23 
     24 app = microdot.Microdot()
     25 
     26 log = logging.getLogger(__name__)
     27 
     28 background_tasks = set()
     29 render_tasks: dict[str, asyncio.Task] = {}
     30 upload_tasks: dict[pathlib.Path, asyncio.Task] = {}
     31 
     32 # https://www.japanesewithanime.com/2017/05/quotation-marks-japanese.html#sumitsukikakko
     33 # strip extraneous spaces and hashtag
     34 _pattern_bracketed_kw = re.compile(r"^【[\s#]*(?P<topic>.*?)[\s]?】")
     35 
     36 # bae prefers the double angle brackets because she's quirky like that
     37 _pattern_angled_bracketed_kw = re.compile(r"^≪[\s#]*(?P<topic>.*?)[\s]?≫")
     38 
     39 
     40 class JobUploadCriteria(enum.StrEnum):
     41     MANUALLY = "manual"
     42     WHEN_FINISHED = "finished"
     43     IF_PRIVATE = "private"
     44     IF_CUT = "cut"
     45 
     46     @property
     47     def automatic(self) -> bool:
     48         return self != JobUploadCriteria.MANUALLY
     49 
     50 
     51 class MoomboxDownloadStatus(enum.StrEnum):
     52     UNKNOWN = "Unknown"
     53     UNAVAILABLE = "Unavailable"
     54     WAITING = "Waiting"
     55     DOWNLOADING = "Downloading"
     56     MUXING = "Muxing"
     57     FINISHED = "Finished"
     58     ERROR = "Error"
     59 
     60 
     61 class MoomboxJobInfo(msgspec.Struct, kw_only=True, frozen=True):
     62     id: str
     63     title: str | None
     64     author: str | None
     65     channel_id: str | None
     66     video_id: str | None
     67     manifest_progress: dict
     68     status: MoomboxDownloadStatus = MoomboxDownloadStatus.UNKNOWN
     69     output_paths: set[str] | None = msgspec.field(default_factory=set)
     70     scheduled_start_datetime: datetime.datetime | None = None
     71     download_finish_datetime: datetime.datetime | None = None
     72 
     73     @property
     74     def uploadability_state(self) -> JobUploadCriteria | None:
     75         # returns a JobUploadCriteria that can be tested for equality to run
     76         if self.status != MoomboxDownloadStatus.FINISHED:
     77             return None
     78         if not self.files_available:
     79             return None
     80         if not self.is_configured_for_upload:
     81             return None
     82         return JobUploadCriteria.WHEN_FINISHED
     83 
     84     @property
     85     def is_configured_for_upload(self) -> bool:
     86         if not self.channel_id:
     87             return False
     88         config = config_ctx.get()
     89         channel_config = config.get_channel_config_by_id(self.channel_id)
     90         if not channel_config:
     91             return False
     92         return True
     93 
     94     @property
     95     def files_available(self) -> bool:
     96         return bool(self.output_pathobjs) and all(p.exists() for p in self.output_pathobjs)
     97 
     98     @property
     99     def output_pathobjs(self) -> set[pathlib.Path]:
    100         return set(map(pathlib.Path, self.output_paths or []))
    101 
    102     @property
    103     def last_activity_datetime(self) -> datetime.datetime:
    104         # this does not need to be very accurate
    105         return (
    106             self.download_finish_datetime
    107             or self.scheduled_start_datetime
    108             or datetime.datetime.fromtimestamp(0, tz=datetime.UTC)
    109         )
    110 
    111 
    112 def _extract_name_from_stream_title(title: str) -> str | None:
    113     """
    114     Attempts to extract a preset name from the stream title.
    115     """
    116     match = _pattern_bracketed_kw.search(title) or _pattern_angled_bracketed_kw.search(title)
    117     if match:
    118         return str(match["topic"]).strip()
    119     return None
    120 
    121 
    122 class JobConfig(msgspec.Struct):
    123     name: str
    124     upload: JobUploadCriteria
    125     author_note: str | None = None
    126 
    127     @classmethod
    128     def from_moombox_job(cls, job: MoomboxJobInfo) -> Self:
    129         # generate a default config from the job
    130         name = "Unspecified Stream"
    131         upload = JobUploadCriteria.MANUALLY
    132 
    133         if not job.title:
    134             return cls(name, upload)
    135 
    136         name = _extract_name_from_stream_title(job.title) or name
    137 
    138         # mapping of phrases and any possible synonyms
    139         # if a synonym isn't present then an empty set should be present
    140         phrases = {
    141             "unarchived": {"no archive", "unarchive"},
    142             "karaoke": {"rock n' rawr", "歌枠", "sing"},
    143             "rebroadcast": {"kara-rewind"},
    144         }
    145 
    146         phrase_set = set(phrases.keys()) | set(itertools.chain.from_iterable(phrases.values()))
    147         # not sure why mypy complains about job.title being possibly None here if left bare...
    148         matched_phrases = set(
    149             filter(lambda term: term in (job.title.lower() if job.title else ""), phrase_set)
    150         )
    151 
    152         # dedupe synonymous terms
    153         for phrase, syn_phrases in phrases.items():
    154             if syn_phrases and matched_phrases & syn_phrases:
    155                 matched_phrases = (matched_phrases - syn_phrases) | {phrase}
    156 
    157         if matched_phrases == {"unarchived"}:
    158             name = "Unarchived Stream"
    159             upload = JobUploadCriteria.WHEN_FINISHED
    160         elif matched_phrases == {"karaoke"}:
    161             name = "Karaoke"
    162             upload = JobUploadCriteria.IF_CUT
    163         elif matched_phrases == {"unarchived", "karaoke"}:
    164             name = "Unarchived Karaoke"
    165             upload = JobUploadCriteria.WHEN_FINISHED
    166         elif matched_phrases == {"unarchived", "karaoke", "rebroadcast"}:
    167             name = "Unarchived Karaoke (rebroadcast)"
    168             upload = JobUploadCriteria.MANUALLY
    169 
    170         # cannot auto-upload if no channel configuration exists
    171         if not job.is_configured_for_upload:
    172             upload = JobUploadCriteria.MANUALLY
    173 
    174         return cls(name, upload)
    175 
    176 
    177 def stream_display_date(dt: datetime.datetime) -> datetime.date:
    178     """
    179     Given the stream start time, returns a display date.
    180 
    181     If the stream start time is within the last 10 minutes of the day, we treat the next day
    182     as the stream date instead based on the assumption that the streamer has a waiting screen
    183     set up.
    184     """
    185     end_of_day = (23, 50)
    186     return dt.date() + datetime.timedelta(days=1 if (dt.hour, dt.minute) >= end_of_day else 0)
    187 
    188 
    189 async def get_moombox_job_by_id(jobid: str) -> MoomboxJobInfo | None:
    190     config = config_ctx.get()
    191     async with httpx.AsyncClient() as client:
    192         for attempt in stamina.retry_context(
    193             on=httpx.HTTPError, attempts=None, timeout=None, wait_initial=0.5, wait_max=10.0
    194         ):
    195             with attempt:
    196                 result = await client.get(urllib.parse.urljoin(config.moombox_url, "status"))
    197         for job in result.json():
    198             if job["id"] == jobid:
    199                 return msgspec.convert(job, type=MoomboxJobInfo)
    200         return None
    201 
    202 
    203 async def gofile_upload_and_persist(
    204     cursor, upload_file: pathlib.Path, job_id: str, token: str, folder_id: str
    205 ):
    206     with upload_file.open("rb") as upload_filereader:
    207         await gofile.api.upload_single(upload_filereader, token, folder_id)
    208         database = database_ctx.get()
    209         cursor = database.cursor()
    210         cursor.execute(
    211             "INSERT INTO gofile_files VALUES (?, ?)",
    212             (job_id, str(upload_file)),
    213         )
    214         database.commit()
    215 
    216 
    217 async def do_gofile_upload(job: MoomboxJobInfo):
    218     """
    219     Prepares a guest upload for the specific job's output files, then returns the folder URL
    220     while the uploads are running in the background.
    221 
    222     This operation is idempotent; if it is interrupted, running this function will only upload
    223     files that weren't already.
    224     """
    225     database = database_ctx.get()
    226 
    227     known_files = set()
    228     for known_file, *_ in database.execute(
    229         "SELECT filepath FROM gofile_files WHERE job_id = ?", (job.id,)
    230     ):
    231         known_files.add(pathlib.Path(known_file))
    232 
    233     pending_files = set(map(pathlib.Path, job.output_paths or [])) - known_files
    234 
    235     token = None
    236     folder_id = None
    237     gofile_url = None
    238 
    239     cursor = database.cursor()
    240     cursor.execute(
    241         "SELECT token, folder_id, url FROM gofile_tokens WHERE job_id = ?", (job.id,)
    242     )
    243     job_token_info = cursor.fetchone()
    244     if job_token_info:
    245         token, folder_id, gofile_url = job_token_info
    246     else:
    247         # get the smallest file to quickly get the upload result
    248         pending_file = min(pending_files, key=lambda p: p.stat().st_size)
    249         if pending_file in upload_tasks:
    250             # another task is uploading the same file; just block until that one is available
    251             upload = await upload_tasks[pending_file]
    252             token, folder_id, gofile_url = (
    253                 upload.result.guest_token,
    254                 upload.result.parent_folder,
    255                 upload.result.download_page,
    256             )
    257         else:
    258             with pending_file.open("rb") as fo:
    259                 upload_tasks[pending_file] = gofile.api.upload_single(fo)
    260                 upload = await upload_tasks[pending_file]
    261                 token, folder_id, gofile_url = (
    262                     upload.result.guest_token,
    263                     upload.result.parent_folder,
    264                     upload.result.download_page,
    265                 )
    266             cursor.execute(
    267                 "INSERT INTO gofile_tokens VALUES (?, ?, ?, ?)",
    268                 (job.id, token, folder_id, gofile_url),
    269             )
    270             cursor.execute(
    271                 "INSERT INTO gofile_files VALUES (?, ?)",
    272                 (job.id, str(pending_file)),
    273             )
    274             database.commit()
    275         pending_files.discard(pending_file)
    276 
    277     # upload any remaining files in the background
    278     for pending_file in pending_files:
    279         # don't double-dip and upload an in-progress file
    280         if pending_file in upload_tasks:
    281             continue
    282         upload_tasks[pending_file] = asyncio.create_task(
    283             gofile_upload_and_persist(cursor, pending_file, job.id, token, folder_id)
    284         )
    285     return gofile_url
    286 
    287 
    288 async def do_webdav_upload(webdav: WebDavConfig, filepath: pathlib.Path, target: str):
    289     auth = httpx.BasicAuth(username=webdav.username, password=webdav.password)
    290     async with httpx.AsyncClient(auth=auth) as client:
    291         connection_warning_seen = False
    292         while True:
    293             try:
    294                 dest = urllib.parse.urljoin(webdav.base_url, target)
    295                 file_check = await client.head(dest)
    296                 if file_check.status_code == httpx.codes.NOT_FOUND:
    297                     with filepath.open("rb") as fh:
    298                         await client.put(dest, content=fh.read())
    299                 return
    300             except httpx.ConnectTimeout:
    301                 if not connection_warning_seen:
    302                     log.warning(f"Failed to connect to {webdav.base_url}.  Retrying...")
    303                     connection_warning_seen = True
    304                 await asyncio.sleep(10)
    305 
    306 
    307 async def get_qbittorrent_session(qbt_config: QBittorrentConfig) -> httpx.AsyncClient:
    308     client = httpx.AsyncClient(
    309         base_url=urllib.parse.urljoin(qbt_config.host, "api/v2/"), verify=False
    310     )
    311     r = await client.post(
    312         "auth/login",
    313         data={"username": qbt_config.username, "password": qbt_config.password},
    314     )
    315     r.raise_for_status()
    316     return client
    317 
    318 
    319 async def _process_job(jobid):
    320     job = await get_moombox_job_by_id(jobid)
    321     if not job:
    322         return "Couldn't find matching job", 404
    323 
    324     # TODO fill in necessary fields in template, etc.
    325     job_config = _get_saved_jobconf(job.id) or JobConfig.from_moombox_job(job)
    326 
    327     config = config_ctx.get()
    328 
    329     channel = config.get_channel_config_by_id(job.channel_id)
    330     stream_time = job.scheduled_start_datetime or datetime.datetime.now(tz=datetime.UTC)
    331 
    332     torrent_output_dir = pathlib.Path("torrents")
    333     torrent_output_dir.mkdir(exist_ok=True)
    334 
    335     display_date = stream_display_date(stream_time)
    336     display_date_str = display_date.strftime("%Y%m%d")
    337     torrent_name = f"[{display_date_str}] {job_config.name} ({job.video_id})"
    338 
    339     folder_name = None
    340     if stream_time:
    341         folder_name = stream_time.strftime("%Y-%m-%dT%H:%MZ")
    342 
    343     readme_finalized = False
    344     render_kwargs = {}
    345     if job.output_paths:
    346         torrent_file = torrent_output_dir / f"{job.id} ({job.video_id}).torrent"
    347         source_files = list(map(pathlib.Path, job.output_paths))
    348 
    349         if not all(f.exists() for f in source_files):
    350             return
    351 
    352         save_path = str(source_files[0].parent)
    353 
    354         if not torrent_file.exists():
    355             t = torf.Torrent(
    356                 trackers=config.torrent.trackers,
    357             )
    358 
    359             # TODO extract key phrases from title / description
    360 
    361             t.filepaths = source_files
    362             # note that the name also sets the subdirectory for the files
    363             # as such, using other names for the same files *will* change the infohash
    364             t.name = torrent_name
    365             success = await asyncio.to_thread(t.generate)
    366             if not success:
    367                 raise RuntimeError("Failed to generate torrent")
    368             t.write(torrent_file)
    369             t.write(torrent_output_dir / f"{torrent_name}.torrent")
    370 
    371             # send torrent to qbittorrent for seeding
    372             if config.qbittorrent:
    373                 if config.qbittorrent.default_save_path:
    374                     save_path = config.qbittorrent.default_save_path
    375                 client = await get_qbittorrent_session(config.qbittorrent)
    376                 files = [
    377                     (
    378                         "torrents",
    379                         (
    380                             torrent_file.name,
    381                             torrent_file.open("rb"),
    382                             "application/x-bittorrent",
    383                         ),
    384                     ),
    385                 ]
    386                 data = {
    387                     "savepath": save_path,
    388                     "contentLayout": "NoSubfolder",
    389                     "stopped": "true" if config.qbittorrent.start_paused else "false",
    390                     "paused": "true" if config.qbittorrent.start_paused else "false",
    391                 }
    392                 task = asyncio.create_task(client.post("torrents/add", files=files, data=data))
    393                 background_tasks.add(task)
    394                 task.add_done_callback(background_tasks.discard)
    395         else:
    396             t = torf.Torrent.read(torrent_file)
    397         log.debug(t)
    398         log.debug(t.magnet(size=False))
    399         log.debug(t.files)
    400 
    401         # punt file to webdav remote
    402         if config.webdav and channel and channel.webdav_path:
    403             target_base = f"{channel.webdav_path}/{folder_name}"
    404             task = asyncio.create_task(
    405                 do_webdav_upload(
    406                     config.webdav, torrent_file, f"{target_base}/{torrent_name}.torrent"
    407                 )
    408             )
    409             background_tasks.add(task)
    410             task.add_done_callback(background_tasks.discard)
    411 
    412         render_kwargs["magnet_url"] = t.magnet(size=False)
    413 
    414         # punt files to gofile
    415         render_kwargs["gofile_url"] = await do_gofile_upload(job)
    416         readme_finalized = True
    417     rendered_job = await microdot.jinja.Template("job.md").render_async(
    418         job=job,
    419         author_override=channel.name if channel else None,
    420         stream_time=stream_time.strftime("%Y-%m-%dT%H:%M:%SZ") if stream_time else "(unknown)",
    421         config=job_config,
    422         **render_kwargs,
    423     )
    424     if readme_finalized and config.webdav and channel and channel.webdav_path:
    425         rendered_job_file = torrent_output_dir / f"{job.id} ({job.video_id}).md"
    426         rendered_job_file.write_text(rendered_job)
    427         target_base = f"{channel.webdav_path}/{folder_name}"
    428         task = asyncio.create_task(
    429             do_webdav_upload(config.webdav, rendered_job_file, f"{target_base}/REPORT.md")
    430         )
    431         background_tasks.add(task)
    432         task.add_done_callback(background_tasks.discard)
    433 
    434         database = database_ctx.get()
    435         cursor = database.cursor()
    436         cursor.execute(
    437             "INSERT INTO job_status VALUES (?, ?) ON CONFLICT(job_id) DO UPDATE SET completed=excluded.completed",
    438             (jobid, True),
    439         )
    440         database.commit()
    441     if not readme_finalized and jobid in render_tasks:
    442         # allow rerun if not finalized
    443         del render_tasks[jobid]
    444     return rendered_job
    445 
    446 
    447 def get_process_job_task(jobid: str) -> asyncio.Task:
    448     """
    449     Creates or retrieves a singleton job rendering task.
    450     This is decoupled from the request callback to allow cancellations and multiple consumers.
    451     """
    452     if jobid in render_tasks and render_tasks[jobid].done() and render_tasks[jobid].exception():
    453         del render_tasks[jobid]
    454     if jobid not in render_tasks:
    455         render_tasks[jobid] = asyncio.create_task(_process_job(jobid))
    456     return render_tasks[jobid]
    457 
    458 
    459 def _get_saved_jobconf(jobid: str) -> JobConfig | None:
    460     database = database_ctx.get()
    461     cursor = database.cursor()
    462     cursor.execute("SELECT config FROM job_config WHERE job_id = ?", (jobid,))
    463     result = cursor.fetchone()
    464     if result:
    465         job_config_payload, *_ = result
    466         return msgspec.json.decode(job_config_payload, type=JobConfig)
    467     return None
    468 
    469 
    470 @app.get("/<jobid>/config")
    471 async def show_job_editor(request, jobid: str):
    472     job = await get_moombox_job_by_id(jobid)
    473     if not job:
    474         return "Couldn't find matching job", 404
    475 
    476     job_config = _get_saved_jobconf(job.id) or JobConfig.from_moombox_job(job)
    477 
    478     return (
    479         await microdot.jinja.Template("edit.html").render_async(job=job, config=job_config),
    480         {
    481             "Content-Type": "text/html; charset=utf-8",
    482         },
    483     )
    484 
    485 
    486 @app.post("/<jobid>/config")
    487 async def apply_job_config(request, jobid: str):
    488     job = await get_moombox_job_by_id(jobid)
    489     if not job:
    490         return "Couldn't find matching job", 404
    491     jobconf = msgspec.convert(
    492         {item: request.form[item] for item in request.form}, type=JobConfig
    493     )
    494     database = database_ctx.get()
    495 
    496     cursor = database.cursor()
    497     cursor.execute(
    498         "INSERT INTO job_config VALUES (?, ?) ON CONFLICT(job_id) DO UPDATE SET config=excluded.config",
    499         (jobid, msgspec.json.encode(jobconf)),
    500     )
    501     database.commit()
    502 
    503     return (
    504         await microdot.jinja.Template("edit.html").render_async(job=job, config=jobconf),
    505         {
    506             "Content-Type": "text/html; charset=utf-8",
    507         },
    508     )
    509 
    510 
    511 @app.get("/<jobid>")
    512 async def show_job(request, jobid: str):
    513     return await get_process_job_task(jobid)
    514 
    515 
    516 @app.post("/<jobid>")
    517 async def upload_job(request, jobid: str):
    518     try:
    519         await get_process_job_task(jobid)
    520         return await microdot.jinja.Template("job/upload_success.html").render_async()
    521     except torf.TorfError:
    522         log.exception(f"Failed to upload job {jobid}")
    523         return await microdot.jinja.Template("job/upload_error.html").render_async()
    524 
    525 
    526 async def job_auto_monitor():
    527     while True:
    528         config = config_ctx.get()
    529 
    530         async with httpx.AsyncClient() as client:
    531             for attempt in stamina.retry_context(
    532                 on=(httpx.HTTPError, json.decoder.JSONDecodeError),
    533                 attempts=None,
    534                 timeout=None,
    535                 wait_initial=0.5,
    536                 wait_max=10.0,
    537             ):
    538                 with attempt:
    539                     result = await client.get(
    540                         urllib.parse.urljoin(config.moombox_url, "status")
    541                     )
    542                     result.raise_for_status()
    543                     result.json()
    544 
    545         jobs = []
    546         for serialized_job in result.json():
    547             try:
    548                 jobs.append(msgspec.convert(serialized_job, type=MoomboxJobInfo))
    549             except msgspec.ValidationError:
    550                 pass
    551 
    552         database = database_ctx.get()
    553         cursor = database.cursor()
    554         configs = {
    555             job_id: msgspec.json.decode(job_conf, type=JobConfig)
    556             for job_id, job_conf in cursor.execute("SELECT job_id, config FROM job_config")
    557         }
    558         statuses = {
    559             job_id: bool(completed)
    560             for job_id, completed in cursor.execute("SELECT job_id, completed FROM job_status")
    561         }
    562 
    563         for job in reversed(jobs):
    564             if not job.uploadability_state:
    565                 continue
    566             job_conf = configs.get(job.id)
    567             if not job_conf:
    568                 job_conf = JobConfig.from_moombox_job(job)
    569             if job.uploadability_state != job_conf.upload:
    570                 continue
    571             if not config.autoupload.active:
    572                 log.info(f"'{job.title}' meets conditions for upload")
    573                 continue
    574             if statuses.get(job.id):
    575                 continue
    576             log.info(f"'{job.title}' is scheduled for upload")
    577             get_process_job_task(job.id)
    578 
    579         await asyncio.sleep(120)