""" Version: 3.0.0 Author: Salty Usage: See help `python flaccare.py -h` Requirements: 1) Make sure to install mutagen: `pip install mutagen` 2) Make sure to have the latest versions of the following added to PATH: * flac (https://xiph.org/flac/download.html) * metaflac (https://xiph.org/flac/download.html) """ import argparse import math import mimetypes import os import platform import shutil import signal import subprocess import time import traceback from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, ArgumentTypeError, RawTextHelpFormatter from multiprocessing import Pool from pathlib import Path, PurePath from threading import Event, Semaphore from typing import Iterator from mutagen.flac import FLAC from mutagen.id3 import PictureType class HelpFormatter(RawTextHelpFormatter, ArgumentDefaultsHelpFormatter): pass class SuccessResult: def __init__(self, path: str, savings: int): self.path = path self.savings = savings def __str__(self) -> str: return f"{res.path}\t{savings}\n" class FailureResult: def __init__(self, path: str, msg: str): self.path = path self.msg = msg.replace("\r\n", "\n").strip() def __str__(self) -> str: return f"{res.path}\n\n{res.msg}\n\n" """Utils""" def is_arg_folder(value: str): if not Path(value).is_dir(): raise ArgumentTypeError(f"invalid folder: {value}") return value def format_size(value): for mag, unit in enumerate([ "B", "KiB", "MiB", "GiB", "TiB" ]): if abs(value) < 1024.0: return f"{value:.0f} {unit}" if mag == 0 else f"{value:.2f} {unit}" value /= 1024 return f"{value:.1f} PiB" def walk(path: str, limiter: Semaphore, cancel: Event) -> Iterator[str]: for path, _, files in os.walk(path): if cancel.is_set(): return for file in files: if file.endswith(".flac"): limiter.acquire() yield PurePath(path).joinpath(file).as_posix() def run(args: list[str]) -> bool: if platform.system() == "Windows": kwargs = dict(creationflags=subprocess.CREATE_NEW_PROCESS_GROUP) else: kwargs = dict(preexec_fn=os.setpgrp) res = subprocess.run(args, capture_output=True, timeout=300, **kwargs) return res.returncode == 0 """Worker""" def work_init(): signal.signal(signal.SIGINT, signal.SIG_IGN) def work_handler(path: str): try: return work(path) except Exception as ex: return FailureResult(path, "".join(traceback.format_exception(type(ex), ex, ex.__traceback__))) def work(path: str): old_size = os.stat(path).st_size if args.extract: flac = FLAC(path) for picture in flac.pictures: if picture.type == PictureType.COVER_FRONT: cover_path = Path(path).parent.joinpath("cover").with_suffix(mimetypes.guess_extension(picture.mime) or "unk") if not cover_path.exists(): cover_path.write_bytes(picture.data) break if not run([ "flac", "-8", "-V", "-f", path ]): raise Exception("Failed to reencode.") if not run([ "metaflac", "--dont-use-padding", "--remove", "--block-type=PICTURE,PADDING", path ]): raise Exception("Failed to remove embedded images.") if not run([ "metaflac", f"--add-padding={args.padding}", path ]): raise Exception("Failed to add padding.") new_size = os.stat(path).st_size return SuccessResult(path, old_size - new_size) """Main""" parser = ArgumentParser(formatter_class=HelpFormatter, add_help=False, description="Reencodes with libFLAC, removes embedded images and excess padding.") parser.add_argument("path", type=is_arg_folder, help="path to music folder") parser.add_argument("--help", action="help", default=argparse.SUPPRESS, help="show help and exit") parser.add_argument("--padding", metavar="N", type=int, default=8192, help="bytes of padding to keep") parser.add_argument("--extract", action="store_true", help="extract embedded cover to file") parser.add_argument("--workers", metavar="N", type=int, default=math.floor(os.cpu_count() * .8), help="parallel encoder count") args = parser.parse_args() if __name__ == "__main__": if shutil.which("flac") is None: print("Could not find flac executable!") exit(1) if shutil.which("metaflac") is None: print("Could not find metaflac executable!") exit(1) print(f"Using {args.workers} workers...") count = 0 savings = 0 timestamp = time.time() log_success = open("log_success.txt", "a+", encoding="utf-8", buffering=1) log_failure = open("log_failure.txt", "a+", encoding="utf-8", buffering=1) pool = Pool(args.workers, work_init) limiter = Semaphore(args.workers) cancel = Event() def sigint_handler(signum, frame): print("Aborting...") cancel.set() signal.signal(signal.SIGINT, sigint_handler) for res in pool.imap(work_handler, walk(args.path, limiter, cancel)): limiter.release() count += 1 if isinstance(res, SuccessResult): savings += res.savings log_success.write(str(res)) elif isinstance(res, FailureResult): log_failure.write(str(res)) if count % max(args.workers, 10) == 0: print(f" {count:>10} | {format_size(savings):>10} | {PurePath(res.path).relative_to(args.path).as_posix()}") pool.close() log_success.close() log_failure.close() print(f"Saved {format_size(savings)} by processing {count} files in {(time.time() - timestamp) / 60:.2f} minutes")