#!/usr/bin/env python3 import re import sys import requests from distutils.version import LooseVersion from ruamel.yaml import YAML from config import COMPOSE_FILE_PATH REGISTRY_URL = "https://registry.hub.docker.com/v2/" RED = "\033[31m" BLU = "\033[34m" WHT = "\033[37m" GRY = "\033[90m" NC = "\033[0m" class DockerAuth(requests.auth.AuthBase): def __init__(self): self.auth_token = None def __call__(self, request): if self.auth_token: request.headers["Authorization"] = f"Bearer {self.auth_token}" request.register_hook('response', self.handle_401) return request def handle_401(self, response, **kwargs): if not 400 <= response.status_code < 500: return response if "Www-Authenticate" not in response.headers: return response auth_header = response.headers["Www-Authenticate"] regex = re.compile(r'(\w+)[=]\"([\w:/.]+)\"') auth_data = dict(regex.findall(auth_header)) if "realm" not in auth_data or \ "service" not in auth_data or \ "scope" not in auth_data: exit_msg("Unexpected auth header") params = { "service": auth_data["service"], "client_id": "fuck-your-stupid-undocumented-shit", "scope": auth_data["scope"] } print(f"{GRY}Request to {auth_data['realm']}{NC}") resp = requests.get(auth_data["realm"], params=params) self.auth_token = resp.json()["token"] print(f"{GRY}Request to {response.url}{NC}") response = requests.get(response.url, headers={"Authorization": f"Bearer {self.auth_token}"}) return response def exit_msg(msg, exit_code=1): if exit_code > 0: COLOR = RED else: COLOR = BLU print(f"{COLOR}{msg}{NC}", file=sys.stderr) sys.exit(exit_code) def main(): loader = YAML() with open(COMPOSE_FILE_PATH) as f: yaml_data = loader.load(f) if "services" not in yaml_data: exit_msg("Invalid compose file. No services") summary = {} service_length = 0 old_length = 0 new_length = 0 for k, v in yaml_data["services"].items(): if "build" in v: print(f'{GRY}Skipping service "{k}", as it is build locally{NC}') continue if "image" not in v: exit_msg("Invalid compose file. No image") image_str = v["image"] if not any(char.isdigit() for char in image_str): continue image_name, image_version = image_str.split(":") print(f'Working on service "{k}" with image "{image_name}": {image_version}') if "-" in image_version: i = image_version.find("-") pure_version = image_version[:i] variant = image_version[i:] else: pure_version = image_version variant = "" specificity = pure_version.count(".") old_version = LooseVersion(pure_version) regex_str = ".".join([r"\d+"] * (specificity + 1)) + variant + "$" regex = re.compile(regex_str) if "/" not in image_name: image_name = "library/" + image_name url = REGISTRY_URL + image_name + "/tags/list" print(f"{GRY}Request to {url}{NC}") resp = requests.get(url, auth=DockerAuth()) tags = resp.json()["tags"] newest_full_version = image_version newest_version = old_version for tag in tags: if not regex.match(tag): continue if variant: new_pure_version = tag[:-len(variant)] else: new_pure_version = tag new_version = LooseVersion(new_pure_version) if new_version > old_version: newest_version = new_version newest_full_version = tag if newest_version != old_version: print(f"Found newer version: {newest_full_version}") summary[k] = (image_version, newest_full_version) service_length = max(service_length, len(k)) old_length = max(old_length, len(image_version)) new_length = max(new_length, len(newest_full_version)) yaml_data["services"][k]["image"] = f"{image_name}:{newest_full_version}" if not summary: exit_msg("No new versions.", 0) print(f"{WHT}Summary:{NC}") for service, (old, new) in summary.items(): print(f'{WHT}{service:>{service_length}}: {old:>{old_length}} -> {new:>{new_length}}{NC}') user_input = input("Confirm ? [y/N] ") if user_input.lower() != "y": exit_msg("User did not confirm. Exit", 0) loader.indent(mapping=2, sequence=4, offset=2) with open(COMPOSE_FILE_PATH, "w") as f: loader.dump(yaml_data, f) exit_msg(f'Successfully written to file "{COMPOSE_FILE_PATH}"', 0) if __name__ == "__main__": main()