找回密码
 用户注册

QQ登录

只需一步,快速开始

查看: 5620|回复: 0

MongoDB研究和实践

[复制链接]
发表于 2011-6-14 15:15:30 | 显示全部楼层 |阅读模式
最近NoSQL炒的如火如荼,最近在为公司搞一个类似邮件的信息系统。BOSS需要大量用户用手机通过这个信息系统进行交流,并且还必须能实时查询。因为我的系统采用了ICE方式部署,可以有效的分解逻辑部分,于是在寻找一种能够配合分布式存储模式的数据存储模式。我的需求是,一种基于KV结构的信息系统,key永远是用户ID,而Value我需要是一个数据结构,并支持我对这个结构的修改和增删。最后经过多方比较,我决定选择MongoDB来实现我的需求。
说实在的,NoSQL只是一种数据存储手段,并不是万能的。更多的时候,我更喜欢DBMS,因为这样的结构适合很多关系型数据的遍历和查询,因为在数据初期我不可能预测到所有的查询需求,DBMS恰好能弥补我这方面的需求。NoSQL并不是万能的,它适合服务于写频繁,同时读取指定键值简单的数据关系,所谓分布式存储其实不是什么新概念,DBMS其实就有不少的分布式产品的例子,比如oracle。
在我看来,NoSQL还没有完全到达非常成熟的程度,比如对热点数据的负载分担,NoSQL算法还不是很完善。虽然有redis 这类的产品,可以自动分割"热"和"冷"的数据,通过换页的方法增加数据命中率,不过对单点数据的瞬时巨量访问还一直是一个问题,我希望NoSQL可以针对数据的“热”度,在达到一定程度的时候,对数据自动产生分布式冗余,因为过热的数据大多是读取操作多些(比如,明星微博发布一条消息)。而这部分冗余数据可以分散集中在一点的数据访问量。这部分功能目前主要还是开发者们自己去实现的。另外还有对线程安全的一些控制和操作。
所以我觉得,数据层采用什么方法存储,最好还是根据实际需求确定,切莫一概而论,容易反而给自己增加更多不必要的烦恼。
MongoDB优点有不少,写入快(其实并不是写入IO快,而是不写入IO先放在内存里面,后面有一个线程去搞定数据和IO的同步),对文档遍历查询很方便,支持一套类SQL的关系型查找规则,尤其对数组查询和修改提供的小语法非常实用,这里要赞一下。
先说MongoDB的安装,先去
http://www.mongodb.org/
下载一个符合你系统的MongoDB数据库。mongoDB还支持中文版的网页,看来使用者中国人也不少。
然后根据你的开发语言,去下载响应的客户端开发包。(比如我现在用的是Python,就下载一个pymongo)
在这里推荐一本书《MongoDB权威指南》,介绍了很多很实用的基础语法规则。
于是我开始构建我的数据库结构。
比如,我的数据结构如下:
  1. class CUserData():
  2.     def __init__(self):
  3.         self.nID     = 0   #唯一标记ID
  4.         self.nCount  = 0   #更新次数
  5.         self.strName = ""  #名称
  6.         self.objData = []  #CUerDataInfo数组
复制代码
这里的objData是一个数组,这个数组的单元是一个结构体,这样设计比较符合常规的设计模式(很多例子上操作数据一般都用字符串,数字之类的,不过我觉得这些例子实用价值不高,更多的是我们的数组里面有一个我们所规定的结构体,并对这个结构体进行访问)
我们姑且定义这个结构体如下:
  1. class CUerDataInfo():
  2.     def __init__(self):
  3.         self.nID     = 0    #当前ID
  4.         self.strText = ""   #数据内容
复制代码
好了,下面介绍一下mongoDB的基础插入,删除,更新,修改,查询动作是怎么做的。考虑到这些东东如果写在实际逻辑代码里实在是太难看,也没必要,以后修改维护起来太麻烦,于是我把它封装了一个类,提供外围调用使用之。
  1. import gridfsfrom pymongo import Connection
  2. #数据库操作类,包装mongoDB访问使用之
  3. class CMongoDBOP():
  4.     def __init__(self):
  5.         self.m_objConnection = None
  6.         
  7.     def __del__(self):
  8.         self.Close()
  9.         
  10.     def Connect(self, strDBIP, nPort):
  11.         try:
  12.             if self.m_objConnection is None:
  13.                 self.m_objConnection = Connection(strDBIP, nPort)
  14.             else:
  15.                 self.m_objConnection.disconnect()
  16.         except:
  17.             print "[CMongoDBOP::Connect]Connect error."
  18.             pass
  19.         
  20.     def Close(self):
  21.         if self.m_objConnection is not None:
  22.             self.m_objConnection.disconnect()
  23.             
  24.     def CreateCollection(self, strDBName, strTableName, nSize = 0):
  25.         if self.m_objConnection is None:
  26.             print "[CMongoDBOP::Set]self.m_objConnection is None."
  27.             return False
  28.         
  29.         db = self.m_objConnection[strDBName]
  30.         if db is None:
  31.             return False
  32.         else:
  33.             objTBNames = db.collection_names()
  34.             for res in objTBNames:
  35.                 if res == strTableName:
  36.                     return False
  37.                
  38.             if nSize == 0:
  39.                 db.create_collection(strTableName)
  40.             else:
  41.                 db.create_collection(strTableName, {"capped":"true", "max":100})
  42.             return True
  43.             
  44.         
  45.     def Insert(self, strDBName, strTableName, objData):
  46.         if self.m_objConnection is None:
  47.             print "[CMongoDBOP::Set]self.m_objConnection is None."
  48.             return False
  49.         else:
  50.             db = self.m_objConnection[strDBName]
  51.             if db is None:
  52.                 return False
  53.             
  54.             #{a:1,b:1}
  55.             db[strTableName].insert(objData)
  56.             return True
  57.         
  58.     def Update(self, strDBName, strTableName, objWhere, objData):
  59.         if self.m_objConnection is None:
  60.             print "[CMongoDBOP::Set]self.m_objConnection is None."
  61.             return False
  62.         else:
  63.             db = self.m_objConnection[strDBName]
  64.             if db is None:
  65.                 return False
  66.             
  67.             #{b:'q'}, {$set:{a:1}}, false, true
  68.             db[strTableName].update(objWhere, objData, upsert=False, multi=True)
  69.             return True
  70.    
  71.     def Delete(self, strDBName, strTableName, objData):
  72.         if self.m_objConnection is None:
  73.             print "[CMongoDBOP::Set]self.m_objConnection is None."
  74.             return False
  75.         else:
  76.             db = self.m_objConnection[strDBName]
  77.             if db is None:
  78.                 return False
  79.             
  80.             #{z:'abc'}
  81.             db[strTableName].remove(objData)
  82.             return True
  83.         
  84.     def Remove(self, strDBName, strTableName):
  85.         if self.m_objConnection is None:
  86.             print "[CMongoDBOP::Set]self.m_objConnection is None."
  87.             return False
  88.         else:
  89.             db = self.m_objConnection[strDBName]
  90.             if db is None:
  91.                 return False
  92.             
  93.             #{z:'abc'}
  94.             db[strTableName].remove()
  95.             return True        
  96.         
  97.     def Select(self, strDBName, strTableName, objData):
  98.         if self.m_objConnection is None:
  99.             print "[CMongoDBOP::Set]self.m_objConnection is None."
  100.             return None
  101.         else:
  102.             db = self.m_objConnection[strDBName]
  103.             if db is None:
  104.                 return None
  105.             
  106.             objResult = None
  107.             if objData is None:
  108.                 objResult = db[strTableName].find()
  109.             else:
  110.                 objResult = db[strTableName].find(objData)
  111.             return objResult
  112.         
  113.     def Count(self, strDBName, strTableName):
  114.         if self.m_objConnection is None:
  115.             print "[CMongoDBOP::Set]self.m_objConnection is None."
  116.             return 0
  117.         else:
  118.             db = self.m_objConnection[strDBName]
  119.             if db is None:
  120.                 return 0
  121.             
  122.             return db[strTableName].count()
  123.         
  124.     def Limit(self, strDBName, strTableName, objData, objOrderBy, nLimit):
  125.         if self.m_objConnection is None:
  126.             print "[CMongoDBOP::Set]self.m_objConnection is None."
  127.             return None
  128.         else:
  129.             db = self.m_objConnection[strDBName]
  130.             if db is None:
  131.                 return None
  132.             
  133.             objResult = None
  134.             if objData is None:
  135.                 #objResult = db[strTableName].find().sort(objOrderBy).limit(nLimit)
  136.                 objResult = db[strTableName].find().sort(objOrderBy, -1).limit(nLimit)
  137.             else:
  138.                 objResult = db[strTableName].find(objData).sort(objOrderBy).limit(nLimit)
  139.             return objResult
  140.         
  141.     def EnsureIndex(self, strDBName, strTableName, objEnsureIndex):
  142.         if self.m_objConnection is None:
  143.             print "[CMongoDBOP::Set]self.m_objConnection is None."
  144.             return False
  145.         else:
  146.             db = self.m_objConnection[strDBName]
  147.             if db is None:
  148.                 return False
  149.             
  150.             db[strTableName].create_index(objEnsureIndex)
  151.             return True
  152.             
  153.             
  154.     def GridFS_Put(self, strDBName, strTableName, objFileContent, strFileName):
  155.         if self.m_objConnection is None:
  156.             print "[CMongoDBOP::Set]self.m_objConnection is None."
  157.             return False
  158.         else:
  159.             db = self.m_objConnection[strDBName]
  160.             if db is None:
  161.                 return False
  162.         
  163.             fs = gridfs.GridFS(db)
  164.             nFileID = fs.put(objFileContent, filename=strFileName)
  165.             #print "[GridFS_Put]nFileID=" + str(nFileID)
  166.             return True
复制代码

mongoDB Client代码很简单,先建立一个Connect,然后基于这个Connect进行增删改操作。这个Connect实际就是一个标准的socket,当你insert的时候,MongoDB会在客户端调用Bson引擎把你的doc转换成Bson格式传输给数据库。这里其实是有优化空间的,我和andy测试的结果是Bson.encode()最耗时,能否把这部分剥离出来,让外面生成Bson数据直接Send呢?肯定是可以的,只不过需要稍微修改一下MongoDB的client代码,也不是很复杂。此话以后有时间大家有兴趣的话我专门说一下。
以上是一个标准的数据操作封装,有兴趣可以看一下里面的实现代码。strDBName指的是数据库名称,在MongoDB里面,一个strDBName对应一个在Data文件夹下的文件。你可以在一个strDBName里面建立N个collection,这个collection你完全可以理解成DBMS里面的数据表。strTableName就是这个集合的名字。
mongoDB很有趣,你可以预先不建立collection,直到你插入的时候,如果数据库发现你的表不存在,它会帮你直接建立。在很多时候这样的方式比较方便,不过,也会造成麻烦,比如有些一次性操作,这么做就比较复杂(比如建立索引)。你必须用单独的代码去做。更好玩的是Update操作,你可以通过设置Update的upsert=False, multi=True这两个参数实现,upsert参数是如果数据不存在,则会帮你建立(true的时候有效),multi意思是如果发现多行数据符合条件,是否一次修改多行?如果为False只会修改查找到的第一条记录。这个功能实在是太好使了,在DBMS实现这个可麻烦了,至少2,3条SQL语句才能做到。这里一个参数就搞定了。
mongoDB的魔力还不仅如此,我之所以想用它的根本原因,是他的数组修改器,这个小东东太好使了,再次攒一下。
试想一下,我有这么一个需求,我需要建立一个定长的数组,当达到上限,则自动顶出最早的数据。这在DBMS里面,实现起来又是一个繁复的过程,至少要判定当前数组个数是否达到上限,再去找最旧的数据,然后再删除,再插入。。。。!@#$%^,要累死我啊。。再看mongoDB,数组完全可以作为栈空间操作,直接一个语法$pop : { objData : -1 },搞定。简单吧。。。。我真没想到能如此简单。。
当然,给力的支持还不仅仅如此。$pull更能根据你给定的条件删除数据。再加上$push方便的插入数据,$addToSet能避免重复输入。无敌了!我只能这么说了。
好了,废话少说,看看我上面说的这些东东怎么通过代码实现。
  1. @exeTime
  2. def DBInsert():
  3.     objMongoDBOP = CMongoDBOP()
  4.     objMongoDBOP.Connect("127.0.0.1", 27017)
  5.     objUserData = CUserData()
  6.     objUserData.strName = "freeeyes"
  7.     for i in xrange(1, 100000):
  8.         obkDict = {}
  9.         obkDict["nID"]     = i
  10.         obkDict["strName"] = objUserData.strName
  11.         obkDict["objData"] = []
  12.         objMongoDBOP.Insert("MongoTestDB", "TestCollection", obkDict)
  13. @exeTime
  14. def DBUpdata():
  15.     objMongoDBOP = CMongoDBOP()
  16.     objMongoDBOP.Connect("127.0.0.1", 27017)
  17.     objUserData = CUserData()
  18.     objUserData.strName = "freeeyes"
  19.     for i in xrange(1, 100000):
  20.         obkDictKey = {}
  21.         obkDictKey["nID"]     = i
  22.         
  23.         obkUpdata = {"$inc" : {"nCount" : 1}, "$set" : {"strName" : "freedomeyes"}}
  24.         objMongoDBOP.Update("MongoTestDB", "TestCollection", obkDictKey, obkUpdata)
  25.         
  26. @exeTime
  27. def DBEnsureIndex():
  28.     objMongoDBOP = CMongoDBOP()
  29.     objMongoDBOP.Connect("127.0.0.1", 27017)
  30.    
  31.     obkDictKey = "nID"
  32.     objMongoDBOP.EnsureIndex("MongoTestDB", "TestCollection", obkDictKey)
  33.         
  34. @exeTime
  35. def DBDelete():
  36.     objMongoDBOP = CMongoDBOP()
  37.     objMongoDBOP.Connect("127.0.0.1", 27017)
  38.     objUserData = CUserData()
  39.     objUserData.strName = "freeeyes"
  40.     for i in xrange(1, 100000):
  41.         obkDictKey = {}
  42.         obkDictKey["nID"]     = i
  43.         
  44.         objMongoDBOP.Delete("MongoTestDB", "TestCollection", obkDictKey)
  45.         
  46. @exeTime
  47. @exeTime
  48. def RemoveAll():
  49.     objMongoDBOP = CMongoDBOP()
  50.     objMongoDBOP.Connect("127.0.0.1", 27017)
  51.     objMongoDBOP.Remove("MongoTestDB", "TestCollection")
  52.     objMongoDBOP.Remove("MongoTestDB", "TestGFS")
  53.    
  54. @exeTime
  55. def DBInsertCell():
  56.     #测试数组
  57.     objMongoDBOP = CMongoDBOP()
  58.     objMongoDBOP.Connect("127.0.0.1", 27017)
  59.     for i in xrange(1, 100000):
  60.         obkDictKey = {}
  61.         obkDictKey["nID"]     = i
  62.         
  63.         objDict = {}
  64.         objDict["nID"]     = i
  65.         objDict["strText"] = "I'm a freeeyes"
  66.         
  67.         objData = {"objData" : objDict}
  68.         
  69.         objUpdate = {}
  70.         objUpdate["$push"] = objData        
  71.         
  72.         objMongoDBOP.Update("MongoTestDB", "TestCollection", obkDictKey, objUpdate)
  73.         
  74. @exeTime
  75. def DBUpdateCell():
  76.     #测试数组
  77.     objMongoDBOP = CMongoDBOP()
  78.     objMongoDBOP.Connect("127.0.0.1", 27017)
  79.     for i in xrange(1, 100000):
  80.         obkDictKey = {}
  81.         obkDictKey["nID"]     = i
  82.         
  83.         objDict = {}
  84.         objDict["objData.0.strText"]     = "I'm a shiqiang."
  85.         
  86.         objUpdate = {}
  87.         objUpdate["$set"] = objDict        
  88.         
  89.         objMongoDBOP.Update("MongoTestDB", "TestCollection", obkDictKey, objUpdate)
  90.         
  91. @exeTime
  92. def DBDeleteCell():
  93.     #测试数组
  94.     objMongoDBOP = CMongoDBOP()
  95.     objMongoDBOP.Connect("127.0.0.1", 27017)
  96.     for i in xrange(1, 100000):
  97.         obkDictKey = {}
  98.         obkDictKey["nID"]     = i
  99.         
  100.         objData = {"objData" : { "nID" : i }}
  101.         
  102.         objUpdate = {}
  103.         objUpdate["$pull"] = objData  
  104.         
  105.         objMongoDBOP.Update("MongoTestDB", "TestCollection", obkDictKey, objUpdate)   
  106. @exeTime
  107. def DBGridFS():
  108.     objMongoDBOP = CMongoDBOP()
  109.     objMongoDBOP.Connect("127.0.0.1", 27017)
  110.    
  111.     for i in xrange(1, 10000):
  112.         objMongoDBOP.GridFS_Put("MongoTestDB", "TestGFS", "Hello freeeyes", "file_" + str(i) + ".txt")
复制代码

这里要说明一下,@exeTime是我写的一个装饰器:用于显示执行时间的,代码如下:
  1. def exeTime(func):
  2.     def newFunc(*args, **args2):
  3.         t0 = time.time()
  4.         #print "@%s, {%s} start" % (time.strftime("%X", time.localtime()), func.__name__)   
  5.         back = func(*args, **args2)
  6.         #print "@%s, {%s} end" % (time.strftime("%X", time.localtime()), func.__name__)   
  7.         print "*******@%.3fs taken for {%s}" % (time.time() - t0, func.__name__)
  8.         return back
  9.     return newFunc
复制代码

好了,我为了测试性能,模拟了10万次的插入,删除,修改动作(有索引的情况下),并进行了10万次的数组操作(插入,删除和修改),让我们开看看mongoDB完成这些操作用了多少时间?
测试代码如下:
  1. #单线程版本  
  2. print "[MongoDBTest]Single Thread."
  3. DBInsert()
  4. DBEnsureIndex()
  5. DBUpdata()
  6. DBDelete()
  7. DBInsertCell()
  8. DBUpdateCell()
  9. DBDeleteCell()
  10. RemoveAll()
复制代码

好了,看看mongoDB消耗了多少时间:
[MongoDBTest]Single Thread.
*******@10.532s taken for {DBInsert}
*******@0.062s taken for {DBEnsureIndex}
*******@16.891s taken for {DBUpdata}
*******@11.281s taken for {DBDelete}
*******@8.859s taken for {DBInsertCell}
*******@9.704s taken for {DBUpdateCell}
*******@8.171s taken for {DBDeleteCell}
*******@0.000s taken for {RemoveAll}
*******@0.000s taken for {newFunc}

呵呵,10万次操作,这个时间我还是比较满意的。
这里在着重说一下,mongoDB为了快速,默认是在unsafe模式下运行的,这个模式是指,操作并不会等数据真正执行成功便会返回。这是一个双刃剑,对于有些不重要的数据,这么做是没有问题的,但是对于交易数据,最好还是把safe关键字选择true。
主要稍微修改一下insert方法
  1.     def Insert(self, strDBName, strTableName, objData):
  2.         if self.m_objConnection is None:
  3.             print "[CMongoDBOP::Set]self.m_objConnection is None."
  4.             return False
  5.         else:
  6.             db = self.m_objConnection[strDBName]
  7.             if db is None:
  8.                 return False
  9.             
  10.             #{a:1,b:1}
  11.             db[strTableName].insert(objData, safe=True)
  12.             return True
复制代码

不过safe模式会大大降低系统性能,最好在需要肯定数据必须需要同步的时候去做。这就是属于数据设计的问题了。呵呵。
好了,如果你有兴趣,可以跑跑这段代码,看看mongoDB在你机器上的表现。(此测试代码在python2.6+winXP下测试通过)

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?用户注册

×
您需要登录后才可以回帖 登录 | 用户注册

本版积分规则

Archiver|手机版|小黑屋|ACE Developer ( 京ICP备06055248号 )

GMT+8, 2024-5-6 00:11 , Processed in 0.018398 second(s), 5 queries , Redis On.

Powered by Discuz! X3.5

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表