我试图将大约 800 万条记录插入 Mongo,它似乎以每秒 1000 条记录的速度插入它们,这非常慢。
代码是用python写的,所以可能是python的问题,但我怀疑。这是代码:
def str2datetime(str):
return None if (not str or str == r'\N') else datetime.strptime(str, '%Y-%m-%d %H:%M:%S')
def str2bool(str):
return None if (not str or str == r'\N') else (False if str == '0' else True)
def str2int(str):
return None if (not str or str == r'\N') else int(str)
def str2float(str):
return None if (not str or str == r'\N') else float(str)
def str2float2int(str):
return None if (not str or str == r'\N') else int(float(str) + 0.5)
def str2latin1(str):
return unicode(str, 'latin-1')
_ = lambda x: x
converters_map = {
'test_id': str2int,
'android_device_id': str2int,
'android_fingerprint': _,
'test_date': str2datetime,
'client_ip_address': _,
'download_kbps': str2int,
'upload_kbps': str2int,
'latency': str2int,
'server_name': _,
'server_country': _,
'server_country_code': _,
'server_latitude': str2float,
'server_longitude': str2float,
'client_country': _,
'client_country_code': _,
'client_region_name': str2latin1,
'client_region_code': _,
'client_city': str2latin1,
'client_latitude': str2float,
'client_longitude': str2float,
'miles_between': str2float2int,
'connection_type': str2int,
'isp_name': _,
'is_isp': str2bool,
'network_operator_name': _,
'network_operator': _,
'brand': _,
'device': _,
'hardware': _,
'build_id': _,
'manufacturer': _,
'model': str2latin1,
'product': _,
'cdma_cell_id': str2int,
'gsm_cell_id': str2int,
'client_ip_id': str2int,
'user_agent': _,
'client_net_speed': str2int,
'iphone_device_id': str2int,
'carrier_name': _,
'iso_country_code': _,
'mobile_country_code': str2int,
'mobile_network_code': str2int,
'model': str2latin1,
'version': _,
'server_sponsor_name': _,
}
def read_csv_zip(path):
with ZipFile(path) as z:
with z.open(z.namelist()[0]) as input:
r = csv.reader(input)
header = r.next()
converters = tuple((title if title != 'test_id' else '_id', converters_map[title]) for title in header)
for row in r:
row = {converter[0]:converter[1](value) for converter, value in zip(converters, row)}
yield row
argv = [x for x in argv if not x == '']
if len(argv) == 1:
print("Usage: " + argv[0] + " zip-file")
exit(1)
zip_file = argv[1]
collection_name = zip_file[:zip_file.index('_')]
print("Populating " + collection_name + " with the data from " + zip_file)
with Connection() as connection:
db = connection.db
collection = db.__getattr__(collection_name)
i = 0;
try:
start = time()
for item in read_csv_zip(zip_file):
i += 1
if (i % 1000) == 0:
stdout.write("\r%d " % i)
stdout.flush()
try:
collection.insert(item)
except Exception as exc:
print("Failed at the record #{0} (id = {1})".format(i,item['_id']))
print exc
print("Elapsed time = {0} seconds, {1} records.".format(time() - start, i))
raw_input("Press ENTER to exit")
except Exception as exc:
print("Failed at the record #{0} (id = {1})".format(i,item['_id']))
print exc
exit(1)
插入262796条记录(一个csv文件)需要350秒。
mongo 服务器在同一台机器上运行,没有人使用它。所以,如果有办法的话,我可以直接写入数据库文件。
我对分片不感兴趣,因为 800 万条记录不应该需要分片,不是吗?
我的问题是我做错了什么?也许我选择的数据库是错误的?典型的流程是每月刷新一次记录,然后仅对数据库进行查询。
谢谢。
编辑
事实证明,瓶颈不是 mongo,而是读取 zip 文件。我更改了代码,以 1000 行为一组读取 zip 文件,然后通过调用 Collection.insert 将它们提供给 mongo。它是 zip 文件,它需要所有时间。这是修改后的代码:
def insert_documents(collection, source, i, batch_size):
count = 0;
while True:
items = list(itertools.islice(source, batch_size))
if len(items) == 0:
break;
old_i = i
count += len(items)
i += len(items)
if (old_i / 1000) != (i / 1000):
sys.stdout.write("\r%d " % i)
sys.stdout.flush()
try:
collection.insert(items)
except Exception as exc:
print("Failed at some record between #{0} (id = {1}) and #{2} (id = {3})".format(old_i,items[0]['_id'],i,items[-1]['_id']))
print exc
return count
def main():
argv = [x for x in sys.argv if not x == '']
if len(argv) == 1:
print("Usage: " + argv[0] + " zip-file")
exit(1)
zip_file = argv[1]
collection_name = zip_file[:zip_file.index('_')]
print("Populating " + collection_name + " with the data from " + zip_file)
with Connection() as connection:
ookla = connection.ookla
collection = ookla.__getattr__(collection_name)
i = 0;
start = time()
count = insert_documents(collection, read_csv_zip(zip_file), i, 1000)
i += count
print("Elapsed time = {0} seconds, {1} records.".format(time() - start, count))
raw_input("Press ENTER to exit")
if __name__ == "__main__":
main()
事实证明,大部分时间都进入了 items = list(itertools.islice(source, batch_size))。
关于如何改进它有什么想法吗?
最佳答案
尽管您在评论中指出您不能使用 mongoimport,但您可以而且应该使用。可以完美导入日期以及您的 str2latin 转换。只需预处理您的 csv 使其与 mongoimport 兼容,您就大功告成了。
将日期转换为 {myDate:{$date: msSinceEpoch}} 并且 mongoimport 会理解它。因此,通过一个预处理步骤,您可以使用 mongoimport 并根据您的用例,我不明白为什么这会成为问题。
也就是说,mongoimport 不应比批量插入快一个数量级,尽管 1000/秒并不慢,但它肯定不符合我在简单的开发机器上获得的性能类型。如果我使用批量插入而不是单声道插入,我可以轻松达到 30k/sec 甚至更高,尤其是使用 safe=false 写入(在这种情况下应该没问题,因为您可以在导入后作为第二步进行验证)。你的瓶颈是什么资源? (检查 mongostat 和 top)
关于python - 如何有效地将大型压缩 csv 文件中的数百万条记录插入到 mongo 数据库中?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9760242/
我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div
总的来说,我对ruby还比较陌生,我正在为我正在创建的对象编写一些rspec测试用例。许多测试用例都非常基础,我只是想确保正确填充和返回值。我想知道是否有办法使用循环结构来执行此操作。不必为我要测试的每个方法都设置一个assertEquals。例如:describeitem,"TestingtheItem"doit"willhaveanullvaluetostart"doitem=Item.new#HereIcoulddotheitem.name.shouldbe_nil#thenIcoulddoitem.category.shouldbe_nilendend但我想要一些方法来使用
我有一个Ruby程序,它使用rubyzip压缩XML文件的目录树。gem。我的问题是文件开始变得很重,我想提高压缩级别,因为压缩时间不是问题。我在rubyzipdocumentation中找不到一种为创建的ZIP文件指定压缩级别的方法。有人知道如何更改此设置吗?是否有另一个允许指定压缩级别的Ruby库? 最佳答案 这是我通过查看rubyzip内部创建的代码。level=Zlib::BEST_COMPRESSIONZip::ZipOutputStream.open(zip_file)do|zip|Dir.glob("**/*")d
我试图在一个项目中使用rake,如果我把所有东西都放到Rakefile中,它会很大并且很难读取/找到东西,所以我试着将每个命名空间放在lib/rake中它自己的文件中,我添加了这个到我的rake文件的顶部:Dir['#{File.dirname(__FILE__)}/lib/rake/*.rake'].map{|f|requiref}它加载文件没问题,但没有任务。我现在只有一个.rake文件作为测试,名为“servers.rake”,它看起来像这样:namespace:serverdotask:testdoputs"test"endend所以当我运行rakeserver:testid时
作为我的Rails应用程序的一部分,我编写了一个小导入程序,它从我们的LDAP系统中吸取数据并将其塞入一个用户表中。不幸的是,与LDAP相关的代码在遍历我们的32K用户时泄漏了大量内存,我一直无法弄清楚如何解决这个问题。这个问题似乎在某种程度上与LDAP库有关,因为当我删除对LDAP内容的调用时,内存使用情况会很好地稳定下来。此外,不断增加的对象是Net::BER::BerIdentifiedString和Net::BER::BerIdentifiedArray,它们都是LDAP库的一部分。当我运行导入时,内存使用量最终达到超过1GB的峰值。如果问题存在,我需要找到一些方法来更正我的代
关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。
Rails2.3可以选择随时使用RouteSet#add_configuration_file添加更多路由。是否可以在Rails3项目中做同样的事情? 最佳答案 在config/application.rb中:config.paths.config.routes在Rails3.2(也可能是Rails3.1)中,使用:config.paths["config/routes"] 关于ruby-on-rails-Rails3中的多个路由文件,我们在StackOverflow上找到一个类似的问题
给定这段代码defcreate@upgrades=User.update_all(["role=?","upgraded"],:id=>params[:upgrade])redirect_toadmin_upgrades_path,:notice=>"Successfullyupgradeduser."end我如何在该操作中实际验证它们是否已保存或未重定向到适当的页面和消息? 最佳答案 在Rails3中,update_all不返回任何有意义的信息,除了已更新的记录数(这可能取决于您的DBMS是否返回该信息)。http://ar.ru
我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t
我正在寻找执行以下操作的正确语法(在Perl、Shell或Ruby中):#variabletoaccessthedatalinesappendedasafileEND_OF_SCRIPT_MARKERrawdatastartshereanditcontinues. 最佳答案 Perl用__DATA__做这个:#!/usr/bin/perlusestrict;usewarnings;while(){print;}__DATA__Texttoprintgoeshere 关于ruby-如何将脚