我有一个包含 400,000 行的 cassandra 表“文章”
primary key (source,created_at desc)
当我使用以下方式查询我们的数据时:
select * from articles where source = 'abc' and created_at <= '2016-01-01 00:00:00'
读取 110,000 行需要 8 分钟。
这非常慢,我不知道错误在哪里。
我想在 10 秒内读取 100,000 行。不确定这是否可能?
这里有更多细节:
I have 3 nodes, replication factor =2, stragegy=SimpleStrategy, 4CPU, 32G RAM
I am using Cassandra-driver-3.0.0.
我不确定它是来自 python 还是 Cassandra,因为我们也在使用 python。
这是我的 CQL 模式:
CREATE TABLE crawler.articles (
source text,
created_at timestamp,
id text,
category text,
channel text,
last_crawled timestamp,
text text,
thumbnail text,
title text,
url text,
PRIMARY KEY (source, created_at, id)
) WITH CLUSTERING ORDER BY (created_at DESC, id ASC)
AND bloom_filter_fp_chance = 0.01
AND caching = '{"keys":"ALL", "rows_per_partition":"ALL"}'
AND comment = ''
AND compaction = {'sstable_size_in_mb': '160', 'enabled': 'true', 'unchecked_tombstone_compaction': 'false', 'tombstone_compaction_interval': '86400', 'tombstone_threshold': '0.2', 'class': 'org.apache.cassandra.db.compaction.LeveledCompactionStrategy'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 604800
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99.0PERCENTILE';
CREATE INDEX articles_id_idx ON crawler.articles (id);
CREATE INDEX articles_url_idx ON crawler.articles (url);
编辑:
我想查询最近几天的新文章,因此我的查询是:
SELECT * FROM articles WHERE source = 'any source'
AND created_at >= '2016-01-08 00:00:00'
样本插入将是:
INSERT INTO articles (source,created_at,id,category,channel,last_crawled,text,thumbnail,title,url)
VALUES ('money',1452417991000,'1290141063','news_video_top','',1452418260000,'','http://inews.gtimg.com/newsapp_ls/0/143487758_150120/0','article title','http://view.inews.qq.com/a/VID2016011002195801');
客户端代码:
'''
import sys
import logging
from cassandra import ConsistencyLevel
timespan = int(sys.argv[1])
source = str(sys.argv[2])
logging.basicConfig(filename='statistics-%d.log' % (timespan), format='%(asctime)-15s %(filename)s %(name)-8s %(message)s', level=logging.INFO)
class Whitelist(logging.Filter):
def __init__(self, *whitelist):
self.whitelist = [logging.Filter(name) for name in whitelist]
def filter(self, record):
return any(f.filter(record) for f in self.whitelist)
for handler in logging.root.handlers:
handler.addFilter(Whitelist('statistics'))
log = logging.getLogger('statistics')
try:
from datetime import datetime, timedelta
if __name__ == '__main__':
pass
from cassandra.cluster import Cluster
log.info('[%d] connecting cassandra...' % (timespan))
cluster = Cluster(['xxx', 'xxx', 'xxx'])
session = cluster.connect('crawler')
cluster = Cluster(['xxx', 'xxx', 'xxx'])
session_statis = cluster.connect('statistics')
created_at = datetime.utcnow() + timedelta(hours=-timespan)
print "[%s] FINDING ..." % (datetime.utcnow().isoformat())
statuses = {}
stmt = session.prepare("select * from articles where source = ? and created_at >= ? ")
category_stmt = session.prepare('SELECT category FROM channels WHERE source = ? and id = ?')
rows = session.execute(stmt, [source, created_at])
for row in rows:
try:
if row.channel and source != 'toutiao':
category = session.execute(category_stmt, ['zhihu' if row.source=='zhihuzero' else row.source, row.channel])
statuses[row.id] = {'source':row.source, 'timespan': str(timespan), 'id': row.id, 'title':row.title, 'thumbnail':row.thumbnail, 'url':row.url, 'text':row.text, 'created_at':row.created_at, 'category': category[0].category, 'author':'', 'genre':row.category }
else:
statuses[row.id] = {'source':row.source, 'timespan': str(timespan), 'id': row.id, 'title':row.title, 'thumbnail':row.thumbnail, 'url':row.url, 'text':row.text, 'created_at':row.created_at, 'category': row.category, 'author':'', 'genre':'' }
except Exception, e:
continue
print "%s weibos ..." % (len(statuses))
print "[%s] CACULATING ..." % (datetime.utcnow().isoformat())
stmt = session.prepare('SELECT article, MAX(comments) AS comments,MAX(likes) AS likes,MAX(reads) AS reads,MAX(shares) AS shares FROM axes WHERE article = ? AND at >= ?')
for statuses_id, status in statuses.iteritems():
rows = session.execute(stmt, [statuses_id, datetime.utcnow() + timedelta(hours=-timespan)])
for row in rows:
if source == 'toutiao':
if not row.article is None:
status['reads'] = row.reads
status['likes'] = row.likes
status['shares'] = row.shares
status['comments'] = row.comments
status['speed'] = row.comments
else:
status['reads'] = 0
status['likes'] = 0
status['shares'] = 0
status['comments'] = 0
status['speed'] = 0
elif source == 'weibohao':
if not row.article is None:
status['reads'] = row.reads
status['likes'] = row.likes
status['shares'] = row.shares
status['comments'] = row.comments
# status['speed'] = row.comments - row.comments_1
status['speed'] = row.shares
else:
status['reads'] = 0
status['likes'] = 0
status['shares'] = 0
status['comments'] = 0
status['speed'] = 0
elif source == 'tencent':
if not row.article is None:
status['reads'] = row.reads
status['likes'] = row.likes
status['shares'] = row.shares
status['comments'] = row.comments
# status['speed'] = row.comments - row.comments_1
status['speed'] = row.comments
else:
status['reads'] = 0
status['likes'] = 0
status['shares'] = 0
status['comments'] = 0
status['speed'] = 0
elif source == 'zhihu':
if not row.article is None:
status['reads'] = row.reads
status['likes'] = row.likes
status['shares'] = row.shares
status['comments'] = row.comments
# status['speed'] = row.comments - row.comments_1
status['speed'] = row.likes
else:
status['reads'] = 0
status['likes'] = 0
status['shares'] = 0
status['comments'] = 0
status['speed'] = 0
elif source == 'buluo':
if not row.article is None:
status['reads'] = row.reads
status['likes'] = row.likes
status['shares'] = row.shares
status['comments'] = row.comments
# status['speed'] = row.comments - row.comments_1
status['speed'] = row.reads
else:
status['reads'] = 0
status['likes'] = 0
status['shares'] = 0
status['comments'] = 0
status['speed'] = 0
elif source == 'zhihuzero':
if not row.article is None:
status['reads'] = row.reads
status['likes'] = row.likes
status['shares'] = row.shares
status['comments'] = row.comments
# status['speed'] = row.comments - row.comments_1
status['speed'] = row.likes
else:
status['reads'] = 0
status['likes'] = 0
status['shares'] = 0
status['comments'] = 0
status['speed'] = 0
statuses = sorted(statuses.iteritems(), key=lambda (k, v): (v['speed'], k), reverse=True)[:1000]
print "[%s] TRUNCATING ..." % (datetime.utcnow().isoformat())
session_statis.execute('DELETE FROM statistics WHERE source = %s AND timespan = %s', (source, str(timespan))) #, consistency_level=ConsistencyLevel.QUORUM
print "[%s] UPDATING ..." % (datetime.utcnow().isoformat())
for i, status in statuses:
if status['speed'] > 0:
session_statis.execute('insert into statistics.statistics(source,timespan,id,title,thumbnail,url,text,created_at,category,genre,author,reads,likes,comments,shares,speed) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)', (status['source'], status['timespan'], status['id'], status['title'], status['thumbnail'], status['url'], status['text'], status['created_at'], status['category'], status['genre'], status['author'], status['reads'], status['likes'], status['comments'], status['shares'], status['speed']))
else:
print status['id'], status['url']
print "[%s] DONE ..." % (datetime.utcnow().isoformat())
log.info('[%d] done' % (timespan))
except Exception, e:
print 'except ===:', e
感谢您的回复!
最佳答案
您的用例有点不寻常。 Cassandra 更适合对少量行进行事务性操作,而不是像您在 hadoop 中那样进行批量处理。
您执行查询的方式是访问单个节点上的一个分区并将 10 万行传输到您的客户端。在网络中传输的数据量很大,我不确定您为什么要这样做。您按顺序执行所有操作,因此您无法获得并行性,也无法从拥有三个节点中获益。
通常,如果您想在 Cassandra 中对大量行进行批量处理,您会使用 Spark 在每个节点上进行分布式处理,而不是按顺序将大量数据提取到客户端。
此外,您正在创建的两个索引看起来也不会很好地工作。 Cassandra 索引适用于基数较低的字段,但您似乎是在高基数字段上创建索引。 Cassandra 索引与关系数据库中的索引有很大不同。
我必须查看您的客户端代码才能知道您在那里做的事情是否效率低下。通常获取大量行会触发分页,所以我不确定您是如何处理的。
关于mysql - 为什么我的 Cassandra 数据库读取数据速度太慢?想要在 10 秒内读取 100,000 行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34691462/
类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc
我有一个模型:classItem项目有一个属性“商店”基于存储的值,我希望Item对象对特定方法具有不同的行为。Rails中是否有针对此的通用设计模式?如果方法中没有大的if-else语句,这是如何干净利落地完成的? 最佳答案 通常通过Single-TableInheritance. 关于ruby-on-rails-Rails-子类化模型的设计模式是什么?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.co
我正在寻找执行以下操作的正确语法(在Perl、Shell或Ruby中):#variabletoaccessthedatalinesappendedasafileEND_OF_SCRIPT_MARKERrawdatastartshereanditcontinues. 最佳答案 Perl用__DATA__做这个:#!/usr/bin/perlusestrict;usewarnings;while(){print;}__DATA__Texttoprintgoeshere 关于ruby-如何将脚
我正在使用的第三方API的文档状态:"[O]urAPIonlyacceptspaddedBase64encodedstrings."什么是“填充的Base64编码字符串”以及如何在Ruby中生成它们。下面的代码是我第一次尝试创建转换为Base64的JSON格式数据。xa=Base64.encode64(a.to_json) 最佳答案 他们说的padding其实就是Base64本身的一部分。它是末尾的“=”和“==”。Base64将3个字节的数据包编码为4个编码字符。所以如果你的输入数据有长度n和n%3=1=>"=="末尾用于填充n%
我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i
为什么4.1%2返回0.0999999999999996?但是4.2%2==0.2。 最佳答案 参见此处:WhatEveryProgrammerShouldKnowAboutFloating-PointArithmetic实数是无限的。计算机使用的位数有限(今天是32位、64位)。因此计算机进行的浮点运算不能代表所有的实数。0.1是这些数字之一。请注意,这不是与Ruby相关的问题,而是与所有编程语言相关的问题,因为它来自计算机表示实数的方式。 关于ruby-为什么4.1%2使用Ruby返
好的,所以我的目标是轻松地将一些数据保存到磁盘以备后用。您如何简单地写入然后读取一个对象?所以如果我有一个简单的类classCattr_accessor:a,:bdefinitialize(a,b)@a,@b=a,bendend所以如果我从中非常快地制作一个objobj=C.new("foo","bar")#justgaveitsomerandomvalues然后我可以把它变成一个kindaidstring=obj.to_s#whichreturns""我终于可以将此字符串打印到文件或其他内容中。我的问题是,我该如何再次将这个id变回一个对象?我知道我可以自己挑选信息并制作一个接受该信
它不等于主线程的binding,这个toplevel作用域是什么?此作用域与主线程中的binding有何不同?>ruby-e'putsTOPLEVEL_BINDING===binding'false 最佳答案 事实是,TOPLEVEL_BINDING始终引用Binding的预定义全局实例,而Kernel#binding创建的新实例>Binding每次封装当前执行上下文。在顶层,它们都包含相同的绑定(bind),但它们不是同一个对象,您无法使用==或===测试它们的绑定(bind)相等性。putsTOPLEVEL_BINDINGput
我可以得到Infinity和NaNn=9.0/0#=>Infinityn.class#=>Floatm=0/0.0#=>NaNm.class#=>Float但是当我想直接访问Infinity或NaN时:Infinity#=>uninitializedconstantInfinity(NameError)NaN#=>uninitializedconstantNaN(NameError)什么是Infinity和NaN?它们是对象、关键字还是其他东西? 最佳答案 您看到打印为Infinity和NaN的只是Float类的两个特殊实例的字符串
如果您尝试在Ruby中的nil对象上调用方法,则会出现NoMethodError异常并显示消息:"undefinedmethod‘...’fornil:NilClass"然而,有一个tryRails中的方法,如果它被发送到一个nil对象,它只返回nil:require'rubygems'require'active_support/all'nil.try(:nonexisting_method)#noNoMethodErrorexceptionanymore那么try如何在内部工作以防止该异常? 最佳答案 像Ruby中的所有其他对象