博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Python通过LDAP验证、查找用户(class,logging)
阅读量:6565 次
发布时间:2019-06-24

本文共 8720 字,大约阅读时间需要 29 分钟。

定义一个类,用于初始化ldap连接,验证、查找用户等功能

# -*- coding: UTF-8 -*-import sys reload(sys) sys.setdefaultencoding('utf-8') import ldap,logging,timelogfile = 'e:\\a.txt'# logging.basicConfig(filename=logfile,level=logging.INFO)# logging.basicConfig(format='%(time.asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')logging.basicConfig(level=logging.INFO,                      #format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s', #返回值:Thu, 26 May 2016 15:09:31 t11.py[line:92] INFO                     format='%(asctime)s %(levelname)s %(message)s',                     #datefmt='%a, %d %b %Y %H:%M:%S',                      #datefmt='%Y/%m/%d %I:%M:%S %p', #返回2016/05/26 03:12:56 PM                    datefmt='%Y-%m-%d %H:%M:%S', #返回2016/05/26 03:12:56 PM                    filename=logfile#,                      #filemode='a' #默认为a                   ) #logging输出结果:#2016-05-26 15:22:29 INFO liu1 valid passed.#2016-05-26 15:22:37 INFO liu1 valid passed.class ldapc:    def __init__(self,ldap_path,baseDN,domainname,ldap_authuser,ldap_authpass):        self.baseDN = baseDN        self.ldap_error = None        ldap_authduser = '%s\%s' %(domainname,ldap_authuser)        self.l=ldap.initialize(ldap_path)        self.l.protocol_version = ldap.VERSION3        try:            self.l.simple_bind_s(ldap_authduser,ldap_authpass)        except ldap.LDAPError,err:            self.ldap_error = 'Connect to %s failed, Error:%s.' %(ldap_path,err.message['desc'])            print self.ldap_error        # finally:        #     self.l.unbind_s()        #     del self.l    def search_users(self,username): #模糊查找,返回一个list,使用search_s()        if self.ldap_error is None:            try:                searchScope = ldap.SCOPE_SUBTREE                searchFiltername = "sAMAccountName" #通过samaccountname查找用户                retrieveAttributes = None                searchFilter = '(' + searchFiltername + "=" + username +'*)'                ldap_result =self.l.search_s(self.baseDN, searchScope, searchFilter, retrieveAttributes)                if len(ldap_result) == 0: #ldap_result is a list.                    return "%s doesn't exist." %username                else:                    # result_type, result_data = self.l.result(ldap_result, 0)                      # return result_type, ldap_result                    return ldap_result            except ldap.LDAPError,err:                return err    def search_user(self,username): #精确查找,返回值为list,使用search()        if self.ldap_error is None:            try:                searchScope = ldap.SCOPE_SUBTREE                searchFiltername = "sAMAccountName" #通过samaccountname查找用户                retrieveAttributes = None                searchFilter = '(' + searchFiltername + "=" + username +')'                ldap_result_id =self.l.search(self.baseDN, searchScope, searchFilter, retrieveAttributes)                result_type, result_data = self.l.result(ldap_result_id, 0)                if result_type == ldap.RES_SEARCH_ENTRY:                    return result_data                else:                    return "%s doesn't exist." %username            except ldap.LDAPError,err:                return err    def search_userDN(self,username): #精确查找,最后返回该用户的DN值        if self.ldap_error is None:            try:                searchScope = ldap.SCOPE_SUBTREE                searchFiltername = "sAMAccountName" #通过samaccountname查找用户                retrieveAttributes = None                searchFilter = '(' + searchFiltername + "=" + username +')'                ldap_result_id =self.l.search(self.baseDN, searchScope, searchFilter, retrieveAttributes)                result_type, result_data = self.l.result(ldap_result_id, 0)                if result_type == ldap.RES_SEARCH_ENTRY:                    return result_data[0][0] #list第一个值为用户的DN,第二个值是一个dict,包含了用户属性信息                else:                    return "%s doesn't exist." %username            except ldap.LDAPError,err:                return err    def valid_user(self,username,userpassword): #验证用户密码是否正确        if self.ldap_error is None:            target_user = self.search_userDN(username) #使用前面定义的search_userDN函数获取用户的DN            if target_user.find("doesn't exist") == -1:                try:                    self.l.simple_bind_s(target_user,userpassword)                    logging.info('%s valid passed.\r'%(username)) #logging会自动在每行log后面添加"\000"换行,windows下未自动换行                    return True                except ldap.LDAPError,err:                    return err            else:                return target_user    def update_pass(self,username,oldpassword,newpassword): #####未测试#########        if self.ldap_error is None:            target_user = self.search_userDN(username)             if target_user.find("doesn't exist") == -1:                try:                    self.l.simple_bind_s(target_user,oldpassword)                    self.l.passwd_s(target_user,oldpassword,newpassword)                    return 'Change password success.'                except ldap.LDAPError,err:                    return err            else:                return target_userldap_authuser='liu1'ldap_authpass='pass'domainname='uu'ldappath='ldap://192.168.200.25:389'baseDN='OU=优优,DC=uu,DC=yuu,DC=com' #ldap_authuser在连接到LDAP的时候不会用到baseDN,在验证其他用户的时候才需要使用username = 'liu1' #要查找/验证的用户p=ldapc(ldappath,baseDN,domainname,ldap_authuser,ldap_authpass)print p.valid_user('Lily','lpass') #调用valid_user()方法验证用户是否为合法用户

 

遍历OU下的用户函数:

def search_OU(self): #精确查找,最后返回该用户的DN值        if self.ldap_error is None:            try:                searchScope = ldap.SCOPE_SUBTREE                #searchFiltername = "sAMAccountName" #通过samaccountname查找用户                retrieveAttributes = None                searchFilter = '(&(objectClass=person))'                ldap_result =self.l.search_s(self.baseDN, searchScope, searchFilter, retrieveAttributes)                if ldap_result is not None:                    udict = {}                    usersinfor = []                    for pinfor in ldap_result:                    #pinfor是一个tuple,第一个元素是该用户的CN,第二个元素是一个dict,包含有用户的所有属性                        if pinfor[1]:                            p=pinfor[1]                            sAMAccountName = p['sAMAccountName'][0] #返回值是一个list                            displayName = p['displayName'][0]                            #如果用户的某个属性为空,则dict中不会包含有相应的key                            if 'department' in p:                                department = p['department'][0]                            else:                                department = None                            #print sAMAccountName,displayName,department                            udict['sAMAccountName'] = sAMAccountName                            udict['displayName'] = displayName                            udict['department'] = department                            usersinfor.append(udict)                            # print udict                    return usersinfor            except ldap.LDAPError,err:                return err            finally:                self.l.unbind_s()                del self.l
 

 

baseDN='OU=Admin,DC=u,DC=y,DC=com' #需要遍历的OU

p=ldapc(ldappath,baseDN,domainname,ldap_authuser,ldap_authpass)
users = p.search_OU()
print users[0]['department']

 

#retrieveAttributes = NonesearchFilter = '(&(objectClass=person))'attrs = ['cn','uid','mail']ldap_result =self.l.search_s(self.baseDN, searchScope, searchFilter, attrs) #只过滤attrs属性,如果为*,则过滤所有属性

 

分页返回LDAP查询结果:

import ldap  from ldap.controls import SimplePagedResultsControl    l = ldap.initialize('ldap://1.1.1.16')  l.simple_bind_s('user', 'Password')  baseDN=unicode('OU=集团,DC=uxin,DC=youxinpai,DC=com','utf8')PAGE_SIZE = 500  #设置每页返回的条数  ATTRLIST = ['sAMAccountName','name', 'mail','department','title']  #设置返回的属性值  # ATTRLIST = None #设置为None则返回用户的所有属性searchFilter =  '(&(objectCategory=person)(objectClass=user)(!(userAccountControl:1.2.840.113556.1.4.803:=2)))' #查询Enabled的用户pg_ctrl = SimplePagedResultsControl(True, size=PAGE_SIZE, cookie="")  userdata = []    while True:      msgid = l.search_ext(baseDN,ldap.SCOPE_SUBTREE, searchFilter, ATTRLIST, serverctrls=[pg_ctrl])      _a, res_data, _b, srv_ctrls = l.result3(msgid)      # print 'res_data', len(res_data) ,msgid    userdata.extend(res_data)      cookie = srv_ctrls[0].cookie      if cookie:          pg_ctrl.cookie = cookie      else:          break    print 'totalnum:', len(userdata)  print userdata[0]

 

 

ObjectClass类型如下:详细参考:http://www.cnblogs.com/dreamer-fish/p/5832735.html(LDAP查询过滤语法)http://www.morgantechspace.com/2013/05/ldap-search-filter-examples.html

objectClass: person

objectClass: organizationalPerson
objectClass: inetOrgPerson

 

转载于:https://www.cnblogs.com/dreamer-fish/p/5531339.html

你可能感兴趣的文章
NVisionXR_iOS教程一 —— NVisionXR从零搭建一个AR项目
查看>>
oracle 12c ins-30131 执行安装程序验证所需的初始设置失败
查看>>
SNAT与DNAT
查看>>
BGP十三条规则
查看>>
Linux 修改密码“ Authentication token manipulation err”
查看>>
openstack
查看>>
【顶】(与同事合作的快乐)技术人员也需要先学会做人,再学会做事,再是能成事,最后是成名得利...
查看>>
Lync Server 2013 安装体验(一)
查看>>
Hadoop2.6.0学习笔记(五)自定义InputFormat和RecordReader
查看>>
EBB-24、DNS2
查看>>
监控web是否正常
查看>>
zabbix监控交换机
查看>>
css3做的nav
查看>>
汇编笔记
查看>>
在线枚举内核模块函数及地址(win64位)
查看>>
deploy nginx using saltstack
查看>>
rsync实现文件传输
查看>>
控件联动(三级联动)
查看>>
shell编程学习
查看>>
点击qq、点击邮箱01
查看>>