job_render.py (19536B)
1 #!/usr/bin/python3 2 3 import asyncio 4 import datetime 5 import enum 6 import itertools 7 import logging 8 import pathlib 9 import re 10 from typing import Self 11 12 import gofile.api # type: ignore 13 import httpx 14 import microdot # type: ignore 15 import msgspec 16 import qbittorrentapi # type: ignore 17 import stamina 18 import torf # type: ignore 19 20 from .config import WebDavConfig, config_ctx 21 from .database import database_ctx 22 23 app = microdot.Microdot() 24 25 log = logging.getLogger(__name__) 26 27 background_tasks = set() 28 render_tasks: dict[str, asyncio.Task] = {} 29 upload_tasks: dict[pathlib.Path, asyncio.Task] = {} 30 31 # https://www.japanesewithanime.com/2017/05/quotation-marks-japanese.html#sumitsukikakko 32 # strip extraneous spaces and hashtag 33 _pattern_bracketed_kw = re.compile(r"^【[\s#]*(?P<topic>.*?)[\s]?】") 34 35 # bae prefers the double angle brackets because she's quirky like that 36 _pattern_angled_bracketed_kw = re.compile(r"^≪[\s#]*(?P<topic>.*?)[\s]?≫") 37 38 39 class JobUploadCriteria(enum.StrEnum): 40 MANUALLY = "manual" 41 WHEN_FINISHED = "finished" 42 IF_PRIVATE = "private" 43 IF_CUT = "cut" 44 45 @property 46 def automatic(self) -> bool: 47 return self != JobUploadCriteria.MANUALLY 48 49 50 class MoomboxDownloadStatus(enum.StrEnum): 51 UNKNOWN = "Unknown" 52 UNAVAILABLE = "Unavailable" 53 WAITING = "Waiting" 54 DOWNLOADING = "Downloading" 55 MUXING = "Muxing" 56 FINISHED = "Finished" 57 ERROR = "Error" 58 59 60 class MoomboxJobInfo(msgspec.Struct, kw_only=True, frozen=True): 61 id: str 62 title: str | None 63 author: str | None 64 channel_id: str | None 65 video_id: str | None 66 manifest_progress: dict 67 status: MoomboxDownloadStatus = MoomboxDownloadStatus.UNKNOWN 68 output_paths: set[str] | None = msgspec.field(default_factory=set) 69 scheduled_start_datetime: datetime.datetime | None = None 70 71 @property 72 def uploadability_state(self) -> JobUploadCriteria | None: 73 # returns a JobUploadCriteria that can be tested for equality to run 74 if self.status != MoomboxDownloadStatus.FINISHED: 75 return None 76 if not self.files_available: 77 return None 78 if not self.is_configured_for_upload: 79 return None 80 return JobUploadCriteria.WHEN_FINISHED 81 82 @property 83 def is_configured_for_upload(self) -> bool: 84 if not self.channel_id: 85 return False 86 config = config_ctx.get() 87 channel_config = config.get_channel_config_by_id(self.channel_id) 88 if not channel_config: 89 return False 90 return True 91 92 @property 93 def files_available(self) -> bool: 94 return bool(self.output_pathobjs) and all(p.exists() for p in self.output_pathobjs) 95 96 @property 97 def output_pathobjs(self) -> set[pathlib.Path]: 98 return set(map(pathlib.Path, self.output_paths or [])) 99 100 101 def _extract_name_from_stream_title(title: str) -> str | None: 102 """ 103 Attempts to extract a preset name from the stream title. 104 """ 105 match = _pattern_bracketed_kw.search(title) or _pattern_angled_bracketed_kw.search(title) 106 if match: 107 return str(match["topic"]).strip() 108 return None 109 110 111 class JobConfig(msgspec.Struct): 112 name: str 113 upload: JobUploadCriteria 114 author_note: str | None = None 115 116 @classmethod 117 def from_moombox_job(cls, job: MoomboxJobInfo) -> Self: 118 # generate a default config from the job 119 name = "Unspecified Stream" 120 upload = JobUploadCriteria.MANUALLY 121 122 if not job.title: 123 return cls(name, upload) 124 125 name = _extract_name_from_stream_title(job.title) or name 126 127 # mapping of phrases and any possible synonyms 128 # if a synonym isn't present then an empty set should be present 129 phrases = { 130 "unarchived": {"no archive", "unarchive"}, 131 "karaoke": {"rock n' rawr", "歌枠", "sing"}, 132 "rebroadcast": {"kara-rewind"}, 133 } 134 135 phrase_set = set(phrases.keys()) | set(itertools.chain.from_iterable(phrases.values())) 136 # not sure why mypy complains about job.title being possibly None here if left bare... 137 matched_phrases = set( 138 filter(lambda term: term in (job.title.lower() if job.title else ""), phrase_set) 139 ) 140 141 # dedupe synonymous terms 142 for phrase, syn_phrases in phrases.items(): 143 if syn_phrases and matched_phrases & syn_phrases: 144 matched_phrases = (matched_phrases - syn_phrases) | {phrase} 145 146 if matched_phrases == {"unarchived"}: 147 name = "Unarchived Stream" 148 upload = JobUploadCriteria.WHEN_FINISHED 149 elif matched_phrases == {"karaoke"}: 150 name = "Karaoke" 151 upload = JobUploadCriteria.IF_CUT 152 elif matched_phrases == {"unarchived", "karaoke"}: 153 name = "Unarchived Karaoke" 154 upload = JobUploadCriteria.WHEN_FINISHED 155 elif matched_phrases == {"unarchived", "karaoke", "rebroadcast"}: 156 name = "Unarchived Karaoke (rebroadcast)" 157 upload = JobUploadCriteria.MANUALLY 158 159 # cannot auto-upload if no channel configuration exists 160 if not job.is_configured_for_upload: 161 upload = JobUploadCriteria.MANUALLY 162 163 return cls(name, upload) 164 165 166 def stream_display_date(dt: datetime.datetime) -> datetime.date: 167 """ 168 Given the stream start time, returns a display date. 169 170 If the stream start time is within the last 10 minutes of the day, we treat the next day 171 as the stream date instead based on the assumption that the streamer has a waiting screen 172 set up. 173 """ 174 end_of_day = (23, 50) 175 return dt.date() + datetime.timedelta(days=1 if (dt.hour, dt.minute) >= end_of_day else 0) 176 177 178 async def get_moombox_job_by_id(jobid: str) -> MoomboxJobInfo | None: 179 config = config_ctx.get() 180 async with httpx.AsyncClient() as client: 181 for attempt in stamina.retry_context( 182 on=httpx.HTTPError, attempts=None, timeout=None, wait_initial=0.5, wait_max=10.0 183 ): 184 with attempt: 185 result = await client.get(f"{config.moombox_url}/status") 186 for job in result.json(): 187 if job["id"] == jobid: 188 return msgspec.convert(job, type=MoomboxJobInfo) 189 return None 190 191 192 async def gofile_upload_and_persist( 193 cursor, upload_file: pathlib.Path, job_id: str, token: str, folder_id: str 194 ): 195 with upload_file.open("rb") as upload_filereader: 196 await gofile.api.upload_single(upload_filereader, token, folder_id) 197 database = database_ctx.get() 198 cursor = database.cursor() 199 cursor.execute( 200 "INSERT INTO gofile_files VALUES (?, ?)", 201 (job_id, str(upload_file)), 202 ) 203 database.commit() 204 205 206 async def do_gofile_upload(job: MoomboxJobInfo): 207 """ 208 Prepares a guest upload for the specific job's output files, then returns the folder URL 209 while the uploads are running in the background. 210 211 This operation is idempotent; if it is interrupted, running this function will only upload 212 files that weren't already. 213 """ 214 database = database_ctx.get() 215 216 known_files = set() 217 for known_file, *_ in database.execute( 218 "SELECT filepath FROM gofile_files WHERE job_id = ?", (job.id,) 219 ): 220 known_files.add(pathlib.Path(known_file)) 221 222 pending_files = set(map(pathlib.Path, job.output_paths or [])) - known_files 223 224 token = None 225 folder_id = None 226 gofile_url = None 227 228 cursor = database.cursor() 229 cursor.execute( 230 "SELECT token, folder_id, url FROM gofile_tokens WHERE job_id = ?", (job.id,) 231 ) 232 job_token_info = cursor.fetchone() 233 if job_token_info: 234 token, folder_id, gofile_url = job_token_info 235 else: 236 # get the smallest file to quickly get the upload result 237 pending_file = min(pending_files, key=lambda p: p.stat().st_size) 238 if pending_file in upload_tasks: 239 # another task is uploading the same file; just block until that one is available 240 upload = await upload_tasks[pending_file] 241 token, folder_id, gofile_url = ( 242 upload.result.guest_token, 243 upload.result.parent_folder, 244 upload.result.download_page, 245 ) 246 else: 247 with pending_file.open("rb") as fo: 248 upload_tasks[pending_file] = gofile.api.upload_single(fo) 249 upload = await upload_tasks[pending_file] 250 token, folder_id, gofile_url = ( 251 upload.result.guest_token, 252 upload.result.parent_folder, 253 upload.result.download_page, 254 ) 255 cursor.execute( 256 "INSERT INTO gofile_tokens VALUES (?, ?, ?, ?)", 257 (job.id, token, folder_id, gofile_url), 258 ) 259 cursor.execute( 260 "INSERT INTO gofile_files VALUES (?, ?)", 261 (job.id, str(pending_file)), 262 ) 263 database.commit() 264 pending_files.discard(pending_file) 265 266 # upload any remaining files in the background 267 for pending_file in pending_files: 268 # don't double-dip and upload an in-progress file 269 if pending_file in upload_tasks: 270 continue 271 upload_tasks[pending_file] = asyncio.create_task( 272 gofile_upload_and_persist(cursor, pending_file, job.id, token, folder_id) 273 ) 274 return gofile_url 275 276 277 async def do_webdav_upload(webdav: WebDavConfig, filepath: pathlib.Path, target: str): 278 auth = httpx.BasicAuth(username=webdav.username, password=webdav.password) 279 async with httpx.AsyncClient(auth=auth) as client: 280 connection_warning_seen = False 281 while True: 282 try: 283 dest = f"{webdav.base_url}/{target}" 284 file_check = await client.head(dest) 285 if file_check.status_code == httpx.codes.NOT_FOUND: 286 with filepath.open("rb") as fh: 287 await client.put(dest, content=fh.read()) 288 return 289 except httpx.ConnectTimeout: 290 if not connection_warning_seen: 291 log.warning(f"Failed to connect to {webdav.base_url}. Retrying...") 292 connection_warning_seen = True 293 await asyncio.sleep(10) 294 295 296 async def _process_job(jobid): 297 job = await get_moombox_job_by_id(jobid) 298 if not job: 299 return "Couldn't find matching job", 404 300 301 # TODO fill in necessary fields in template, etc. 302 job_config = _get_saved_jobconf(job.id) or JobConfig.from_moombox_job(job) 303 304 config = config_ctx.get() 305 306 channel = config.get_channel_config_by_id(job.channel_id) 307 stream_time = job.scheduled_start_datetime or datetime.datetime.now(tz=datetime.UTC) 308 309 torrent_output_dir = pathlib.Path("torrents") 310 torrent_output_dir.mkdir(exist_ok=True) 311 312 display_date = stream_display_date(stream_time) 313 display_date_str = display_date.strftime("%Y%m%d") 314 torrent_name = f"[{display_date_str}] {job_config.name} ({job.video_id})" 315 316 folder_name = None 317 if stream_time: 318 folder_name = stream_time.strftime("%Y-%m-%dT%H:%MZ") 319 320 readme_finalized = False 321 render_kwargs = {} 322 if job.output_paths: 323 torrent_file = torrent_output_dir / f"{job.id} ({job.video_id}).torrent" 324 source_files = list(map(pathlib.Path, job.output_paths)) 325 326 if not all(f.exists() for f in source_files): 327 return 328 329 save_path = str(source_files[0].parent) 330 331 if not torrent_file.exists(): 332 t = torf.Torrent( 333 trackers=config.torrent.trackers, 334 ) 335 336 # TODO extract key phrases from title / description 337 338 t.filepaths = source_files 339 # note that the name also sets the subdirectory for the files 340 # as such, using other names for the same files *will* change the infohash 341 t.name = torrent_name 342 success = await asyncio.to_thread(t.generate) 343 if not success: 344 raise RuntimeError("Failed to generate torrent") 345 t.write(torrent_file) 346 t.write(torrent_output_dir / f"{torrent_name}.torrent") 347 348 # send torrent to qbittorrent for seeding 349 if config.qbittorrent: 350 if config.qbittorrent.default_save_path: 351 save_path = config.qbittorrent.default_save_path 352 qbt_client = qbittorrentapi.Client( 353 host=config.qbittorrent.host, 354 username=config.qbittorrent.username, 355 password=config.qbittorrent.password, 356 VERIFY_WEBUI_CERTIFICATE=False, 357 RAISE_NOTIMPLEMENTEDERROR_FOR_UNIMPLEMENTED_API_ENDPOINTS=True, 358 ) 359 task = asyncio.create_task( 360 asyncio.to_thread( 361 qbt_client.torrents_add, 362 torrent_files=torrent_file, 363 save_path=save_path, 364 content_layout="NoSubfolder", 365 is_paused=config.qbittorrent.start_paused, 366 ) 367 ) 368 background_tasks.add(task) 369 task.add_done_callback(background_tasks.discard) 370 else: 371 t = torf.Torrent.read(torrent_file) 372 log.debug(t) 373 log.debug(t.magnet(size=False)) 374 log.debug(t.files) 375 376 # punt file to webdav remote 377 if config.webdav and channel and channel.webdav_path: 378 target_base = f"{channel.webdav_path}/{folder_name}" 379 task = asyncio.create_task( 380 do_webdav_upload( 381 config.webdav, torrent_file, f"{target_base}/{torrent_name}.torrent" 382 ) 383 ) 384 background_tasks.add(task) 385 task.add_done_callback(background_tasks.discard) 386 387 render_kwargs["magnet_url"] = t.magnet(size=False) 388 389 # punt files to gofile 390 render_kwargs["gofile_url"] = await do_gofile_upload(job) 391 readme_finalized = True 392 rendered_job = await microdot.jinja.Template("job.md").render_async( 393 job=job, 394 author_override=channel.name if channel else None, 395 stream_time=stream_time.strftime("%Y-%m-%dT%H:%M:%SZ") if stream_time else "(unknown)", 396 config=job_config, 397 **render_kwargs, 398 ) 399 if readme_finalized and config.webdav and channel and channel.webdav_path: 400 rendered_job_file = torrent_output_dir / f"{job.id} ({job.video_id}).md" 401 rendered_job_file.write_text(rendered_job) 402 target_base = f"{channel.webdav_path}/{folder_name}" 403 task = asyncio.create_task( 404 do_webdav_upload(config.webdav, rendered_job_file, f"{target_base}/REPORT.md") 405 ) 406 background_tasks.add(task) 407 task.add_done_callback(background_tasks.discard) 408 409 database = database_ctx.get() 410 cursor = database.cursor() 411 cursor.execute( 412 "INSERT INTO job_status VALUES (?, ?) ON CONFLICT(job_id) DO UPDATE SET completed=excluded.completed", 413 (jobid, True), 414 ) 415 database.commit() 416 if not readme_finalized and jobid in render_tasks: 417 # allow rerun if not finalized 418 del render_tasks[jobid] 419 return rendered_job 420 421 422 def get_process_job_task(jobid: str) -> asyncio.Task: 423 """ 424 Creates or retrieves a singleton job rendering task. 425 This is decoupled from the request callback to allow cancellations and multiple consumers. 426 """ 427 if jobid in render_tasks and render_tasks[jobid].done() and render_tasks[jobid].exception(): 428 del render_tasks[jobid] 429 if jobid not in render_tasks: 430 render_tasks[jobid] = asyncio.create_task(_process_job(jobid)) 431 return render_tasks[jobid] 432 433 434 def _get_saved_jobconf(jobid: str) -> JobConfig | None: 435 database = database_ctx.get() 436 cursor = database.cursor() 437 cursor.execute("SELECT config FROM job_config WHERE job_id = ?", (jobid,)) 438 result = cursor.fetchone() 439 if result: 440 job_config_payload, *_ = result 441 return msgspec.json.decode(job_config_payload, type=JobConfig) 442 return None 443 444 445 @app.get("/<jobid>/config") 446 async def show_job_editor(request, jobid: str): 447 job = await get_moombox_job_by_id(jobid) 448 if not job: 449 return "Couldn't find matching job", 404 450 451 job_config = _get_saved_jobconf(job.id) or JobConfig.from_moombox_job(job) 452 453 return ( 454 await microdot.jinja.Template("edit.html").render_async(job=job, config=job_config), 455 { 456 "Content-Type": "text/html; charset=utf-8", 457 }, 458 ) 459 460 461 @app.post("/<jobid>/config") 462 async def apply_job_config(request, jobid: str): 463 job = await get_moombox_job_by_id(jobid) 464 if not job: 465 return "Couldn't find matching job", 404 466 jobconf = msgspec.convert( 467 {item: request.form[item] for item in request.form}, type=JobConfig 468 ) 469 database = database_ctx.get() 470 471 cursor = database.cursor() 472 cursor.execute( 473 "INSERT INTO job_config VALUES (?, ?) ON CONFLICT(job_id) DO UPDATE SET config=excluded.config", 474 (jobid, msgspec.json.encode(jobconf)), 475 ) 476 database.commit() 477 478 return ( 479 await microdot.jinja.Template("edit.html").render_async(job=job, config=jobconf), 480 { 481 "Content-Type": "text/html; charset=utf-8", 482 }, 483 ) 484 485 486 @app.get("/<jobid>") 487 async def show_job(request, jobid: str): 488 return await get_process_job_task(jobid) 489 490 491 @app.post("/<jobid>") 492 async def upload_job(request, jobid: str): 493 try: 494 await get_process_job_task(jobid) 495 return await microdot.jinja.Template("job/upload_success.html").render_async() 496 except torf.TorfError: 497 log.exception(f"Failed to upload job {jobid}") 498 return await microdot.jinja.Template("job/upload_error.html").render_async() 499 500 501 async def job_auto_monitor(): 502 while True: 503 config = config_ctx.get() 504 505 async with httpx.AsyncClient() as client: 506 for attempt in stamina.retry_context( 507 on=httpx.HTTPError, attempts=None, timeout=None, wait_initial=0.5, wait_max=10.0 508 ): 509 with attempt: 510 result = await client.get(f"{config.moombox_url}/status") 511 512 jobs = [] 513 for serialized_job in result.json(): 514 try: 515 jobs.append(msgspec.convert(serialized_job, type=MoomboxJobInfo)) 516 except msgspec.ValidationError: 517 pass 518 519 database = database_ctx.get() 520 cursor = database.cursor() 521 configs = { 522 job_id: msgspec.json.decode(job_conf, type=JobConfig) 523 for job_id, job_conf in cursor.execute("SELECT job_id, config FROM job_config") 524 } 525 statuses = { 526 job_id: bool(completed) 527 for job_id, completed in cursor.execute("SELECT job_id, completed FROM job_status") 528 } 529 530 for job in reversed(jobs): 531 if not job.uploadability_state: 532 continue 533 job_conf = configs.get(job.id) 534 if not job_conf: 535 job_conf = JobConfig.from_moombox_job(job) 536 if job.uploadability_state != job_conf.upload: 537 continue 538 if not config.autoupload.active: 539 log.info(f"'{job.title}' meets conditions for upload") 540 continue 541 if statuses.get(job.id): 542 continue 543 log.info(f"'{job.title}' is scheduled for upload") 544 get_process_job_task(job.id) 545 546 await asyncio.sleep(120)