ginger-dev-list
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Ginger-dev-list] [PATCH 2/2] User model changes


From: dhbarboza82
Subject: [Ginger-dev-list] [PATCH 2/2] User model changes
Date: Fri, 24 Jul 2015 10:53:19 -0300

From: Daniel Henrique Barboza <address@hidden>

libuser specific methods were added in models/users.py to isolate
the logic of libuser API from the basic functions (create/delete/list)
of the models. This also makes it easier to isolate the libuser
API for unit testing.

Signed-off-by: Daniel Henrique Barboza <address@hidden>
---
 models/users.py | 203 ++++++++++++++++++++++++++++++++++++++------------------
 1 file changed, 139 insertions(+), 64 deletions(-)

diff --git a/models/users.py b/models/users.py
index ffd0b6a..0d73785 100644
--- a/models/users.py
+++ b/models/users.py
@@ -17,7 +17,7 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
 
-
+import crypt
 import grp
 import os
 import pwd
@@ -32,49 +32,143 @@ SUDOERS_FILE = '/etc/sudoers.d/%s_conf'
 SUDOERS_LINE = '%s\tALL=(ALL)\tALL\n'
 
 
+def get_groups():
+    adm = libuser.admin()
+    return adm.enumerateGroups()
+
+
+def get_group_obj(groupname):
+    adm = libuser.admin()
+    return adm.lookupGroupByName(groupname)
+
+
+def get_group_obj_by_gid(gid):
+    adm = libuser.admin()
+    return adm.lookupGroupById(gid)
+
+
+def get_group_gid(groupname):
+    adm = libuser.admin()
+    gid = adm.lookupGroupByName(groupname).get('pw_gid')[0]
+    return gid
+
+
+def create_group(groupname):
+    adm = libuser.admin()
+    group = adm.lookupGroupByName(groupname)
+    if not group:
+        new_group = adm.initGroup(groupname)
+        gid = new_group[libuser.GIDNUMBER]
+        adm.addGroup(new_group)
+        return gid[0]
+    return group.get('pw_gid')[0]
+
+
+def delete_group(groupname):
+    adm = libuser.admin()
+    group_obj = adm.lookupGroupById(
+        int(get_group_gid(groupname))
+    )
+
+    if group_obj is None:
+        kimchi_log.error('Could not delete group "%s"', groupname)
+        raise OperationFailed('GINUSER0012E', {'group': groupname})
+
+    if not adm.enumerateUsersByGroup(groupname):
+        try:
+            adm.deleteGroup(group_obj)
+        except Exception as e:
+            kimchi_log.error('Could not delete group "%s": %s', groupname, e)
+            raise OperationFailed('GINUSER0012E', {'group': groupname})
+
+
+def get_users_from_group(groupname):
+    adm = libuser.admin()
+    group_obj = adm.lookupGroupById(
+        int(get_group_gid(groupname))
+    )
+    if group_obj is not None:
+        return adm.enumerateUsersByGroup(groupname)
+    return None
+
+
+def get_users(exclude_system_users=True):
+    if exclude_system_users:
+        return [user.pw_name for user in pwd.getpwall()
+                if user.pw_uid >= 1000]
+
+    admin = libuser.admin()
+    return admin.enumerateUsers()
+
+
+def get_user_obj(username):
+    adm = libuser.admin()
+    return adm.lookupUserByName(username)
+
+
+def create_user(name, plain_passwd, profile=None):
+    adm = libuser.admin()
+    user = adm.lookupUserByName(name)
+    if user:
+        msg = 'User/Login "%s" already in use' % name
+        kimchi_log.error(msg)
+        raise OperationFailed('GINUSER0008E', {'user': name})
+
+    try:
+        new_user = adm.initUser(name)
+        if profile == "kimchiuser":
+            new_user[libuser.LOGINSHELL] = '/sbin/nologin'
+        adm.addUser(new_user)
+        enc_pwd = crypt.crypt(plain_passwd)
+        adm.setpassUser(new_user, enc_pwd, True)
+    except Exception as e:
+        kimchi_log.error('Could not create user %s', name, e)
+        raise OperationFailed('GINUSER0009E', {'user': name})
+
+    return new_user
+
+
+def delete_user(username):
+    adm = libuser.admin()
+    user_obj = adm.lookupUserByName(username)
+
+    if user_obj is None:
+        kimchi_log.error('User "%s" does not exist', username)
+        raise OperationFailed('GINUSER0011E', {'user': username})
+    try:
+        adm.deleteUser(user_obj, True, True)
+    except Exception as e:
+        kimchi_log.error('Could not delete user %s: %s', username, e)
+        raise OperationFailed('GINUSER0010E', {'user': username})
+
+
+def add_user_to_primary_group(username, groupname):
+    user_obj = get_user_obj(username)
+    user_obj[libuser.GIDNUMBER] = get_group_gid(groupname)
+
+
 class UsersModel(object):
     """
     The model class for basic management of users in the host system
     """
 
     def create(self, params):
-        adm = libuser.admin()
+        username = params['name']
+        passwd = params['password']
         profile = params['profile']
-        # Login/Group already in use ?
-        user = adm.lookupUserByName(params['name'])
-        group = adm.lookupGroupByName(params['group'])
-        if user:
-            msg = 'User/Login "%s" already in use' % params['name']
-            kimchi_log.error(msg)
-            raise OperationFailed('GINUSER0008E', {'user': params['name']})
-        # Adding user/group
-        try:
-            new_user = adm.initUser(params['name'])
-            # Handling user shell according to profile, else, use default shell
-            if profile == "kimchiuser":
-                new_user[libuser.LOGINSHELL] = '/sbin/nologin'
-            # Creating group or adding user to existing group
-            if group is None:
-                new_group = adm.initGroup(params['group'])
-                gid = new_group[libuser.GIDNUMBER]
-                adm.addGroup(new_group)
-            else:
-                gid = adm.lookupGroupByName(params['group']).get('pw_gid')[0]
-            new_user[libuser.GIDNUMBER] = gid
-            adm.addUser(new_user)
-            # Password should come encrypted
-            adm.setpassUser(new_user, params['password'], True)
-        except Exception as e:
-            kimchi_log.error('Could not create user %s', params['name'], e)
-            raise OperationFailed('GINUSER0009E', {'user': params['name']})
+        groupname = params['group']
+
+        create_user(username, passwd, profile=profile)
+        create_group(groupname)
+        add_user_to_primary_group(username, groupname)
 
         # Handle profiles
         if profile in ["virtuser", "admin"]:
-            self._add_user_to_kvm_group(adm, params['name'])
+            self._add_user_to_kvm_group(username)
         if profile == "admin":
-            self._add_user_to_sudoers(params['name'])
+            self._add_user_to_sudoers(username)
 
-        return params['name']
+        return username
 
     def _add_user_to_sudoers(self, user):
         try:
@@ -88,9 +182,10 @@ class UsersModel(object):
                              user, e.message)
             raise OperationFailed('GINUSER0007E', {'user': user})
 
-    def _add_user_to_kvm_group(self, adm, user):
+    def _add_user_to_kvm_group(self, user):
         # Add new user to KVM group
-        kvmgrp = adm.lookupGroupByName('kvm')
+        adm = libuser.admin()
+        kvmgrp = get_group_obj('kvm')
         kvmgrp.add('gr_mem', user)
         ret = adm.modifyGroup(kvmgrp)
         if ret != 1:
@@ -101,42 +196,22 @@ class UsersModel(object):
             raise OperationFailed('GINUSER0006E', {'user': user})
 
     def get_list(self):
-        # Get list of users in the host excluding system users
-        return [user.pw_name for user in pwd.getpwall() if user.pw_uid >= 1000]
+        return get_users()
 
 
 class UserModel(object):
     def delete(self, user):
-        adm = libuser.admin()
-        user_obj = adm.lookupUserByName(user)
-        # Check if user exist
-        if user_obj is None:
-            kimchi_log.error('User "%s" does not exist', user)
-            raise OperationFailed('GINUSER0011E', {'user': user})
-        group_obj = adm.lookupGroupById(int(user_obj.get('pw_gid')[0]))
-        # Delete user with its home and mails too
-        try:
-            adm.deleteUser(user_obj, True, True)
-        except Exception as e:
-            kimchi_log.error('Could not delete user %s: %s', user, e)
-            raise OperationFailed('GINUSER0010E', {'user': user})
+        user_obj = get_user_obj(user)
+        group_obj = get_group_obj_by_gid(
+            int(user_obj.get('pw_gid')[0])
+        )
 
-        # Handle user according to its profile
-        self._delete_profile_settings(user)
+        delete_user(user)
+        if group_obj is not None:
+            groupname = group_obj.get('gr_name')[0]
+            delete_group(groupname)
 
-        # Delete group if no users are assigned to it
-        # It is not possible to delete user/group at same time
-        if group_obj is None:
-            msg = 'Group for user "%s" does not exist for removal' % user
-            kimchi_log.warn(msg)
-            raise OperationFailed('GINUSER0013E', {'user': user})
-        group = group_obj.get('gr_name')[0]
-        if not adm.enumerateUsersByGroup(group):
-            try:
-                adm.deleteGroup(group_obj)
-            except Exception as e:
-                kimchi_log.error('Could not delete group "%s": %s', group, e)
-                raise OperationFailed('GINUSER0012E', {'group': group})
+        self._delete_profile_settings(user)
 
     def lookup(self, user):
         try:
-- 
2.4.3




reply via email to

[Prev in Thread] Current Thread [Next in Thread]