草庐IT

java - 用于查找最常出现的列值的 Hive UDAF

coder 2024-01-07 原文

我正在尝试创建一个 Hive UDAF 来查找最常出现的列(字符串)值(不是单个字符或子字符串,使用精确的列值)。假设以下是我的名为 my_table 的表(破折号用于在视觉上分隔列)。

User_Id - Item  - Count
 1  - A - 1
 1  - B - 1
 1  - A - 1
 1  - A - 1
 1  - A - 1
 1  - C - 1
 2  - C - 1
 2  - C - 1
 2  - A - 1
 2  - C - 1

假设我调用以下脚本:

Select User_Id, findFrequent(*) from my_table group by User_Id

我应该得到以下输出,因为对于 User_Id=1,A 出现了 4 次而 B 和 C 只出现了一次。所以,User_Id=1最频繁的是A。同样,User_Id=2最频繁的是C。换句话说,每个唯一的User_Id应该只有一个最频繁的列值。

1 - A
2 - C

我按照这个例子创建了一个类 https://github.com/rathboma/hive-extension-examples/blob/master/src/main/java/com/matthewrathbone/example/TotalNumOfLettersGenericUDAF.java但到目前为止还没有运气。这是我的代码:

@Description(name = "FindMostCommonString", value = "_FUNC_(expr) - Returns most commonly found string of a column.")
public class FindMostCommonString extends AbstractGenericUDAFResolver {

@Override
public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
        throws SemanticException {
    if (parameters.length != 1) {
        throw new UDFArgumentTypeException(parameters.length - 1,
                "Exactly one argument is expected.");
    }

    ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(parameters[0]);

    if (oi.getCategory() != ObjectInspector.Category.PRIMITIVE){
        throw new UDFArgumentTypeException(0,
                        "Argument must be PRIMITIVE, but "
                        + oi.getCategory().name()
                        + " was passed.");
    }

    PrimitiveObjectInspector inputOI = (PrimitiveObjectInspector) oi;

    if (inputOI.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING){
        throw new UDFArgumentTypeException(0,
                        "Argument must be String, but "
                        + inputOI.getPrimitiveCategory().name()
                        + " was passed.");
    }

    return new MostCommonStringEvaluator();
}

public static class MostCommonStringEvaluator extends GenericUDAFEvaluator {

    PrimitiveObjectInspector inputOI;
    ObjectInspector outputOI;
    MapObjectInspector mapOI;

    HashMap<String, Integer> total = new HashMap<String, Integer>();


    @Override
    public ObjectInspector init(Mode m, ObjectInspector[] parameters)
            throws HiveException {

        assert (parameters.length == 1);
        super.init(m, parameters);

        // init input object inspectors

        if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) {
            inputOI = (PrimitiveObjectInspector) parameters[0];
        }
        else{
            mapOI =  (MapObjectInspector) parameters[0];
        }

        outputOI = ObjectInspectorFactory.getReflectionObjectInspector(String.class,
                ObjectInspectorOptions.JAVA);


        return outputOI;

    }


    static class StringCountAgg implements AggregationBuffer {
        HashMap<String, Integer> strCount; 
        void add(String str){

            if(strCount.containsKey(str)){
                strCount.put(str,strCount.get(str)+1);
            }
            else{
                strCount.put(str,1);
            }
        }

        StringCountAgg(){
            strCount = new HashMap<String, Integer>();
        }
    }

    @Override
    public AggregationBuffer getNewAggregationBuffer() throws HiveException {
        StringCountAgg result = new StringCountAgg();
        return result;
    }

    @Override
    public void reset(AggregationBuffer agg) throws HiveException {
        StringCountAgg myagg = new StringCountAgg();
    }

    private boolean warned = false;

    @Override
    public void iterate(AggregationBuffer agg, Object[] parameters)
            throws HiveException {
        assert (parameters.length == 1);
        if (parameters[0] != null) {
            StringCountAgg myagg = (StringCountAgg) agg;
            Object p1 = ((PrimitiveObjectInspector) inputOI).getPrimitiveJavaObject(parameters[0]);
            myagg.add((String)p1);
        }
    }

    @Override
    public Object terminatePartial(AggregationBuffer agg) throws HiveException {
        StringCountAgg myagg = (StringCountAgg) agg;
        appendToHashMap(total, myagg.strCount);
        return total;
    }

    @Override
    public void merge(AggregationBuffer agg, Object partial)
            throws HiveException {
        if (partial != null) {

            StringCountAgg myagg1 = (StringCountAgg) agg;

            HashMap<String, Integer>  partialRes = (HashMap<String, Integer> ) mapOI.getMap(partial);

            appendToHashMap(myagg1.strCount, partialRes);
        }
    }

    @Override
    public Object terminate(AggregationBuffer agg) throws HiveException {
        StringCountAgg myagg = (StringCountAgg) agg;
        appendToHashMap(total, myagg.strCount);
        String result = null;
        int maxCount = 0;

        for(String key: total.keySet()){

            if(total.get(key) > maxCount){
                maxCount = total.get(key);
                result = key;
            }
        }

        return result;
    }


    private void appendToHashMap(HashMap<String, Integer> main, HashMap<String, Integer> strCount) {
        for(String key: strCount.keySet()){
            if(main.containsKey(key)){
                main.put(key,main.get(key)+strCount.get(key));
            }
            else{
                main.put(key, strCount.get(key));
            }
        }
    }

}
}

最佳答案

select User_Id,Item from HiveTable;
+---------+------+
| User_Id | Item |
+---------+------+
|       1 | A    |
|       1 | B    |
|       1 | A    |
|       1 | A    |
|       1 | A    |
|       1 | C    |
|       2 | C    |
|       2 | C    |
|       2 | C    |
|       2 | A    |
|       2 | C    |
+---------+------+

查询-

select User_Id, Item from 
(
select User_Id,count(*) as total,Item from HiveTable group by User_Id, Item order by total desc
)q3 group by User_Id;

输出

+---------+------+
| User_Id | Item |
+---------+------+
|       1 | A    |
|       2 | C    |
+---------+------+

希望对你有帮助

关于java - 用于查找最常出现的列值的 Hive UDAF,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38425719/

有关java - 用于查找最常出现的列值的 Hive UDAF的更多相关文章

  1. ruby-on-rails - Rails 常用字符串(用于通知和错误信息等) - 2

    大约一年前,我决定确保每个包含非唯一文本的Flash通知都将从模块中的方法中获取文本。我这样做的最初原因是为了避免一遍又一遍地输入相同的字符串。如果我想更改措辞,我可以在一个地方轻松完成,而且一遍又一遍地重复同一件事而出现拼写错误的可能性也会降低。我最终得到的是这样的:moduleMessagesdefformat_error_messages(errors)errors.map{|attribute,message|"Error:#{attribute.to_s.titleize}#{message}."}enddeferror_message_could_not_find(obje

  2. java - 等价于 Java 中的 Ruby Hash - 2

    我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/

  3. Ruby Sinatra 配置用于生产和开发 - 2

    我已经在Sinatra上创建了应用程序,它代表了一个简单的API。我想在生产和开发上进行部署。我想在部署时选择,是开发还是生产,一些方法的逻辑应该改变,这取决于部署类型。是否有任何想法,如何完成以及解决此问题的一些示例。例子:我有代码get'/api/test'doreturn"Itisdev"end但是在部署到生产环境之后我想在运行/api/test之后看到ItisPROD如何实现? 最佳答案 根据SinatraDocumentation:EnvironmentscanbesetthroughtheRACK_ENVenvironm

  4. ruby - 当使用::指定模块时,为什么 Ruby 不在更高范围内查找类? - 2

    我刚刚被困在这个问题上一段时间了。以这个基地为例:moduleTopclassTestendmoduleFooendend稍后,我可以通过这样做在Foo中定义扩展Test的类:moduleTopmoduleFooclassSomeTest但是,如果我尝试通过使用::指定模块来最小化缩进:moduleTop::FooclassFailure这失败了:NameError:uninitializedconstantTop::Foo::Test这是一个错误,还是仅仅是Ruby解析变量名的方式的逻辑结果? 最佳答案 Isthisabug,or

  5. ruby - 查找字符串中的内容类型(数字、日期、时间、字符串等) - 2

    我正在尝试解析一个CSV文件并使用SQL命令自动为其创建一个表。CSV中的第一行给出了列标题。但我需要推断每个列的类型。Ruby中是否有任何函数可以找到每个字段中内容的类型。例如,CSV行:"12012","Test","1233.22","12:21:22","10/10/2009"应该产生像这样的类型['integer','string','float','time','date']谢谢! 最佳答案 require'time'defto_something(str)if(num=Integer(str)rescueFloat(s

  6. java - 从 JRuby 调用 Java 类的问题 - 2

    我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www

  7. ruby - inverse_of 是否适用于 has_many? - 2

    当我使用has_one时,它​​工作得很好,但在has_many上却不行。在这里您可以看到object_id不同,因为它运行了另一个SQL来再次获取它。ruby-1.9.2-p290:001>e=Employee.create(name:'rafael',active:false)ruby-1.9.2-p290:002>b=Badge.create(number:1,employee:e)ruby-1.9.2-p290:003>a=Address.create(street:"123MarketSt",city:"SanDiego",employee:e)ruby-1.9.2-p290

  8. java - 我的模型类或其他类中应该有逻辑吗 - 2

    我只想对我一直在思考的这个问题有其他意见,例如我有classuser_controller和classuserclassUserattr_accessor:name,:usernameendclassUserController//dosomethingaboutanythingaboutusersend问题是我的User类中是否应该有逻辑user=User.newuser.do_something(user1)oritshouldbeuser_controller=UserController.newuser_controller.do_something(user1,user2)我

  9. java - 什么相当于 ruby​​ 的 rack 或 python 的 Java wsgi? - 2

    什么是ruby​​的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht

  10. Observability:从零开始创建 Java 微服务并监控它 (二) - 2

    这篇文章是继上一篇文章“Observability:从零开始创建Java微服务并监控它(一)”的续篇。在上一篇文章中,我们讲述了如何创建一个Javaweb应用,并使用Filebeat来收集应用所生成的日志。在今天的文章中,我来详述如何收集应用的指标,使用APM来监控应用并监督web服务的在线情况。源码可以在地址 https://github.com/liu-xiao-guo/java_observability 进行下载。摄入指标指标被视为可以随时更改的时间点值。当前请求的数量可以改变任何毫秒。你可能有1000个请求的峰值,然后一切都回到一个请求。这也意味着这些指标可能不准确,你还想提取最小/

随机推荐