autotako

Service to monitor moombox for completed livestream downloads to upload for distribution
git clone https://code.alwayswait.ing/autotako
Log | Files | Refs | README

job_render.py (19051B)


      1 #!/usr/bin/python3
      2 
      3 import asyncio
      4 import datetime
      5 import enum
      6 import itertools
      7 import pathlib
      8 import re
      9 import traceback
     10 from typing import Self
     11 
     12 import gofile.api  # type: ignore
     13 import httpx
     14 import microdot  # type: ignore
     15 import msgspec
     16 import qbittorrentapi  # type: ignore
     17 import stamina
     18 import torf  # type: ignore
     19 
     20 from .config import WebDavConfig, config_ctx
     21 from .database import database_ctx
     22 
     23 app = microdot.Microdot()
     24 
     25 background_tasks = set()
     26 render_tasks: dict[str, asyncio.Task] = {}
     27 upload_tasks: dict[pathlib.Path, asyncio.Task] = {}
     28 
     29 # https://www.japanesewithanime.com/2017/05/quotation-marks-japanese.html#sumitsukikakko
     30 # strip extraneous spaces and hashtag
     31 _pattern_bracketed_kw = re.compile(r"^【[\s#]*(?P<topic>.*?)[\s]?】")
     32 
     33 # bae prefers the double angle brackets because she's quirky like that
     34 _pattern_angled_bracketed_kw = re.compile(r"^≪[\s#]*(?P<topic>.*?)[\s]?≫")
     35 
     36 
     37 class JobUploadCriteria(enum.StrEnum):
     38     MANUALLY = "manual"
     39     WHEN_FINISHED = "finished"
     40     IF_PRIVATE = "private"
     41     IF_CUT = "cut"
     42 
     43     @property
     44     def automatic(self) -> bool:
     45         return self != JobUploadCriteria.MANUALLY
     46 
     47 
     48 class MoomboxDownloadStatus(enum.StrEnum):
     49     UNKNOWN = "Unknown"
     50     UNAVAILABLE = "Unavailable"
     51     WAITING = "Waiting"
     52     DOWNLOADING = "Downloading"
     53     MUXING = "Muxing"
     54     FINISHED = "Finished"
     55     ERROR = "Error"
     56 
     57 
     58 class MoomboxJobInfo(msgspec.Struct, kw_only=True, frozen=True):
     59     id: str
     60     title: str | None
     61     author: str | None
     62     channel_id: str | None
     63     video_id: str | None
     64     manifest_progress: dict
     65     status: MoomboxDownloadStatus = MoomboxDownloadStatus.UNKNOWN
     66     output_paths: set[str] | None = msgspec.field(default_factory=set)
     67     scheduled_start_datetime: datetime.datetime | None = None
     68 
     69     @property
     70     def uploadability_state(self) -> JobUploadCriteria | None:
     71         # returns a JobUploadCriteria that can be tested for equality to run
     72         if self.status != MoomboxDownloadStatus.FINISHED:
     73             return None
     74         if not self.files_available:
     75             return None
     76         if not self.is_configured_for_upload:
     77             return None
     78         return JobUploadCriteria.WHEN_FINISHED
     79 
     80     @property
     81     def is_configured_for_upload(self) -> bool:
     82         if not self.channel_id:
     83             return False
     84         config = config_ctx.get()
     85         channel_config = config.get_channel_config_by_id(self.channel_id)
     86         if not channel_config:
     87             return False
     88         return True
     89 
     90     @property
     91     def files_available(self) -> bool:
     92         return bool(self.output_pathobjs) and all(p.exists() for p in self.output_pathobjs)
     93 
     94     @property
     95     def output_pathobjs(self) -> set[pathlib.Path]:
     96         return set(map(pathlib.Path, self.output_paths or []))
     97 
     98 
     99 def _extract_name_from_stream_title(title: str) -> str | None:
    100     """
    101     Attempts to extract a preset name from the stream title.
    102     """
    103     match = _pattern_bracketed_kw.search(title) or _pattern_angled_bracketed_kw.search(title)
    104     if match:
    105         return str(match["topic"]).strip()
    106     return None
    107 
    108 
    109 class JobConfig(msgspec.Struct):
    110     name: str
    111     upload: JobUploadCriteria
    112     author_note: str | None = None
    113 
    114     @classmethod
    115     def from_moombox_job(cls, job: MoomboxJobInfo) -> Self:
    116         # generate a default config from the job
    117         name = "Unspecified Stream"
    118         upload = JobUploadCriteria.MANUALLY
    119 
    120         if not job.title:
    121             return cls(name, upload)
    122 
    123         name = _extract_name_from_stream_title(job.title) or name
    124 
    125         # mapping of phrases and any possible synonyms
    126         # if a synonym isn't present then an empty set should be present
    127         phrases = {
    128             "unarchived": {"no archive", "unarchive"},
    129             "karaoke": {"rock n' rawr", "歌枠", "sing"},
    130             "rebroadcast": {"kara-rewind"},
    131         }
    132 
    133         phrase_set = set(phrases.keys()) | set(itertools.chain.from_iterable(phrases.values()))
    134         # not sure why mypy complains about job.title being possibly None here if left bare...
    135         matched_phrases = set(
    136             filter(lambda term: term in (job.title.lower() if job.title else ""), phrase_set)
    137         )
    138 
    139         # dedupe synonymous terms
    140         for phrase, syn_phrases in phrases.items():
    141             if syn_phrases and matched_phrases & syn_phrases:
    142                 matched_phrases = (matched_phrases - syn_phrases) | {phrase}
    143 
    144         if matched_phrases == {"unarchived"}:
    145             name = "Unarchived Stream"
    146             upload = JobUploadCriteria.WHEN_FINISHED
    147         elif matched_phrases == {"karaoke"}:
    148             name = "Karaoke"
    149             upload = JobUploadCriteria.IF_CUT
    150         elif matched_phrases == {"unarchived", "karaoke"}:
    151             name = "Unarchived Karaoke"
    152             upload = JobUploadCriteria.WHEN_FINISHED
    153         elif matched_phrases == {"unarchived", "karaoke", "rebroadcast"}:
    154             name = "Unarchived Karaoke (rebroadcast)"
    155             upload = JobUploadCriteria.MANUALLY
    156 
    157         # cannot auto-upload if no channel configuration exists
    158         if not job.is_configured_for_upload:
    159             upload = JobUploadCriteria.MANUALLY
    160 
    161         return cls(name, upload)
    162 
    163 
    164 def stream_display_date(dt: datetime.datetime) -> datetime.date:
    165     """
    166     Given the stream start time, returns a display date.
    167 
    168     If the stream start time is within the last 10 minutes of the day, we treat the next day
    169     as the stream date instead based on the assumption that the streamer has a waiting screen
    170     set up.
    171     """
    172     end_of_day = (23, 50)
    173     return dt.date() + datetime.timedelta(days=1 if (dt.hour, dt.minute) >= end_of_day else 0)
    174 
    175 
    176 async def get_moombox_job_by_id(jobid: str) -> MoomboxJobInfo | None:
    177     config = config_ctx.get()
    178     async with httpx.AsyncClient() as client:
    179         for attempt in stamina.retry_context(
    180             on=httpx.HTTPError, attempts=None, timeout=None, wait_initial=0.5, wait_max=10.0
    181         ):
    182             with attempt:
    183                 result = await client.get(f"{config.moombox_url}/status")
    184         for job in result.json():
    185             if job["id"] == jobid:
    186                 return msgspec.convert(job, type=MoomboxJobInfo)
    187         return None
    188 
    189 
    190 async def gofile_upload_and_persist(
    191     cursor, upload_file: pathlib.Path, job_id: str, token: str, folder_id: str
    192 ):
    193     with upload_file.open("rb") as upload_filereader:
    194         await gofile.api.upload_single(upload_filereader, token, folder_id)
    195         database = database_ctx.get()
    196         cursor = database.cursor()
    197         cursor.execute(
    198             "INSERT INTO gofile_files VALUES (?, ?)",
    199             (job_id, str(upload_file)),
    200         )
    201         database.commit()
    202 
    203 
    204 async def do_gofile_upload(job: MoomboxJobInfo):
    205     """
    206     Prepares a guest upload for the specific job's output files, then returns the folder URL
    207     while the uploads are running in the background.
    208 
    209     This operation is idempotent; if it is interrupted, running this function will only upload
    210     files that weren't already.
    211     """
    212     database = database_ctx.get()
    213 
    214     known_files = set()
    215     for known_file, *_ in database.execute(
    216         "SELECT filepath FROM gofile_files WHERE job_id = ?", (job.id,)
    217     ):
    218         known_files.add(pathlib.Path(known_file))
    219 
    220     pending_files = set(map(pathlib.Path, job.output_paths or [])) - known_files
    221 
    222     token = None
    223     folder_id = None
    224     gofile_url = None
    225 
    226     cursor = database.cursor()
    227     cursor.execute(
    228         "SELECT token, folder_id, url FROM gofile_tokens WHERE job_id = ?", (job.id,)
    229     )
    230     job_token_info = cursor.fetchone()
    231     if job_token_info:
    232         token, folder_id, gofile_url = job_token_info
    233     else:
    234         # get the smallest file to quickly get the upload result
    235         pending_file = min(pending_files, key=lambda p: p.stat().st_size)
    236         if pending_file in upload_tasks:
    237             # another task is uploading the same file; just block until that one is available
    238             upload = await upload_tasks[pending_file]
    239             token, folder_id, gofile_url = (
    240                 upload.result.guest_token,
    241                 upload.result.parent_folder,
    242                 upload.result.download_page,
    243             )
    244         else:
    245             with pending_file.open("rb") as fo:
    246                 upload_tasks[pending_file] = gofile.api.upload_single(fo)
    247                 upload = await upload_tasks[pending_file]
    248                 token, folder_id, gofile_url = (
    249                     upload.result.guest_token,
    250                     upload.result.parent_folder,
    251                     upload.result.download_page,
    252                 )
    253             cursor.execute(
    254                 "INSERT INTO gofile_tokens VALUES (?, ?, ?, ?)",
    255                 (job.id, token, folder_id, gofile_url),
    256             )
    257             cursor.execute(
    258                 "INSERT INTO gofile_files VALUES (?, ?)",
    259                 (job.id, str(pending_file)),
    260             )
    261             database.commit()
    262         pending_files.discard(pending_file)
    263 
    264     # upload any remaining files in the background
    265     for pending_file in pending_files:
    266         # don't double-dip and upload an in-progress file
    267         if pending_file in upload_tasks:
    268             continue
    269         upload_tasks[pending_file] = asyncio.create_task(
    270             gofile_upload_and_persist(cursor, pending_file, job.id, token, folder_id)
    271         )
    272     return gofile_url
    273 
    274 
    275 async def do_webdav_upload(webdav: WebDavConfig, filepath: pathlib.Path, target: str):
    276     auth = httpx.BasicAuth(username=webdav.username, password=webdav.password)
    277     async with httpx.AsyncClient(auth=auth) as client:
    278         dest = f"{webdav.base_url}/{target}"
    279         file_check = await client.head(dest)
    280         if file_check.status_code == httpx.codes.NOT_FOUND:
    281             with filepath.open("rb") as fh:
    282                 await client.put(dest, content=fh.read())
    283 
    284 
    285 async def _process_job(jobid):
    286     job = await get_moombox_job_by_id(jobid)
    287     if not job:
    288         return "Couldn't find matching job", 404
    289 
    290     # TODO fill in necessary fields in template, etc.
    291     job_config = _get_saved_jobconf(job.id) or JobConfig.from_moombox_job(job)
    292 
    293     config = config_ctx.get()
    294 
    295     channel = config.get_channel_config_by_id(job.channel_id)
    296     stream_time = job.scheduled_start_datetime or datetime.datetime.now(tz=datetime.UTC)
    297 
    298     torrent_output_dir = pathlib.Path("torrents")
    299     torrent_output_dir.mkdir(exist_ok=True)
    300 
    301     display_date = stream_display_date(stream_time)
    302     display_date_str = display_date.strftime("%Y%m%d")
    303     torrent_name = f"[{display_date_str}] {job_config.name} ({job.video_id})"
    304 
    305     folder_name = None
    306     if stream_time:
    307         folder_name = stream_time.strftime("%Y-%m-%dT%H:%MZ")
    308 
    309     readme_finalized = False
    310     render_kwargs = {}
    311     if job.output_paths:
    312         torrent_file = torrent_output_dir / f"{job.id} ({job.video_id}).torrent"
    313         source_files = list(map(pathlib.Path, job.output_paths))
    314 
    315         if not all(f.exists() for f in source_files):
    316             return
    317 
    318         save_path = str(source_files[0].parent)
    319 
    320         if not torrent_file.exists():
    321             t = torf.Torrent(
    322                 trackers=config.torrent.trackers,
    323             )
    324 
    325             # TODO extract key phrases from title / description
    326 
    327             t.filepaths = source_files
    328             # note that the name also sets the subdirectory for the files
    329             # as such, using other names for the same files *will* change the infohash
    330             t.name = torrent_name
    331             success = await asyncio.to_thread(t.generate)
    332             if not success:
    333                 raise RuntimeError("Failed to generate torrent")
    334             t.write(torrent_file)
    335             t.write(torrent_output_dir / f"{torrent_name}.torrent")
    336 
    337             # send torrent to qbittorrent for seeding
    338             if config.qbittorrent:
    339                 if config.qbittorrent.default_save_path:
    340                     save_path = config.qbittorrent.default_save_path
    341                 qbt_client = qbittorrentapi.Client(
    342                     host=config.qbittorrent.host,
    343                     username=config.qbittorrent.username,
    344                     password=config.qbittorrent.password,
    345                     VERIFY_WEBUI_CERTIFICATE=False,
    346                     RAISE_NOTIMPLEMENTEDERROR_FOR_UNIMPLEMENTED_API_ENDPOINTS=True,
    347                 )
    348                 task = asyncio.create_task(
    349                     asyncio.to_thread(
    350                         qbt_client.torrents_add,
    351                         torrent_files=torrent_file,
    352                         save_path=save_path,
    353                         content_layout="NoSubfolder",
    354                         is_paused=config.qbittorrent.start_paused,
    355                     )
    356                 )
    357                 background_tasks.add(task)
    358                 task.add_done_callback(background_tasks.discard)
    359         else:
    360             t = torf.Torrent.read(torrent_file)
    361         print(t)
    362         print(t.magnet(size=False))
    363         print(t.files)
    364 
    365         # punt file to webdav remote
    366         if config.webdav and channel and channel.webdav_path:
    367             target_base = f"{channel.webdav_path}/{folder_name}"
    368             task = asyncio.create_task(
    369                 do_webdav_upload(
    370                     config.webdav, torrent_file, f"{target_base}/{torrent_name}.torrent"
    371                 )
    372             )
    373             background_tasks.add(task)
    374             task.add_done_callback(background_tasks.discard)
    375 
    376         render_kwargs["magnet_url"] = t.magnet(size=False)
    377 
    378         # punt files to gofile
    379         render_kwargs["gofile_url"] = await do_gofile_upload(job)
    380         readme_finalized = True
    381     rendered_job = await microdot.jinja.Template("job.md").render_async(
    382         job=job,
    383         author_override=channel.name if channel else None,
    384         stream_time=stream_time.strftime("%Y-%m-%dT%H:%M:%SZ") if stream_time else "(unknown)",
    385         config=job_config,
    386         **render_kwargs,
    387     )
    388     if readme_finalized and config.webdav and channel and channel.webdav_path:
    389         rendered_job_file = torrent_output_dir / f"{job.id} ({job.video_id}).md"
    390         rendered_job_file.write_text(rendered_job)
    391         target_base = f"{channel.webdav_path}/{folder_name}"
    392         task = asyncio.create_task(
    393             do_webdav_upload(config.webdav, rendered_job_file, f"{target_base}/REPORT.md")
    394         )
    395         background_tasks.add(task)
    396         task.add_done_callback(background_tasks.discard)
    397 
    398         database = database_ctx.get()
    399         cursor = database.cursor()
    400         cursor.execute(
    401             "INSERT INTO job_status VALUES (?, ?) ON CONFLICT(job_id) DO UPDATE SET completed=excluded.completed",
    402             (jobid, True),
    403         )
    404         database.commit()
    405     if not readme_finalized and jobid in render_tasks:
    406         # allow rerun if not finalized
    407         del render_tasks[jobid]
    408     return rendered_job
    409 
    410 
    411 def get_process_job_task(jobid: str) -> asyncio.Task:
    412     """
    413     Creates or retrieves a singleton job rendering task.
    414     This is decoupled from the request callback to allow cancellations and multiple consumers.
    415     """
    416     if jobid in render_tasks and render_tasks[jobid].done() and render_tasks[jobid].exception():
    417         del render_tasks[jobid]
    418     if jobid not in render_tasks:
    419         render_tasks[jobid] = asyncio.create_task(_process_job(jobid))
    420     return render_tasks[jobid]
    421 
    422 
    423 def _get_saved_jobconf(jobid: str) -> JobConfig | None:
    424     database = database_ctx.get()
    425     cursor = database.cursor()
    426     cursor.execute("SELECT config FROM job_config WHERE job_id = ?", (jobid,))
    427     result = cursor.fetchone()
    428     if result:
    429         job_config_payload, *_ = result
    430         return msgspec.json.decode(job_config_payload, type=JobConfig)
    431     return None
    432 
    433 
    434 @app.get("/<jobid>/config")
    435 async def show_job_editor(request, jobid: str):
    436     job = await get_moombox_job_by_id(jobid)
    437     if not job:
    438         return "Couldn't find matching job", 404
    439 
    440     job_config = _get_saved_jobconf(job.id) or JobConfig.from_moombox_job(job)
    441 
    442     return (
    443         await microdot.jinja.Template("edit.html").render_async(job=job, config=job_config),
    444         {
    445             "Content-Type": "text/html; charset=utf-8",
    446         },
    447     )
    448 
    449 
    450 @app.post("/<jobid>/config")
    451 async def apply_job_config(request, jobid: str):
    452     job = await get_moombox_job_by_id(jobid)
    453     if not job:
    454         return "Couldn't find matching job", 404
    455     jobconf = msgspec.convert(
    456         {item: request.form[item] for item in request.form}, type=JobConfig
    457     )
    458     database = database_ctx.get()
    459 
    460     cursor = database.cursor()
    461     cursor.execute(
    462         "INSERT INTO job_config VALUES (?, ?) ON CONFLICT(job_id) DO UPDATE SET config=excluded.config",
    463         (jobid, msgspec.json.encode(jobconf)),
    464     )
    465     database.commit()
    466 
    467     return (
    468         await microdot.jinja.Template("edit.html").render_async(job=job, config=jobconf),
    469         {
    470             "Content-Type": "text/html; charset=utf-8",
    471         },
    472     )
    473 
    474 
    475 @app.get("/<jobid>")
    476 async def show_job(request, jobid: str):
    477     return await get_process_job_task(jobid)
    478 
    479 
    480 @app.post("/<jobid>")
    481 async def upload_job(request, jobid: str):
    482     try:
    483         await get_process_job_task(jobid)
    484         return await microdot.jinja.Template("job/upload_success.html").render_async()
    485     except torf.TorfError:
    486         traceback.print_exc()
    487         return await microdot.jinja.Template("job/upload_error.html").render_async()
    488 
    489 
    490 async def job_auto_monitor():
    491     while True:
    492         config = config_ctx.get()
    493 
    494         async with httpx.AsyncClient() as client:
    495             for attempt in stamina.retry_context(
    496                 on=httpx.HTTPError, attempts=None, timeout=None, wait_initial=0.5, wait_max=10.0
    497             ):
    498                 with attempt:
    499                     result = await client.get(f"{config.moombox_url}/status")
    500 
    501         jobs = []
    502         for serialized_job in result.json():
    503             try:
    504                 jobs.append(msgspec.convert(serialized_job, type=MoomboxJobInfo))
    505             except msgspec.ValidationError:
    506                 pass
    507 
    508         database = database_ctx.get()
    509         cursor = database.cursor()
    510         configs = {
    511             job_id: msgspec.json.decode(job_conf, type=JobConfig)
    512             for job_id, job_conf in cursor.execute("SELECT job_id, config FROM job_config")
    513         }
    514         statuses = {
    515             job_id: bool(completed)
    516             for job_id, completed in cursor.execute("SELECT job_id, completed FROM job_status")
    517         }
    518 
    519         for job in reversed(jobs):
    520             if not job.uploadability_state:
    521                 continue
    522             job_conf = configs.get(job.id)
    523             if not job_conf:
    524                 job_conf = JobConfig.from_moombox_job(job)
    525             if job.uploadability_state != job_conf.upload:
    526                 continue
    527             if not config.autoupload.active:
    528                 print(f"'{job.title}' meets conditions for upload")
    529                 continue
    530             if statuses.get(job.id):
    531                 continue
    532             print(f"'{job.title}' is scheduled for upload")
    533             get_process_job_task(job.id)
    534 
    535         await asyncio.sleep(120)