job_render.py (18977B)
1 #!/usr/bin/python3 2 3 import asyncio 4 import datetime 5 import enum 6 import itertools 7 import pathlib 8 import re 9 import traceback 10 from typing import Self 11 12 import gofile.api # type: ignore 13 import httpx 14 import microdot # type: ignore 15 import msgspec 16 import qbittorrentapi # type: ignore 17 import stamina 18 import torf # type: ignore 19 20 from .config import WebDavConfig, config_ctx 21 from .database import database_ctx 22 23 app = microdot.Microdot() 24 25 background_tasks = set() 26 render_tasks: dict[str, asyncio.Task] = {} 27 upload_tasks: dict[pathlib.Path, asyncio.Task] = {} 28 29 # https://www.japanesewithanime.com/2017/05/quotation-marks-japanese.html#sumitsukikakko 30 # strip extraneous spaces and hashtag 31 _pattern_bracketed_kw = re.compile(r"^【[\s#]*(?P<topic>.*?)[\s]?】") 32 33 # bae prefers the double angle brackets because she's quirky like that 34 _pattern_angled_bracketed_kw = re.compile(r"^≪[\s#]*(?P<topic>.*?)[\s]?≫") 35 36 37 class JobUploadCriteria(enum.StrEnum): 38 MANUALLY = "manual" 39 WHEN_FINISHED = "finished" 40 IF_PRIVATE = "private" 41 IF_CUT = "cut" 42 43 @property 44 def automatic(self) -> bool: 45 return self != JobUploadCriteria.MANUALLY 46 47 48 class MoomboxDownloadStatus(enum.StrEnum): 49 UNKNOWN = "Unknown" 50 UNAVAILABLE = "Unavailable" 51 WAITING = "Waiting" 52 DOWNLOADING = "Downloading" 53 MUXING = "Muxing" 54 FINISHED = "Finished" 55 ERROR = "Error" 56 57 58 class MoomboxJobInfo(msgspec.Struct, kw_only=True, frozen=True): 59 id: str 60 title: str | None 61 author: str | None 62 channel_id: str | None 63 video_id: str | None 64 manifest_progress: dict 65 status: MoomboxDownloadStatus = MoomboxDownloadStatus.UNKNOWN 66 output_paths: set[str] | None = msgspec.field(default_factory=set) 67 scheduled_start_datetime: datetime.datetime | None = None 68 69 @property 70 def uploadability_state(self) -> JobUploadCriteria | None: 71 # returns a JobUploadCriteria that can be tested for equality to run 72 if self.status != MoomboxDownloadStatus.FINISHED: 73 return None 74 if not self.files_available: 75 return None 76 if not self.is_configured_for_upload: 77 return None 78 return JobUploadCriteria.WHEN_FINISHED 79 80 @property 81 def is_configured_for_upload(self) -> bool: 82 if not self.channel_id: 83 return False 84 config = config_ctx.get() 85 channel_config = config.get_channel_config_by_id(self.channel_id) 86 if not channel_config: 87 return False 88 return True 89 90 @property 91 def files_available(self) -> bool: 92 return bool(self.output_pathobjs) and all(p.exists() for p in self.output_pathobjs) 93 94 @property 95 def output_pathobjs(self) -> set[pathlib.Path]: 96 return set(map(pathlib.Path, self.output_paths or [])) 97 98 99 def _extract_name_from_stream_title(title: str) -> str | None: 100 """ 101 Attempts to extract a preset name from the stream title. 102 """ 103 match = _pattern_bracketed_kw.search(title) or _pattern_angled_bracketed_kw.search(title) 104 if match: 105 return str(match["topic"]).strip() 106 return None 107 108 109 class JobConfig(msgspec.Struct): 110 name: str 111 upload: JobUploadCriteria 112 author_note: str | None = None 113 114 @classmethod 115 def from_moombox_job(cls, job: MoomboxJobInfo) -> Self: 116 # generate a default config from the job 117 name = "Unspecified Stream" 118 upload = JobUploadCriteria.MANUALLY 119 120 if not job.title: 121 return cls(name, upload) 122 123 name = _extract_name_from_stream_title(job.title) or name 124 125 # mapping of phrases and any possible synonyms 126 # if a synonym isn't present then an empty set should be present 127 phrases = { 128 "unarchived": {"no archive", "unarchive"}, 129 "karaoke": {"rock n' rawr", "歌枠", "sing"}, 130 "rebroadcast": {"kara-rewind"}, 131 } 132 133 phrase_set = set(phrases.keys()) | set(itertools.chain.from_iterable(phrases.values())) 134 # not sure why mypy complains about job.title being possibly None here if left bare... 135 matched_phrases = set( 136 filter(lambda term: term in (job.title.lower() if job.title else ""), phrase_set) 137 ) 138 139 # dedupe synonymous terms 140 for phrase, syn_phrases in phrases.items(): 141 if syn_phrases and matched_phrases & syn_phrases: 142 matched_phrases = (matched_phrases - syn_phrases) | {phrase} 143 144 if matched_phrases == {"unarchived"}: 145 name = "Unarchived Stream" 146 upload = JobUploadCriteria.WHEN_FINISHED 147 elif matched_phrases == {"karaoke"}: 148 name = "Karaoke" 149 upload = JobUploadCriteria.IF_CUT 150 elif matched_phrases == {"unarchived", "karaoke"}: 151 name = "Unarchived Karaoke" 152 upload = JobUploadCriteria.WHEN_FINISHED 153 elif matched_phrases == {"unarchived", "karaoke", "rebroadcast"}: 154 name = "Unarchived Karaoke (rebroadcast)" 155 upload = JobUploadCriteria.MANUALLY 156 157 # cannot auto-upload if no channel configuration exists 158 if not job.is_configured_for_upload: 159 upload = JobUploadCriteria.MANUALLY 160 161 return cls(name, upload) 162 163 164 def stream_display_date(dt: datetime.datetime) -> datetime.date: 165 """ 166 Given the stream start time, returns a display date. 167 168 If the stream start time is within the last 10 minutes of the day, we treat the next day 169 as the stream date instead based on the assumption that the streamer has a waiting screen 170 set up. 171 """ 172 end_of_day = (23, 50) 173 return dt.date() + datetime.timedelta(days=1 if (dt.hour, dt.minute) >= end_of_day else 0) 174 175 176 async def get_moombox_job_by_id(jobid: str) -> MoomboxJobInfo | None: 177 config = config_ctx.get() 178 async with httpx.AsyncClient() as client: 179 for attempt in stamina.retry_context( 180 on=httpx.HTTPError, attempts=None, timeout=None, wait_initial=0.5, wait_max=10.0 181 ): 182 with attempt: 183 result = await client.get(f"{config.moombox_url}/status") 184 for job in result.json(): 185 if job["id"] == jobid: 186 return msgspec.convert(job, type=MoomboxJobInfo) 187 return None 188 189 190 async def gofile_upload_and_persist( 191 cursor, upload_file: pathlib.Path, job_id: str, token: str, folder_id: str 192 ): 193 with upload_file.open("rb") as upload_filereader: 194 await gofile.api.upload_single(upload_filereader, token, folder_id) 195 database = database_ctx.get() 196 cursor = database.cursor() 197 cursor.execute( 198 "INSERT INTO gofile_files VALUES (?, ?)", 199 (job_id, str(upload_file)), 200 ) 201 database.commit() 202 203 204 async def do_gofile_upload(job: MoomboxJobInfo): 205 """ 206 Prepares a guest upload for the specific job's output files, then returns the folder URL 207 while the uploads are running in the background. 208 209 This operation is idempotent; if it is interrupted, running this function will only upload 210 files that weren't already. 211 """ 212 database = database_ctx.get() 213 214 known_files = set() 215 for known_file, *_ in database.execute( 216 "SELECT filepath FROM gofile_files WHERE job_id = ?", (job.id,) 217 ): 218 known_files.add(pathlib.Path(known_file)) 219 220 pending_files = set(map(pathlib.Path, job.output_paths or [])) - known_files 221 222 token = None 223 folder_id = None 224 gofile_url = None 225 226 cursor = database.cursor() 227 cursor.execute( 228 "SELECT token, folder_id, url FROM gofile_tokens WHERE job_id = ?", (job.id,) 229 ) 230 job_token_info = cursor.fetchone() 231 if job_token_info: 232 token, folder_id, gofile_url = job_token_info 233 else: 234 # get the smallest file to quickly get the upload result 235 pending_file = min(pending_files, key=lambda p: p.stat().st_size) 236 if pending_file in upload_tasks: 237 # another task is uploading the same file; just block until that one is available 238 upload = await upload_tasks[pending_file] 239 token, folder_id, gofile_url = ( 240 upload.result.guest_token, 241 upload.result.parent_folder, 242 upload.result.download_page, 243 ) 244 else: 245 with pending_file.open("rb") as fo: 246 upload_tasks[pending_file] = gofile.api.upload_single(fo) 247 upload = await upload_tasks[pending_file] 248 token, folder_id, gofile_url = ( 249 upload.result.guest_token, 250 upload.result.parent_folder, 251 upload.result.download_page, 252 ) 253 cursor.execute( 254 "INSERT INTO gofile_tokens VALUES (?, ?, ?, ?)", 255 (job.id, token, folder_id, gofile_url), 256 ) 257 cursor.execute( 258 "INSERT INTO gofile_files VALUES (?, ?)", 259 (job.id, str(pending_file)), 260 ) 261 database.commit() 262 pending_files.discard(pending_file) 263 264 # upload any remaining files in the background 265 for pending_file in pending_files: 266 # don't double-dip and upload an in-progress file 267 if pending_file in upload_tasks: 268 continue 269 upload_tasks[pending_file] = asyncio.create_task( 270 gofile_upload_and_persist(cursor, pending_file, job.id, token, folder_id) 271 ) 272 return gofile_url 273 274 275 async def do_webdav_upload(webdav: WebDavConfig, filepath: pathlib.Path, target: str): 276 auth = httpx.BasicAuth(username=webdav.username, password=webdav.password) 277 async with httpx.AsyncClient(auth=auth) as client: 278 dest = f"{webdav.base_url}/{target}" 279 file_check = await client.head(dest) 280 if file_check.status_code == httpx.codes.NOT_FOUND: 281 with filepath.open("rb") as fh: 282 await client.put(dest, content=fh.read()) 283 284 285 async def _process_job(jobid): 286 job = await get_moombox_job_by_id(jobid) 287 if not job: 288 return "Couldn't find matching job", 404 289 290 # TODO fill in necessary fields in template, etc. 291 job_config = _get_saved_jobconf(job.id) or JobConfig.from_moombox_job(job) 292 293 config = config_ctx.get() 294 295 channel = config.get_channel_config_by_id(job.channel_id) 296 stream_time = job.scheduled_start_datetime or datetime.datetime.now(tz=datetime.UTC) 297 298 torrent_output_dir = pathlib.Path("torrents") 299 torrent_output_dir.mkdir(exist_ok=True) 300 301 display_date = stream_display_date(stream_time) 302 display_date_str = display_date.strftime("%Y%m%d") 303 torrent_name = f"[{display_date_str}] {job_config.name} ({job.video_id})" 304 305 folder_name = None 306 if stream_time: 307 folder_name = stream_time.strftime("%Y-%m-%dT%H:%MZ") 308 309 readme_finalized = False 310 render_kwargs = {} 311 if job.output_paths: 312 torrent_file = torrent_output_dir / f"{job.id} ({job.video_id}).torrent" 313 source_files = list(map(pathlib.Path, job.output_paths)) 314 315 save_path = str(source_files[0].parent) 316 317 if not torrent_file.exists(): 318 t = torf.Torrent( 319 trackers=config.torrent.trackers, 320 ) 321 322 # TODO extract key phrases from title / description 323 324 t.filepaths = source_files 325 # note that the name also sets the subdirectory for the files 326 # as such, using other names for the same files *will* change the infohash 327 t.name = torrent_name 328 success = await asyncio.to_thread(t.generate) 329 if not success: 330 raise RuntimeError("Failed to generate torrent") 331 t.write(torrent_file) 332 t.write(torrent_output_dir / f"{torrent_name}.torrent") 333 334 # send torrent to qbittorrent for seeding 335 if config.qbittorrent: 336 if config.qbittorrent.default_save_path: 337 save_path = config.qbittorrent.default_save_path 338 qbt_client = qbittorrentapi.Client( 339 host=config.qbittorrent.host, 340 username=config.qbittorrent.username, 341 password=config.qbittorrent.password, 342 VERIFY_WEBUI_CERTIFICATE=False, 343 RAISE_NOTIMPLEMENTEDERROR_FOR_UNIMPLEMENTED_API_ENDPOINTS=True, 344 ) 345 task = asyncio.create_task( 346 asyncio.to_thread( 347 qbt_client.torrents_add, 348 torrent_files=torrent_file, 349 save_path=save_path, 350 content_layout="NoSubfolder", 351 is_paused=config.qbittorrent.start_paused, 352 ) 353 ) 354 background_tasks.add(task) 355 task.add_done_callback(background_tasks.discard) 356 else: 357 t = torf.Torrent.read(torrent_file) 358 print(t) 359 print(t.magnet(size=False)) 360 print(t.files) 361 362 # punt file to webdav remote 363 if config.webdav and channel and channel.webdav_path: 364 target_base = f"{channel.webdav_path}/{folder_name}" 365 task = asyncio.create_task( 366 do_webdav_upload( 367 config.webdav, torrent_file, f"{target_base}/{torrent_name}.torrent" 368 ) 369 ) 370 background_tasks.add(task) 371 task.add_done_callback(background_tasks.discard) 372 373 render_kwargs["magnet_url"] = t.magnet(size=False) 374 375 # punt files to gofile 376 render_kwargs["gofile_url"] = await do_gofile_upload(job) 377 readme_finalized = True 378 rendered_job = await microdot.jinja.Template("job.md").render_async( 379 job=job, 380 author_override=channel.name if channel else None, 381 stream_time=stream_time.strftime("%Y-%m-%dT%H:%M:%SZ") if stream_time else "(unknown)", 382 config=job_config, 383 **render_kwargs, 384 ) 385 if readme_finalized and config.webdav and channel and channel.webdav_path: 386 rendered_job_file = torrent_output_dir / f"{job.id} ({job.video_id}).md" 387 rendered_job_file.write_text(rendered_job) 388 target_base = f"{channel.webdav_path}/{folder_name}" 389 task = asyncio.create_task( 390 do_webdav_upload(config.webdav, rendered_job_file, f"{target_base}/REPORT.md") 391 ) 392 background_tasks.add(task) 393 task.add_done_callback(background_tasks.discard) 394 395 database = database_ctx.get() 396 cursor = database.cursor() 397 cursor.execute( 398 "INSERT INTO job_status VALUES (?, ?) ON CONFLICT(job_id) DO UPDATE SET completed=excluded.completed", 399 (jobid, True), 400 ) 401 database.commit() 402 if not readme_finalized and jobid in render_tasks: 403 # allow rerun if not finalized 404 del render_tasks[jobid] 405 return rendered_job 406 407 408 def get_process_job_task(jobid: str) -> asyncio.Task: 409 """ 410 Creates or retrieves a singleton job rendering task. 411 This is decoupled from the request callback to allow cancellations and multiple consumers. 412 """ 413 if jobid in render_tasks and render_tasks[jobid].done() and render_tasks[jobid].exception(): 414 del render_tasks[jobid] 415 if jobid not in render_tasks: 416 render_tasks[jobid] = asyncio.create_task(_process_job(jobid)) 417 return render_tasks[jobid] 418 419 420 def _get_saved_jobconf(jobid: str) -> JobConfig | None: 421 database = database_ctx.get() 422 cursor = database.cursor() 423 cursor.execute("SELECT config FROM job_config WHERE job_id = ?", (jobid,)) 424 result = cursor.fetchone() 425 if result: 426 job_config_payload, *_ = result 427 return msgspec.json.decode(job_config_payload, type=JobConfig) 428 return None 429 430 431 @app.get("/<jobid>/config") 432 async def show_job_editor(request, jobid: str): 433 job = await get_moombox_job_by_id(jobid) 434 if not job: 435 return "Couldn't find matching job", 404 436 437 job_config = _get_saved_jobconf(job.id) or JobConfig.from_moombox_job(job) 438 439 return ( 440 await microdot.jinja.Template("edit.html").render_async(job=job, config=job_config), 441 { 442 "Content-Type": "text/html; charset=utf-8", 443 }, 444 ) 445 446 447 @app.post("/<jobid>/config") 448 async def apply_job_config(request, jobid: str): 449 job = await get_moombox_job_by_id(jobid) 450 if not job: 451 return "Couldn't find matching job", 404 452 jobconf = msgspec.convert( 453 {item: request.form[item] for item in request.form}, type=JobConfig 454 ) 455 database = database_ctx.get() 456 457 cursor = database.cursor() 458 cursor.execute( 459 "INSERT INTO job_config VALUES (?, ?) ON CONFLICT(job_id) DO UPDATE SET config=excluded.config", 460 (jobid, msgspec.json.encode(jobconf)), 461 ) 462 database.commit() 463 464 return ( 465 await microdot.jinja.Template("edit.html").render_async(job=job, config=jobconf), 466 { 467 "Content-Type": "text/html; charset=utf-8", 468 }, 469 ) 470 471 472 @app.get("/<jobid>") 473 async def show_job(request, jobid: str): 474 return await get_process_job_task(jobid) 475 476 477 @app.post("/<jobid>") 478 async def upload_job(request, jobid: str): 479 try: 480 await get_process_job_task(jobid) 481 return await microdot.jinja.Template("job/upload_success.html").render_async() 482 except torf.TorfError: 483 traceback.print_exc() 484 return await microdot.jinja.Template("job/upload_error.html").render_async() 485 486 487 async def job_auto_monitor(): 488 while True: 489 config = config_ctx.get() 490 491 async with httpx.AsyncClient() as client: 492 for attempt in stamina.retry_context( 493 on=httpx.HTTPError, attempts=None, timeout=None, wait_initial=0.5, wait_max=10.0 494 ): 495 with attempt: 496 result = await client.get(f"{config.moombox_url}/status") 497 498 jobs = [] 499 for serialized_job in result.json(): 500 try: 501 jobs.append(msgspec.convert(serialized_job, type=MoomboxJobInfo)) 502 except msgspec.ValidationError: 503 pass 504 505 database = database_ctx.get() 506 cursor = database.cursor() 507 configs = { 508 job_id: msgspec.json.decode(job_conf, type=JobConfig) 509 for job_id, job_conf in cursor.execute("SELECT job_id, config FROM job_config") 510 } 511 statuses = { 512 job_id: bool(completed) 513 for job_id, completed in cursor.execute("SELECT job_id, completed FROM job_status") 514 } 515 516 for job in reversed(jobs): 517 if not job.uploadability_state: 518 continue 519 job_conf = configs.get(job.id) 520 if not job_conf: 521 job_conf = JobConfig.from_moombox_job(job) 522 if job.uploadability_state != job_conf.upload: 523 continue 524 if not config.autoupload.active: 525 print(f"'{job.title}' meets conditions for upload") 526 continue 527 if statuses.get(job.id): 528 continue 529 print(f"'{job.title}' is scheduled for upload") 530 get_process_job_task(job.id) 531 532 await asyncio.sleep(120)