我的编程空间,编程开发者的网络收藏夹
学习永远不晚

python 操作 openldap

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

python  操作 openldap

# -*- coding: utf-8 -*-
# author : s

import random,string
from ldap3 import Server,Connection,ALL,SUBTREE,ALL_ATTRIBUTES,MODIFY_REPLACE,MODIFY_ADD,MODIFY_DELETE
from passlib.hash import  ldap_salted_sha1 as ssha
from app.utils import  ErrMsg

class LdapOp(object):

    """
    Operation Dcouments: http://ldap3.readthedocs.io/

    """

    def __init__(self,ip,port,user,passwd):
        self._ip = ip
        self._port = port
        self._user = user
        self._passwd = passwd
        self.dn = self._user.split(',',1)[1]
        self.s = Server(self._ip,self._port,get_info=ALL)
        self._conn =  Connection(self.s, self._user, self._passwd, auto_bind=True)

    @property
    def conn(self):

        if not self._conn:
            print('ldap conn init ')
            self._conn =  Connection(self.s, self._user, self._passwd, auto_bind=True)

        return self._conn

    def searchAll(self,name='top'):
        entries = self.conn.extend.standard.paged_search(self.dn, '(objectClass=%s)' % name ,attributes=['cn', 'givenName','gidNumber','uidNumber','telephoneNumber','mail'])
        en = list(entries)

        ulist = [v for v in en if  'People' in v.get('dn','')  and  'uid' in v.get('dn','') ]

        l = []
        for v in ulist:
            udict = {}
            udict['dn'] = v['dn']
            udict['cn'] =  v['attributes'].get('cn','')[0]  if len(v['attributes'].get('cn','')) > 0 else v['attributes'].get('cn','')
            udict['gid'] = str(v['attributes'].get('gidNumber',''))
            udict['mail'] = v['attributes'].get('mail','')[0]  if len(v['attributes'].get('mail','')) > 0 else v['attributes'].get('mail','')
            udict['phone'] = v['attributes'].get('telephoneNumber','')[0]  if len(v['attributes'].get('telephoneNumber','')) > 0 else v['attributes'].get('telephoneNumber','')
            l.append(udict)

        # append user is groups
        groups = self.searchGroups()
        ##  添加 组 到 用户的连接

        for user in l:
            user['groups'] = []
            for group in groups:
                # group.get('memberUid',[])
                if user.get('cn', '') in group.get('memberUid', []):
                    user['groups'].append(group.get('cn'))
        #print(l)
        return l

    def getGroups(self):
        l = []

        return l


    def user_addGroup(self, group_dn, user_cn=[]):
        data = self.conn.modify(group_dn, {'memberuid': [(MODIFY_ADD, user_cn)]})
        return self.conn.entries

    def user_deleteGroup(self,group_dn,user_cn=[]):
        data = self.conn.modify(group_dn, {'memberuid': [(MODIFY_DELETE,user_cn)]})
        print(data)
        return self.conn.entries


    def addGroup(self,args):

        ouname = self.get_ouname('(|(ou=Groups)(ou=Group))')
        print('ouname=',ouname)
        groupname = args.get('name')
        groupid = args.get('groupid')
        #grouppassword = args.get('grouppassword')

        cndn = 'cn=%s,%s' % (groupname,ouname)
        print(cndn)
        attr = {
            'objectClass': ['posixGroup', 'top'],
            'cn' : groupname,
            'gidNumber': groupid,
            #'userPassword': grouppassword
        }
        sgroup = [ g for g in self.searchGroups() if g.get('cn','') == groupname or g.get('gidNumber','') == groupid ]
        if sgroup:
            return {'msg': 'err', 'data': u' 组名字或组ID重复,请检查后重新添加!'}
        try:
            data = self.conn.add(cndn, attributes=attr)
            msg = 'ok'
        except Exception as e:

            data  = e
            msg = 'err'
        finally:
            #self.conn.unbind()
            return {'msg': msg , 'data': data}


    def searchGroups(self,groupname=''):

        if groupname:
            #print(self.conn)
            rv = self.conn.search('%s' % self.dn, '( &(objectClass=posixGroup)(cn=%s))' %groupname,
               attributes=['sn', 'cn', 'objectclass', 'userPassword', 'gidNumber','memberUid'])
        else:
            rv = self.conn.search('%s' % self.dn, '(objectClass=posixGroup)',
               attributes=['sn', 'cn', 'objectclass', 'userPassword', 'gidNumber','memberUid'])
        data = []

        if rv:
            for en in  self.conn.entries:
                endict = en.entry_get_attributes_dict()
                suben = {
                    'cn': str(en['cn'][0]),
                    'gidNumber': str(en['gidNumber'][0]),
                    #'objectClass': ','.join(en['objectClass']),
                    #'userPassword': en['userPassword'][0],
                    'dn': str(en.entry_get_dn()),
                    'memberUid': endict.get('memberUid',[]),
                }
                data.append(suben)

        return data


    def deleteGroup(self,name=''):

        if not name:
            return {'msg':'err','data': 'Group Name not is None'}

        group = self.searchGroups(name)

        if len(group)== 1:

            try:

                self.conn.delete(group[0]['dn'])

                rv =  {'msg':'ok','data':''}
            except Exception as e:
                rv =  {'msg': 'err','data': e}

        elif len(group) == 0 :
            rv = {'msg': 'err', 'data': 'Not group Find'}

        else:
            rv = {'msg': 'err', 'data': 'Find %d groups '} %len(group)

        return rv

    def addUser(self,args):

        cndn = 'uid=%s,ou=People,%s' % ( args.get('name'),self.dn)
        #object_class = [ 'account','posixAccount', 'top', 'shadowAccount']

        if args.get('mail',''):
            mail = args.get('mail')
        else:
            if self.dn == 'dc=test,dc=com':
                dn = 'test.com'
                mail = '%s@%s' % (args.get('name'), dn)
            else:
                dn ='.'.join(map(lambda x: x[3:],self.dn.split(',')))
                mail = '%s@%s' % (args.get('name'),dn)
        attr = {
            'objectClass': ['account', 'posixAccount', 'top', 'shadowAccount'],
            'userPassword': args.get('passwd'),
            'uid': args.get('name'),
            'cn': args.get('name'),
            'shadowLastChange': '17038',
            'shadowMax': '99999',
            'shadowWarning': '7',
            'uidNumber': args.get('phonenumber'),
            'gidNumber': args.get('groupid'),
            'homeDirectory': '/home/%s' % args.get('name'),
            'mail': mail,
            'telephoneNumber': args.get('phonenumber'),
            'displayname': args.get('name'),
        }
        #print('*' *100)
        #print(cndn)
        #print(attr)
        try:
            data = self.conn.add(cndn, attributes=attr)
            if not data:
                
                attr['objectClass'] = ['posixAccount', 'shadowAccount', 'top', 'person', 'organizationalPerson','inetOrgPerson']
                attr['sn'] = args.get('name')
          
                data = self.conn.add(cndn, attributes=attr)
                if not data:
                    
                    msg = self.conn.result['message']
                    raise ValueError(msg)
            msg = 'ok'
        except Exception as e:

            data  = e
            msg = 'err'
        finally:
            self.conn.unbind()
        return {'msg': msg , 'data': data}

    def deleteUser(self,name=''):
        if not name:
            return {'msg':'err','data': 'Name not is None'}
        if 'dc' in name:
            dn = name
        else:
            dn = 'uid=%s,ou=People,%s' %(name,self.dn)
        print(dn)
        try:
            self.conn.delete(dn)
            rv =  {'msg':'ok','data':''}
        except Exception as e:
            rv =  {'msg': 'err','data': e}
        finally:
            self.conn.unbind()
        return  rv

    def searchUser(self,name):
        try:
            users = self.searchAll()
            if name:
                data = [ user for user in  users if name in  user.get('cn') ]
            else:
                data = users
            rv = 'ok'
        except Exception as e:
            data = e
            rv = 'err'
        finally:
            return {'msg':rv,'data':data}

    def searchSinUser(self,name):
        try:
            ga = self.conn.extend.standard.paged_search(search_base=self.dn,
                                           search_filter='(uid=%s)' %name,
                                           search_scope= SUBTREE,
                                           attributes=ALL_ATTRIBUTES)
            user = list(ga)
            if user:
                data = user[0].get('attributes')

                d = {
                    'user': str(data.get('cn')[0]),
                    'passwd': data.get('userPassword')[0].decode('utf-8'),
                    'gid': data.get('gidNumber'),
                    'mail': data.get('mail')[0],
                    'phonenumber': data.get('uidNumber')

                }
                
            else:
                d = {}
            users = self.searchAll()
            usingle = [d for d in users if d.get('cn') == name ]
            u = {}
            if usingle:
                u = usingle[0]
        except Exception as e:
            print(e)
            u = {'msg': ErrMsg()}

        finally:
            #print('user=',u)
            x = u.update(d)
            print('u=',u)
            return u

    def editUser(self,args):
        try:
            cn = 'uid=%s,ou=People,%s' % (args.get('user'), self.dn)
            print(cn)
            passwd = args.get('passwd','')

            user = self.searchSinUser(args.get('user'))
            data = ''
            if user.get('passwd') != passwd:
                if 'SSHA' not in passwd:
                    passwd = self.encodePasswd(passwd)
                   
                    data = self.conn.modify(cn, {'userPassword': (MODIFY_REPLACE,passwd)})
            if user.get('gid') != args.get('gid') and  args.get('gid') != 'default':
                data = self.conn.modify(cn,{'gidNumber': (MODIFY_REPLACE,args.get('gid')) })

        except Exception as e:
            ErrMsg()
            data = e
        finally:
            print('data', data)
            return data


    def setPasswd(self,**kwargs):
        try:
            cn = 'uid=%s,ou=People,%s' % (kwargs.get('user',''), self.dn)
            data = self.conn.modify(cn,{'userPassword':(MODIFY_REPLACE,kwargs.get('passwd', ''))})
        except Exception as e:
            data = ErrMsg()
        finally:
            return data

    @staticmethod
    def GenPasswd():
        return ''.join(random.sample(string.ascii_letters + string.digits, 8))

    def _searchUser_org(self,username):
        print('in-search-org')
        ga = self.conn.extend.standard.paged_search(search_base=self.dn,
                                       search_filter='(uid=%s)' %username,
                                       search_scope= SUBTREE,
                                       attributes=ALL_ATTRIBUTES)
        user = list(ga)
        print(user)
        entry = self.conn.response[0]
        dn = entry['dn']
        attr_dict = entry['attributes']

        return (dn,attr_dict)

    def authUser(self,username,password):
        
        try:
            cn = 'uid=%s,ou=People,%s' % (username,self.dn)
            conn2 = Connection(self._ip, user=cn, password=password,
                               check_names=True, lazy=False, raise_exceptions=False)
            conn2.bind()
            if conn2.result["description"] == "success":
                rv = 'ok'
                data = '认证成功'
            else:
                rv = 'err'
                data = '认证失败,用户名或密码错误!'
        except Exception as e:
            rv = 'err'
            data = ErrMsg()
        finally:
            return {'rv': rv,'data':data}

    @staticmethod
    def encodePasswd(password=''):
        """
        :param password:
        :return:
        """
        if password:
            return ssha.encrypt(password,salt_size=12)
        return ''

    def get_ouname(self,ouname):
        """
         ouname = '(|(ou=Groups)(ou=Group))'

        :param ouname:
        :return:
        """
        print(self.dn,ouname)
        ou = self.conn.search('%s' % self.dn, '%s' % ouname )
        print(ou)
        retry = self.conn.entries
        num = len(retry)
        if num == 0:
            return []

        else:
            return retry[0].entry_get_dn()


免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

python 操作 openldap

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

python 操作 openldap

# -*- coding: utf-8 -*-# author : simport random,stringfrom ldap3 import Server,Connection,ALL,SUBTREE,ALL_ATTRIBUTES,MO
2023-01-31

Python操作SQLLite(基本操作

SQLite 是一个软件库,实现了自给自足的、无服务器的、零配置的、事务性的 SQL 数据库引擎。SQLite 是在世界上最广泛部署的 SQL 数据库引擎。SQLite 源代码不受版权限制。Python SQLITE数据库是一款非常小巧的嵌
2023-01-31

Python操作Elasticsearc

描述:ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。下面介绍了利用Python API接口进行数据查询,方便其他系统的调用。注:此文仅做笔记参考  
2023-01-31

Python操作MSSQL

Python连接SQL Server数据库 - pymssql使用基础:https://www.cnblogs.com/baiyangcao/p/pymssql_basic.html廖雪峰官网 之 Python 访问数据库(SQLLite
2023-01-31

python 操作outlook

# -*- coding: utf-8 -*-from win32com.client import constantsfrom win32com.client.gencache import EnsureDispatch as Dispa
2023-01-31

Python操作符

运算操作符+_*/% 取余// 除法取整** 幂运算例a = 3a += 2a → 5b = 4b -= 1b→310/8 → 1.2510//8 → 110 % 3 → 16 % 3 → 0逻辑操作符andornot
2023-01-31

python 操作excel

python 读写 excel 有好多选择,但是,方便操作的库不多,在我尝试了几个库之后,我觉得两个比较方便的库分别是 xlrd/xlwt、openpyxl。之所以推荐两个库是因为这两个库分别操作的是不同版本的 excel,xlrd 操作的
2023-01-31

Python 操作json

Json语法规则:数据在名称/值对中数据由逗号分隔花括号保存对象方括号保存数组Json字符串本质上是一个字符串,用单引号表示Json数据的书写格式名称--值对,包括名称字段(在双引号中),后面跟一个冒号,然后是值:“name”: ”zhan
2023-01-31

python操作mysqldb

下载安装MySQLdb<1>linux版本http://sourceforge.net/projects/mysql-python/ 下载,在安装是要先安装setuptools,然后在下载文件目录下,修改mysite.cfg,指定本地mys
2023-01-31

python 操作 libreoffic

最近研究了一个,用python来向libreoffice spreadsheet中的写数据,openoffice管方网站的资料也不是很清楚,在网上找到的几个比较有用的网站有:http://wiki.services.openoffice.o
2023-01-31

python操作solr

solr接收http请求,所以使用requests库操作solr就可以添加 data = {"add": {"doc": params, "commitWithin": 1000}} headers = {"Content-ty
2023-01-31

Python操作xml

XmlXML指可扩展标记语言(Extensible Markup Language)XML被设计用于结构化、存储和传输数据XML是一种标记语言,很类似于HTMLXML没有像HTML那样具有预定义标签,需要程序员自定义标签。XML被
2023-01-31

python操作sqlserver

# coding=gbkimport sys  import pymssql#尝试数据库连接try:    conn = pymssql.connect(host="192.168.1.43",user="sa",password="sa0
2023-01-31

Python操作redis

Python操作redispythonredis数据库searchimport首先确保redis已经正常启动。安装   可以去pypi上找到redis的Python模块:   http://pypi.python.org/pypi?%3Aa
2023-01-31

python操作mysql

# rpm -qa |grep MySQL-python 查询是否有mysqldb库MySQL-python-1.2.3-0.3.c1.1.el6.x86_64>>> import MySQLdb #导入mysqldb模块>>> conn
2023-01-31

python操作Excel

import xlrdimport xlwt import xlutils import win32com#xlrd#打开exceldata = xlrd.open_workbook("I+P.xls")#查看文件中包含sheet的名称sh
2023-01-31

python 操作kafka

https://pypi.python.org/pypi/pykafka最近项目中总是跟java配合,我一个写python的程序员,面对有复杂数据结构的java代码转换成python代码,确实是一大难题,有时候或多或少会留有一点坑,看来有空
2023-01-31

python excel操作

首先在python3.5里,我们可以使用新的python类库,来支持3.x之后的读写excel 针对 03版excel(xls结尾的),我们可以使用xlrd读,xlwt包来写 针对 07版excel(xlsx结尾的),我们可以使用openp
2023-01-31

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录