草庐IT

迭代器模式 实现ES大量数据查询

法毅的博客 2023-07-11 原文

目录

项目需求 

要求

普通策略

升级策略:使用迭代器模式

迭代器模式组成

代码实现

查询实体

返回实体

实现类

代码测试

mock的ES返回结果json数据

第一次返回结果

第二次返回结果

第三次返回结果

postMan请求, 控制台打印结果


项目需求 

数据从Mysql 迁移到 Es,  Es查询数据默认fetch Size最大为10000条,如果查询超过1万条,需要通过scroll形式进行查询

要求

  1. 安全问题考虑,查询需要连接ES-ip:9200,不可使用第三方j a r
  2. 由于目前项目的查询方式是基于Mysql,为了减少改动,暂时使用sql进行查询
  3. 需要将结果以stream的形式进行返回,避免内存占用过大,以及瞬时的网络带宽问题

普通策略

  1. 进行第一次访问,然后取columns, rows 和 cursor
  2. 转化我们的第一次结果为map形式的json
  3. 拿第一次的corsor id 进行第二次访问
  4. 用第一次记录的columns 组装第二次的rows结果为map形式的json
  5. 以此类推

升级策略:使用迭代器模式

  1. 将我们访问封装到我们的迭代器中
  2. 第一次访问以迭代器的构造函数访问,初始化我们的columns, 供后续使用, 将第一次的结果转化为map 形式的json供迭代器使用 hasNext里面进行后续的多次访问

迭代器模式组成

  • iterator 抽象迭代器:负责定义访问和遍历元素的接口
  • concreteIterator 具体迭代器:实现迭代器接口,完成容器元素的遍历
  • aggreate抽象容器:容器角色负责提供创建决堤迭代器的角色接口,提供一个类似createiterator()这样的方法, 在Java中一般是 iterator()方法
  • concreteaggreate 具体容器:实现容器接口定义的方法

代码实现

查询实体


/**
 * 第一次查询用query和fetchSize, 后续用返回结果中的游标cursor
 */
@JsonIgnoreProperties
public class EsSqlQuery {
    /**
     * 调用ES的查询的sql
     */
    private String query;
    /**
     * 取出的条数
     */
    private Long fetchSize;
    /**
     * ES返回的游标
     */
    private String cursor;
    
    public EsSqlQuery(String cursor) {
        this.cursor = cursor;
    }

    public EsSqlQuery(String query, Long fetchSize) {
        this.query = query;
        this.fetchSize = fetchSize;
    }

    public String getQuery() {
        return query;
    }

    public void setQuery(String query) {
        this.query = query;
    }

    public Long getFetchSize() {
        return fetchSize;
    }

    public void setFetchSize(Long fetchSize) {
        this.fetchSize = fetchSize;
    }

    public String getCursor() {
        return cursor;
    }

    public void setCursor(String cursor) {
        this.cursor = cursor;
    }
}

返回实体


public class EsSqlResult {
    private List<Map<String,String>> columns;
    private List<List<Object>> rows;
    private String cursor;

    public List<Map<String, String>> getColumns() {
        return columns;
    }

    public void setColumns(List<Map<String, String>> columns) {
        this.columns = columns;
    }

    public List<List<Object>> getRows() {
        return rows;
    }

    public void setRows(List<List<Object>> rows) {
        this.rows = rows;
    }

    public String getCursor() {
        return cursor;
    }

    public void setCursor(String cursor) {
        this.cursor = cursor;
    }
}

实现类

@Component
public class EsQueryProcessor {

    // 1.用stream返回,节省内存
    public Stream<Map<String, Object>> scrollEsStream(String query, Long fetchSize) {
        return StreamSupport.stream(Spliterators
                .spliteratorUnknownSize(new ScrollIterator(query, fetchSize), 0), false);
    }

    // 2.用迭代器模式
    private class ScrollIterator implements Iterator<Map<String, Object>> {

        /**
         * 游标
         */
        private String scrollId;
        /**
         * 字段名集合
         */
        private List<String> columns;
        /**
         * 迭代元素
         */
        Iterator<Map<String, Object>> iterator;
        /**
         * 模拟访问次数
         */
        private int i = 1;

        // 2.1构造函数进行第一次查询, 初始化后续需要使用的columns和iterator,scrollId
        public ScrollIterator(String query, Long fetchSize) {
            // 模拟根据query和fetchSize从ES第一次获取数据
            String jsonName = "es1.json";
            EsSqlResult result = this.getEsSqlResult(jsonName);
            columns = CollStreamUtil.toList(result.getColumns(), u -> u.get("name"));
            this.scrollId = result.getCursor();
            this.iterator = convert(columns, result).iterator();

        }

        // 2.2根据 scrollId 是否为null进行后续访问,直到scrollId为null
        @Override
        public boolean hasNext() {
            return iterator.hasNext() || scrollNext();
        }

        private boolean scrollNext() {
            if (iterator == null || this.scrollId == null) {
                return false;
            }
            i++;
            // 模拟根据query和fetchSize从ES第i次获取数据
            String jsonName = "es" + i + ".json";
            EsSqlResult result = this.getEsSqlResult(jsonName);
            this.scrollId = result.getCursor();
            this.iterator = convert(columns, result).iterator();
            return iterator.hasNext();
        }

        @Override
        public Map<String, Object> next() {
            return iterator.next();
        }

        // 模拟从ES获取查询结果
        private EsSqlResult getEsSqlResult(String jsonName) {
            if (StrUtil.isBlank(jsonName)) {
                return null;
            }
            EsSqlResult result = null;
            List<File> fileList = FileUtil.loopFiles(ResourceUtil.getResource("json").getFile());
            for (File file : fileList) {
                if (jsonName.equals(file.getName())) {
                    String json = FileUtil.readString(file, Charset.forName("UTF-8"));
                    result = JSONObject.parseObject(json, EsSqlResult.class);
                }
            }
            return result;
        }

    }

    // 3.返回结果传统一点 List<Map>
    private List<Map<String, Object>> convert(List<String> columns, EsSqlResult result) {
        List<Map<String, Object>> results = new ArrayList<>();
        for (List<Object> row : result.getRows()) {
            Map<String, Object> map = new HashMap<>();
            for (int i = 0; i < columns.size(); i++) {
                map.put(columns.get(i), row.get(i));
            }
            results.add(map);
        }
        return results;
    }


}

代码测试

@RestController
public class EsController {

    @Autowired
    private EsService esService;

    @GetMapping("/es")
    public Boolean suggestRequirement() {
        return esService.query();
    }


}


public class EsService {

    @Autowired
    private EsQueryProcessor processor;

    public Boolean query() {
        Stream<Map<String, Object>> mapStream = processor.scrollEsStream(null, null);
        mapStream.forEach(x -> System.out.println(x));
        return true;
    }
}

mock的ES返回结果json数据

第一次返回结果

{
  "cursor": "safajfkajskjkasg",
  "columns": [
    {
      "name": "username",
      "type": "String"
    },
    {
      "name": "age",
      "type": "String"
    }
  ],
  "rows": [
    [
      "zhao",
      "11"
    ],
    [
      "qian",
      "22"
    ]
  ]
}

第二次返回结果

{
  "cursor": "safajfkajskjkasg",
  "rows": [
    [
      "sun",
      "33"
    ],
    [
      "li",
      "44"
    ]
  ]
}

第三次返回结果

{
  "rows": [
    [
      "zhou",
      "55"
    ],
    [
      "wu",
      "66"
    ]
  ]
}

postMan请求, 控制台打印结果

有关迭代器模式 实现ES大量数据查询的更多相关文章

  1. ruby-on-rails - Rails - 子类化模型的设计模式是什么? - 2

    我有一个模型:classItem项目有一个属性“商店”基于存储的值,我希望Item对象对特定方法具有不同的行为。Rails中是否有针对此的通用设计模式?如果方法中没有大的if-else语句,这是如何干净利落地完成的? 最佳答案 通常通过Single-TableInheritance. 关于ruby-on-rails-Rails-子类化模型的设计模式是什么?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.co

  2. ruby - ECONNRESET (Whois::ConnectionError) - 尝试在 Ruby 中查询 Whois 时出错 - 2

    我正在用Ruby编写一个简单的程序来检查域列表是否被占用。基本上它循环遍历列表,并使用以下函数进行检查。require'rubygems'require'whois'defcheck_domain(domain)c=Whois::Client.newc.query("google.com").available?end程序不断出错(即使我在google.com中进行硬编码),并打印以下消息。鉴于该程序非常简单,我已经没有什么想法了-有什么建议吗?/Library/Ruby/Gems/1.8/gems/whois-2.0.2/lib/whois/server/adapters/base.

  3. ruby - 解析 RDFa、微数据等的最佳方式是什么,使用统一的模式/词汇(例如 schema.org)存储和显示信息 - 2

    我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i

  4. ruby - 如何在续集中重新加载表模式? - 2

    鉴于我有以下迁移:Sequel.migrationdoupdoalter_table:usersdoadd_column:is_admin,:default=>falseend#SequelrunsaDESCRIBEtablestatement,whenthemodelisloaded.#Atthispoint,itdoesnotknowthatusershaveais_adminflag.#Soitfails.@user=User.find(:email=>"admin@fancy-startup.example")@user.is_admin=true@user.save!ende

  5. ruby - 如何根据特征实现 FactoryGirl 的条件行为 - 2

    我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden

  6. ruby-on-rails - 在 Rails 和 ActiveRecord 中查询时忽略某些字段 - 2

    我知道我可以指定某些字段来使用pluck查询数据库。ids=Item.where('due_at但是我想知道,是否有一种方法可以指定我想避免从数据库查询的某些字段。某种反拔?posts=Post.where(published:true).do_not_lookup(:enormous_field) 最佳答案 Model#attribute_names应该返回列/属性数组。您可以排除其中一些并传递给pluck或select方法。像这样:posts=Post.where(published:true).select(Post.attr

  7. ruby - Ruby 有 `Pair` 数据类型吗? - 2

    有时我需要处理键/值数据。我不喜欢使用数组,因为它们在大小上没有限制(很容易不小心添加超过2个项目,而且您最终需要稍后验证大小)。此外,0和1的索引变成了魔数(MagicNumber),并且在传达含义方面做得很差(“当我说0时,我的意思是head...”)。散列也不合适,因为可能会不小心添加额外的条目。我写了下面的类来解决这个问题:classPairattr_accessor:head,:taildefinitialize(h,t)@head,@tail=h,tendend它工作得很好并且解决了问题,但我很想知道:Ruby标准库是否已经带有这样一个类? 最佳

  8. ruby - 为什么 Ruby 的 each 迭代器先执行? - 2

    我在用Ruby执行简单任务时遇到了一件奇怪的事情。我只想用每个方法迭代字母表,但迭代在执行中先进行:alfawit=("a".."z")puts"That'sanalphabet:\n\n#{alfawit.each{|litera|putslitera}}"这段代码的结果是:(缩写)abc⋮xyzThat'sanalphabet:a..z知道为什么它会这样工作或者我做错了什么吗?提前致谢。 最佳答案 因为您的each调用被插入到在固定字符串之前执行的字符串文字中。此外,each返回一个Enumerable,实际上您甚至打印它。试试

  9. ruby - 是否有用于序列化和反序列化各种格式的对象层次结构的模式? - 2

    给定一个复杂的对象层次结构,幸运的是它不包含循环引用,我如何实现支持各种格式的序列化?我不是来讨论实际实现的。相反,我正在寻找可能会派上用场的设计模式提示。更准确地说:我正在使用Ruby,我想解析XML和JSON数据以构建复杂的对象层次结构。此外,应该可以将该层次结构序列化为JSON、XML和可能的HTML。我可以为此使用Builder模式吗?在任何提到的情况下,我都有某种结构化数据-无论是在内存中还是文本中-我想用它来构建其他东西。我认为将序列化逻辑与实际业务逻辑分开会很好,这样我以后就可以轻松支持多种XML格式。 最佳答案 我最

  10. ruby - 我如何添加二进制数据来遏制 POST - 2

    我正在尝试使用Curbgem执行以下POST以解析云curl-XPOST\-H"X-Parse-Application-Id:PARSE_APP_ID"\-H"X-Parse-REST-API-Key:PARSE_API_KEY"\-H"Content-Type:image/jpeg"\--data-binary'@myPicture.jpg'\https://api.parse.com/1/files/pic.jpg用这个:curl=Curl::Easy.new("https://api.parse.com/1/files/lion.jpg")curl.multipart_form_

随机推荐