如何从映射器中访问 Cassandra 列族?具体来说,如何将 map() 方法的参数转换回我期望的 java 类型?
Key {logType} -> {列名:timeUUID,列值:csv log line,ttl:1year}
package com.xxx.hadoop;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.SortedMap;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.thrift.TBaseHelper;
import com.xxx.parser.LogParser;
import com.netflix.astyanax.serializers.StringSerializer;
public class LogTypeCounterByDate extends Configured implements Tool {
private static final String KEYSPACE = "LogKS";
private static final String COLUMN_FAMILY = "LogBlock";
private static final String JOB_NAME = "LOG_LINE_COUNT";
private static final String INPUT_PARTITIONER = "org.apache.cassandra.dht.RandomPartitioner";
private static final String INPUT_RPC_PORT = "9160";
private static final String INPUT_INITIAL_ADDRESS = "192.168.1.21";
private static final String OUTPUT_PATH = "/logOutput/results";
@Override
public int run(String[] args) throws Exception {
//Configuration conf = new Configuration();
Job job = new Job(getConf(), JOB_NAME);
job.setJarByClass(LogTypeCounterByDate.class);
job.setMapperClass(LogTypeCounterByDateMapper.class);
job.setReducerClass(LogTypeCounterByDateReducer.class);
job.setInputFormatClass(ColumnFamilyInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
ConfigHelper.setRangeBatchSize(getConf(), 1000);
/*SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(ByteBuffer.wrap(new byte[0]),
ByteBuffer.wrap(new byte[0]), true, 1));*/
SliceRange sliceRange = new SliceRange(ByteBuffer.wrap(new byte[0]),
ByteBuffer.wrap(new byte[0]), true, 1000);
SlicePredicate slicePredicate = new SlicePredicate();
slicePredicate.setSlice_range(sliceRange);
ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
ConfigHelper.setInputRpcPort(job.getConfiguration(), INPUT_RPC_PORT);
ConfigHelper.setInputInitialAddress(job.getConfiguration(), INPUT_INITIAL_ADDRESS);
ConfigHelper.setInputPartitioner(job.getConfiguration(), INPUT_PARTITIONER);
ConfigHelper.setInputSlicePredicate(job.getConfiguration(), slicePredicate);
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
public static void main(String[] args) throws Exception{
ToolRunner.run(new Configuration(), new LogTypeCounterByDate(), args);
System.exit(0);
}
public static class LogTypeCounterByDateMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text, LongWritable>
{
@SuppressWarnings("rawtypes")
@Override
protected void setup(Mapper.Context context){
}
@SuppressWarnings({ })
public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context context) throws IOException, InterruptedException{
//String[] lines = columns.;
String rowkey = StringSerializer.get().fromByteBuffer(TBaseHelper.rightSize(key));
Iterator<ByteBuffer> iter = columns.keySet().iterator();
IColumn column;
String line;
LogParser lp = null;
while(iter.hasNext()){
column = columns.get(iter.next());
line = StringSerializer.get().fromByteBuffer(TBaseHelper.rightSize(column.value()));
lp = new LogParser(line);
context.write(new Text(rowkey + "\t" + "LineCount"), new LongWritable(1L));
context.write(new Text(rowkey + "\t" + "Minutes"), new LongWritable(lp.getTotalDuration()));
}
}
}
public static class LogTypeCounterByDateReducer extends Reducer<Text, LongWritable, Text, LongWritable>
{
public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException{
long total = 0;
for(LongWritable val : values){
total += val.get();
}
context.write(key, new LongWritable(total));
}
}
}
ConfigHelper.setRangeBatchSize(getConf(), 1000);
/*SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(ByteBuffer.wrap(new byte[0]),
ByteBuffer.wrap(new byte[0]), true, 1));*/
SliceRange sliceRange = new SliceRange(ByteBuffer.wrap(new byte[0]),
ByteBuffer.wrap(new byte[0]), true, 1000);
SlicePredicate slicePredicate = new SlicePredicate();
slicePredicate.setSlice_range(sliceRange);
上面的代码只为每一行提供 1000 列映射器,因为我想每次以 1000 列的批处理为每一行提供所有列。
请有人帮助我。
最佳答案
给定参数:
ByteBuffer key;
SortedMap<ByteBuffer, IColumn> columns;
你会使用:
String rowkey = StringSerializer.get().fromByteBuffer(TBaseHelper.rightSize(key))
获取反序列化的键值。请注意,这里假设行键是一个字符串。如果是其他类型,则必须使用适当的序列化程序类。
要获取列值,请执行以下操作:
Iterator<ByteBuffer> = columns.keySet().iterator();
while (iter.hasNext()) {
IColumn col = columns.get(iter.next());
xxx colVal = xxxSerializer.get().fromByteBuffer(TBaseHelper.rightSize(col.value()));
}
其中xxx是列值的Java类型,xxxSerializer是对应的序列化器。
顺便说一句,TBaseHelper 类用于将内部字节数组中的值的偏移量更正为零,强制执行序列化程序实现所做的假设。
那么还有一件事......如果您要检索时间序列,那么每一列都是它自己的时间序列值,您需要包含适当的映射器逻辑(比如某种数学运算和写入上下文)在列上的迭代循环内。相反,如果您有一个更静态的列族(更像传统的 sql 表),那么您可能会对整行的上下文进行一次写入。
关于java - 用于时间序列数据的 Cassandra Map Reduce,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13417785/
大约一年前,我决定确保每个包含非唯一文本的Flash通知都将从模块中的方法中获取文本。我这样做的最初原因是为了避免一遍又一遍地输入相同的字符串。如果我想更改措辞,我可以在一个地方轻松完成,而且一遍又一遍地重复同一件事而出现拼写错误的可能性也会降低。我最终得到的是这样的:moduleMessagesdefformat_error_messages(errors)errors.map{|attribute,message|"Error:#{attribute.to_s.titleize}#{message}."}enddeferror_message_could_not_find(obje
我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i
我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/
我已经在Sinatra上创建了应用程序,它代表了一个简单的API。我想在生产和开发上进行部署。我想在部署时选择,是开发还是生产,一些方法的逻辑应该改变,这取决于部署类型。是否有任何想法,如何完成以及解决此问题的一些示例。例子:我有代码get'/api/test'doreturn"Itisdev"end但是在部署到生产环境之后我想在运行/api/test之后看到ItisPROD如何实现? 最佳答案 根据SinatraDocumentation:EnvironmentscanbesetthroughtheRACK_ENVenvironm
我需要检查DateTime是否采用有效的ISO8601格式。喜欢:#iso8601?我检查了ruby是否有特定方法,但没有找到。目前我正在使用date.iso8601==date来检查这个。有什么好的方法吗?编辑解释我的环境,并改变问题的范围。因此,我的项目将使用jsapiFullCalendar,这就是我需要iso8601字符串格式的原因。我想知道更好或正确的方法是什么,以正确的格式将日期保存在数据库中,或者让ActiveRecord完成它们的工作并在我需要时间信息时对其进行操作。 最佳答案 我不太明白你的问题。我假设您想检查
有时我需要处理键/值数据。我不喜欢使用数组,因为它们在大小上没有限制(很容易不小心添加超过2个项目,而且您最终需要稍后验证大小)。此外,0和1的索引变成了魔数(MagicNumber),并且在传达含义方面做得很差(“当我说0时,我的意思是head...”)。散列也不合适,因为可能会不小心添加额外的条目。我写了下面的类来解决这个问题:classPairattr_accessor:head,:taildefinitialize(h,t)@head,@tail=h,tendend它工作得很好并且解决了问题,但我很想知道:Ruby标准库是否已经带有这样一个类? 最佳
这个问题在这里已经有了答案:Railsformattingdate(4个答案)关闭4年前。我想格式化Time.Now函数以显示YYYY-MM-DDHH:MM:SS而不是:“2018-03-0909:47:19+0000”该函数需要放在时间中.现在功能。require‘roo’require‘roo-xls’require‘byebug’file_name=ARGV.first||“Template.xlsx”excel_file=Roo::Spreadsheet.open(“./#{file_name}“,extension::xlsx)xml=Nokogiri::XML::Build
我正在尝试解析一个CSV文件并使用SQL命令自动为其创建一个表。CSV中的第一行给出了列标题。但我需要推断每个列的类型。Ruby中是否有任何函数可以找到每个字段中内容的类型。例如,CSV行:"12012","Test","1233.22","12:21:22","10/10/2009"应该产生像这样的类型['integer','string','float','time','date']谢谢! 最佳答案 require'time'defto_something(str)if(num=Integer(str)rescueFloat(s
我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www
给定一个复杂的对象层次结构,幸运的是它不包含循环引用,我如何实现支持各种格式的序列化?我不是来讨论实际实现的。相反,我正在寻找可能会派上用场的设计模式提示。更准确地说:我正在使用Ruby,我想解析XML和JSON数据以构建复杂的对象层次结构。此外,应该可以将该层次结构序列化为JSON、XML和可能的HTML。我可以为此使用Builder模式吗?在任何提到的情况下,我都有某种结构化数据-无论是在内存中还是文本中-我想用它来构建其他东西。我认为将序列化逻辑与实际业务逻辑分开会很好,这样我以后就可以轻松支持多种XML格式。 最佳答案 我最