#!/usr/bin/python3 """ autodist git integration """ import argparse import glob import os import re import subprocess import sys import tempfile from shutil import copyfile from functools import cmp_to_key from gitea import Gitea, NotFoundException, Organization, Repository import rpm from pyrpm.spec import Spec, replace_macros from git import Repo from configobj import ConfigObj cfg = ConfigObj(infile='/etc/autodist/config-git') gitea = Gitea(cfg["GITEA_URL"], cfg["GITEA_TOKEN"]) org = Organization.request(gitea, cfg["GITEA_ORGANIZATION"]) def comparePkgInfo(item1, item2): return rpm.labelCompare( (str(item1['e']), item1['v'], item1['r']), (str(item2['e']), item2['v'], item2['r'])) def giteaGetRepoTags(repo_name): try: path = f'/repos/{cfg["GITEA_ORGANIZATION"]}/{repo_name}/tags' return gitea.requests_get(path) except NotFoundException: return None def giteaGetRepository(repo_name): try: path = f'/repos/{cfg["GITEA_ORGANIZATION"]}/{repo_name}' result = gitea.requests_get(path) return Repository.parse_response(gitea, result) except NotFoundException: return None def findOrCreateRepo(pkg_name, pkg_description=None, create=True): # Replace '+' for repository name as it is not allowed repo_name = pkg_name.replace('+','Plus') # Get gitea repository instance gitea_repo = giteaGetRepository(repo_name) if gitea_repo is not None: return gitea_repo # Return None if requested to not create if not create: return None # Repository does not exist -> create org.create_repo( repoName=repo_name, description=pkg_description, private=False, autoInit=True, gitignores=None, # license=spec.license, # readme=spec.description, issue_labels=None, default_branch="main", ) gitea_repo = giteaGetRepository(repo_name) return gitea_repo def archivePkg(pkg_name): # Replace '+' for repository name as it is not allowed repo_name = pkg_name.replace('+','Plus') # Get gitea repository instance #try: # gitea_repo = org.get_repository(repo_name) gitea_repo = giteaGetRepository(repo_name) if gitea_repo == None: print(f'archiveRepo: repository {repo_name} not found') return if getattr(gitea_repo, "archived"): print(f'Repository {repo_name} is already archived') return False setattr(gitea_repo, "archived", True) gitea_repo.commit() return True def commitReleaseFromDir(options, pkg_info, gitea_repo, repo, temp_dir): src_dir = pkg_info["src"] if src_dir.endswith(".src.rpm"): # src is a SRPM, use autospec to extract in a temporary folder #print(f'Extracting {pkg_info["name"]} release {pkg_info["v"]}-{pkg_info["r"]} from {pkg_info["src"]}...') src_temp_dir = tempfile.TemporaryDirectory() src_dir = src_temp_dir.name subprocess.run(["autospec", "-x", pkg_info["src"], f'--destdir={src_dir}'], stdout=subprocess.PIPE, check=False) # Delete binary source archives for pattern in [ '*.zip','*.tar.bz2','*.tar.xz','*.tar.gz','*.tgz','*.txz','*.iso', '*.run','*.dll','*.bin','*.jar','*.msi','*.deb']: for filename in glob.glob(f'{src_dir}/{pattern}'): os.remove(filename) # Parse spec file spec = Spec.from_file(f'{src_dir}/{pkg_info["name"]}.spec') #specfiletemp_dir = tempfile.TemporaryDirectory() #spec = Specfile(f'{src_dir}/{pkg_info["name"]}.spec', # sourcedir=specfiletemp_dir.name) # Get used information from specfile commit_text = "" header_split = [] commit_user = cfg["COMMITTER_USER"] commit_email = cfg["COMMITTER_EMAIL"] for c in spec.changelog.split('\n'): if c == "": break if c[0:2] == "* ": header_split = c.split(" ") # Get committer name end email from changelog commit_user = " ".join(header_split[5:len(header_split)-2]) commit_email = header_split[-2][1:-1] else: if commit_text != "": commit_text += "\n" if c[0:2] == "- ": commit_text += c[2:] else: commit_text += c commit_text = f'{commit_text} [release {header_split[-1]};' + \ f'{header_split[1]} {header_split[2]} {header_split[3]} {header_split[4]}]' # Remove commented sections following spec description spec_description = "" if spec.description is not None: for line in spec.description.split("\n"): if line.startswith("#"): break if spec_description != "": spec_description += '\n' spec_description += line spec_description = replace_macros(spec_description, spec) # Set/update gitea repository description and website url spec_url = replace_macros(spec.url, spec) spec_summary = replace_macros(spec.summary, spec) if gitea_repo.description != spec_summary or gitea_repo.website != spec_url: gitea_repo.description = spec_summary gitea_repo.website = spec_url gitea_repo.commit() # Check if tag already exists new_tag = f'{pkg_info["v"]}-{pkg_info["r"]}'.replace('~','+') if new_tag in repo.tags: if options.verbose: print(f'Skipping {pkg_info["name"]} release ' f'{pkg_info["e"]}:{pkg_info["v"]}-{pkg_info["r"]}: tag {new_tag} already exists') return # Create/update README.md with open(file=f'{temp_dir.name}/README.md', mode="w", encoding="utf-8") as readme_file: readme_file.write(f"# {pkg_info['name']}\n\n{spec_description}") repo.index.add(['README.md']) # Update files dir_list = os.listdir(src_dir) for dir_file in dir_list: copyfile(f'{src_dir}/{dir_file}', f'{temp_dir.name}/{dir_file}') # Add/modify files repo.index.add([dir_file]) # Remove deleted files temp_dir_list = os.listdir(temp_dir.name) for temp_dir_file in temp_dir_list: if temp_dir_file == ".git" or temp_dir_file == "README.md": continue if not temp_dir_file in dir_list: repo.index.remove([temp_dir_file], working_tree = True) # Set committer user and email repo.config_writer().set_value("user", "name", commit_user).release() repo.config_writer().set_value("user", "email", commit_email).release() # Commit print(f'Committing {pkg_info["name"]} release {new_tag}...') repo.index.commit(commit_text) #origin = repo.remote(name='origin') #origin.push() # Create and commit tag repo.create_tag(new_tag, message=f'Release {new_tag}') #origin.push(new_tag) def findAndCommitPackageReleases(options, pkgname, pkgvr): print(f'Processing package {pkgname} on {cfg["GITEA_URL"]}...') pkgs_info = [] # Find from archive dir pkgnamere = pkgname.replace('+','\+') dirs = [f for f in os.listdir(f'{cfg["ARCHIVE_DIR"]}/{pkgname[0:1]}') if re.match(f'{pkgnamere}-[^-]*-[^-]*$', f)] for _dir in dirs: pkg_dir = f'{cfg["ARCHIVE_DIR"]}/{pkgname[0:1]}/{_dir}' spec = Spec.from_file(f'{pkg_dir}/{pkgname}.spec') parts = re.split(f'{pkgnamere}-([^-]*)-([^-]*)$', _dir) epoch = 0 if spec.epoch is None else int(spec.epoch) version = parts[1] release = parts[2] pkgs_info.append( {'name': pkgname, 'e': epoch, 'v': version, 'r': release, 'src': pkg_dir}) # Find from OLD_DIR dirs = [f for f in os.listdir(f'{cfg["OLD_DIR"]}') if re.match(f'{pkgnamere}_[0-9]*.[0-9]*$', f)] for _dir in dirs: srpms_list = glob.glob(f'{cfg["OLD_DIR"]}/{_dir}/{pkgname}-*.src.rpm') for srpm in srpms_list: parts = re.split('.*-([^-]*)-([^-]*).src.rpm$', srpm) epoch = subprocess.run(['rpm', '-q', '--queryformat=%{epoch}', '-p', srpm], stdout=subprocess.PIPE, check=False).stdout.decode('utf-8') # result is "(none)" if no Epoch is set if len(epoch) > 2: epoch = "0" version = parts[1] release = parts[2] pkgs_info.append( {'name': pkgname, 'e': epoch, 'v': version, 'r': release, 'src': srpm}) # Find from SRPMS_DIR srpms_list = [f for f in os.listdir(cfg["SRPMS_DIR"]) if re.match(f'{pkgnamere}-[^-]*-[^-]*$', f)] for srpm in srpms_list: src_path = f'{cfg["SRPMS_DIR"]}/{srpm}' parts = re.split('.*-([^-]*)-([^-]*).src.rpm$', srpm) epoch = subprocess.run(['rpm', '-q', '--queryformat=%{epoch}', '-p', src_path], stdout=subprocess.PIPE, check=False).stdout.decode('utf-8') # result is "(none)" if no Epoch is set if len(epoch) > 2: epoch = "0" version = parts[1] release = parts[2] pkgs_info.append( {'name': pkgname, 'e': epoch, 'v': version, 'r': release, 'src': src_path }) # Sort releases pkgs_info.sort(key=cmp_to_key(comparePkgInfo)) # Find or create and get repository instance gitea_repo = findOrCreateRepo(pkgname) # Clone repository to temporary folder repo_url = f'{cfg["GITEA_SSH_URL"]}{gitea_repo.name}.git' temp_dir = tempfile.TemporaryDirectory() repo = Repo.clone_from(repo_url, temp_dir.name) # Set to push annotated tags with commits repo.config_writer().set_value('push', 'followTags', 'true').release() new_commits = False for pkg_info in pkgs_info: vr = f'{pkg_info["v"]}-{pkg_info["r"]}' if pkgvr is not None and pkgvr != vr: continue # Check if tag already exists new_tag = f'{vr}'.replace('~','+') if new_tag in repo.tags: if options.verbose: print(f'Skipping {pkgname} release ' f'{pkg_info["e"]}:{vr}: tag {new_tag} already exists') continue #for line in sys.stdin: # line = line.rstrip() # break #exit(1) #pkg_dir = f'{cfg["ARCHIVE_DIR"]}/{pkgname[0:1]}/{pkgname}-{vr}' commitReleaseFromDir(options, pkg_info, gitea_repo, repo, temp_dir) new_commits = True if new_commits: print(f"Pushing commits and tags for {pkgname}...") origin = repo.remote(name='origin') origin.push() else: print(f"No new commits for {pkgname}.") temp_dir.cleanup() def main(): # argparse options options = None parser = argparse.ArgumentParser(prog='autodist-git', description='RPM repository sync and management with git service.', epilog="Copyright (c) 2023-2024 by Silvan Calarco - GPL v3 License") subparsers = parser.add_subparsers(help='sub-command help', dest='mode') parser.add_argument('-v', '--verbose', help="verbose output", action='store_true') parser_syncpkg = subparsers.add_parser('syncpkg', help="sync a specified package") parser_syncpkg.add_argument('pkgname', help="name of package") parser_syncpkg.add_argument('pkgver', help="version of package", nargs='?') parser_syncpkg.add_argument('-d', '--delete', action='store_true', help="delete and recreate existing repository", required=False) parser_archiverepo = subparsers.add_parser('archivepkg', help="archive a specified repository package on git server") parser_archiverepo.add_argument('pkgname', help="name of package") parser_syncrepo = subparsers.add_parser('syncrepo', help="sync base repository with git server") parser_syncrepo.add_argument('--from', dest='frompkg', help="from package name", required=False) parser_syncrepo.add_argument('--to', dest='topkg', help="to package name", required=False) parser_syncrepo.add_argument('-d', '--delete', action='store_true', help="delete and recreate existing repositories", required=False) try: options = parser.parse_args() except: sys.exit(1) if options.mode == 'syncpkg': if options.delete: if options.pkgver is not None: print("ERROR: specifying pkgver is not allowed with -d option") exit(1) repo_name = options.pkgname.replace('+','Plus') gitea_repo = giteaGetRepository(repo_name) if gitea_repo is not None: print(f'Deleting repository for {options.pkgname}...') gitea_repo.delete() findAndCommitPackageReleases(options, options.pkgname, options.pkgver) elif options.mode == 'archivepkg': archivePkg(options.pkgname) elif options.mode == 'syncrepo': dir_list = sorted(filter(os.path.isfile, glob.glob(f'{cfg["SRPMS_DIR"]}/*.src.rpm'))) for dir_file in dir_list: parts = re.split('.*/([^/]*)-([^-]*)-([^-]*).src.rpm$', dir_file) pkg_name = parts[1] if options.topkg is not None and pkg_name > options.topkg: break if options.frompkg is not None and pkg_name < options.frompkg: continue pkg_item = { 'e': 0, 'v': parts[2].replace('~','+'), 'r': parts[3]} pkg_vr = f'{parts[2]}-{parts[3]}' # Replace '+' for repository name as it is not allowed repo_name = pkg_name.replace('+','Plus') if options.delete: gitea_repo = giteaGetRepository(repo_name) if gitea_repo is not None: print(f'Deleting repository for {pkg_name}...') gitea_repo.delete() repo_tags = giteaGetRepoTags(repo_name) found_equal = False found_newer = False found_older = False if repo_tags is None: found_newer = True else: for repo_tag in repo_tags: parts = re.split('([^-]*)-([^-]*)$', repo_tag["name"]) tag_item = { 'e':0, 'v': parts[1], 'r': parts[2]} compare = comparePkgInfo(pkg_item, tag_item) if compare == 0: found_equal = True elif compare > 0: found_older = True elif compare < 0: found_newer = True if not found_equal: if repo_tags is not None: print(f'{pkg_name} ({pkg_vr}): needs update') findAndCommitPackageReleases(options, pkg_name, None) if found_newer: if options.verbose: print(f'{pkg_name} ({pkg_vr}): found_equal={found_equal} found_newer={found_newer} found_older={found_older}') else: parser.print_help() main()