freeeyes 发表于 2011-6-14 15:15:30

MongoDB研究和实践

最近NoSQL炒的如火如荼,最近在为公司搞一个类似邮件的信息系统。BOSS需要大量用户用手机通过这个信息系统进行交流,并且还必须能实时查询。因为我的系统采用了ICE方式部署,可以有效的分解逻辑部分,于是在寻找一种能够配合分布式存储模式的数据存储模式。我的需求是,一种基于KV结构的信息系统,key永远是用户ID,而Value我需要是一个数据结构,并支持我对这个结构的修改和增删。最后经过多方比较,我决定选择MongoDB来实现我的需求。
说实在的,NoSQL只是一种数据存储手段,并不是万能的。更多的时候,我更喜欢DBMS,因为这样的结构适合很多关系型数据的遍历和查询,因为在数据初期我不可能预测到所有的查询需求,DBMS恰好能弥补我这方面的需求。NoSQL并不是万能的,它适合服务于写频繁,同时读取指定键值简单的数据关系,所谓分布式存储其实不是什么新概念,DBMS其实就有不少的分布式产品的例子,比如oracle。
在我看来,NoSQL还没有完全到达非常成熟的程度,比如对热点数据的负载分担,NoSQL算法还不是很完善。虽然有redis 这类的产品,可以自动分割"热"和"冷"的数据,通过换页的方法增加数据命中率,不过对单点数据的瞬时巨量访问还一直是一个问题,我希望NoSQL可以针对数据的“热”度,在达到一定程度的时候,对数据自动产生分布式冗余,因为过热的数据大多是读取操作多些(比如,明星微博发布一条消息)。而这部分冗余数据可以分散集中在一点的数据访问量。这部分功能目前主要还是开发者们自己去实现的。另外还有对线程安全的一些控制和操作。
所以我觉得,数据层采用什么方法存储,最好还是根据实际需求确定,切莫一概而论,容易反而给自己增加更多不必要的烦恼。
MongoDB优点有不少,写入快(其实并不是写入IO快,而是不写入IO先放在内存里面,后面有一个线程去搞定数据和IO的同步),对文档遍历查询很方便,支持一套类SQL的关系型查找规则,尤其对数组查询和修改提供的小语法非常实用,这里要赞一下。
先说MongoDB的安装,先去
http://www.mongodb.org/
下载一个符合你系统的MongoDB数据库。mongoDB还支持中文版的网页,看来使用者中国人也不少。
然后根据你的开发语言,去下载响应的客户端开发包。(比如我现在用的是Python,就下载一个pymongo)
在这里推荐一本书《MongoDB权威指南》,介绍了很多很实用的基础语法规则。
于是我开始构建我的数据库结构。
比如,我的数据结构如下:
class CUserData():
    def __init__(self):
      self.nID   = 0   #唯一标记ID
      self.nCount= 0   #更新次数
      self.strName = ""#名称
      self.objData = []#CUerDataInfo数组
这里的objData是一个数组,这个数组的单元是一个结构体,这样设计比较符合常规的设计模式(很多例子上操作数据一般都用字符串,数字之类的,不过我觉得这些例子实用价值不高,更多的是我们的数组里面有一个我们所规定的结构体,并对这个结构体进行访问)
我们姑且定义这个结构体如下:
class CUerDataInfo():
    def __init__(self):
      self.nID   = 0    #当前ID
      self.strText = ""   #数据内容好了,下面介绍一下mongoDB的基础插入,删除,更新,修改,查询动作是怎么做的。考虑到这些东东如果写在实际逻辑代码里实在是太难看,也没必要,以后修改维护起来太麻烦,于是我把它封装了一个类,提供外围调用使用之。import gridfsfrom pymongo import Connection
#数据库操作类,包装mongoDB访问使用之
class CMongoDBOP():
    def __init__(self):
      self.m_objConnection = None
      
    def __del__(self):
      self.Close()
      
    def Connect(self, strDBIP, nPort):
      try:
            if self.m_objConnection is None:
                self.m_objConnection = Connection(strDBIP, nPort)
            else:
                self.m_objConnection.disconnect()
      except:
            print "Connect error."
            pass
      
    def Close(self):
      if self.m_objConnection is not None:
            self.m_objConnection.disconnect()
            
    def CreateCollection(self, strDBName, strTableName, nSize = 0):
      if self.m_objConnection is None:
            print "self.m_objConnection is None."
            return False
      
      db = self.m_objConnection
      if db is None:
            return False
      else:
            objTBNames = db.collection_names()
            for res in objTBNames:
                if res == strTableName:
                  return False
               
            if nSize == 0:
                db.create_collection(strTableName)
            else:
                db.create_collection(strTableName, {"capped":"true", "max":100})
            return True
            
      
    def Insert(self, strDBName, strTableName, objData):
      if self.m_objConnection is None:
            print "self.m_objConnection is None."
            return False
      else:
            db = self.m_objConnection
            if db is None:
                return False
            
            #{a:1,b:1}
            db.insert(objData)
            return True
      
    def Update(self, strDBName, strTableName, objWhere, objData):
      if self.m_objConnection is None:
            print "self.m_objConnection is None."
            return False
      else:
            db = self.m_objConnection
            if db is None:
                return False
            
            #{b:'q'}, {$set:{a:1}}, false, true
            db.update(objWhere, objData, upsert=False, multi=True)
            return True
   
    def Delete(self, strDBName, strTableName, objData):
      if self.m_objConnection is None:
            print "self.m_objConnection is None."
            return False
      else:
            db = self.m_objConnection
            if db is None:
                return False
            
            #{z:'abc'}
            db.remove(objData)
            return True
      
    def Remove(self, strDBName, strTableName):
      if self.m_objConnection is None:
            print "self.m_objConnection is None."
            return False
      else:
            db = self.m_objConnection
            if db is None:
                return False
            
            #{z:'abc'}
            db.remove()
            return True      
      
    def Select(self, strDBName, strTableName, objData):
      if self.m_objConnection is None:
            print "self.m_objConnection is None."
            return None
      else:
            db = self.m_objConnection
            if db is None:
                return None
            
            objResult = None
            if objData is None:
                objResult = db.find()
            else:
                objResult = db.find(objData)
            return objResult
      
    def Count(self, strDBName, strTableName):
      if self.m_objConnection is None:
            print "self.m_objConnection is None."
            return 0
      else:
            db = self.m_objConnection
            if db is None:
                return 0
            
            return db.count()
      
    def Limit(self, strDBName, strTableName, objData, objOrderBy, nLimit):
      if self.m_objConnection is None:
            print "self.m_objConnection is None."
            return None
      else:
            db = self.m_objConnection
            if db is None:
                return None
            
            objResult = None
            if objData is None:
                #objResult = db.find().sort(objOrderBy).limit(nLimit)
                objResult = db.find().sort(objOrderBy, -1).limit(nLimit)
            else:
                objResult = db.find(objData).sort(objOrderBy).limit(nLimit)
            return objResult
      
    def EnsureIndex(self, strDBName, strTableName, objEnsureIndex):
      if self.m_objConnection is None:
            print "self.m_objConnection is None."
            return False
      else:
            db = self.m_objConnection
            if db is None:
                return False
            
            db.create_index(objEnsureIndex)
            return True
            
            
    def GridFS_Put(self, strDBName, strTableName, objFileContent, strFileName):
      if self.m_objConnection is None:
            print "self.m_objConnection is None."
            return False
      else:
            db = self.m_objConnection
            if db is None:
                return False
      
            fs = gridfs.GridFS(db)
            nFileID = fs.put(objFileContent, filename=strFileName)
            #print "nFileID=" + str(nFileID)
            return True

mongoDB Client代码很简单,先建立一个Connect,然后基于这个Connect进行增删改操作。这个Connect实际就是一个标准的socket,当你insert的时候,MongoDB会在客户端调用Bson引擎把你的doc转换成Bson格式传输给数据库。这里其实是有优化空间的,我和andy测试的结果是Bson.encode()最耗时,能否把这部分剥离出来,让外面生成Bson数据直接Send呢?肯定是可以的,只不过需要稍微修改一下MongoDB的client代码,也不是很复杂。此话以后有时间大家有兴趣的话我专门说一下。
以上是一个标准的数据操作封装,有兴趣可以看一下里面的实现代码。strDBName指的是数据库名称,在MongoDB里面,一个strDBName对应一个在Data文件夹下的文件。你可以在一个strDBName里面建立N个collection,这个collection你完全可以理解成DBMS里面的数据表。strTableName就是这个集合的名字。
mongoDB很有趣,你可以预先不建立collection,直到你插入的时候,如果数据库发现你的表不存在,它会帮你直接建立。在很多时候这样的方式比较方便,不过,也会造成麻烦,比如有些一次性操作,这么做就比较复杂(比如建立索引)。你必须用单独的代码去做。更好玩的是Update操作,你可以通过设置Update的upsert=False, multi=True这两个参数实现,upsert参数是如果数据不存在,则会帮你建立(true的时候有效),multi意思是如果发现多行数据符合条件,是否一次修改多行?如果为False只会修改查找到的第一条记录。这个功能实在是太好使了,在DBMS实现这个可麻烦了,至少2,3条SQL语句才能做到。这里一个参数就搞定了。
mongoDB的魔力还不仅如此,我之所以想用它的根本原因,是他的数组修改器,这个小东东太好使了,再次攒一下。
试想一下,我有这么一个需求,我需要建立一个定长的数组,当达到上限,则自动顶出最早的数据。这在DBMS里面,实现起来又是一个繁复的过程,至少要判定当前数组个数是否达到上限,再去找最旧的数据,然后再删除,再插入。。。。!@#$%^,要累死我啊。。再看mongoDB,数组完全可以作为栈空间操作,直接一个语法$pop : { objData : -1 },搞定。简单吧。。。。我真没想到能如此简单。。
当然,给力的支持还不仅仅如此。$pull更能根据你给定的条件删除数据。再加上$push方便的插入数据,$addToSet能避免重复输入。无敌了!我只能这么说了。
好了,废话少说,看看我上面说的这些东东怎么通过代码实现。
@exeTime
def DBInsert():
    objMongoDBOP = CMongoDBOP()
    objMongoDBOP.Connect("127.0.0.1", 27017)
    objUserData = CUserData()
    objUserData.strName = "freeeyes"
    for i in xrange(1, 100000):
      obkDict = {}
      obkDict["nID"]   = i
      obkDict["strName"] = objUserData.strName
      obkDict["objData"] = []
      objMongoDBOP.Insert("MongoTestDB", "TestCollection", obkDict)

@exeTime
def DBUpdata():
    objMongoDBOP = CMongoDBOP()
    objMongoDBOP.Connect("127.0.0.1", 27017)
    objUserData = CUserData()
    objUserData.strName = "freeeyes"
    for i in xrange(1, 100000):
      obkDictKey = {}
      obkDictKey["nID"]   = i
      
      obkUpdata = {"$inc" : {"nCount" : 1}, "$set" : {"strName" : "freedomeyes"}}
      objMongoDBOP.Update("MongoTestDB", "TestCollection", obkDictKey, obkUpdata)
      
@exeTime
def DBEnsureIndex():
    objMongoDBOP = CMongoDBOP()
    objMongoDBOP.Connect("127.0.0.1", 27017)
   
    obkDictKey = "nID"
    objMongoDBOP.EnsureIndex("MongoTestDB", "TestCollection", obkDictKey)
      
@exeTime
def DBDelete():
    objMongoDBOP = CMongoDBOP()
    objMongoDBOP.Connect("127.0.0.1", 27017)
    objUserData = CUserData()
    objUserData.strName = "freeeyes"
    for i in xrange(1, 100000):
      obkDictKey = {}
      obkDictKey["nID"]   = i
      
      objMongoDBOP.Delete("MongoTestDB", "TestCollection", obkDictKey)
      
@exeTime

@exeTime
def RemoveAll():
    objMongoDBOP = CMongoDBOP()
    objMongoDBOP.Connect("127.0.0.1", 27017)
    objMongoDBOP.Remove("MongoTestDB", "TestCollection")
    objMongoDBOP.Remove("MongoTestDB", "TestGFS")
   
@exeTime
def DBInsertCell():
    #测试数组
    objMongoDBOP = CMongoDBOP()
    objMongoDBOP.Connect("127.0.0.1", 27017)
    for i in xrange(1, 100000):
      obkDictKey = {}
      obkDictKey["nID"]   = i
      
      objDict = {}
      objDict["nID"]   = i
      objDict["strText"] = "I'm a freeeyes"
      
      objData = {"objData" : objDict}
      
      objUpdate = {}
      objUpdate["$push"] = objData      
      
      objMongoDBOP.Update("MongoTestDB", "TestCollection", obkDictKey, objUpdate)
      
@exeTime
def DBUpdateCell():
    #测试数组
    objMongoDBOP = CMongoDBOP()
    objMongoDBOP.Connect("127.0.0.1", 27017)
    for i in xrange(1, 100000):
      obkDictKey = {}
      obkDictKey["nID"]   = i
      
      objDict = {}
      objDict["objData.0.strText"]   = "I'm a shiqiang."
      
      objUpdate = {}
      objUpdate["$set"] = objDict      
      
      objMongoDBOP.Update("MongoTestDB", "TestCollection", obkDictKey, objUpdate)
      
@exeTime
def DBDeleteCell():
    #测试数组
    objMongoDBOP = CMongoDBOP()
    objMongoDBOP.Connect("127.0.0.1", 27017)
    for i in xrange(1, 100000):
      obkDictKey = {}
      obkDictKey["nID"]   = i
      
      objData = {"objData" : { "nID" : i }}
      
      objUpdate = {}
      objUpdate["$pull"] = objData
      
      objMongoDBOP.Update("MongoTestDB", "TestCollection", obkDictKey, objUpdate)   

@exeTime
def DBGridFS():
    objMongoDBOP = CMongoDBOP()
    objMongoDBOP.Connect("127.0.0.1", 27017)
   
    for i in xrange(1, 10000):
      objMongoDBOP.GridFS_Put("MongoTestDB", "TestGFS", "Hello freeeyes", "file_" + str(i) + ".txt")

这里要说明一下,@exeTime是我写的一个装饰器:用于显示执行时间的,代码如下:
def exeTime(func):
    def newFunc(*args, **args2):
      t0 = time.time()
      #print "@%s, {%s} start" % (time.strftime("%X", time.localtime()), func.__name__)   
      back = func(*args, **args2)
      #print "@%s, {%s} end" % (time.strftime("%X", time.localtime()), func.__name__)   
      print "*******@%.3fs taken for {%s}" % (time.time() - t0, func.__name__)
      return back
    return newFunc

好了,我为了测试性能,模拟了10万次的插入,删除,修改动作(有索引的情况下),并进行了10万次的数组操作(插入,删除和修改),让我们开看看mongoDB完成这些操作用了多少时间?
测试代码如下:
#单线程版本

print "Single Thread."
DBInsert()
DBEnsureIndex()
DBUpdata()
DBDelete()
DBInsertCell()
DBUpdateCell()
DBDeleteCell()
RemoveAll()

好了,看看mongoDB消耗了多少时间:
Single Thread.
*******@10.532s taken for {DBInsert}
*******@0.062s taken for {DBEnsureIndex}
*******@16.891s taken for {DBUpdata}
*******@11.281s taken for {DBDelete}
*******@8.859s taken for {DBInsertCell}
*******@9.704s taken for {DBUpdateCell}
*******@8.171s taken for {DBDeleteCell}
*******@0.000s taken for {RemoveAll}
*******@0.000s taken for {newFunc}

呵呵,10万次操作,这个时间我还是比较满意的。
这里在着重说一下,mongoDB为了快速,默认是在unsafe模式下运行的,这个模式是指,操作并不会等数据真正执行成功便会返回。这是一个双刃剑,对于有些不重要的数据,这么做是没有问题的,但是对于交易数据,最好还是把safe关键字选择true。
主要稍微修改一下insert方法
    def Insert(self, strDBName, strTableName, objData):
      if self.m_objConnection is None:
            print "self.m_objConnection is None."
            return False
      else:
            db = self.m_objConnection
            if db is None:
                return False
            
            #{a:1,b:1}
            db.insert(objData, safe=True)
            return True

不过safe模式会大大降低系统性能,最好在需要肯定数据必须需要同步的时候去做。这就是属于数据设计的问题了。呵呵。
好了,如果你有兴趣,可以跑跑这段代码,看看mongoDB在你机器上的表现。(此测试代码在python2.6+winXP下测试通过)
页: [1]
查看完整版本: MongoDB研究和实践