Adding repoaccess script
1 files changed, 147 insertions(+), 0 deletions(-)

A => repoaccess.py
A => repoaccess.py +147 -0
@@ 0,0 1,147 @@ 
+#!/usr/bin/env python3
+"""
+    Simple script to give repo access to a specific user.
+
+        $ python3 repoaccess.py <repo type> <~username/repo_name> <username_to_give_access_to> <permission>
+
+    repo types are "hg" and "git"
+
+    permissions are "ro" for read-only and "rw" for read-write
+
+    This needs to be run on the same system that the particular service
+    (repo_type) is being served from.
+"""
+import os
+import re
+import sys
+import argparse
+
+from srht.config import cfg
+from srht.database import db, DbSession
+from scmsrht.repos.access import AccessMode
+
+
+class RepoHelper:
+    user_model = None
+    access_model = None
+    repo_model = None
+
+    def __init__(self, repo_type):
+        _db = DbSession(cfg(f'{repo_type}.sr.ht', 'connection-string'))
+        _db.init()
+        getattr(self, f'set_models_{repo_type}')()
+
+    def set_models_hg(self):
+        from hgsrht.types import User, Access, Repository
+        self.user_model = User
+        self.access_model = Access
+        self.repo_model = Repository
+
+    def set_models_git(self):
+        from gitsrht.types import User, Access, Repository
+        self.user_model = User
+        self.access_model = Access
+        self.repo_model = Repository
+
+    def get_user(self, username):
+        if username.startswith('~'):
+            username = username[1:]
+        return self.user_model.query.filter_by(username=username).first()
+
+    def get_repo(self, owner_name, repo_name):
+        user = self.get_user(owner_name)
+        if not user:
+            return None
+
+        return self.repo_model.query.filter_by(
+            owner_id=user.id,
+            name=repo_name,
+        ).first()
+
+    def grant_access(self, repo_full, to_user, perm):
+        """ repo_full should be in format: ~username/repo_name
+            to_user is the user to give access to
+        """
+        owner, repo = repo_full.split('/')
+        repo = self.get_repo(owner, repo)
+        if not repo:
+            sys.stderr.write(f'Repository {repo_full} not found.\n')
+            return 1
+        user = self.get_user(to_user)
+        if not user:
+            sys.stderr.write(f'User {to_user} not found.\n')
+            return 1
+
+        # Check for existing access
+        access = self.access_model.query.filter_by(
+            repo_id=repo.id,
+            user_id=user.id,
+        ).first()
+        if access:
+            # scmsrht.repos.access.AccessMode - Enum
+            if access.mode.value == perm:
+                print(f'{to_user} already has {perm} access to {repo_full}.')
+                return 0
+            else:
+                access.mode = getattr(AccessMode, perm)
+                print(f'Updated {repo_full} access for {to_user} to {perm}.')
+        else:
+            # Grant access
+            grant = self.access_model()
+            grant.repo_id = repo.id
+            grant.user_id = user.id
+            grant.mode = getattr(AccessMode, perm)
+            db.session.add(grant)
+            print(f'Granted {perm} access to user {to_user} for {repo_full}.')
+
+        db.session.commit()
+        return 0
+
+
+def main():
+    def _repo_type(value):
+        if value not in ['hg', 'git']:
+            raise ValueError
+        return value
+
+    def _repository(value):
+        if not re.search(r'^~[-\w]+\/[-\w]+$', value):
+            raise ValueError
+        return value
+
+    def _perms(value):
+        if value not in ['ro', 'rw']:
+            raise ValueError
+        return value
+
+    parser = argparse.ArgumentParser(
+        description='Give access to hg or git repositories.'
+    )
+    parser.add_argument(
+        'repo_type',
+        type=_repo_type,
+        help='Repository type (hg or git)',
+    )
+    parser.add_argument(
+        'repository',
+        type=_repository,
+        help='Repository name (~username/repo_name)',
+    )
+    parser.add_argument(
+        'username',
+        type=str,
+        help='Username of user you\'re giving access to.',
+    )
+    parser.add_argument(
+        'perms',
+        type=_perms,
+        help='Permissions being granted (ro or rw)',
+    )
+    args = parser.parse_args()
+
+    repo = RepoHelper(args.repo_type)
+    sys.exit(repo.grant_access(args.repository, args.username, args.perms))
+
+
+if __name__ == '__main__':
+    main()