草庐IT

java - Mapreduce java 程序搜索 QuadTree 索引并运行 GeometryEngine.contains 以使用 wkt 文件确认多边形中的点

coder 2024-01-10 原文

这篇文章是针对我之前的问题建议的 map reduce 实现:“How to optimize scan of 1 huge file / table in Hive to confirm/check if lat long point is contained in a wkt geometry shape

我不太会写map-reduce的java程序,主要使用Hive或者Pig或者spark在Hadoop生态系统中开发。给出手头任务的背景:我试图将每个纬度/经度 ping 关联到相应的 ZIP 邮政编码。我有一个包含所有 zip 信息的 WKT 多边形形状文件 (500 MB)。我已经将它加载到 Hive 中,并且可以使用 ST_Contains(polygon, point) 进行连接。但是,需要很长时间才能完成。为了克服这个瓶颈,我尝试利用 ESRI 中的示例(“https://github.com/Esri/gis-tools-for-hadoop/tree/master/samples/point-in-polygon-aggregation-mr”)构建四叉树索引来搜索从多边形中的经纬度派生的点。

我设法编写了代码,但它阻塞了集群的 Java 堆内存。任何关于改进代码或寻找不同方法的建议将不胜感激: 错误信息: 错误:Java 堆空间 容器被 ApplicationMaster 杀死。 根据要求杀死容器。退出代码为 143 容器以非零退出代码 143 退出

我的代码:

public class MapperClass extends Mapper<LongWritable, Text, Text, IntWritable> {

    // column indices for values in the text file
    int longitudeIndex;
    int latitudeIndex;
    int wktZip; 
    int wktGeom;
    int wktLineCount;
    int wktStateID;

    // in boundaries.wkt, the label for the polygon is "wkt"
    //creating ArrayList to hold details of the file
    ArrayList<ZipPolyClass> nodes = new ArrayList<ZipPolyClass>();

    String labelAttribute;
    EsriFeatureClass featureClass;
    SpatialReference spatialReference;
    QuadTree quadTree;
    QuadTreeIterator quadTreeIter;
    BufferedReader csvWkt;

    // class to store all the values from wkt file and calculate geometryFromWKT 
    public class ZipPolyClass {

        public String zipCode;
        public String wktPoly;
        public String stateID;
        public int indexJkey;
        public Geometry wktGeomObj; 

        public ZipPolyClass(int ijk, String z, String w, String s ){
            zipCode = z;
            wktPoly = w;
            stateID = s;
            indexJkey = ijk;
            wktGeomObj = GeometryEngine.geometryFromWkt(wktPoly, 0, Geometry.Type.Unknown);
        }

    }


    //building quadTree Index from WKT multiPolygon and creating an iterator
    private void buildQuadTree(){
        quadTree = new QuadTree(new Envelope2D(-180, -90, 180, 90), 8);

        Envelope envelope = new Envelope();

        int j=0;

        while(j<nodes.size()){
            nodes.get(j).wktGeomObj.queryEnvelope(envelope);
            quadTree.insert(j, new Envelope2D(envelope.getXMin(), envelope.getYMin(), envelope.getXMax(), envelope.getYMax()));
        }

        quadTreeIter = quadTree.getIterator();
    }


    /**
     * Query the quadtree for the feature containing the given point
     * 
     * @param pt point as longitude, latitude
     * @return index to feature in featureClass or -1 if not found
     */
    private int queryQuadTree(Point pt)
    {
        // reset iterator to the quadrant envelope that contains the point passed
        quadTreeIter.resetIterator(pt, 0);

        int elmHandle = quadTreeIter.next();

        while (elmHandle >= 0){
            int featureIndex = quadTree.getElement(elmHandle);

            // we know the point and this feature are in the same quadrant, but we need to make sure the feature
            // actually contains the point
            if (GeometryEngine.contains(nodes.get(featureIndex).wktGeomObj, pt, spatialReference)){
                return featureIndex;
            }

            elmHandle = quadTreeIter.next();
        }

        // feature not found
        return -1;
    }


    /**
     * Sets up mapper with filter geometry provided as argument[0] to the jar
     */
    @Override
    public void setup(Context context)
    {
        Configuration config = context.getConfiguration();

        spatialReference = SpatialReference.create(4326);

        // first pull values from the configuration     
        String featuresPath = config.get("sample.features.input");
        //get column reference from driver class 
        wktZip = config.getInt("sample.features.col.zip", 0);
        wktGeom = config.getInt("sample.features.col.geometry", 18);
        wktStateID = config.getInt("sample.features.col.stateID", 3);
        latitudeIndex = config.getInt("samples.csvdata.columns.lat", 5);
        longitudeIndex = config.getInt("samples.csvdata.columns.long", 6);

        FSDataInputStream iStream = null;

        try {
            // load the text WKT file provided as argument 0
            FileSystem hdfs = FileSystem.get(config);
            iStream = hdfs.open(new Path(featuresPath));
            BufferedReader br = new BufferedReader(new InputStreamReader(iStream));
            String wktLine ;
            int i=0;

            while((wktLine = br.readLine()) != null){
                String [] val = wktLine.split("\\|");
                String qtZip = val[wktZip];
                String poly = val[wktGeom];
                String stID = val[wktStateID];
                ZipPolyClass zpc = new ZipPolyClass(i, qtZip, poly, stID);
                nodes.add(i,zpc);
                i++; // increment in the loop before end
                }

        } 
        catch (Exception e)
        {
            e.printStackTrace();
        } 
        finally
        {
            if (iStream != null)
            {
                try {
                    iStream.close();
                } catch (IOException e) { }
            }
        }

        // build a quadtree of our features for fast queries
        if (!nodes.isEmpty()) {
            buildQuadTree();
        }
    }

    @Override
    public void map(LongWritable key, Text val, Context context)
            throws IOException, InterruptedException {

        /* 
         * The TextInputFormat we set in the configuration, by default, splits a text file line by line.
         * The key is the byte offset to the first character in the line.  The value is the text of the line.
         */

        String line = val.toString();
        String [] values = line.split(",");

        // get lat long from file and convert to float
        float latitude = Float.parseFloat(values[latitudeIndex]);
        float longitude = Float.parseFloat(values[longitudeIndex]);

        // Create our Point directly from longitude and latitude
        Point point = new Point(longitude, latitude);


        int featureIndex = queryQuadTree(point);

        // Each map only processes one record at a time, so we start out with our count 
                // as 1. Since we have a distinct record file we will not run reducer
                IntWritable one = new IntWritable(1);

        if (featureIndex >= 0){

            String zipTxt =nodes.get(featureIndex).zipCode;
            String stateIDTxt = nodes.get(featureIndex).stateID;
            String latTxt = values[latitudeIndex];
            String longTxt = values[longitudeIndex];
            String pointTxt = point.toString();
            String name;
            name = zipTxt+"\t"+stateIDTxt+"\t"+latTxt+"\t"+longTxt+ "\t" +pointTxt;

            context.write(new Text(name), one);
        } else {
            context.write(new Text("*Outside Feature Set"), one);
        }
    }
}

最佳答案

我能够通过将 arrayList < classobject=""> 修改为仅包含 arrayList < geometry=""> 类型来解决内存不足问题。

创建一个类对象(大约 50k)来保存文本文件的每一行,消耗了所有的 java 堆内存。进行此更改后,即使在 1 节点虚拟沙箱中,代码也能正常运行。我能够在大约 6 分钟内处理大约 4000 万行。

关于java - Mapreduce java 程序搜索 QuadTree 索引并运行 GeometryEngine.contains 以使用 wkt 文件确认多边形中的点,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39253652/

有关java - Mapreduce java 程序搜索 QuadTree 索引并运行 GeometryEngine.contains 以使用 wkt 文件确认多边形中的点的更多相关文章

  1. ruby - 如何从 ruby​​ 中的字符串运行任意对象方法? - 2

    总的来说,我对ruby​​还比较陌生,我正在为我正在创建的对象编写一些rspec测试用例。许多测试用例都非常基础,我只是想确保正确填充和返回值。我想知道是否有办法使用循环结构来执行此操作。不必为我要测试的每个方法都设置一个assertEquals。例如:describeitem,"TestingtheItem"doit"willhaveanullvaluetostart"doitem=Item.new#HereIcoulddotheitem.name.shouldbe_nil#thenIcoulddoitem.category.shouldbe_nilendend但我想要一些方法来使用

  2. ruby - 如何每月在 Heroku 运行一次 Scheduler 插件? - 2

    在选择我想要运行操作的频率时,唯一的选项是“每天”、“每小时”和“每10分钟”。谢谢!我想为我的Rails3.1应用程序运行调度程序。 最佳答案 这不是一个优雅的解决方案,但您可以安排它每天运行,并在实际开始工作之前检查日期是否为当月的第一天。 关于ruby-如何每月在Heroku运行一次Scheduler插件?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/8692687/

  3. ruby-on-rails - 如何在 ruby​​ 中使用两个参数异步运行 exe? - 2

    exe应该在我打开页面时运行。异步进程需要运行。有什么方法可以在ruby​​中使用两个参数异步运行exe吗?我已经尝试过ruby​​命令-system()、exec()但它正在等待过程完成。我需要用参数启动exe,无需等待进程完成是否有任何ruby​​gems会支持我的问题? 最佳答案 您可以使用Process.spawn和Process.wait2:pid=Process.spawn'your.exe','--option'#Later...pid,status=Process.wait2pid您的程序将作为解释器的子进程执行。除

  4. ruby - 无法运行 Rails 2.x 应用程序 - 2

    我尝试运行2.x应用程序。我使用rvm并为此应用程序设置其他版本的ruby​​:$rvmuseree-1.8.7-head我尝试运行服务器,然后出现很多错误:$script/serverNOTE:Gem.source_indexisdeprecated,useSpecification.Itwillberemovedonorafter2011-11-01.Gem.source_indexcalledfrom/Users/serg/rails_projects_terminal/work_proj/spohelp/config/../vendor/rails/railties/lib/r

  5. ruby - Sinatra:运行 rspec 测试时记录噪音 - 2

    Sinatra新手;我正在运行一些rspec测试,但在日志中收到了一堆不需要的噪音。如何消除日志中过多的噪音?我仔细检查了环境是否设置为:test,这意味着记录器级别应设置为WARN而不是DEBUG。spec_helper:require"./app"require"sinatra"require"rspec"require"rack/test"require"database_cleaner"require"factory_girl"set:environment,:testFactoryGirl.definition_file_paths=%w{./factories./test/

  6. java - 等价于 Java 中的 Ruby Hash - 2

    我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/

  7. ruby-on-rails - 无法让 rspec、spork 和调试器正常运行 - 2

    GivenIamadumbprogrammerandIamusingrspecandIamusingsporkandIwanttodebug...mmm...let'ssaaay,aspecforPhone.那么,我应该把“require'ruby-debug'”行放在哪里,以便在phone_spec.rb的特定点停止处理?(我所要求的只是一个大而粗的箭头,即使是一个有挑战性的程序员也能看到:-3)我已经尝试了很多位置,除非我没有正确测试它们,否则会发生一些奇怪的事情:在spec_helper.rb中的以下位置:require'rubygems'require'spork'

  8. java - 从 JRuby 调用 Java 类的问题 - 2

    我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www

  9. ruby-on-rails - before_filter 运行多个方法 - 2

    是否有可能:before_filter:authenticate_user!||:authenticate_admin! 最佳答案 before_filter:do_authenticationdefdo_authenticationauthenticate_user!||authenticate_admin!end 关于ruby-on-rails-before_filter运行多个方法,我们在StackOverflow上找到一个类似的问题: https://

  10. java - 我的模型类或其他类中应该有逻辑吗 - 2

    我只想对我一直在思考的这个问题有其他意见,例如我有classuser_controller和classuserclassUserattr_accessor:name,:usernameendclassUserController//dosomethingaboutanythingaboutusersend问题是我的User类中是否应该有逻辑user=User.newuser.do_something(user1)oritshouldbeuser_controller=UserController.newuser_controller.do_something(user1,user2)我

随机推荐