草庐IT

使用分页导入的方式把大量数据从mysql导入es

专吃海绵宝宝菠萝屋的派大星 2023-08-11 原文

1、首先要有分页功能的代码 

如何使用mybatis-plus实现分页,可参考

http://t.csdn.cn/ddnlk

2、要创建feign远程调用模块

可以参考

http://t.csdn.cn/gshFw

3、在feign模块中声明远程调用接口

1.在feign模块中创建一个接口,名字可以是你要调用的服务名+client

 2.接口中的代码为要调用的方法,也就是分页方法

package com.hmall.config;

import com.hmall.common.dto.Item;
import com.hmall.common.dto.PageDTO;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;

/**
 * 商品模块的远程调用
 *
 * @author ning
 * @since 2022/12/9 18:39
 */
//表示对应的是itemservice服务器
@FeignClient("itemservice")
public interface ItemClient {

    //分页查询
    //Item为数据库的实体类,需要复制一份到Feign模块,
    //注意,复制过来的实体类,只需要属性和构造方法,其他的不需要,否则会报错
    @GetMapping("/item/list")
    public PageDTO<Item> list(@RequestParam("page") Integer page, @RequestParam("size") Integer size);
}

 实体类:

 4、在es对应的模块加入ItemClient依赖

例如:

 5、创建启动类

package com.hmall.search;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;

/**
 * @author ning
 * @since 2022/12/9 20:03
 */

//开启Feign客户端
//basePackages 指定需要扫描的包
@EnableFeignClients(basePackages = "com.hmall.client")
@SpringBootApplication
public class SearchApplication {

    public static void main(String[] args) {
        SpringApplication.run(SearchApplication.class, args);
    }
}

6、创建es索引库对应的实体类itemDoc

package com.hmall.search.pojo;

import com.hmall.common.dto.Item;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.beans.BeanUtils;

import java.util.ArrayList;
import java.util.List;

/**
 * es的实体类
 *
 * @author ning
 * @since 2022/12/9 20:12
 */

@NoArgsConstructor
@Data
public class ItemDoc {
    private Long id;//商品id
    private String name;//商品名称
    private Long price;//价格(分)
    private String image;//商品图片
    private String category;//分类名称
    private String brand;//品牌名称
    private Integer sold;//销量
    private Integer commentCount;//评论数
    private Boolean isAD;//商品状态 1-正常,2-下架
    private List<String> suggestion = new ArrayList<>(2);

    //把从数据查出来的参数复制到这个es的实体类
    public ItemDoc(Item item) {
        //复制属性
        BeanUtils.copyProperties(item,this);
        //自动补全字段
        //品牌
        suggestion.add(item.getBrand());
        //分类
        suggestion.add(item.getCategory());
    }
}

7、修改配置类(也可以不设置)

ribbon超时设置 (防止数据库读取时间长时,feign远程调用失败)

默认是3秒,查询如果超过3秒,就失败了

这是改成了5秒

ribbon:
  ConnectTimeout: 5000
  ReadTimeout: 5000

8、编写数据导入的测试方法

import com.alibaba.fastjson.JSON;
import com.hmall.client.ItemClient;
import com.hmall.common.dto.Item;
import com.hmall.common.dto.PageDTO;
import com.hmall.search.pojo.ItemDoc;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.io.IOException;
import java.util.List;

/**
 * 使用分页把数据从mysql导入es
 *
 * @author ning
 * @since 2022/12/9 20:32
 */

@Slf4j
@SpringBootTest
public class FeignTest {

    //注入远程调用分页方法的接口
    @Autowired
    private ItemClient itemClient;

    //注入es的组件操作索引库的增删改查
    @Autowired
    private RestHighLevelClient client;


    /**
     * 测试:分页接口是否正常
     * 建议:在正式运行下边的数据导入的代码之前,先运行以下代码,确保远程调用分页接口正常
     */
    @Test
    void testItemClient() {
        PageDTO<Item> pageDTO = itemClient.list(1, 5);
        List<Item> itemList = pageDTO.getList();
        Long total = pageDTO.getTotal();
        log.info("total:::" + total);
        for (Item item : itemList) {
            System.out.println(item);
        }
    }

    
    /**
     * 数据导入(从mysql导入es)
     */
    @Test
    void testDataSync() {
        //使用分页查询数据库
        //(当前页和每页显示几条数据可以随便写,目的是获取总记录数)
        PageDTO<Item> pageDTO = itemClient.list(1, 1);
        //获取总记录数
        Long total = pageDTO.getTotal();
        System.out.println("total:" + total);
        //设置每页有1000条数据
        int size = 1000;
        //计算页数
        //总记录数和1000做模运算,如果为0,总页数就是total / size的值,否则就是total / size + 1
        Long page = total % size == 0 ? total / size : total / size + 1;
        //根据页数循环,把每一页的数据复制到es
        for (int i = 1; i <= page; i++) {
            //使用分页方法获取每页的数据
            pageDTO = itemClient.list(i, size);
            //创建一个批量请求
            BulkRequest bulkRequest = new BulkRequest();
            for (Item item : pageDTO.getList()) {
                //判断商品的状态,只有是可售卖的状态才可以复制到es
                if (item.getStatus() == 1) {
                    //创建es的实体类对象,并赋值数据库查出当页数据赋值
                    ItemDoc itemDoc = new ItemDoc(item);
                    //把封装之后的es的实体类对象转成json格式
                    String jsonString = JSON.toJSONString(itemDoc);
                    //System.out.println(itemDoc.getId());
                    //生成添加文档的请求
                    bulkRequest.add(                    //并把添加文档的请求存入批量请求中
                            new IndexRequest("item")    //创建一个添加文档的请求对象,item为添加到哪个索引库
                                    .id(itemDoc.getId().toString())//新添加数据的id
                                    .source(jsonString, XContentType.JSON)//添加的数据,声明数据格式是json
                    );
                }
            }
            try {
                //发送请求
                //第一个参数:创建的请求,第二个参数:是否还有其他执行的选项,一般选DEFAULT
                client.bulk(bulkRequest, RequestOptions.DEFAULT);
                System.out.printf("第%d页,本页总条数:%d,导入完毕\r\n", i, pageDTO.getList().size());
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

我出现的问题:

以上步骤执行完毕之后,我的代码报了一个错:

 

 意思是,拒绝连接:没有进一步的信息

我的解决方案是:在yml文件中配置以下信息,问题就可以解决

spring:
  data:
    elasticsearch:
      repositories:
        enabled: true
    # 异常处理
  elasticsearch:
    rest:
      uris: 192.168.177.132:9200

但是,我水品有限,没有明白什么原因,还有这个配置文件中的内容也不是很清楚,如果有路过的大佬,原因耽误宝贵的时间,给小弟解释一下,小弟不胜感激!!!!

有关使用分页导入的方式把大量数据从mysql导入es的更多相关文章

  1. ruby - 如何使用 Nokogiri 的 xpath 和 at_xpath 方法 - 2

    我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div

  2. ruby - 使用 RubyZip 生成 ZIP 文件时设置压缩级别 - 2

    我有一个Ruby程序,它使用rubyzip压缩XML文件的目录树。gem。我的问题是文件开始变得很重,我想提高压缩级别,因为压缩时间不是问题。我在rubyzipdocumentation中找不到一种为创建的ZIP文件指定压缩级别的方法。有人知道如何更改此设置吗?是否有另一个允许指定压缩级别的Ruby库? 最佳答案 这是我通过查看ruby​​zip内部创建的代码。level=Zlib::BEST_COMPRESSIONZip::ZipOutputStream.open(zip_file)do|zip|Dir.glob("**/*")d

  3. ruby - 为什么我可以在 Ruby 中使用 Object#send 访问私有(private)/ protected 方法? - 2

    类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc

  4. ruby-on-rails - 使用 Ruby on Rails 进行自动化测试 - 最佳实践 - 2

    很好奇,就使用ruby​​onrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提

  5. ruby - 在 Ruby 中使用匿名模块 - 2

    假设我做了一个模块如下:m=Module.newdoclassCendend三个问题:除了对m的引用之外,还有什么方法可以访问C和m中的其他内容?我可以在创建匿名模块后为其命名吗(就像我输入“module...”一样)?如何在使用完匿名模块后将其删除,使其定义的常量不再存在? 最佳答案 三个答案:是的,使用ObjectSpace.此代码使c引用你的类(class)C不引用m:c=nilObjectSpace.each_object{|obj|c=objif(Class===objandobj.name=~/::C$/)}当然这取决于

  6. ruby - 使用 ruby​​ 和 savon 的 SOAP 服务 - 2

    我正在尝试使用ruby​​和Savon来使用网络服务。测试服务为http://www.webservicex.net/WS/WSDetails.aspx?WSID=9&CATID=2require'rubygems'require'savon'client=Savon::Client.new"http://www.webservicex.net/stockquote.asmx?WSDL"client.get_quotedo|soap|soap.body={:symbol=>"AAPL"}end返回SOAP异常。检查soap信封,在我看来soap请求没有正确的命名空间。任何人都可以建议我

  7. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  8. ruby-on-rails - 'compass watch' 是如何工作的/它是如何与 rails 一起使用的 - 2

    我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t

  9. ruby - 使用 ruby​​ 将 HTML 转换为纯文本并维护结构/格式 - 2

    我想将html转换为纯文本。不过,我不想只删除标签,我想智能地保留尽可能多的格式。为插入换行符标签,检测段落并格式化它们等。输入非常简单,通常是格式良好的html(不是整个文档,只是一堆内容,通常没有anchor或图像)。我可以将几个正则表达式放在一起,让我达到80%,但我认为可能有一些现有的解决方案更智能。 最佳答案 首先,不要尝试为此使用正则表达式。很有可能你会想出一个脆弱/脆弱的解决方案,它会随着HTML的变化而崩溃,或者很难管理和维护。您可以使用Nokogiri快速解析HTML并提取文本:require'nokogiri'h

  10. ruby - 在 64 位 Snow Leopard 上使用 rvm、postgres 9.0、ruby 1.9.2-p136 安装 pg gem 时出现问题 - 2

    我想为Heroku构建一个Rails3应用程序。他们使用Postgres作为他们的数据库,所以我通过MacPorts安装了postgres9.0。现在我需要一个postgresgem并且共识是出于性能原因你想要pggem。但是我对我得到的错误感到非常困惑当我尝试在rvm下通过geminstall安装pg时。我已经非常明确地指定了所有postgres目录的位置可以找到但仍然无法完成安装:$envARCHFLAGS='-archx86_64'geminstallpg--\--with-pg-config=/opt/local/var/db/postgresql90/defaultdb/po

随机推荐