草庐IT

java - 在 map 调用中获取 Spark 上的行

coder 2024-01-07 原文

我尝试从 HDFS 中的文件聚合数据。 我需要从那些对 hbase 中的特定表具有值(value)的数据中添加一些详细信息。

但我有异常(exception):

org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)
    at org.apache.spark.rdd.RDD.map(RDD.scala:286)
    at org.apache.spark.api.java.JavaRDDLike$class.mapToPair(JavaRDDLike.scala:113)
    at org.apache.spark.api.java.AbstractJavaRDDLike.mapToPair(JavaRDDLike.scala:46)
    at ......
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:577)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:174)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:197)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation
Serialization stack:

    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)

我知道问题发生在我们尝试在 map 函数期间访问 hbase 时。

我的问题是:如何使用 hbase 表中包含的值来完成我的 RDD。

例如: hdfs 中的文件是 csv:

Name;Number1;Number2
toto;1;2

在 hbase 中,我们有与名称 toto 关联的数据。

我需要检索数字 1 和数字 2 的总和(这是最简单的部分) 并与表中的数据聚合。 例如:

reducer 的键将是 tata 并通过获取 hbase 表中的行键 toto 来检索。

有什么建议吗?

最佳答案

最后一位同事做到了,多亏了你的建议:

所以这是允许使用 hbase 表中的数据聚合文件的映射代码。

private final Logger LOGGER = LoggerFactory.getLogger(AbtractGetSDMapFunction.class);




/**
 * Namespace name
 */
public static final String NAMESPACE = "NameSpace";
private static final String ID = "id";
private Connection connection = null;
private static final String LINEID = "l";
private static final String CHANGE_LINE_ID = "clid";
private static final String CHANGE_LINE_DATE = "cld";
private String constClientPortHBase;
private String constQuorumHBase;
private int constTimeOutHBase;
private String constZnodeHBase;
public void initConnection() {
    Configuration conf = HBaseConfiguration.create();
    conf.setInt("timeout", constTimeOutHBase);
    conf.set("hbase.zookeeper.quorum", constQuorumHBase);
    conf.set("hbase.zookeeper.property.clientPort", constClientPortHBase);
    conf.set("zookeeper.znode.parent", constZnodeHBase);
    try {
        connection = HConnectionManager.createConnection(conf);
    } catch (Exception e) {
        LOGGER.error("Error in the configuration of the connection with HBase.", e);
    }
}

 public Tuple2<String, myInput> call(String row) throws Exception {
//this is where you need to init the connection for hbase to avoid serialization problem
    initConnection();

....do your work 
State state = getCurrentState(myInput.getKey());
....do your work 
}

public AbtractGetSDMapFunction( String constClientPortHBase, String constQuorumHBase, String constZnodeHBase, int constTimeOutHBase) {
    this.constClientPortHBase = constClientPortHBase;
    this.constQuorumHBase = constQuorumHBase;
    this.constZnodeHBase = constZnodeHBase;
    this.constTimeOutHBase = constTimeOutHBase;
}

/***************************************************************************/
/**
 * Table Name
 */
public static final String TABLE_NAME = "Table";

public state getCurrentState(String key) throws TechnicalException {
    LOGGER.debug("start key {}", key);
    String buildRowKey = buildRowKey(key);
    State currentState = new State();
    String columnFamily = State.getColumnFamily();
    if (!StringUtils.isEmpty(buildRowKey) && null != columnFamily) {
        try {
            Get scan = new Get(Bytes.toBytes(buildRowKey));
            scan.addFamily(Bytes.toBytes(columnFamily));
            addColumnsToScan(scan, columnFamily, ID);                
            Result result = getTable().get(scan);
            currentState.setCurrentId(getLong(result, columnFamily, ID));              
        } catch (IOException ex) {
            throw new TechnicalException(ex);
        }
        LOGGER.debug("end ");
    }
    return currentState;
}

/***********************************************************/

private Table getTable() throws IOException, TechnicalException {
    Connection connection = getConnection();
    // Table retrieve
    if (connection != null) {
        Table table = connection.getTable(TableName.valueOf(NAMESPACE, TABLE_NAME));


        return table;
    } else {
        throw new TechnicalException("Connection to Hbase not available");
    }
}

/****************************************************************/




private Long getLong(Result result, String columnFamily, String qualifier) {
    Long toLong = null;
    if (null != columnFamily && null != qualifier) {
        byte[] value = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
        toLong = (value != null ? Bytes.toLong(value) : null);
    }
    return toLong;
}

private String getString(Result result, String columnFamily, String qualifier) {
    String toString = null;
    if (null != columnFamily && null != qualifier) {
        byte[] value = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
        toString = (value != null ? Bytes.toString(value) : null);
    }
    return toString;
}


public Connection getConnection() {
    return connection;
}

public void setConnection(Connection connection) {
    this.connection = connection;
}



private void addColumnsToScan(Get scan, String family, String qualifier) {
    if (org.apache.commons.lang.StringUtils.isNotEmpty(family) && org.apache.commons.lang.StringUtils.isNotEmpty(qualifier)) {
        scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
    }
}

private String buildRowKey(String key) throws TechnicalException {
    StringBuilder rowKeyBuilder = new StringBuilder();
    rowKeyBuilder.append(HashFunction.makeSHA1Hash(key));
    return rowKeyBuilder.toString();
}

关于java - 在 map 调用中获取 Spark 上的行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41985552/

有关java - 在 map 调用中获取 Spark 上的行的更多相关文章

  1. java - 等价于 Java 中的 Ruby Hash - 2

    我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/

  2. ruby-on-rails - date_field_tag,如何设置默认日期? [ rails 上的 ruby ] - 2

    我想设置一个默认日期,例如实际日期,我该如何设置?还有如何在组合框中设置默认值顺便问一下,date_field_tag和date_field之间有什么区别? 最佳答案 试试这个:将默认日期作为第二个参数传递。youcorrectlysetthedefaultvalueofcomboboxasshowninyourquestion. 关于ruby-on-rails-date_field_tag,如何设置默认日期?[rails上的ruby],我们在StackOverflow上找到一个类似的问

  3. ruby-on-rails - openshift 上的 rails 控制台 - 2

    我将我的Rails应用程序部署到OpenShift,它运行良好,但我无法在生产服务器上运行“Rails控制台”。它给了我这个错误。我该如何解决这个问题?我尝试更新ruby​​gems,但它也给出了权限被拒绝的错误,我也无法做到。railsc错误:Warning:You'reusingRubygems1.8.24withSpring.UpgradetoatleastRubygems2.1.0andrun`gempristine--all`forbetterstartupperformance./opt/rh/ruby193/root/usr/share/rubygems/rubygems

  4. ruby-on-rails - 相关表上的范围为 "WHERE ... LIKE" - 2

    我正在尝试从Postgresql表(table1)中获取数据,该表由另一个相关表(property)的字段(table2)过滤。在纯SQL中,我会这样编写查询:SELECT*FROMtable1JOINtable2USING(table2_id)WHEREtable2.propertyLIKE'query%'这工作正常:scope:my_scope,->(query){includes(:table2).where("table2.property":query)}但我真正需要的是使用LIKE运算符进行过滤,而不是严格相等。然而,这是行不通的:scope:my_scope,->(que

  5. ruby - 简单获取法拉第超时 - 2

    有没有办法在这个简单的get方法中添加超时选项?我正在使用法拉第3.3。Faraday.get(url)四处寻找,我只能先发起连接后应用超时选项,然后应用超时选项。或者有什么简单的方法?这就是我现在正在做的:conn=Faraday.newresponse=conn.getdo|req|req.urlurlreq.options.timeout=2#2secondsend 最佳答案 试试这个:conn=Faraday.newdo|conn|conn.options.timeout=20endresponse=conn.get(url

  6. 使用 ACL 调用 upload_file 时出现 Ruby S3 "Access Denied"错误 - 2

    我正在尝试编写一个将文件上传到AWS并公开该文件的Ruby脚本。我做了以下事情:s3=Aws::S3::Resource.new(credentials:Aws::Credentials.new(KEY,SECRET),region:'us-west-2')obj=s3.bucket('stg-db').object('key')obj.upload_file(filename)这似乎工作正常,除了该文件不是公开可用的,而且我无法获得它的公共(public)URL。但是当我登录到S3时,我可以正常查看我的文件。为了使其公开可用,我将最后一行更改为obj.upload_file(file

  7. ruby - 从 Ruby 中的主机名获取 IP 地址 - 2

    我有一个存储主机名的Ruby数组server_names。如果我打印出来,它看起来像这样:["hostname.abc.com","hostname2.abc.com","hostname3.abc.com"]相当标准。我想要做的是获取这些服务器的IP(可能将它们存储在另一个变量中)。看起来IPSocket类可以做到这一点,但我不确定如何使用IPSocket类遍历它。如果它只是尝试像这样打印出IP:server_names.eachdo|name|IPSocket::getaddress(name)pnameend它提示我没有提供服务器名称。这是语法问题还是我没有正确使用类?输出:ge

  8. ruby - 获取模块中定义的所有常量的值 - 2

    我想获取模块中定义的所有常量的值:moduleLettersA='apple'.freezeB='boy'.freezeendconstants给了我常量的名字:Letters.constants(false)#=>[:A,:B]如何获取它们的值的数组,即["apple","boy"]? 最佳答案 为了做到这一点,请使用mapLetters.constants(false).map&Letters.method(:const_get)这将返回["a","b"]第二种方式:Letters.constants(false).map{|c

  9. ruby-on-rails - 获取 inf-ruby 以使用 ruby​​ 版本管理器 (rvm) - 2

    我安装了ruby​​版本管理器,并将RVM安装的ruby​​实现设置为默认值,这样'哪个ruby'显示'~/.rvm/ruby-1.8.6-p383/bin/ruby'但是当我在emacs中打开inf-ruby缓冲区时,它使用安装在/usr/bin中的ruby​​。有没有办法让emacs像shell一样尊重ruby​​的路径?谢谢! 最佳答案 我创建了一个emacs扩展来将rvm集成到emacs中。如果您有兴趣,可以在这里获取:http://github.com/senny/rvm.el

  10. c# - 如何在 ruby​​ 中调用 C# dll? - 2

    如何在ruby​​中调用C#dll? 最佳答案 我能想到几种可能性:为您的DLL编写(或找人编写)一个COM包装器,如果它还没有,则使用Ruby的WIN32OLE库来调用它;看看RubyCLR,其中一位作者是JohnLam,他继续在Microsoft从事IronRuby方面的工作。(估计不会再维护了,可能不支持.Net2.0以上的版本);正如其他地方已经提到的,看看使用IronRuby,如果这是您的技术选择。有一个主题是here.请注意,最后一篇文章实际上来自JohnLam(看起来像是2009年3月),他似乎很自在地断言RubyCL

随机推荐