1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
| from ldap3 import Server, Connection, SUBTREE, ALL_ATTRIBUTES
from ldap3.core.exceptions import LDAPBindError
from ldap3 import MODIFY_REPLACE
from ldap3.utils.dn import safe_rdn
class LDAP(object):
def __init__(self, host, port, user, password, base_dn):
dn = "cn=%s,%s" % (user, base_dn)
self.server = Server(host=host, port=port)
self.base_dn = base_dn
self.__conn = Connection(self.server, dn, password, auto_bind=True)
def add_ou(self, ou, oid):
"""添加 OU,oid 保存至 st 字段"""
return self.__conn.add(ou, 'organizationalUnit', {"st": oid})
def add_user(self, userid, username, mobile, mail, title, ou_dn, gidnumber=501):
"""添加用户
:param userid: 用户 ID(如 "linan")
:param username: 姓名(cn)
:param ou_dn: 如 "ou=运维中心,dc=domain,dc=com"
"""
objectclass = ['top', 'person', 'inetOrgPerson', 'posixAccount']
add_dn = "cn=%s,%s" % (username, ou_dn)
password = '%s@qwe' % userid
return self.__conn.add(add_dn, objectclass, {
'mobile': mobile, 'sn': userid, 'mail': mail,
'userPassword': password, 'title': title, 'uid': username,
'gidNumber': gidnumber,
'uidNumber': userid.strip("xxx"),
'homeDirectory': '/home/users/%s' % userid,
'loginShell': '/bin/bash',
})
def get_userdn_by_mail(self, mail, base_dn=None):
"""通过邮箱查询用户 entry"""
status = self.__conn.search(
base_dn or self.base_dn,
search_filter='(mail={})'.format(mail),
search_scope=SUBTREE, attributes=ALL_ATTRIBUTES)
return self.__conn.entries[0] if status and self.__conn.entries else False
def get_userdn_by_args(self, base_dn=None, **kwargs):
"""多条件查询用户(& 逻辑),支持任意 LDAP 属性"""
search = "".join("(%s=%s)" % (k, v) for k, v in kwargs.items())
search_filter = '(&%s)' % search if search else ''
status = self.__conn.search(
base_dn or self.base_dn,
search_filter=search_filter,
search_scope=SUBTREE, attributes=ALL_ATTRIBUTES)
return self.__conn.entries if status and self.__conn.entries else False
def authenticate_userdn_by_mail(self, mail, password):
"""验证用户密码"""
entry = self.get_userdn_by_mail(mail=mail)
if not entry:
return False
try:
Connection(self.server, entry.entry_dn, password, auto_bind=True)
return True
except LDAPBindError:
return False
def update_user_info(self, user_dn, action=MODIFY_REPLACE, **kwargs):
"""修改用户属性(uid, mail, sn, mobile, title 等)"""
allow_key = "uid userPassword mail sn gidNumber uidNumber mobile title".split()
update_args = {}
for k, v in kwargs.items():
if k not in allow_key:
return False
update_args[k] = [(action, [v])]
return self.__conn.modify(user_dn, update_args)
def update_user_cn(self, user_dn, new_cn):
"""修改用户 CN"""
return self.__conn.modify_dn(user_dn, 'cn=%s' % new_cn)
def update_ou(self, dn, new_ou_dn):
"""移动用户到新 OU"""
rdn = safe_rdn(dn)
return self.__conn.modify_dn(dn, rdn[0], new_superior=new_ou_dn)
def delete_dn(self, dn):
"""删除 DN,如果是 OU 则递归删除子节点"""
if not dn.startswith("cn"):
all_users = self.get_userdn_by_args(base_dn=dn, objectClass="Person")
if all_users:
for u in all_users:
self.__conn.delete(u.entry_dn)
all_ous = self.get_userdn_by_args(base_dn=dn, objectClass="organizationalUnit")
if all_ous:
for ou in reversed(all_ous):
self.__conn.delete(ou.entry_dn)
return self.__conn.delete(dn)
|