job_render.py (20634B)
1 #!/usr/bin/python3 2 3 import asyncio 4 import datetime 5 import enum 6 import itertools 7 import json 8 import logging 9 import pathlib 10 import re 11 import urllib.parse 12 from typing import Self 13 14 import gofile.api # type: ignore 15 import httpx 16 import microdot # type: ignore 17 import msgspec 18 import stamina 19 import torf # type: ignore 20 21 from .config import QBittorrentConfig, WebDavConfig, config_ctx 22 from .database import database_ctx 23 24 app = microdot.Microdot() 25 26 log = logging.getLogger(__name__) 27 28 background_tasks = set() 29 render_tasks: dict[str, asyncio.Task] = {} 30 upload_tasks: dict[pathlib.Path, asyncio.Task] = {} 31 32 # https://www.japanesewithanime.com/2017/05/quotation-marks-japanese.html#sumitsukikakko 33 # strip extraneous spaces and hashtag 34 _pattern_bracketed_kw = re.compile(r"^【[\s#]*(?P<topic>.*?)[\s]?】") 35 36 # bae prefers the double angle brackets because she's quirky like that 37 _pattern_angled_bracketed_kw = re.compile(r"^≪[\s#]*(?P<topic>.*?)[\s]?≫") 38 39 40 class JobUploadCriteria(enum.StrEnum): 41 MANUALLY = "manual" 42 WHEN_FINISHED = "finished" 43 IF_PRIVATE = "private" 44 IF_CUT = "cut" 45 46 @property 47 def automatic(self) -> bool: 48 return self != JobUploadCriteria.MANUALLY 49 50 51 class MoomboxDownloadStatus(enum.StrEnum): 52 UNKNOWN = "Unknown" 53 UNAVAILABLE = "Unavailable" 54 WAITING = "Waiting" 55 DOWNLOADING = "Downloading" 56 MUXING = "Muxing" 57 FINISHED = "Finished" 58 ERROR = "Error" 59 60 61 class MoomboxJobInfo(msgspec.Struct, kw_only=True, frozen=True): 62 id: str 63 title: str | None 64 author: str | None 65 channel_id: str | None 66 video_id: str | None 67 manifest_progress: dict 68 status: MoomboxDownloadStatus = MoomboxDownloadStatus.UNKNOWN 69 output_paths: set[str] | None = msgspec.field(default_factory=set) 70 scheduled_start_datetime: datetime.datetime | None = None 71 download_finish_datetime: datetime.datetime | None = None 72 73 @property 74 def uploadability_state(self) -> JobUploadCriteria | None: 75 # returns a JobUploadCriteria that can be tested for equality to run 76 if self.status != MoomboxDownloadStatus.FINISHED: 77 return None 78 if not self.files_available: 79 return None 80 if not self.is_configured_for_upload: 81 return None 82 return JobUploadCriteria.WHEN_FINISHED 83 84 @property 85 def is_configured_for_upload(self) -> bool: 86 if not self.channel_id: 87 return False 88 config = config_ctx.get() 89 channel_config = config.get_channel_config_by_id(self.channel_id) 90 if not channel_config: 91 return False 92 return True 93 94 @property 95 def files_available(self) -> bool: 96 return bool(self.output_pathobjs) and all(p.exists() for p in self.output_pathobjs) 97 98 @property 99 def output_pathobjs(self) -> set[pathlib.Path]: 100 return set(map(pathlib.Path, self.output_paths or [])) 101 102 @property 103 def last_activity_datetime(self) -> datetime.datetime: 104 # this does not need to be very accurate 105 return ( 106 self.download_finish_datetime 107 or self.scheduled_start_datetime 108 or datetime.datetime.fromtimestamp(0, tz=datetime.UTC) 109 ) 110 111 112 def _extract_name_from_stream_title(title: str) -> str | None: 113 """ 114 Attempts to extract a preset name from the stream title. 115 """ 116 match = _pattern_bracketed_kw.search(title) or _pattern_angled_bracketed_kw.search(title) 117 if match: 118 return str(match["topic"]).strip() 119 return None 120 121 122 class JobConfig(msgspec.Struct): 123 name: str 124 upload: JobUploadCriteria 125 author_note: str | None = None 126 127 @classmethod 128 def from_moombox_job(cls, job: MoomboxJobInfo) -> Self: 129 # generate a default config from the job 130 name = "Unspecified Stream" 131 upload = JobUploadCriteria.MANUALLY 132 133 if not job.title: 134 return cls(name, upload) 135 136 name = _extract_name_from_stream_title(job.title) or name 137 138 # mapping of phrases and any possible synonyms 139 # if a synonym isn't present then an empty set should be present 140 phrases = { 141 "unarchived": {"no archive", "unarchive"}, 142 "karaoke": {"rock n' rawr", "歌枠", "sing"}, 143 "rebroadcast": {"kara-rewind"}, 144 } 145 146 phrase_set = set(phrases.keys()) | set(itertools.chain.from_iterable(phrases.values())) 147 # not sure why mypy complains about job.title being possibly None here if left bare... 148 matched_phrases = set( 149 filter(lambda term: term in (job.title.lower() if job.title else ""), phrase_set) 150 ) 151 152 # dedupe synonymous terms 153 for phrase, syn_phrases in phrases.items(): 154 if syn_phrases and matched_phrases & syn_phrases: 155 matched_phrases = (matched_phrases - syn_phrases) | {phrase} 156 157 if matched_phrases == {"unarchived"}: 158 name = "Unarchived Stream" 159 upload = JobUploadCriteria.WHEN_FINISHED 160 elif matched_phrases == {"karaoke"}: 161 name = "Karaoke" 162 upload = JobUploadCriteria.IF_CUT 163 elif matched_phrases == {"unarchived", "karaoke"}: 164 name = "Unarchived Karaoke" 165 upload = JobUploadCriteria.WHEN_FINISHED 166 elif matched_phrases == {"unarchived", "karaoke", "rebroadcast"}: 167 name = "Unarchived Karaoke (rebroadcast)" 168 upload = JobUploadCriteria.MANUALLY 169 170 # cannot auto-upload if no channel configuration exists 171 if not job.is_configured_for_upload: 172 upload = JobUploadCriteria.MANUALLY 173 174 return cls(name, upload) 175 176 177 def stream_display_date(dt: datetime.datetime) -> datetime.date: 178 """ 179 Given the stream start time, returns a display date. 180 181 If the stream start time is within the last 10 minutes of the day, we treat the next day 182 as the stream date instead based on the assumption that the streamer has a waiting screen 183 set up. 184 """ 185 end_of_day = (23, 50) 186 return dt.date() + datetime.timedelta(days=1 if (dt.hour, dt.minute) >= end_of_day else 0) 187 188 189 async def get_moombox_job_by_id(jobid: str) -> MoomboxJobInfo | None: 190 config = config_ctx.get() 191 async with httpx.AsyncClient() as client: 192 for attempt in stamina.retry_context( 193 on=httpx.HTTPError, attempts=None, timeout=None, wait_initial=0.5, wait_max=10.0 194 ): 195 with attempt: 196 result = await client.get(urllib.parse.urljoin(config.moombox_url, "status")) 197 for job in result.json(): 198 if job["id"] == jobid: 199 return msgspec.convert(job, type=MoomboxJobInfo) 200 return None 201 202 203 async def gofile_upload_and_persist( 204 cursor, upload_file: pathlib.Path, job_id: str, token: str, folder_id: str 205 ): 206 with upload_file.open("rb") as upload_filereader: 207 await gofile.api.upload_single(upload_filereader, token, folder_id) 208 database = database_ctx.get() 209 cursor = database.cursor() 210 cursor.execute( 211 "INSERT INTO gofile_files VALUES (?, ?)", 212 (job_id, str(upload_file)), 213 ) 214 database.commit() 215 216 217 async def do_gofile_upload(job: MoomboxJobInfo): 218 """ 219 Prepares a guest upload for the specific job's output files, then returns the folder URL 220 while the uploads are running in the background. 221 222 This operation is idempotent; if it is interrupted, running this function will only upload 223 files that weren't already. 224 """ 225 database = database_ctx.get() 226 227 known_files = set() 228 for known_file, *_ in database.execute( 229 "SELECT filepath FROM gofile_files WHERE job_id = ?", (job.id,) 230 ): 231 known_files.add(pathlib.Path(known_file)) 232 233 pending_files = set(map(pathlib.Path, job.output_paths or [])) - known_files 234 235 token = None 236 folder_id = None 237 gofile_url = None 238 239 cursor = database.cursor() 240 cursor.execute( 241 "SELECT token, folder_id, url FROM gofile_tokens WHERE job_id = ?", (job.id,) 242 ) 243 job_token_info = cursor.fetchone() 244 if job_token_info: 245 token, folder_id, gofile_url = job_token_info 246 else: 247 # get the smallest file to quickly get the upload result 248 pending_file = min(pending_files, key=lambda p: p.stat().st_size) 249 if pending_file in upload_tasks: 250 # another task is uploading the same file; just block until that one is available 251 upload = await upload_tasks[pending_file] 252 token, folder_id, gofile_url = ( 253 upload.result.guest_token, 254 upload.result.parent_folder, 255 upload.result.download_page, 256 ) 257 else: 258 with pending_file.open("rb") as fo: 259 upload_tasks[pending_file] = gofile.api.upload_single(fo) 260 upload = await upload_tasks[pending_file] 261 token, folder_id, gofile_url = ( 262 upload.result.guest_token, 263 upload.result.parent_folder, 264 upload.result.download_page, 265 ) 266 cursor.execute( 267 "INSERT INTO gofile_tokens VALUES (?, ?, ?, ?)", 268 (job.id, token, folder_id, gofile_url), 269 ) 270 cursor.execute( 271 "INSERT INTO gofile_files VALUES (?, ?)", 272 (job.id, str(pending_file)), 273 ) 274 database.commit() 275 pending_files.discard(pending_file) 276 277 # upload any remaining files in the background 278 for pending_file in pending_files: 279 # don't double-dip and upload an in-progress file 280 if pending_file in upload_tasks: 281 continue 282 upload_tasks[pending_file] = asyncio.create_task( 283 gofile_upload_and_persist(cursor, pending_file, job.id, token, folder_id) 284 ) 285 return gofile_url 286 287 288 async def do_webdav_upload(webdav: WebDavConfig, filepath: pathlib.Path, target: str): 289 auth = httpx.BasicAuth(username=webdav.username, password=webdav.password) 290 async with httpx.AsyncClient(auth=auth) as client: 291 connection_warning_seen = False 292 while True: 293 try: 294 dest = urllib.parse.urljoin(webdav.base_url, target) 295 file_check = await client.head(dest) 296 if file_check.status_code == httpx.codes.NOT_FOUND: 297 with filepath.open("rb") as fh: 298 await client.put(dest, content=fh.read()) 299 return 300 except httpx.ConnectTimeout: 301 if not connection_warning_seen: 302 log.warning(f"Failed to connect to {webdav.base_url}. Retrying...") 303 connection_warning_seen = True 304 await asyncio.sleep(10) 305 306 307 async def get_qbittorrent_session(qbt_config: QBittorrentConfig) -> httpx.AsyncClient: 308 client = httpx.AsyncClient( 309 base_url=urllib.parse.urljoin(qbt_config.host, "api/v2/"), verify=False 310 ) 311 r = await client.post( 312 "auth/login", 313 data={"username": qbt_config.username, "password": qbt_config.password}, 314 ) 315 r.raise_for_status() 316 return client 317 318 319 async def _process_job(jobid): 320 job = await get_moombox_job_by_id(jobid) 321 if not job: 322 return "Couldn't find matching job", 404 323 324 # TODO fill in necessary fields in template, etc. 325 job_config = _get_saved_jobconf(job.id) or JobConfig.from_moombox_job(job) 326 327 config = config_ctx.get() 328 329 channel = config.get_channel_config_by_id(job.channel_id) 330 stream_time = job.scheduled_start_datetime or datetime.datetime.now(tz=datetime.UTC) 331 332 torrent_output_dir = pathlib.Path("torrents") 333 torrent_output_dir.mkdir(exist_ok=True) 334 335 display_date = stream_display_date(stream_time) 336 display_date_str = display_date.strftime("%Y%m%d") 337 torrent_name = f"[{display_date_str}] {job_config.name} ({job.video_id})" 338 339 folder_name = None 340 if stream_time: 341 folder_name = stream_time.strftime("%Y-%m-%dT%H:%MZ") 342 343 readme_finalized = False 344 render_kwargs = {} 345 if job.output_paths: 346 torrent_file = torrent_output_dir / f"{job.id} ({job.video_id}).torrent" 347 source_files = list(map(pathlib.Path, job.output_paths)) 348 349 if not all(f.exists() for f in source_files): 350 return 351 352 save_path = str(source_files[0].parent) 353 354 if not torrent_file.exists(): 355 t = torf.Torrent( 356 trackers=config.torrent.trackers, 357 ) 358 359 # TODO extract key phrases from title / description 360 361 t.filepaths = source_files 362 # note that the name also sets the subdirectory for the files 363 # as such, using other names for the same files *will* change the infohash 364 t.name = torrent_name 365 success = await asyncio.to_thread(t.generate) 366 if not success: 367 raise RuntimeError("Failed to generate torrent") 368 t.write(torrent_file) 369 t.write(torrent_output_dir / f"{torrent_name}.torrent") 370 371 # send torrent to qbittorrent for seeding 372 if config.qbittorrent: 373 if config.qbittorrent.default_save_path: 374 save_path = config.qbittorrent.default_save_path 375 client = await get_qbittorrent_session(config.qbittorrent) 376 files = [ 377 ( 378 "torrents", 379 ( 380 torrent_file.name, 381 torrent_file.open("rb"), 382 "application/x-bittorrent", 383 ), 384 ), 385 ] 386 data = { 387 "savepath": save_path, 388 "contentLayout": "NoSubfolder", 389 "stopped": "true" if config.qbittorrent.start_paused else "false", 390 "paused": "true" if config.qbittorrent.start_paused else "false", 391 } 392 task = asyncio.create_task(client.post("torrents/add", files=files, data=data)) 393 background_tasks.add(task) 394 task.add_done_callback(background_tasks.discard) 395 else: 396 t = torf.Torrent.read(torrent_file) 397 log.debug(t) 398 log.debug(t.magnet(size=False)) 399 log.debug(t.files) 400 401 # punt file to webdav remote 402 if config.webdav and channel and channel.webdav_path: 403 target_base = f"{channel.webdav_path}/{folder_name}" 404 task = asyncio.create_task( 405 do_webdav_upload( 406 config.webdav, torrent_file, f"{target_base}/{torrent_name}.torrent" 407 ) 408 ) 409 background_tasks.add(task) 410 task.add_done_callback(background_tasks.discard) 411 412 render_kwargs["magnet_url"] = t.magnet(size=False) 413 414 # punt files to gofile 415 render_kwargs["gofile_url"] = await do_gofile_upload(job) 416 readme_finalized = True 417 rendered_job = await microdot.jinja.Template("job.md").render_async( 418 job=job, 419 author_override=channel.name if channel else None, 420 stream_time=stream_time.strftime("%Y-%m-%dT%H:%M:%SZ") if stream_time else "(unknown)", 421 config=job_config, 422 **render_kwargs, 423 ) 424 if readme_finalized and config.webdav and channel and channel.webdav_path: 425 rendered_job_file = torrent_output_dir / f"{job.id} ({job.video_id}).md" 426 rendered_job_file.write_text(rendered_job) 427 target_base = f"{channel.webdav_path}/{folder_name}" 428 task = asyncio.create_task( 429 do_webdav_upload(config.webdav, rendered_job_file, f"{target_base}/REPORT.md") 430 ) 431 background_tasks.add(task) 432 task.add_done_callback(background_tasks.discard) 433 434 database = database_ctx.get() 435 cursor = database.cursor() 436 cursor.execute( 437 "INSERT INTO job_status VALUES (?, ?) ON CONFLICT(job_id) DO UPDATE SET completed=excluded.completed", 438 (jobid, True), 439 ) 440 database.commit() 441 if not readme_finalized and jobid in render_tasks: 442 # allow rerun if not finalized 443 del render_tasks[jobid] 444 return rendered_job 445 446 447 def get_process_job_task(jobid: str) -> asyncio.Task: 448 """ 449 Creates or retrieves a singleton job rendering task. 450 This is decoupled from the request callback to allow cancellations and multiple consumers. 451 """ 452 if jobid in render_tasks and render_tasks[jobid].done() and render_tasks[jobid].exception(): 453 del render_tasks[jobid] 454 if jobid not in render_tasks: 455 render_tasks[jobid] = asyncio.create_task(_process_job(jobid)) 456 return render_tasks[jobid] 457 458 459 def _get_saved_jobconf(jobid: str) -> JobConfig | None: 460 database = database_ctx.get() 461 cursor = database.cursor() 462 cursor.execute("SELECT config FROM job_config WHERE job_id = ?", (jobid,)) 463 result = cursor.fetchone() 464 if result: 465 job_config_payload, *_ = result 466 return msgspec.json.decode(job_config_payload, type=JobConfig) 467 return None 468 469 470 @app.get("/<jobid>/config") 471 async def show_job_editor(request, jobid: str): 472 job = await get_moombox_job_by_id(jobid) 473 if not job: 474 return "Couldn't find matching job", 404 475 476 job_config = _get_saved_jobconf(job.id) or JobConfig.from_moombox_job(job) 477 478 return ( 479 await microdot.jinja.Template("edit.html").render_async(job=job, config=job_config), 480 { 481 "Content-Type": "text/html; charset=utf-8", 482 }, 483 ) 484 485 486 @app.post("/<jobid>/config") 487 async def apply_job_config(request, jobid: str): 488 job = await get_moombox_job_by_id(jobid) 489 if not job: 490 return "Couldn't find matching job", 404 491 jobconf = msgspec.convert( 492 {item: request.form[item] for item in request.form}, type=JobConfig 493 ) 494 database = database_ctx.get() 495 496 cursor = database.cursor() 497 cursor.execute( 498 "INSERT INTO job_config VALUES (?, ?) ON CONFLICT(job_id) DO UPDATE SET config=excluded.config", 499 (jobid, msgspec.json.encode(jobconf)), 500 ) 501 database.commit() 502 503 return ( 504 await microdot.jinja.Template("edit.html").render_async(job=job, config=jobconf), 505 { 506 "Content-Type": "text/html; charset=utf-8", 507 }, 508 ) 509 510 511 @app.get("/<jobid>") 512 async def show_job(request, jobid: str): 513 return await get_process_job_task(jobid) 514 515 516 @app.post("/<jobid>") 517 async def upload_job(request, jobid: str): 518 try: 519 await get_process_job_task(jobid) 520 return await microdot.jinja.Template("job/upload_success.html").render_async() 521 except torf.TorfError: 522 log.exception(f"Failed to upload job {jobid}") 523 return await microdot.jinja.Template("job/upload_error.html").render_async() 524 525 526 async def job_auto_monitor(): 527 while True: 528 config = config_ctx.get() 529 530 async with httpx.AsyncClient() as client: 531 for attempt in stamina.retry_context( 532 on=(httpx.HTTPError, json.decoder.JSONDecodeError), 533 attempts=None, 534 timeout=None, 535 wait_initial=0.5, 536 wait_max=10.0, 537 ): 538 with attempt: 539 result = await client.get( 540 urllib.parse.urljoin(config.moombox_url, "status") 541 ) 542 result.raise_for_status() 543 result.json() 544 545 jobs = [] 546 for serialized_job in result.json(): 547 try: 548 jobs.append(msgspec.convert(serialized_job, type=MoomboxJobInfo)) 549 except msgspec.ValidationError: 550 pass 551 552 database = database_ctx.get() 553 cursor = database.cursor() 554 configs = { 555 job_id: msgspec.json.decode(job_conf, type=JobConfig) 556 for job_id, job_conf in cursor.execute("SELECT job_id, config FROM job_config") 557 } 558 statuses = { 559 job_id: bool(completed) 560 for job_id, completed in cursor.execute("SELECT job_id, completed FROM job_status") 561 } 562 563 for job in reversed(jobs): 564 if not job.uploadability_state: 565 continue 566 job_conf = configs.get(job.id) 567 if not job_conf: 568 job_conf = JobConfig.from_moombox_job(job) 569 if job.uploadability_state != job_conf.upload: 570 continue 571 if not config.autoupload.active: 572 log.info(f"'{job.title}' meets conditions for upload") 573 continue 574 if statuses.get(job.id): 575 continue 576 log.info(f"'{job.title}' is scheduled for upload") 577 get_process_job_task(job.id) 578 579 await asyncio.sleep(120)