-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathldapHelper.py
More file actions
148 lines (118 loc) · 4.06 KB
/
ldapHelper.py
File metadata and controls
148 lines (118 loc) · 4.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import ldap, ldap.sasl, sys, getpass, subprocess
from linuxmusterLinuxclient7 import logging, constants, config, user, computer
_currentLdapConnection = None
def serverUrl():
"""
Returns the server URL
:return: The server URL
:rtype: str
"""
rc, networkConfig = config.network()
if not rc:
return False, None
serverHostname = networkConfig["serverHostname"]
return f"ldap://{serverHostname}"
def baseDn():
"""
Returns the base DN
:return: The baseDN
:rtype: str
"""
rc, networkConfig = config.network()
if not rc:
return None
domain = networkConfig["domain"]
return "dc=" + domain.replace(".", ",dc=")
def conn():
"""
Returns the ldap connection object
:return: The ldap connection object
:rtype: ldap.ldapobject.LDAPObject
"""
global _currentLdapConnection
if _connect():
return _currentLdapConnection
return None
def searchOne(filter):
"""Searches the LDAP with a filter and returns the first found object
:param filter: A valid ldap filter
:type filter: str
:return: Tuple (success, ldap object as dict)
:rtype: tuple
"""
if conn() == None:
logging.error("Cannot talk to LDAP")
return False, None
try:
rawResult = conn().search_s(
baseDn(),
ldap.SCOPE_SUBTREE,
filter
)
except Exception as e:
logging.error("Error executing LDAP search!")
logging.exception(e)
return False, None
try:
result = {}
if len(rawResult) <= 0 or rawResult[0][0] == None:
logging.debug(f"Search \"{filter}\" did not return any objects")
return False, None
for k in rawResult[0][1]:
if rawResult[0][1][k] != None:
rawAttribute = rawResult[0][1][k]
try:
if len(rawAttribute) == 1:
result[k] = str(rawAttribute[0].decode())
elif len(rawAttribute) > 0:
result[k] = []
for rawItem in rawAttribute:
result[k].append(str(rawItem.decode()))
except UnicodeDecodeError:
continue
return True, result
except Exception as e:
logging.error("Error while reading ldap search results!")
logging.exception(e)
return False, None
def isObjectInGroup(objectDn, groupName):
"""
Check if a given object is in a given group
:param objectDn: The DN of the object
:type objectDn: str
:param groupName: The name of the group
:type groupName: str
:return: True if it is a member, False otherwise
:rtype: bool
"""
logging.debug(f"= Testing if object {objectDn} is a member of group {groupName} =")
rc, groupAdObject = searchOne(f"(&(member:1.2.840.113556.1.4.1941:={objectDn})(sAMAccountName={groupName}))")
logging.debug(f"=> Result: {rc} =")
return rc
# --------------------
# - Helper functions -
# --------------------
def _connect():
global _currentLdapConnection
if not user.isInAD() and not (user.isRoot() or not computer.isInAD()):
logging.warning("Cannot perform LDAP search: User is not in AD!")
_currentLdapConnection = None
return False
if not _currentLdapConnection == None:
return True
try:
sasl_auth = ldap.sasl.sasl({} ,'GSSAPI')
_currentLdapConnection = ldap.initialize(serverUrl(), trace_level=0)
# TODO:
# conn.set_option(ldap.OPT_X_TLS_CACERTFILE, '/path/to/ca.pem')
# conn.set_option(ldap.OPT_X_TLS_NEWCTX, 0)
# conn.start_tls_s()
_currentLdapConnection.set_option(ldap.OPT_REFERRALS,0)
_currentLdapConnection.protocol_version = ldap.VERSION3
_currentLdapConnection.sasl_interactive_bind_s("", sasl_auth)
except Exception as e:
_currentLdapConnection = None
logging.error("Cloud not bind to ldap!")
logging.exception(e)
return False
return True