#!/usr/bin/python3 import glob import os import re import rpm import sys import subprocess import tempfile from shutil import copyfile from pyrpm.spec import Spec, replace_macros from gitea import * from git import Repo from functools import cmp_to_key from configobj import ConfigObj cfg = ConfigObj(infile='/etc/autodist/config-git') gitea = Gitea(cfg["GITEA_URL"], cfg["GITEA_TOKEN"]) org = Organization.request(gitea, cfg["GITEA_ORGANIZATION"]) def commitReleaseFromDir(pkg_info): src_dir = pkg_info["src"] if src_dir.endswith(".src.rpm"): # src is a SRPM, use autospec to extract in a temporary folder src_temp_dir = tempfile.TemporaryDirectory() src_dir = src_temp_dir.name subprocess.run(["autospec", "-x", pkg_info["src"], f'--destdir={src_dir}'], stdout=subprocess.PIPE) # Delete binary source archives for pattern in [ '*.zip','*.tar.bz2','*.tar.xz','*.tar.gz','*.tgz','*.txz','*.iso', '*.run','*.dll','*.bin','*.jar']: for filename in glob.glob(f'{src_dir}/{pattern}'): os.remove(filename) #print(src_dir) #for line in sys.stdin: # line = line.rstrip() # break # Parse spec file spec = Spec.from_file(f'{src_dir}/{pkg_info["name"]}.spec') #specfiletemp_dir = tempfile.TemporaryDirectory() #spec = Specfile(f'{src_dir}/{pkg_info["name"]}.spec', # sourcedir=specfiletemp_dir.name) # Get used information from specfile commit_text = "" header_split = [] for c in spec.changelog.split('\n'): if c == "": break if c[0:2] == "* ": header_split = c.split(" ") else: if commit_text != "": commit_text += "\n" if c[0:2] == "- ": commit_text += c[2:] else: commit_text += c commit_text = f'{commit_text} [release {header_split[-1]};' + \ f'{header_split[1]} {header_split[2]} {header_split[3]} {header_split[4]}]' spec_description = replace_macros(spec.description, spec) try: # Find repository gitea_repo = org.get_repository(pkg_info['name']) except NotFoundException: # Repository does not exist -> create org.create_repo( repoName=pkg_info['name'], description=spec.summary, private=False, autoInit=True, gitignores=None, # license=spec.license, # readme=spec.description, issue_labels=None, default_branch="main", ) gitea_repo = org.get_repository(pkg_info['name']) # Set/update repository description and website url spec_url = replace_macros(spec.url, spec) spec_summary = replace_macros(spec.summary, spec) if gitea_repo.description != spec_summary or gitea_repo.website != spec_url: gitea_repo.description = spec_summary gitea_repo.website = spec_url gitea_repo.commit() # Clone repo repo_url = f'{cfg["GITEA_SSH_URL"]}{pkg_info["name"]}.git' # Clone repository to temporary folder temp_dir = tempfile.TemporaryDirectory() repo = Repo.clone_from(repo_url, temp_dir.name) # Check if tag already exists new_tag = f'{pkg_info["v"]}-{pkg_info["r"]}'.replace('~','+') if new_tag in repo.tags: print(f'Skipping {pkg_info["name"]} release ' f'{pkg_info["e"]}:{pkg_info["v"]}-{pkg_info["r"]}: tag {new_tag} already exists') return # Set committer user and email repo.config_writer().set_value("user", "name", cfg["COMMITTER_USER"]).release() repo.config_writer().set_value("user", "email", cfg["COMMITTER_EMAIL"]).release() # Create/update README.md with open(f'{temp_dir.name}/README.md', "w") as readme_file: readme_file.write(f"# {pkg_info['name']}\n\n{spec_description}") repo.index.add(['README.md']) # Update files dir_list = os.listdir(src_dir) for dir_file in dir_list: copyfile(f'{src_dir}/{dir_file}', f'{temp_dir.name}/{dir_file}') # Add/modify files repo.index.add([dir_file]) # Remove deleted files temp_dir_list = os.listdir(temp_dir.name) for temp_dir_file in temp_dir_list: if temp_dir_file == ".git" or temp_dir_file == "README.md": continue if not temp_dir_file in dir_list: repo.index.remove([temp_dir_file], working_tree = True) # Commit print(f'Committing {pkg_info["name"]} release {new_tag}...') repo.index.commit(commit_text) origin = repo.remote(name='origin') origin.push() # Create and commit tag repo.create_tag(new_tag) origin.push(new_tag) #for line in sys.stdin: # line = line.rstrip() # break temp_dir.cleanup() def comparePkgInfo(item1, item2): return rpm.labelCompare( (str(item1['e']), item1['v'], item1['r']), (str(item2['e']), item2['v'], item2['r'])) def findAndCommitPackageReleases(pkgname, pkgvr): pkgs_info = [] # Find from archive dir dirs = [f for f in os.listdir(f'{cfg["ARCHIVE_DIR"]}/{pkgname[0:1]}') if re.match(f'{pkgname}-[^-]*-[^-]*$', f)] for dir in dirs: pkg_dir = f'{cfg["ARCHIVE_DIR"]}/{pkgname[0:1]}/{dir}' spec = Spec.from_file(f'{pkg_dir}/{pkgname}.spec') parts = re.split(f'{pkgname}-([^-]*)-([^-]*)$', dir) epoch = 0 if spec.epoch is None else int(spec.epoch) version = parts[1] release = parts[2] pkgs_info.append( {'name': pkgname, 'e': epoch, 'v': version, 'r': release, 'src': pkg_dir}) # Find from OLD_DIR dirs = [f for f in os.listdir(f'{cfg["OLD_DIR"]}') if re.match(f'{pkgname}_[0-9]*.[0-9]*$', f)] for dir in dirs: srpms_list = glob.glob(f'{cfg["OLD_DIR"]}/{dir}/{pkgname}-*.src.rpm') for srpm in srpms_list: parts = re.split('.*-([^-]*)-([^-]*).src.rpm$', srpm) result = subprocess.run(['rpm', '-q', '--queryformat=%{epoch}', '-p', srpm], stdout=subprocess.PIPE).stdout.decode('utf-8') # result is "(none)" if no Epoch is set epoch = "0" if len(result) > 2 else result version = parts[1] release = parts[2] pkgs_info.append( {'name': pkgname, 'e': epoch, 'v': version, 'r': release, 'src': srpm}) # Find from SRPMS_DIR srpms_list = [f for f in os.listdir(cfg["SRPMS_DIR"]) if re.match(f'{pkgname}-[^-]*-[^-]*$', f)] for srpm in srpms_list: src_path = f'{cfg["SRPMS_DIR"]}/{srpm}' parts = re.split('.*-([^-]*)-([^-]*).src.rpm$', srpm) result = subprocess.run(['rpm', '-q', '--queryformat=%{epoch}', '-p', src_path], stdout=subprocess.PIPE).stdout.decode('utf-8') # result is "(none)" if no Epoch is set epoch = "0" if len(result) > 2 else result version = parts[1] release = parts[2] pkgs_info.append( {'name': pkgname, 'e': epoch, 'v': version, 'r': release, 'src': src_path }) # Sort releases pkgs_info.sort(key=cmp_to_key(comparePkgInfo)) for pkg_info in pkgs_info: vr = f'{pkg_info["v"]}-{pkg_info["r"]}' if pkgvr is not None and pkgvr != vr: continue #pkg_dir = f'{cfg["ARCHIVE_DIR"]}/{pkgname[0:1]}/{pkgname}-{vr}' commitReleaseFromDir(pkg_info) def main(): args = sys.argv[1:] if len(args) == 0: from_pkg = "ico" from_reached = False dir_list = sorted(filter(os.path.isfile, glob.glob(f'{cfg["SRPMS_DIR"]}/*.src.rpm'))) for dir_file in dir_list: parts = re.split('.*/([^/]*)-([^-]*)-([^-]*).src.rpm$', dir_file) pkgname = parts[1] if not from_reached and pkgname != from_pkg: continue from_reached = True print(f'Processing package {pkgname}...') findAndCommitPackageReleases(pkgname, None) else: if len(args) < 1 or len(args) > 2: print("Usage: autodist-git [pkgname [version-release]]\n") exit(1) pkgname = args[0] pkgvr = None if len(args) > 1: pkgvr = args[1] findAndCommitPackageReleases(pkgname, pkgvr) print("All done.\n") main()