autodist/autodist-git

423 lines
16 KiB
Python
Executable File

#!/usr/bin/python3
"""
autodist git integration
"""
import argparse
import glob
import os
import re
import subprocess
import sys
import tempfile
from shutil import copyfile
from functools import cmp_to_key
from gitea import Gitea, NotFoundException, Organization, Repository
import rpm
from pyrpm.spec import Spec, replace_macros
from git import Repo
from configobj import ConfigObj
cfg = ConfigObj(infile='/etc/autodist/config-git')
gitea = Gitea(cfg["GITEA_URL"], cfg["GITEA_TOKEN"])
org = Organization.request(gitea, cfg["GITEA_ORGANIZATION"])
def comparePkgInfo(item1, item2):
return rpm.labelCompare(
(str(item1['e']), item1['v'], item1['r']),
(str(item2['e']), item2['v'], item2['r']))
def giteaGetRepoTags(repo_name):
try:
path = f'/repos/{cfg["GITEA_ORGANIZATION"]}/{repo_name}/tags'
return gitea.requests_get(path)
except NotFoundException:
return None
def giteaGetRepository(repo_name):
try:
path = f'/repos/{cfg["GITEA_ORGANIZATION"]}/{repo_name}'
result = gitea.requests_get(path)
return Repository.parse_response(gitea, result)
except NotFoundException:
return None
def giteaGetRepositories(page, limit):
try:
path = f'/orgs/{cfg["GITEA_ORGANIZATION"]}/repos?page={page}&limit={limit}'
results = gitea.requests_get(path)
return [Repository.parse_response(gitea, result) for result in results]
except NotFoundException:
return None
def findOrCreateRepo(pkg_name, pkg_description=None, create=True):
# Replace '+' for repository name as it is not allowed
repo_name = pkg_name.replace('+','Plus')
# Get gitea repository instance
gitea_repo = giteaGetRepository(repo_name)
if gitea_repo is not None:
return gitea_repo
# Return None if requested to not create
if not create:
return None
# Repository does not exist -> create
org.create_repo(
repoName=repo_name,
description=pkg_description,
private=False,
autoInit=True,
gitignores=None,
# license=spec.license,
# readme=spec.description,
issue_labels=None,
default_branch="main",
)
gitea_repo = giteaGetRepository(repo_name)
return gitea_repo
def archiveRepo(pkg_name):
# Replace '+' for repository name as it is not allowed
repo_name = pkg_name.replace('+','Plus')
# Get gitea repository instance
#try:
# gitea_repo = org.get_repository(repo_name)
gitea_repo = giteaGetRepository(repo_name)
if gitea_repo is None:
print(f'archiveRepo: repository {repo_name} not found')
return
if getattr(gitea_repo, "archived"):
print(f'Repository {repo_name} is already archived')
return False
setattr(gitea_repo, "archived", True)
gitea_repo.commit()
return True
def commitReleaseFromDir(options, pkg_info, gitea_repo, repo, temp_dir):
src_dir = pkg_info["src"]
if src_dir.endswith(".src.rpm"):
# src is a SRPM, use autospec to extract in a temporary folder
#print(f'Extracting {pkg_info["name"]} release {pkg_info["v"]}-{pkg_info["r"]} from {pkg_info["src"]}...')
src_temp_dir = tempfile.TemporaryDirectory()
src_dir = src_temp_dir.name
subprocess.run(["autospec", "-x", pkg_info["src"], f'--destdir={src_dir}'],
stdout=subprocess.PIPE, check=False)
# Delete binary source archives
for pattern in [
'*.zip','*.tar.bz2','*.tar.xz','*.tar.gz','*.tgz','*.txz','*.iso',
'*.run','*.dll','*.bin','*.jar','*.msi','*.deb']:
for filename in glob.glob(f'{src_dir}/{pattern}'):
os.remove(filename)
# Parse spec file
spec = Spec.from_file(f'{src_dir}/{pkg_info["name"]}.spec')
#specfiletemp_dir = tempfile.TemporaryDirectory()
#spec = Specfile(f'{src_dir}/{pkg_info["name"]}.spec',
# sourcedir=specfiletemp_dir.name)
# Get used information from specfile
commit_text = ""
header_split = []
commit_user = cfg["COMMITTER_USER"]
commit_email = cfg["COMMITTER_EMAIL"]
for c in spec.changelog.split('\n'):
if c == "":
break
if c[0:2] == "* ":
header_split = c.split(" ")
# Get committer name end email from changelog
commit_user = " ".join(header_split[5:len(header_split)-2])
commit_email = header_split[-2][1:-1]
else:
if commit_text != "":
commit_text += "\n"
if c[0:2] == "- ":
commit_text += c[2:]
else:
commit_text += c
commit_text = f'{commit_text} [release {header_split[-1]};' + \
f'{header_split[1]} {header_split[2]} {header_split[3]} {header_split[4]}]'
# Remove commented sections following spec description
spec_description = ""
if spec.description is not None:
for line in spec.description.split("\n"):
if line.startswith("#"):
break
if spec_description != "":
spec_description += '\n'
spec_description += line
spec_description = replace_macros(spec_description, spec)
# Set/update gitea repository description and website url
spec_url = replace_macros(spec.url, spec)
spec_summary = replace_macros(spec.summary, spec)
if gitea_repo.description != spec_summary or gitea_repo.website != spec_url:
gitea_repo.description = spec_summary
gitea_repo.website = spec_url
gitea_repo.commit()
# Check if tag already exists
new_tag = f'{pkg_info["v"]}-{pkg_info["r"]}'.replace('~','+')
if new_tag in repo.tags:
if options.verbose:
print(f'Skipping {pkg_info["name"]} release '
f'{pkg_info["e"]}:{pkg_info["v"]}-{pkg_info["r"]}: tag {new_tag} already exists')
return
# Create/update README.md
with open(file=f'{temp_dir.name}/README.md', mode="w", encoding="utf-8") as readme_file:
readme_file.write(f"# {pkg_info['name']}\n\n{spec_description}")
repo.index.add(['README.md'])
# Update files
dir_list = os.listdir(src_dir)
for dir_file in dir_list:
copyfile(f'{src_dir}/{dir_file}',
f'{temp_dir.name}/{dir_file}')
# Add/modify files
repo.index.add([dir_file])
# Remove deleted files
temp_dir_list = os.listdir(temp_dir.name)
for temp_dir_file in temp_dir_list:
if temp_dir_file == ".git" or temp_dir_file == "README.md":
continue
if not temp_dir_file in dir_list:
repo.index.remove([temp_dir_file], working_tree = True)
# Set committer user and email
repo.config_writer().set_value("user", "name", commit_user).release()
repo.config_writer().set_value("user", "email", commit_email).release()
# Commit
print(f'Committing {pkg_info["name"]} release {new_tag}...')
repo.index.commit(commit_text)
#origin = repo.remote(name='origin')
#origin.push()
# Create and commit tag
repo.create_tag(new_tag, message=f'Release {new_tag}')
#origin.push(new_tag)
def findAndCommitPackageReleases(options, pkgname, pkgvr):
print(f'Processing package {pkgname} on {cfg["GITEA_URL"]}...')
pkgs_info = []
# Find from archive dir
pkgnamere = pkgname.replace('+','\+')
dirs = [f for f in os.listdir(f'{cfg["ARCHIVE_DIR"]}/{pkgname[0:1]}') if re.match(f'{pkgnamere}-[^-]*-[^-]*$', f)]
for _dir in dirs:
pkg_dir = f'{cfg["ARCHIVE_DIR"]}/{pkgname[0:1]}/{_dir}'
spec = Spec.from_file(f'{pkg_dir}/{pkgname}.spec')
parts = re.split(f'{pkgnamere}-([^-]*)-([^-]*)$', _dir)
epoch = 0 if spec.epoch is None else int(spec.epoch)
version = parts[1]
release = parts[2]
pkgs_info.append(
{'name': pkgname, 'e': epoch, 'v': version, 'r': release, 'src': pkg_dir})
# Find from OLD_DIR
dirs = [f for f in os.listdir(f'{cfg["OLD_DIR"]}') if re.match(f'{pkgnamere}_[0-9]*.[0-9]*$', f)]
for _dir in dirs:
srpms_list = glob.glob(f'{cfg["OLD_DIR"]}/{_dir}/{pkgname}-*.src.rpm')
for srpm in srpms_list:
parts = re.split('.*-([^-]*)-([^-]*).src.rpm$', srpm)
epoch = subprocess.run(['rpm', '-q', '--queryformat=%{epoch}', '-p', srpm],
stdout=subprocess.PIPE, check=False).stdout.decode('utf-8')
# result is "(none)" if no Epoch is set
if len(epoch) > 2:
epoch = "0"
version = parts[1]
release = parts[2]
pkgs_info.append(
{'name': pkgname, 'e': epoch, 'v': version, 'r': release, 'src': srpm})
# Find from SRPMS_DIR
srpms_list = [f for f in os.listdir(cfg["SRPMS_DIR"]) if re.match(f'{pkgnamere}-[^-]*-[^-]*$', f)]
for srpm in srpms_list:
src_path = f'{cfg["SRPMS_DIR"]}/{srpm}'
parts = re.split('.*-([^-]*)-([^-]*).src.rpm$', srpm)
epoch = subprocess.run(['rpm', '-q', '--queryformat=%{epoch}', '-p', src_path],
stdout=subprocess.PIPE, check=False).stdout.decode('utf-8')
# result is "(none)" if no Epoch is set
if len(epoch) > 2:
epoch = "0"
version = parts[1]
release = parts[2]
pkgs_info.append(
{'name': pkgname, 'e': epoch, 'v': version, 'r': release, 'src': src_path })
# Sort releases
pkgs_info.sort(key=cmp_to_key(comparePkgInfo))
# Find or create and get repository instance
gitea_repo = findOrCreateRepo(pkgname)
# Clone repository to temporary folder
repo_url = f'{cfg["GITEA_SSH_URL"]}{gitea_repo.name}.git'
temp_dir = tempfile.TemporaryDirectory()
repo = Repo.clone_from(repo_url, temp_dir.name)
# Set to push annotated tags with commits
repo.config_writer().set_value('push', 'followTags', 'true').release()
new_commits = False
for pkg_info in pkgs_info:
vr = f'{pkg_info["v"]}-{pkg_info["r"]}'
if pkgvr is not None and pkgvr != vr:
continue
# Check if tag already exists
new_tag = f'{vr}'.replace('~','+')
if new_tag in repo.tags:
if options.verbose:
print(f'Skipping {pkgname} release '
f'{pkg_info["e"]}:{vr}: tag {new_tag} already exists')
continue
#for line in sys.stdin:
# line = line.rstrip()
# break
#exit(1)
#pkg_dir = f'{cfg["ARCHIVE_DIR"]}/{pkgname[0:1]}/{pkgname}-{vr}'
commitReleaseFromDir(options, pkg_info, gitea_repo, repo, temp_dir)
new_commits = True
if new_commits:
print(f"Pushing commits and tags for {pkgname}...")
origin = repo.remote(name='origin')
origin.push()
else:
print(f"No new commits for {pkgname}.")
temp_dir.cleanup()
def main():
# argparse options
options = None
parser = argparse.ArgumentParser(prog='autodist-git',
description='RPM repository sync and management with git service.',
epilog="Copyright (c) 2023-2024 by Silvan Calarco <silvan.calarco@mambasoft.it> - GPL v3 License")
subparsers = parser.add_subparsers(help='sub-command help', dest='mode')
parser.add_argument('-v', '--verbose', help="verbose output", action='store_true')
parser_syncpkg = subparsers.add_parser('syncpkg', help="sync a specified package")
parser_syncpkg.add_argument('pkgname', help="name of package")
parser_syncpkg.add_argument('pkgver', help="version of package", nargs='?')
parser_syncpkg.add_argument('-d', '--delete', action='store_true', help="delete and recreate existing repository",
required=False)
parser_archiverepo = subparsers.add_parser('archiverepo', help="archive a specified repository package on git server")
parser_archiverepo.add_argument('pkgname', help="name of package")
parser_syncrepo = subparsers.add_parser('syncrepo', help="sync base repository with git server")
parser_syncrepo.add_argument('--from', dest='frompkg', help="from package name", required=False)
parser_syncrepo.add_argument('--to', dest='topkg', help="to package name", required=False)
parser_syncrepo.add_argument('-d', '--delete', action='store_true', help="delete and recreate existing repositories",
required=False)
try:
options = parser.parse_args()
except:
sys.exit(1)
if options.mode == 'syncpkg':
if options.delete:
if options.pkgver is not None:
print("ERROR: specifying pkgver is not allowed with -d option")
exit(1)
repo_name = options.pkgname.replace('+','Plus')
gitea_repo = giteaGetRepository(repo_name)
if gitea_repo is not None:
print(f'Deleting repository for {options.pkgname}...')
gitea_repo.delete()
findAndCommitPackageReleases(options, options.pkgname, options.pkgver)
elif options.mode == 'archiverepo':
print(f'Archiving repository for {options.pkgname}...')
archiveRepo(options.pkgname)
elif options.mode == 'syncrepo':
# Get list for packages from SRPMS dir
dir_list = sorted(filter(os.path.isfile, glob.glob(f'{cfg["SRPMS_DIR"]}/*.src.rpm')))
# Check for package updates to sync with git repositories
print("Checking for package updates to sync with git repositories...")
for dir_file in dir_list:
parts = re.split('.*/([^/]*)-([^-]*)-([^-]*).src.rpm$', dir_file)
pkg_name = parts[1]
if options.topkg is not None and pkg_name > options.topkg:
break
if options.frompkg is not None and pkg_name < options.frompkg:
continue
pkg_item = { 'e': 0, 'v': parts[2].replace('~','+'), 'r': parts[3]}
pkg_vr = f'{parts[2]}-{parts[3]}'
# Replace '+' for repository name as it is not allowed
repo_name = pkg_name.replace('+','Plus')
if options.delete:
gitea_repo = giteaGetRepository(repo_name)
if gitea_repo is not None:
print(f'Deleting repository for {pkg_name}...')
gitea_repo.delete()
repo_tags = giteaGetRepoTags(repo_name)
found_equal = False
found_newer = False
found_older = False
if repo_tags is None:
found_newer = True
else:
for repo_tag in repo_tags:
parts = re.split('([^-]*)-([^-]*)$', repo_tag["name"])
tag_item = { 'e':0, 'v': parts[1], 'r': parts[2]}
compare = comparePkgInfo(pkg_item, tag_item)
if compare == 0:
found_equal = True
elif compare > 0:
found_older = True
elif compare < 0:
found_newer = True
if not found_equal:
if repo_tags is not None:
print(f'{pkg_name} ({pkg_vr}): needs update')
findAndCommitPackageReleases(options, pkg_name, None)
if found_newer:
if options.verbose:
print(f'{pkg_name} ({pkg_vr}): found_equal={found_equal} found_newer={found_newer} found_older={found_older}')
# Check for git repositories to archive
print("Checking for git repositories to archive...")
pkg_names = []
for dir_file in dir_list:
parts = re.split('.*/([^/]*)-([^-]*)-([^-]*).src.rpm$', dir_file)
pkg_names.append(parts[1])
page = 1
while True:
gitea_repos = giteaGetRepositories(page, 50)
for gitea_repo in gitea_repos:
if not getattr(gitea_repo, "archived") and not gitea_repo.name.replace('Plus','+') in pkg_names:
print(f'Archiving repository {gitea_repo.name}')
archiveRepo(gitea_repo.name)
if len(gitea_repos) < 50:
break
page += 1
else:
parser.print_help()
main()