job_render.py (19051B)
1 #!/usr/bin/python3 2 3 import asyncio 4 import datetime 5 import enum 6 import itertools 7 import pathlib 8 import re 9 import traceback 10 from typing import Self 11 12 import gofile.api # type: ignore 13 import httpx 14 import microdot # type: ignore 15 import msgspec 16 import qbittorrentapi # type: ignore 17 import stamina 18 import torf # type: ignore 19 20 from .config import WebDavConfig, config_ctx 21 from .database import database_ctx 22 23 app = microdot.Microdot() 24 25 background_tasks = set() 26 render_tasks: dict[str, asyncio.Task] = {} 27 upload_tasks: dict[pathlib.Path, asyncio.Task] = {} 28 29 # https://www.japanesewithanime.com/2017/05/quotation-marks-japanese.html#sumitsukikakko 30 # strip extraneous spaces and hashtag 31 _pattern_bracketed_kw = re.compile(r"^【[\s#]*(?P<topic>.*?)[\s]?】") 32 33 # bae prefers the double angle brackets because she's quirky like that 34 _pattern_angled_bracketed_kw = re.compile(r"^≪[\s#]*(?P<topic>.*?)[\s]?≫") 35 36 37 class JobUploadCriteria(enum.StrEnum): 38 MANUALLY = "manual" 39 WHEN_FINISHED = "finished" 40 IF_PRIVATE = "private" 41 IF_CUT = "cut" 42 43 @property 44 def automatic(self) -> bool: 45 return self != JobUploadCriteria.MANUALLY 46 47 48 class MoomboxDownloadStatus(enum.StrEnum): 49 UNKNOWN = "Unknown" 50 UNAVAILABLE = "Unavailable" 51 WAITING = "Waiting" 52 DOWNLOADING = "Downloading" 53 MUXING = "Muxing" 54 FINISHED = "Finished" 55 ERROR = "Error" 56 57 58 class MoomboxJobInfo(msgspec.Struct, kw_only=True, frozen=True): 59 id: str 60 title: str | None 61 author: str | None 62 channel_id: str | None 63 video_id: str | None 64 manifest_progress: dict 65 status: MoomboxDownloadStatus = MoomboxDownloadStatus.UNKNOWN 66 output_paths: set[str] | None = msgspec.field(default_factory=set) 67 scheduled_start_datetime: datetime.datetime | None = None 68 69 @property 70 def uploadability_state(self) -> JobUploadCriteria | None: 71 # returns a JobUploadCriteria that can be tested for equality to run 72 if self.status != MoomboxDownloadStatus.FINISHED: 73 return None 74 if not self.files_available: 75 return None 76 if not self.is_configured_for_upload: 77 return None 78 return JobUploadCriteria.WHEN_FINISHED 79 80 @property 81 def is_configured_for_upload(self) -> bool: 82 if not self.channel_id: 83 return False 84 config = config_ctx.get() 85 channel_config = config.get_channel_config_by_id(self.channel_id) 86 if not channel_config: 87 return False 88 return True 89 90 @property 91 def files_available(self) -> bool: 92 return bool(self.output_pathobjs) and all(p.exists() for p in self.output_pathobjs) 93 94 @property 95 def output_pathobjs(self) -> set[pathlib.Path]: 96 return set(map(pathlib.Path, self.output_paths or [])) 97 98 99 def _extract_name_from_stream_title(title: str) -> str | None: 100 """ 101 Attempts to extract a preset name from the stream title. 102 """ 103 match = _pattern_bracketed_kw.search(title) or _pattern_angled_bracketed_kw.search(title) 104 if match: 105 return str(match["topic"]).strip() 106 return None 107 108 109 class JobConfig(msgspec.Struct): 110 name: str 111 upload: JobUploadCriteria 112 author_note: str | None = None 113 114 @classmethod 115 def from_moombox_job(cls, job: MoomboxJobInfo) -> Self: 116 # generate a default config from the job 117 name = "Unspecified Stream" 118 upload = JobUploadCriteria.MANUALLY 119 120 if not job.title: 121 return cls(name, upload) 122 123 name = _extract_name_from_stream_title(job.title) or name 124 125 # mapping of phrases and any possible synonyms 126 # if a synonym isn't present then an empty set should be present 127 phrases = { 128 "unarchived": {"no archive", "unarchive"}, 129 "karaoke": {"rock n' rawr", "歌枠", "sing"}, 130 "rebroadcast": {"kara-rewind"}, 131 } 132 133 phrase_set = set(phrases.keys()) | set(itertools.chain.from_iterable(phrases.values())) 134 # not sure why mypy complains about job.title being possibly None here if left bare... 135 matched_phrases = set( 136 filter(lambda term: term in (job.title.lower() if job.title else ""), phrase_set) 137 ) 138 139 # dedupe synonymous terms 140 for phrase, syn_phrases in phrases.items(): 141 if syn_phrases and matched_phrases & syn_phrases: 142 matched_phrases = (matched_phrases - syn_phrases) | {phrase} 143 144 if matched_phrases == {"unarchived"}: 145 name = "Unarchived Stream" 146 upload = JobUploadCriteria.WHEN_FINISHED 147 elif matched_phrases == {"karaoke"}: 148 name = "Karaoke" 149 upload = JobUploadCriteria.IF_CUT 150 elif matched_phrases == {"unarchived", "karaoke"}: 151 name = "Unarchived Karaoke" 152 upload = JobUploadCriteria.WHEN_FINISHED 153 elif matched_phrases == {"unarchived", "karaoke", "rebroadcast"}: 154 name = "Unarchived Karaoke (rebroadcast)" 155 upload = JobUploadCriteria.MANUALLY 156 157 # cannot auto-upload if no channel configuration exists 158 if not job.is_configured_for_upload: 159 upload = JobUploadCriteria.MANUALLY 160 161 return cls(name, upload) 162 163 164 def stream_display_date(dt: datetime.datetime) -> datetime.date: 165 """ 166 Given the stream start time, returns a display date. 167 168 If the stream start time is within the last 10 minutes of the day, we treat the next day 169 as the stream date instead based on the assumption that the streamer has a waiting screen 170 set up. 171 """ 172 end_of_day = (23, 50) 173 return dt.date() + datetime.timedelta(days=1 if (dt.hour, dt.minute) >= end_of_day else 0) 174 175 176 async def get_moombox_job_by_id(jobid: str) -> MoomboxJobInfo | None: 177 config = config_ctx.get() 178 async with httpx.AsyncClient() as client: 179 for attempt in stamina.retry_context( 180 on=httpx.HTTPError, attempts=None, timeout=None, wait_initial=0.5, wait_max=10.0 181 ): 182 with attempt: 183 result = await client.get(f"{config.moombox_url}/status") 184 for job in result.json(): 185 if job["id"] == jobid: 186 return msgspec.convert(job, type=MoomboxJobInfo) 187 return None 188 189 190 async def gofile_upload_and_persist( 191 cursor, upload_file: pathlib.Path, job_id: str, token: str, folder_id: str 192 ): 193 with upload_file.open("rb") as upload_filereader: 194 await gofile.api.upload_single(upload_filereader, token, folder_id) 195 database = database_ctx.get() 196 cursor = database.cursor() 197 cursor.execute( 198 "INSERT INTO gofile_files VALUES (?, ?)", 199 (job_id, str(upload_file)), 200 ) 201 database.commit() 202 203 204 async def do_gofile_upload(job: MoomboxJobInfo): 205 """ 206 Prepares a guest upload for the specific job's output files, then returns the folder URL 207 while the uploads are running in the background. 208 209 This operation is idempotent; if it is interrupted, running this function will only upload 210 files that weren't already. 211 """ 212 database = database_ctx.get() 213 214 known_files = set() 215 for known_file, *_ in database.execute( 216 "SELECT filepath FROM gofile_files WHERE job_id = ?", (job.id,) 217 ): 218 known_files.add(pathlib.Path(known_file)) 219 220 pending_files = set(map(pathlib.Path, job.output_paths or [])) - known_files 221 222 token = None 223 folder_id = None 224 gofile_url = None 225 226 cursor = database.cursor() 227 cursor.execute( 228 "SELECT token, folder_id, url FROM gofile_tokens WHERE job_id = ?", (job.id,) 229 ) 230 job_token_info = cursor.fetchone() 231 if job_token_info: 232 token, folder_id, gofile_url = job_token_info 233 else: 234 # get the smallest file to quickly get the upload result 235 pending_file = min(pending_files, key=lambda p: p.stat().st_size) 236 if pending_file in upload_tasks: 237 # another task is uploading the same file; just block until that one is available 238 upload = await upload_tasks[pending_file] 239 token, folder_id, gofile_url = ( 240 upload.result.guest_token, 241 upload.result.parent_folder, 242 upload.result.download_page, 243 ) 244 else: 245 with pending_file.open("rb") as fo: 246 upload_tasks[pending_file] = gofile.api.upload_single(fo) 247 upload = await upload_tasks[pending_file] 248 token, folder_id, gofile_url = ( 249 upload.result.guest_token, 250 upload.result.parent_folder, 251 upload.result.download_page, 252 ) 253 cursor.execute( 254 "INSERT INTO gofile_tokens VALUES (?, ?, ?, ?)", 255 (job.id, token, folder_id, gofile_url), 256 ) 257 cursor.execute( 258 "INSERT INTO gofile_files VALUES (?, ?)", 259 (job.id, str(pending_file)), 260 ) 261 database.commit() 262 pending_files.discard(pending_file) 263 264 # upload any remaining files in the background 265 for pending_file in pending_files: 266 # don't double-dip and upload an in-progress file 267 if pending_file in upload_tasks: 268 continue 269 upload_tasks[pending_file] = asyncio.create_task( 270 gofile_upload_and_persist(cursor, pending_file, job.id, token, folder_id) 271 ) 272 return gofile_url 273 274 275 async def do_webdav_upload(webdav: WebDavConfig, filepath: pathlib.Path, target: str): 276 auth = httpx.BasicAuth(username=webdav.username, password=webdav.password) 277 async with httpx.AsyncClient(auth=auth) as client: 278 dest = f"{webdav.base_url}/{target}" 279 file_check = await client.head(dest) 280 if file_check.status_code == httpx.codes.NOT_FOUND: 281 with filepath.open("rb") as fh: 282 await client.put(dest, content=fh.read()) 283 284 285 async def _process_job(jobid): 286 job = await get_moombox_job_by_id(jobid) 287 if not job: 288 return "Couldn't find matching job", 404 289 290 # TODO fill in necessary fields in template, etc. 291 job_config = _get_saved_jobconf(job.id) or JobConfig.from_moombox_job(job) 292 293 config = config_ctx.get() 294 295 channel = config.get_channel_config_by_id(job.channel_id) 296 stream_time = job.scheduled_start_datetime or datetime.datetime.now(tz=datetime.UTC) 297 298 torrent_output_dir = pathlib.Path("torrents") 299 torrent_output_dir.mkdir(exist_ok=True) 300 301 display_date = stream_display_date(stream_time) 302 display_date_str = display_date.strftime("%Y%m%d") 303 torrent_name = f"[{display_date_str}] {job_config.name} ({job.video_id})" 304 305 folder_name = None 306 if stream_time: 307 folder_name = stream_time.strftime("%Y-%m-%dT%H:%MZ") 308 309 readme_finalized = False 310 render_kwargs = {} 311 if job.output_paths: 312 torrent_file = torrent_output_dir / f"{job.id} ({job.video_id}).torrent" 313 source_files = list(map(pathlib.Path, job.output_paths)) 314 315 if not all(f.exists() for f in source_files): 316 return 317 318 save_path = str(source_files[0].parent) 319 320 if not torrent_file.exists(): 321 t = torf.Torrent( 322 trackers=config.torrent.trackers, 323 ) 324 325 # TODO extract key phrases from title / description 326 327 t.filepaths = source_files 328 # note that the name also sets the subdirectory for the files 329 # as such, using other names for the same files *will* change the infohash 330 t.name = torrent_name 331 success = await asyncio.to_thread(t.generate) 332 if not success: 333 raise RuntimeError("Failed to generate torrent") 334 t.write(torrent_file) 335 t.write(torrent_output_dir / f"{torrent_name}.torrent") 336 337 # send torrent to qbittorrent for seeding 338 if config.qbittorrent: 339 if config.qbittorrent.default_save_path: 340 save_path = config.qbittorrent.default_save_path 341 qbt_client = qbittorrentapi.Client( 342 host=config.qbittorrent.host, 343 username=config.qbittorrent.username, 344 password=config.qbittorrent.password, 345 VERIFY_WEBUI_CERTIFICATE=False, 346 RAISE_NOTIMPLEMENTEDERROR_FOR_UNIMPLEMENTED_API_ENDPOINTS=True, 347 ) 348 task = asyncio.create_task( 349 asyncio.to_thread( 350 qbt_client.torrents_add, 351 torrent_files=torrent_file, 352 save_path=save_path, 353 content_layout="NoSubfolder", 354 is_paused=config.qbittorrent.start_paused, 355 ) 356 ) 357 background_tasks.add(task) 358 task.add_done_callback(background_tasks.discard) 359 else: 360 t = torf.Torrent.read(torrent_file) 361 print(t) 362 print(t.magnet(size=False)) 363 print(t.files) 364 365 # punt file to webdav remote 366 if config.webdav and channel and channel.webdav_path: 367 target_base = f"{channel.webdav_path}/{folder_name}" 368 task = asyncio.create_task( 369 do_webdav_upload( 370 config.webdav, torrent_file, f"{target_base}/{torrent_name}.torrent" 371 ) 372 ) 373 background_tasks.add(task) 374 task.add_done_callback(background_tasks.discard) 375 376 render_kwargs["magnet_url"] = t.magnet(size=False) 377 378 # punt files to gofile 379 render_kwargs["gofile_url"] = await do_gofile_upload(job) 380 readme_finalized = True 381 rendered_job = await microdot.jinja.Template("job.md").render_async( 382 job=job, 383 author_override=channel.name if channel else None, 384 stream_time=stream_time.strftime("%Y-%m-%dT%H:%M:%SZ") if stream_time else "(unknown)", 385 config=job_config, 386 **render_kwargs, 387 ) 388 if readme_finalized and config.webdav and channel and channel.webdav_path: 389 rendered_job_file = torrent_output_dir / f"{job.id} ({job.video_id}).md" 390 rendered_job_file.write_text(rendered_job) 391 target_base = f"{channel.webdav_path}/{folder_name}" 392 task = asyncio.create_task( 393 do_webdav_upload(config.webdav, rendered_job_file, f"{target_base}/REPORT.md") 394 ) 395 background_tasks.add(task) 396 task.add_done_callback(background_tasks.discard) 397 398 database = database_ctx.get() 399 cursor = database.cursor() 400 cursor.execute( 401 "INSERT INTO job_status VALUES (?, ?) ON CONFLICT(job_id) DO UPDATE SET completed=excluded.completed", 402 (jobid, True), 403 ) 404 database.commit() 405 if not readme_finalized and jobid in render_tasks: 406 # allow rerun if not finalized 407 del render_tasks[jobid] 408 return rendered_job 409 410 411 def get_process_job_task(jobid: str) -> asyncio.Task: 412 """ 413 Creates or retrieves a singleton job rendering task. 414 This is decoupled from the request callback to allow cancellations and multiple consumers. 415 """ 416 if jobid in render_tasks and render_tasks[jobid].done() and render_tasks[jobid].exception(): 417 del render_tasks[jobid] 418 if jobid not in render_tasks: 419 render_tasks[jobid] = asyncio.create_task(_process_job(jobid)) 420 return render_tasks[jobid] 421 422 423 def _get_saved_jobconf(jobid: str) -> JobConfig | None: 424 database = database_ctx.get() 425 cursor = database.cursor() 426 cursor.execute("SELECT config FROM job_config WHERE job_id = ?", (jobid,)) 427 result = cursor.fetchone() 428 if result: 429 job_config_payload, *_ = result 430 return msgspec.json.decode(job_config_payload, type=JobConfig) 431 return None 432 433 434 @app.get("/<jobid>/config") 435 async def show_job_editor(request, jobid: str): 436 job = await get_moombox_job_by_id(jobid) 437 if not job: 438 return "Couldn't find matching job", 404 439 440 job_config = _get_saved_jobconf(job.id) or JobConfig.from_moombox_job(job) 441 442 return ( 443 await microdot.jinja.Template("edit.html").render_async(job=job, config=job_config), 444 { 445 "Content-Type": "text/html; charset=utf-8", 446 }, 447 ) 448 449 450 @app.post("/<jobid>/config") 451 async def apply_job_config(request, jobid: str): 452 job = await get_moombox_job_by_id(jobid) 453 if not job: 454 return "Couldn't find matching job", 404 455 jobconf = msgspec.convert( 456 {item: request.form[item] for item in request.form}, type=JobConfig 457 ) 458 database = database_ctx.get() 459 460 cursor = database.cursor() 461 cursor.execute( 462 "INSERT INTO job_config VALUES (?, ?) ON CONFLICT(job_id) DO UPDATE SET config=excluded.config", 463 (jobid, msgspec.json.encode(jobconf)), 464 ) 465 database.commit() 466 467 return ( 468 await microdot.jinja.Template("edit.html").render_async(job=job, config=jobconf), 469 { 470 "Content-Type": "text/html; charset=utf-8", 471 }, 472 ) 473 474 475 @app.get("/<jobid>") 476 async def show_job(request, jobid: str): 477 return await get_process_job_task(jobid) 478 479 480 @app.post("/<jobid>") 481 async def upload_job(request, jobid: str): 482 try: 483 await get_process_job_task(jobid) 484 return await microdot.jinja.Template("job/upload_success.html").render_async() 485 except torf.TorfError: 486 traceback.print_exc() 487 return await microdot.jinja.Template("job/upload_error.html").render_async() 488 489 490 async def job_auto_monitor(): 491 while True: 492 config = config_ctx.get() 493 494 async with httpx.AsyncClient() as client: 495 for attempt in stamina.retry_context( 496 on=httpx.HTTPError, attempts=None, timeout=None, wait_initial=0.5, wait_max=10.0 497 ): 498 with attempt: 499 result = await client.get(f"{config.moombox_url}/status") 500 501 jobs = [] 502 for serialized_job in result.json(): 503 try: 504 jobs.append(msgspec.convert(serialized_job, type=MoomboxJobInfo)) 505 except msgspec.ValidationError: 506 pass 507 508 database = database_ctx.get() 509 cursor = database.cursor() 510 configs = { 511 job_id: msgspec.json.decode(job_conf, type=JobConfig) 512 for job_id, job_conf in cursor.execute("SELECT job_id, config FROM job_config") 513 } 514 statuses = { 515 job_id: bool(completed) 516 for job_id, completed in cursor.execute("SELECT job_id, completed FROM job_status") 517 } 518 519 for job in reversed(jobs): 520 if not job.uploadability_state: 521 continue 522 job_conf = configs.get(job.id) 523 if not job_conf: 524 job_conf = JobConfig.from_moombox_job(job) 525 if job.uploadability_state != job_conf.upload: 526 continue 527 if not config.autoupload.active: 528 print(f"'{job.title}' meets conditions for upload") 529 continue 530 if statuses.get(job.id): 531 continue 532 print(f"'{job.title}' is scheduled for upload") 533 get_process_job_task(job.id) 534 535 await asyncio.sleep(120)