@@ 0,0 1,147 @@
+#!/usr/bin/env python3
+"""
+ Simple script to give repo access to a specific user.
+
+ $ python3 repoaccess.py <repo type> <~username/repo_name> <username_to_give_access_to> <permission>
+
+ repo types are "hg" and "git"
+
+ permissions are "ro" for read-only and "rw" for read-write
+
+ This needs to be run on the same system that the particular service
+ (repo_type) is being served from.
+"""
+import os
+import re
+import sys
+import argparse
+
+from srht.config import cfg
+from srht.database import db, DbSession
+from scmsrht.repos.access import AccessMode
+
+
+class RepoHelper:
+ user_model = None
+ access_model = None
+ repo_model = None
+
+ def __init__(self, repo_type):
+ _db = DbSession(cfg(f'{repo_type}.sr.ht', 'connection-string'))
+ _db.init()
+ getattr(self, f'set_models_{repo_type}')()
+
+ def set_models_hg(self):
+ from hgsrht.types import User, Access, Repository
+ self.user_model = User
+ self.access_model = Access
+ self.repo_model = Repository
+
+ def set_models_git(self):
+ from gitsrht.types import User, Access, Repository
+ self.user_model = User
+ self.access_model = Access
+ self.repo_model = Repository
+
+ def get_user(self, username):
+ if username.startswith('~'):
+ username = username[1:]
+ return self.user_model.query.filter_by(username=username).first()
+
+ def get_repo(self, owner_name, repo_name):
+ user = self.get_user(owner_name)
+ if not user:
+ return None
+
+ return self.repo_model.query.filter_by(
+ owner_id=user.id,
+ name=repo_name,
+ ).first()
+
+ def grant_access(self, repo_full, to_user, perm):
+ """ repo_full should be in format: ~username/repo_name
+ to_user is the user to give access to
+ """
+ owner, repo = repo_full.split('/')
+ repo = self.get_repo(owner, repo)
+ if not repo:
+ sys.stderr.write(f'Repository {repo_full} not found.\n')
+ return 1
+ user = self.get_user(to_user)
+ if not user:
+ sys.stderr.write(f'User {to_user} not found.\n')
+ return 1
+
+ # Check for existing access
+ access = self.access_model.query.filter_by(
+ repo_id=repo.id,
+ user_id=user.id,
+ ).first()
+ if access:
+ # scmsrht.repos.access.AccessMode - Enum
+ if access.mode.value == perm:
+ print(f'{to_user} already has {perm} access to {repo_full}.')
+ return 0
+ else:
+ access.mode = getattr(AccessMode, perm)
+ print(f'Updated {repo_full} access for {to_user} to {perm}.')
+ else:
+ # Grant access
+ grant = self.access_model()
+ grant.repo_id = repo.id
+ grant.user_id = user.id
+ grant.mode = getattr(AccessMode, perm)
+ db.session.add(grant)
+ print(f'Granted {perm} access to user {to_user} for {repo_full}.')
+
+ db.session.commit()
+ return 0
+
+
+def main():
+ def _repo_type(value):
+ if value not in ['hg', 'git']:
+ raise ValueError
+ return value
+
+ def _repository(value):
+ if not re.search(r'^~[-\w]+\/[-\w]+$', value):
+ raise ValueError
+ return value
+
+ def _perms(value):
+ if value not in ['ro', 'rw']:
+ raise ValueError
+ return value
+
+ parser = argparse.ArgumentParser(
+ description='Give access to hg or git repositories.'
+ )
+ parser.add_argument(
+ 'repo_type',
+ type=_repo_type,
+ help='Repository type (hg or git)',
+ )
+ parser.add_argument(
+ 'repository',
+ type=_repository,
+ help='Repository name (~username/repo_name)',
+ )
+ parser.add_argument(
+ 'username',
+ type=str,
+ help='Username of user you\'re giving access to.',
+ )
+ parser.add_argument(
+ 'perms',
+ type=_perms,
+ help='Permissions being granted (ro or rw)',
+ )
+ args = parser.parse_args()
+
+ repo = RepoHelper(args.repo_type)
+ sys.exit(repo.grant_access(args.repository, args.username, args.perms))
+
+
+if __name__ == '__main__':
+ main()