# HG changeset patch # User Peter Sanchez # Date 1576785506 28800 # Thu Dec 19 11:58:26 2019 -0800 # Node ID 03214b9b651c294319f06d5b9700c8fa98710a14 # Parent 3931cb79bbee0fa4f45ebf5f0f4381443a25c7d7 Adding repoaccess script diff --git a/repoaccess.py b/repoaccess.py new file mode 100755 --- /dev/null +++ b/repoaccess.py @@ -0,0 +1,147 @@ +#!/usr/bin/env python3 +""" + Simple script to give repo access to a specific user. + + $ python3 repoaccess.py <~username/repo_name> + + repo types are "hg" and "git" + + permissions are "ro" for read-only and "rw" for read-write + + This needs to be run on the same system that the particular service + (repo_type) is being served from. +""" +import os +import re +import sys +import argparse + +from srht.config import cfg +from srht.database import db, DbSession +from scmsrht.repos.access import AccessMode + + +class RepoHelper: + user_model = None + access_model = None + repo_model = None + + def __init__(self, repo_type): + _db = DbSession(cfg(f'{repo_type}.sr.ht', 'connection-string')) + _db.init() + getattr(self, f'set_models_{repo_type}')() + + def set_models_hg(self): + from hgsrht.types import User, Access, Repository + self.user_model = User + self.access_model = Access + self.repo_model = Repository + + def set_models_git(self): + from gitsrht.types import User, Access, Repository + self.user_model = User + self.access_model = Access + self.repo_model = Repository + + def get_user(self, username): + if username.startswith('~'): + username = username[1:] + return self.user_model.query.filter_by(username=username).first() + + def get_repo(self, owner_name, repo_name): + user = self.get_user(owner_name) + if not user: + return None + + return self.repo_model.query.filter_by( + owner_id=user.id, + name=repo_name, + ).first() + + def grant_access(self, repo_full, to_user, perm): + """ repo_full should be in format: ~username/repo_name + to_user is the user to give access to + """ + owner, repo = repo_full.split('/') + repo = self.get_repo(owner, repo) + if not repo: + sys.stderr.write(f'Repository {repo_full} not found.\n') + return 1 + user = self.get_user(to_user) + if not user: + sys.stderr.write(f'User {to_user} not found.\n') + return 1 + + # Check for existing access + access = self.access_model.query.filter_by( + repo_id=repo.id, + user_id=user.id, + ).first() + if access: + # scmsrht.repos.access.AccessMode - Enum + if access.mode.value == perm: + print(f'{to_user} already has {perm} access to {repo_full}.') + return 0 + else: + access.mode = getattr(AccessMode, perm) + print(f'Updated {repo_full} access for {to_user} to {perm}.') + else: + # Grant access + grant = self.access_model() + grant.repo_id = repo.id + grant.user_id = user.id + grant.mode = getattr(AccessMode, perm) + db.session.add(grant) + print(f'Granted {perm} access to user {to_user} for {repo_full}.') + + db.session.commit() + return 0 + + +def main(): + def _repo_type(value): + if value not in ['hg', 'git']: + raise ValueError + return value + + def _repository(value): + if not re.search(r'^~[-\w]+\/[-\w]+$', value): + raise ValueError + return value + + def _perms(value): + if value not in ['ro', 'rw']: + raise ValueError + return value + + parser = argparse.ArgumentParser( + description='Give access to hg or git repositories.' + ) + parser.add_argument( + 'repo_type', + type=_repo_type, + help='Repository type (hg or git)', + ) + parser.add_argument( + 'repository', + type=_repository, + help='Repository name (~username/repo_name)', + ) + parser.add_argument( + 'username', + type=str, + help='Username of user you\'re giving access to.', + ) + parser.add_argument( + 'perms', + type=_perms, + help='Permissions being granted (ro or rw)', + ) + args = parser.parse_args() + + repo = RepoHelper(args.repo_type) + sys.exit(repo.grant_access(args.repository, args.username, args.perms)) + + +if __name__ == '__main__': + main()