gofile

Module and tool to upload files to gofile.io
git clone https://code.alwayswait.ing/gofile.git
Log | Files | Refs

commit 900f403fca939187b71272955b2d65847b0921be
parent 5df7b8ded155ec52e4322e585014214b4886d861
Author: archiveanon <>
Date:   Wed, 25 Dec 2024 20:24:06 +0000

Convert to async

Diffstat:
Msrc/gofile/api.py | 89++++++++++++++++++++++++++++++++++++++++++++++++++++---------------------------
1 file changed, 59 insertions(+), 30 deletions(-)

diff --git a/src/gofile/api.py b/src/gofile/api.py @@ -1,13 +1,14 @@ #!/usr/bin/python3 import argparse +import asyncio import collections import enum import os import pathlib import random import textwrap -from typing import Generic, Iterator, Optional, TypeVar +from typing import AsyncIterator, Generic, Iterator, Optional, TypeVar # normally I'd prefer using standard modules but streaming POST data is really important import httpx @@ -84,37 +85,41 @@ class GofileServerResponse(msgspec.Struct, Generic[T]): data: T -def _gofile_api_get(*args, type: T, **kwargs) -> T: +async def _gofile_api_get(*args, type: T, **kwargs) -> T: # performs a GET request and extracts the 'data' property from the response as a given type # if the status is not 'ok', an exception is raised - r = httpx.get(*args, **kwargs) - result = msgspec.json.decode(r.text, type=GofileServerResponse[type]) + async with httpx.AsyncClient() as client: + r = await client.get(*args, **kwargs) + result = msgspec.json.decode(r.text, type=GofileServerResponse[T]) - if result.status != GofileStatus.OK: - raise Exception(result) - return result.data + if result.status != GofileStatus.OK: + raise Exception(result) + return result.data -def _gofile_api_post(*args, type: T, **kwargs) -> T: +async def _gofile_api_post(*args, type: T, **kwargs) -> T: # performs a POST request and extracts the 'data' property from the response as a given type # if the status is not 'ok', an exception is raised - r = httpx.post(*args, **kwargs) - result = msgspec.json.decode(r.text, type=GofileServerResponse[type]) + async with httpx.AsyncClient() as client: + r = await client.post(*args, **kwargs) + result = msgspec.json.decode(r.text, type=GofileServerResponse[T]) - if result.status != GofileStatus.OK: - raise Exception(result) - return result.data + if result.status != GofileStatus.OK: + raise Exception(result) + return result.data -def get_upload_server() -> GofileServerResult: - server_list = _gofile_api_get("https://api.gofile.io/servers", type=GofileServerListResult) +async def get_upload_server() -> GofileServerResult: + server_list = await _gofile_api_get( + "https://api.gofile.io/servers", type=GofileServerListResult + ) if not server_list.servers: return GofileServerResult() return random.choice(server_list.servers).to_base_server() -def upload_single( +async def upload_single( file: pathlib.Path, token: Optional[str] = None, folder_id: Optional[str] = None, @@ -122,10 +127,10 @@ def upload_single( ) -> GofileUpload: # we return a GofileUpload instead of a GofileUploadResult so there's consistency between upload_single / upload_multiple if not file.exists() or not file.is_file(): - raise ValueError(f"Attempting to upload missing or non-file '{args.path}'") + raise ValueError(f"Attempting to upload missing or non-file '{file}'") if not server: - upload_server_result = get_upload_server() + upload_server_result = await get_upload_server() server = upload_server_result.server # automatically shorten long file names in small terminals (e.g. split panes) @@ -158,7 +163,7 @@ def upload_single( if folder_id: post_data["folderId"] = folder_id - upload_result = _gofile_api_post( + upload_result = await _gofile_api_post( f"https://{server}.gofile.io/contents/uploadfile", files=post_data, type=GofileUploadResult, @@ -171,16 +176,16 @@ def upload_single( return GofileUpload(file, upload_result) -def upload_multiple( +async def upload_multiple( files: Iterator[pathlib.Path], token: Optional[str] = None, folder_id: Optional[str] = None -) -> list[GofileUpload]: +) -> AsyncIterator[GofileUpload]: # returns a generator of GofileUpload instances in the same order files were given first_file, *other_files = files - upload_server_result = get_upload_server() + upload_server_result = await get_upload_server() server = upload_server_result.server - first_upload = upload_single(first_file, token, folder_id, server) + first_upload = await upload_single(first_file, token, folder_id, server) if not token: token = first_upload.result.guest_token @@ -189,12 +194,12 @@ def upload_multiple( yield first_upload - for file in other_files: - upload = upload_single(file, token, folder_id, server) - yield upload + uploads = [upload_single(file, token, folder_id, server) for file in other_files] + for upload in asyncio.as_completed(uploads): + yield await upload -def main(): +async def main(): # you may save and reuse a guest token # there's no clear indicator on whether or not it expires # there may be duplicate filenames in a folder @@ -211,6 +216,19 @@ def main(): action="store_true", help="Uploads are queued in ascending size to get a download link as fast as possible", ) + parser.add_argument( + "--upload-count-threshold", + type=int, + help="Bail if there are more than this number of files present (ensuring you don't accidentally something important)", + default=10, + ) + parser.add_argument( + "-r", + "--recursive", + action="store_true", + help="Get files from subdirectories", + default=False, + ) args = parser.parse_args() @@ -225,13 +243,24 @@ def main(): if args.fast_link: # sort by ascending size so we can produce a link as fast as possible files = sorted(files, key=lambda x: os.stat(x).st_size) + files = [f for f in files if f.name not in ("desktop.ini",)] + if not files: + print(f"Directory '{args.path}' is empty") + return + if args.upload_count_threshold and args.upload_count_threshold < len(files): + print( + f"Directory '{args.path}' has {len(files)} files; only expecting " + f"{args.upload_count_threshold}. Pass --upload-count-threshold to confirm " + "that you really did intend to upload this directory." + ) + return elif args.path.is_file(): files = [args.path] uploads = upload_multiple(files, token=args.token, folder_id=args.folder_id) # force generator to evaluate and yield the first result - first_upload = next(uploads) + first_upload = await anext(uploads) print(f"- Download URL: {first_upload.result.download_page}") print(f"- Folder ID: {first_upload.result.parent_folder}") @@ -241,7 +270,7 @@ def main(): # let the rest of the uploads process try: - for upload in uploads: + async for upload in uploads: pass except KeyboardInterrupt: # reprint the information in case we bail out @@ -252,4 +281,4 @@ def main(): if __name__ == "__main__": - main() + asyncio.run(main())