gofile

Module and tool to upload files to gofile.io
git clone https://code.alwayswait.ing/gofile
Log | Files | Refs

cli.py (4565B)


      1 #!/usr/bin/python3
      2 
      3 import argparse
      4 import asyncio
      5 import contextlib
      6 import os
      7 import pathlib
      8 import textwrap
      9 from types import ModuleType
     10 
     11 from . import api
     12 
     13 tqdm: ModuleType | None = None
     14 try:
     15     import tqdm
     16 except ImportError:
     17     pass
     18 
     19 
     20 @contextlib.contextmanager
     21 def file_upload_progress(file: pathlib.Path):
     22     if not tqdm:
     23         print(f"Uploading '{file.name}'...")
     24         yield file.open("rb")
     25         print(f"Closed '{file.name}'.")
     26         return
     27 
     28     termsize = os.get_terminal_size()
     29     short_file_name = textwrap.shorten(
     30         file.name, width=int(termsize.columns * 0.6), placeholder=f"\u2026{file.suffix}"
     31     )
     32 
     33     file_size = file.stat().st_size
     34     with tqdm.tqdm(
     35         desc=short_file_name,
     36         total=file_size,
     37         unit="B",
     38         unit_scale=True,
     39         unit_divisor=1024,
     40         dynamic_ncols=True,
     41     ) as progress:
     42         yield tqdm.utils.CallbackIOWrapper(progress.update, file.open("rb"), "read")
     43 
     44 
     45 async def main():
     46     # you may save and reuse a guest token
     47     # there's no clear indicator on whether or not it expires
     48     # there may be duplicate filenames in a folder
     49 
     50     parser = argparse.ArgumentParser()
     51     parser.add_argument("path", type=pathlib.Path, help="Path to single file or folder")
     52 
     53     parser.add_argument(
     54         "--token", type=str, help="Token for user upload, or empty for initial anonymous upload"
     55     )
     56     parser.add_argument("--folder-id", type=str, help="Folder to upload to")
     57     parser.add_argument(
     58         "--fast-link",
     59         action="store_true",
     60         help="Uploads are queued in ascending size to get a download link as fast as possible",
     61     )
     62     parser.add_argument(
     63         "--upload-count-threshold",
     64         type=int,
     65         help="Bail if there are more than this number of files present (ensuring you don't accidentally something important)",
     66         default=10,
     67     )
     68     parser.add_argument(
     69         "-r",
     70         "--recursive",
     71         action="store_true",
     72         help="Get files from subdirectories",
     73         default=False,
     74     )
     75 
     76     args = parser.parse_args()
     77 
     78     if not args.path.exists():
     79         print(f"Path '{args.path}' does not exist")
     80         return
     81 
     82     files = []
     83 
     84     if args.path.is_dir():
     85         files = list(args.path.glob("*"))
     86         if args.fast_link:
     87             # sort by ascending size so we can produce a link as fast as possible
     88             files = sorted(files, key=lambda x: os.stat(x).st_size)
     89         files = [f for f in files if f.name not in ("desktop.ini",)]
     90         if not files:
     91             print(f"Directory '{args.path}' is empty")
     92             return
     93         if args.upload_count_threshold and args.upload_count_threshold < len(files):
     94             print(
     95                 f"Directory '{args.path}' has {len(files)} files; only expecting "
     96                 f"{args.upload_count_threshold}. Pass --upload-count-threshold to confirm "
     97                 "that you really did intend to upload this directory."
     98             )
     99             return
    100     elif args.path.is_file():
    101         files = [args.path]
    102 
    103     with contextlib.ExitStack() as stack:
    104         first_file, *other_files = files
    105 
    106         first_upload = await api.upload_single(
    107             stack.enter_context(file_upload_progress(first_file)),
    108             token=args.token,
    109             folder_id=args.folder_id,
    110         )
    111 
    112         print()
    113         print(f"- Download URL: {first_upload.result.download_page}")
    114         print(f"- Folder ID: {first_upload.result.parent_folder}")
    115 
    116         if first_upload.result.guest_token:
    117             print(f"- Guest token: {first_upload.result.guest_token}")
    118 
    119         # let the rest of the uploads process
    120         try:
    121             async with asyncio.TaskGroup() as tg:
    122                 for file in other_files:
    123                     tg.create_task(
    124                         api.upload_single(
    125                             stack.enter_context(file_upload_progress(file)),
    126                             token=first_upload.result.guest_token or args.token,
    127                             folder_id=first_upload.result.parent_folder or args.folder_id,
    128                         )
    129                     )
    130         except asyncio.CancelledError:
    131             # close all file progress meters and reprint the information if we bail out
    132             stack.close()
    133             print()
    134             print("Cancelling upload.  Resume with the following options:")
    135             print(f"- Folder ID: {first_upload.result.parent_folder}")
    136             print(f"- Token: {first_upload.result.guest_token or args.token}")
    137 
    138 
    139 if __name__ == "__main__":
    140     asyncio.run(main())