|
最近NoSQL炒的如火如荼,最近在为公司搞一个类似邮件的信息系统。BOSS需要大量用户用手机通过这个信息系统进行交流,并且还必须能实时查询。因为我的系统采用了ICE方式部署,可以有效的分解逻辑部分,于是在寻找一种能够配合分布式存储模式的数据存储模式。我的需求是,一种基于KV结构的信息系统,key永远是用户ID,而Value我需要是一个数据结构,并支持我对这个结构的修改和增删。最后经过多方比较,我决定选择MongoDB来实现我的需求。
说实在的,NoSQL只是一种数据存储手段,并不是万能的。更多的时候,我更喜欢DBMS,因为这样的结构适合很多关系型数据的遍历和查询,因为在数据初期我不可能预测到所有的查询需求,DBMS恰好能弥补我这方面的需求。NoSQL并不是万能的,它适合服务于写频繁,同时读取指定键值简单的数据关系,所谓分布式存储其实不是什么新概念,DBMS其实就有不少的分布式产品的例子,比如oracle。
在我看来,NoSQL还没有完全到达非常成熟的程度,比如对热点数据的负载分担,NoSQL算法还不是很完善。虽然有redis 这类的产品,可以自动分割"热"和"冷"的数据,通过换页的方法增加数据命中率,不过对单点数据的瞬时巨量访问还一直是一个问题,我希望NoSQL可以针对数据的“热”度,在达到一定程度的时候,对数据自动产生分布式冗余,因为过热的数据大多是读取操作多些(比如,明星微博发布一条消息)。而这部分冗余数据可以分散集中在一点的数据访问量。这部分功能目前主要还是开发者们自己去实现的。另外还有对线程安全的一些控制和操作。
所以我觉得,数据层采用什么方法存储,最好还是根据实际需求确定,切莫一概而论,容易反而给自己增加更多不必要的烦恼。
MongoDB优点有不少,写入快(其实并不是写入IO快,而是不写入IO先放在内存里面,后面有一个线程去搞定数据和IO的同步),对文档遍历查询很方便,支持一套类SQL的关系型查找规则,尤其对数组查询和修改提供的小语法非常实用,这里要赞一下。
先说MongoDB的安装,先去
http://www.mongodb.org/
下载一个符合你系统的MongoDB数据库。mongoDB还支持中文版的网页,看来使用者中国人也不少。
然后根据你的开发语言,去下载响应的客户端开发包。(比如我现在用的是Python,就下载一个pymongo)
在这里推荐一本书《MongoDB权威指南》,介绍了很多很实用的基础语法规则。
于是我开始构建我的数据库结构。
比如,我的数据结构如下:- class CUserData():
- def __init__(self):
- self.nID = 0 #唯一标记ID
- self.nCount = 0 #更新次数
- self.strName = "" #名称
- self.objData = [] #CUerDataInfo数组
复制代码 这里的objData是一个数组,这个数组的单元是一个结构体,这样设计比较符合常规的设计模式(很多例子上操作数据一般都用字符串,数字之类的,不过我觉得这些例子实用价值不高,更多的是我们的数组里面有一个我们所规定的结构体,并对这个结构体进行访问)
我们姑且定义这个结构体如下:- class CUerDataInfo():
- def __init__(self):
- self.nID = 0 #当前ID
- self.strText = "" #数据内容
复制代码 好了,下面介绍一下mongoDB的基础插入,删除,更新,修改,查询动作是怎么做的。考虑到这些东东如果写在实际逻辑代码里实在是太难看,也没必要,以后修改维护起来太麻烦,于是我把它封装了一个类,提供外围调用使用之。- import gridfsfrom pymongo import Connection
- #数据库操作类,包装mongoDB访问使用之
- class CMongoDBOP():
- def __init__(self):
- self.m_objConnection = None
-
- def __del__(self):
- self.Close()
-
- def Connect(self, strDBIP, nPort):
- try:
- if self.m_objConnection is None:
- self.m_objConnection = Connection(strDBIP, nPort)
- else:
- self.m_objConnection.disconnect()
- except:
- print "[CMongoDBOP::Connect]Connect error."
- pass
-
- def Close(self):
- if self.m_objConnection is not None:
- self.m_objConnection.disconnect()
-
- def CreateCollection(self, strDBName, strTableName, nSize = 0):
- if self.m_objConnection is None:
- print "[CMongoDBOP::Set]self.m_objConnection is None."
- return False
-
- db = self.m_objConnection[strDBName]
- if db is None:
- return False
- else:
- objTBNames = db.collection_names()
- for res in objTBNames:
- if res == strTableName:
- return False
-
- if nSize == 0:
- db.create_collection(strTableName)
- else:
- db.create_collection(strTableName, {"capped":"true", "max":100})
- return True
-
-
- def Insert(self, strDBName, strTableName, objData):
- if self.m_objConnection is None:
- print "[CMongoDBOP::Set]self.m_objConnection is None."
- return False
- else:
- db = self.m_objConnection[strDBName]
- if db is None:
- return False
-
- #{a:1,b:1}
- db[strTableName].insert(objData)
- return True
-
- def Update(self, strDBName, strTableName, objWhere, objData):
- if self.m_objConnection is None:
- print "[CMongoDBOP::Set]self.m_objConnection is None."
- return False
- else:
- db = self.m_objConnection[strDBName]
- if db is None:
- return False
-
- #{b:'q'}, {$set:{a:1}}, false, true
- db[strTableName].update(objWhere, objData, upsert=False, multi=True)
- return True
-
- def Delete(self, strDBName, strTableName, objData):
- if self.m_objConnection is None:
- print "[CMongoDBOP::Set]self.m_objConnection is None."
- return False
- else:
- db = self.m_objConnection[strDBName]
- if db is None:
- return False
-
- #{z:'abc'}
- db[strTableName].remove(objData)
- return True
-
- def Remove(self, strDBName, strTableName):
- if self.m_objConnection is None:
- print "[CMongoDBOP::Set]self.m_objConnection is None."
- return False
- else:
- db = self.m_objConnection[strDBName]
- if db is None:
- return False
-
- #{z:'abc'}
- db[strTableName].remove()
- return True
-
- def Select(self, strDBName, strTableName, objData):
- if self.m_objConnection is None:
- print "[CMongoDBOP::Set]self.m_objConnection is None."
- return None
- else:
- db = self.m_objConnection[strDBName]
- if db is None:
- return None
-
- objResult = None
- if objData is None:
- objResult = db[strTableName].find()
- else:
- objResult = db[strTableName].find(objData)
- return objResult
-
- def Count(self, strDBName, strTableName):
- if self.m_objConnection is None:
- print "[CMongoDBOP::Set]self.m_objConnection is None."
- return 0
- else:
- db = self.m_objConnection[strDBName]
- if db is None:
- return 0
-
- return db[strTableName].count()
-
- def Limit(self, strDBName, strTableName, objData, objOrderBy, nLimit):
- if self.m_objConnection is None:
- print "[CMongoDBOP::Set]self.m_objConnection is None."
- return None
- else:
- db = self.m_objConnection[strDBName]
- if db is None:
- return None
-
- objResult = None
- if objData is None:
- #objResult = db[strTableName].find().sort(objOrderBy).limit(nLimit)
- objResult = db[strTableName].find().sort(objOrderBy, -1).limit(nLimit)
- else:
- objResult = db[strTableName].find(objData).sort(objOrderBy).limit(nLimit)
- return objResult
-
- def EnsureIndex(self, strDBName, strTableName, objEnsureIndex):
- if self.m_objConnection is None:
- print "[CMongoDBOP::Set]self.m_objConnection is None."
- return False
- else:
- db = self.m_objConnection[strDBName]
- if db is None:
- return False
-
- db[strTableName].create_index(objEnsureIndex)
- return True
-
-
- def GridFS_Put(self, strDBName, strTableName, objFileContent, strFileName):
- if self.m_objConnection is None:
- print "[CMongoDBOP::Set]self.m_objConnection is None."
- return False
- else:
- db = self.m_objConnection[strDBName]
- if db is None:
- return False
-
- fs = gridfs.GridFS(db)
- nFileID = fs.put(objFileContent, filename=strFileName)
- #print "[GridFS_Put]nFileID=" + str(nFileID)
- return True
复制代码
mongoDB Client代码很简单,先建立一个Connect,然后基于这个Connect进行增删改操作。这个Connect实际就是一个标准的socket,当你insert的时候,MongoDB会在客户端调用Bson引擎把你的doc转换成Bson格式传输给数据库。这里其实是有优化空间的,我和andy测试的结果是Bson.encode()最耗时,能否把这部分剥离出来,让外面生成Bson数据直接Send呢?肯定是可以的,只不过需要稍微修改一下MongoDB的client代码,也不是很复杂。此话以后有时间大家有兴趣的话我专门说一下。
以上是一个标准的数据操作封装,有兴趣可以看一下里面的实现代码。strDBName指的是数据库名称,在MongoDB里面,一个strDBName对应一个在Data文件夹下的文件。你可以在一个strDBName里面建立N个collection,这个collection你完全可以理解成DBMS里面的数据表。strTableName就是这个集合的名字。
mongoDB很有趣,你可以预先不建立collection,直到你插入的时候,如果数据库发现你的表不存在,它会帮你直接建立。在很多时候这样的方式比较方便,不过,也会造成麻烦,比如有些一次性操作,这么做就比较复杂(比如建立索引)。你必须用单独的代码去做。更好玩的是Update操作,你可以通过设置Update的upsert=False, multi=True这两个参数实现,upsert参数是如果数据不存在,则会帮你建立(true的时候有效),multi意思是如果发现多行数据符合条件,是否一次修改多行?如果为False只会修改查找到的第一条记录。这个功能实在是太好使了,在DBMS实现这个可麻烦了,至少2,3条SQL语句才能做到。这里一个参数就搞定了。
mongoDB的魔力还不仅如此,我之所以想用它的根本原因,是他的数组修改器,这个小东东太好使了,再次攒一下。
试想一下,我有这么一个需求,我需要建立一个定长的数组,当达到上限,则自动顶出最早的数据。这在DBMS里面,实现起来又是一个繁复的过程,至少要判定当前数组个数是否达到上限,再去找最旧的数据,然后再删除,再插入。。。。!@#$%^,要累死我啊。。再看mongoDB,数组完全可以作为栈空间操作,直接一个语法$pop : { objData : -1 },搞定。简单吧。。。。我真没想到能如此简单。。
当然,给力的支持还不仅仅如此。$pull更能根据你给定的条件删除数据。再加上$push方便的插入数据,$addToSet能避免重复输入。无敌了!我只能这么说了。
好了,废话少说,看看我上面说的这些东东怎么通过代码实现。- @exeTime
- def DBInsert():
- objMongoDBOP = CMongoDBOP()
- objMongoDBOP.Connect("127.0.0.1", 27017)
- objUserData = CUserData()
- objUserData.strName = "freeeyes"
- for i in xrange(1, 100000):
- obkDict = {}
- obkDict["nID"] = i
- obkDict["strName"] = objUserData.strName
- obkDict["objData"] = []
- objMongoDBOP.Insert("MongoTestDB", "TestCollection", obkDict)
- @exeTime
- def DBUpdata():
- objMongoDBOP = CMongoDBOP()
- objMongoDBOP.Connect("127.0.0.1", 27017)
- objUserData = CUserData()
- objUserData.strName = "freeeyes"
- for i in xrange(1, 100000):
- obkDictKey = {}
- obkDictKey["nID"] = i
-
- obkUpdata = {"$inc" : {"nCount" : 1}, "$set" : {"strName" : "freedomeyes"}}
- objMongoDBOP.Update("MongoTestDB", "TestCollection", obkDictKey, obkUpdata)
-
- @exeTime
- def DBEnsureIndex():
- objMongoDBOP = CMongoDBOP()
- objMongoDBOP.Connect("127.0.0.1", 27017)
-
- obkDictKey = "nID"
- objMongoDBOP.EnsureIndex("MongoTestDB", "TestCollection", obkDictKey)
-
- @exeTime
- def DBDelete():
- objMongoDBOP = CMongoDBOP()
- objMongoDBOP.Connect("127.0.0.1", 27017)
- objUserData = CUserData()
- objUserData.strName = "freeeyes"
- for i in xrange(1, 100000):
- obkDictKey = {}
- obkDictKey["nID"] = i
-
- objMongoDBOP.Delete("MongoTestDB", "TestCollection", obkDictKey)
-
- @exeTime
- @exeTime
- def RemoveAll():
- objMongoDBOP = CMongoDBOP()
- objMongoDBOP.Connect("127.0.0.1", 27017)
- objMongoDBOP.Remove("MongoTestDB", "TestCollection")
- objMongoDBOP.Remove("MongoTestDB", "TestGFS")
-
- @exeTime
- def DBInsertCell():
- #测试数组
- objMongoDBOP = CMongoDBOP()
- objMongoDBOP.Connect("127.0.0.1", 27017)
- for i in xrange(1, 100000):
- obkDictKey = {}
- obkDictKey["nID"] = i
-
- objDict = {}
- objDict["nID"] = i
- objDict["strText"] = "I'm a freeeyes"
-
- objData = {"objData" : objDict}
-
- objUpdate = {}
- objUpdate["$push"] = objData
-
- objMongoDBOP.Update("MongoTestDB", "TestCollection", obkDictKey, objUpdate)
-
- @exeTime
- def DBUpdateCell():
- #测试数组
- objMongoDBOP = CMongoDBOP()
- objMongoDBOP.Connect("127.0.0.1", 27017)
- for i in xrange(1, 100000):
- obkDictKey = {}
- obkDictKey["nID"] = i
-
- objDict = {}
- objDict["objData.0.strText"] = "I'm a shiqiang."
-
- objUpdate = {}
- objUpdate["$set"] = objDict
-
- objMongoDBOP.Update("MongoTestDB", "TestCollection", obkDictKey, objUpdate)
-
- @exeTime
- def DBDeleteCell():
- #测试数组
- objMongoDBOP = CMongoDBOP()
- objMongoDBOP.Connect("127.0.0.1", 27017)
- for i in xrange(1, 100000):
- obkDictKey = {}
- obkDictKey["nID"] = i
-
- objData = {"objData" : { "nID" : i }}
-
- objUpdate = {}
- objUpdate["$pull"] = objData
-
- objMongoDBOP.Update("MongoTestDB", "TestCollection", obkDictKey, objUpdate)
- @exeTime
- def DBGridFS():
- objMongoDBOP = CMongoDBOP()
- objMongoDBOP.Connect("127.0.0.1", 27017)
-
- for i in xrange(1, 10000):
- objMongoDBOP.GridFS_Put("MongoTestDB", "TestGFS", "Hello freeeyes", "file_" + str(i) + ".txt")
复制代码
这里要说明一下,@exeTime是我写的一个装饰器:用于显示执行时间的,代码如下:- def exeTime(func):
- def newFunc(*args, **args2):
- t0 = time.time()
- #print "@%s, {%s} start" % (time.strftime("%X", time.localtime()), func.__name__)
- back = func(*args, **args2)
- #print "@%s, {%s} end" % (time.strftime("%X", time.localtime()), func.__name__)
- print "*******@%.3fs taken for {%s}" % (time.time() - t0, func.__name__)
- return back
- return newFunc
复制代码
好了,我为了测试性能,模拟了10万次的插入,删除,修改动作(有索引的情况下),并进行了10万次的数组操作(插入,删除和修改),让我们开看看mongoDB完成这些操作用了多少时间?
测试代码如下:- #单线程版本
- print "[MongoDBTest]Single Thread."
- DBInsert()
- DBEnsureIndex()
- DBUpdata()
- DBDelete()
- DBInsertCell()
- DBUpdateCell()
- DBDeleteCell()
- RemoveAll()
复制代码
好了,看看mongoDB消耗了多少时间:
[MongoDBTest]Single Thread.
*******@10.532s taken for {DBInsert}
*******@0.062s taken for {DBEnsureIndex}
*******@16.891s taken for {DBUpdata}
*******@11.281s taken for {DBDelete}
*******@8.859s taken for {DBInsertCell}
*******@9.704s taken for {DBUpdateCell}
*******@8.171s taken for {DBDeleteCell}
*******@0.000s taken for {RemoveAll}
*******@0.000s taken for {newFunc}
呵呵,10万次操作,这个时间我还是比较满意的。
这里在着重说一下,mongoDB为了快速,默认是在unsafe模式下运行的,这个模式是指,操作并不会等数据真正执行成功便会返回。这是一个双刃剑,对于有些不重要的数据,这么做是没有问题的,但是对于交易数据,最好还是把safe关键字选择true。
主要稍微修改一下insert方法- def Insert(self, strDBName, strTableName, objData):
- if self.m_objConnection is None:
- print "[CMongoDBOP::Set]self.m_objConnection is None."
- return False
- else:
- db = self.m_objConnection[strDBName]
- if db is None:
- return False
-
- #{a:1,b:1}
- db[strTableName].insert(objData, safe=True)
- return True
复制代码
不过safe模式会大大降低系统性能,最好在需要肯定数据必须需要同步的时候去做。这就是属于数据设计的问题了。呵呵。
好了,如果你有兴趣,可以跑跑这段代码,看看mongoDB在你机器上的表现。(此测试代码在python2.6+winXP下测试通过)
|
本帖子中包含更多资源
您需要 登录 才可以下载或查看,没有账号?用户注册
×
|