docker-compose-updater/main.py

140 lines
4.8 KiB
Python
Raw Normal View History

2021-04-10 12:04:16 +00:00
#!/usr/bin/env python3
import re
import sys
import requests
from distutils.version import LooseVersion
from ruamel.yaml import YAML
from config import COMPOSE_FILE_PATH
REGISTRY_URL = "https://registry.hub.docker.com/v2/"
RED = "\033[31m"
BLU = "\033[34m"
WHT = "\033[37m"
GRY = "\033[90m"
NC = "\033[0m"
class DockerAuth(requests.auth.AuthBase):
def __init__(self):
self.auth_token = None
def __call__(self, request):
if self.auth_token:
request.headers["Authorization"] = f"Bearer {self.auth_token}"
request.register_hook('response', self.handle_401)
return request
def handle_401(self, response, **kwargs):
if not 400 <= response.status_code < 500:
return response
if "Www-Authenticate" not in response.headers:
return response
auth_header = response.headers["Www-Authenticate"]
regex = re.compile(r'(\w+)[=]\"([\w:/.]+)\"')
auth_data = dict(regex.findall(auth_header))
if "realm" not in auth_data or \
"service" not in auth_data or \
"scope" not in auth_data:
exit_msg("Unexpected auth header")
params = {
"service": auth_data["service"],
"client_id": "fuck-your-stupid-undocumented-shit",
"scope": auth_data["scope"]
}
print(f"{GRY}Request to {auth_data['realm']}{NC}")
resp = requests.get(auth_data["realm"], params=params)
self.auth_token = resp.json()["token"]
print(f"{GRY}Request to {response.url}{NC}")
response = requests.get(response.url, headers={"Authorization": f"Bearer {self.auth_token}"})
return response
def exit_msg(msg, exit_code=1):
if exit_code > 0:
COLOR = RED
else:
COLOR = BLU
print(f"{COLOR}{msg}{NC}", file=sys.stderr)
sys.exit(exit_code)
def main():
loader = YAML()
with open(COMPOSE_FILE_PATH) as f:
yaml_data = loader.load(f)
if "services" not in yaml_data:
exit_msg("Invalid compose file. No services")
summary = {}
service_length = 0
old_length = 0
new_length = 0
for k, v in yaml_data["services"].items():
2022-03-11 22:04:50 +00:00
if "build" in v:
print(f'{GRY}Skipping service "{v}", as it is build locally{NC}')
continue
2021-04-10 12:04:16 +00:00
if "image" not in v:
exit_msg("Invalid compose file. No image")
image_str = v["image"]
if not any(char.isdigit() for char in image_str):
continue
image_name, image_version = image_str.split(":")
print(f'Working on service "{k}" with image "{image_name}": {image_version}')
if "-" in image_version:
i = image_version.find("-")
pure_version = image_version[:i]
variant = image_version[i:]
else:
pure_version = image_version
variant = ""
specificity = pure_version.count(".")
old_version = LooseVersion(pure_version)
regex_str = ".".join([r"\d+"] * (specificity + 1)) + variant + "$"
regex = re.compile(regex_str)
if "/" not in image_name:
image_name = "library/" + image_name
url = REGISTRY_URL + image_name + "/tags/list"
print(f"{GRY}Request to {url}{NC}")
resp = requests.get(url, auth=DockerAuth())
tags = resp.json()["tags"]
newest_full_version = image_version
newest_version = old_version
for tag in tags:
if not regex.match(tag):
continue
if variant:
new_pure_version = tag[:-len(variant)]
else:
new_pure_version = tag
new_version = LooseVersion(new_pure_version)
if new_version > old_version:
newest_version = new_version
newest_full_version = tag
if newest_version != old_version:
print(f"Found newer version: {newest_full_version}")
summary[k] = (image_version, newest_full_version)
service_length = max(service_length, len(k))
old_length = max(old_length, len(image_version))
new_length = max(new_length, len(newest_full_version))
yaml_data["services"][k]["image"] = f"{image_name}:{newest_full_version}"
if not summary:
exit_msg("No new versions.", 0)
print(f"{WHT}Summary:{NC}")
for service, (old, new) in summary.items():
print(f'{WHT}{service:>{service_length}}: {old:>{old_length}} -> {new:>{new_length}}{NC}')
user_input = input("Confirm ? [y/N] ")
if user_input.lower() != "y":
exit_msg("User did not confirm. Exit", 0)
loader.indent(mapping=2, sequence=4, offset=2)
with open(COMPOSE_FILE_PATH, "w") as f:
loader.dump(yaml_data, f)
exit_msg(f'Successfully written to file "{COMPOSE_FILE_PATH}"', 0)
if __name__ == "__main__":
main()