草庐IT

logstash+elasticsearch+Kibana(ELK)日志收集

呆萌小新@渊洁 2023-04-18 原文

文章目录


不要一股脑执行以下语句,请观察修改要修改的地方
注意给logstash,elasticsearch,kibana释放端口,云服务器提供商和系统的端口

一.安装elasticsearch

# 安装es
docker pull elasticsearch:7.17.3
mkdir -p /mydata/elasticsearch/config
mkdir -p /mydata/elasticsearch/data
echo "http.host: 0.0.0.0" >> /mydata/elasticsearch/config/elasticsearch.yml
chmod -R 777 /mydata/elasticsearch/

docker run --name elasticsearch -p 9200:9200 -p 9300:9300 \
-e "discovery.type=single-node" \
-e ES_JAVA_OPTS="-Xms512m -Xmx512m" \
--restart=always --privileged=true \
-v /mydata/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
-v /mydata/elasticsearch/data:/usr/share/elasticsearch/data \
-v /mydata/elasticsearch/plugins:/usr/share/elasticsearch/plugins \
-d elasticsearch:7.17.3

1.1进入到es挂载目录elasticsearch.yml的挂载目录,添加以下内容

http.host: 0.0.0.0
http.cors.enabled: true
http.cors.allow-origin: "*"
http.cors.allow-headers: Authorization
xpack.security.enabled: true
# Enable encryption and mutual authentication between cluster nodes
xpack.security.transport.ssl.enabled: true
# Enable encryption for HTTP API client connections, such as Kibana, Logstash, and Agents
xpack.security.http.ssl.enabled: false

1.2 重启es容器并进入es容器
1.3 进入容器后执行以下命令 傻瓜式设置账号密码

./bin/elasticsearch-setup-passwords interactive

1.4 重启es容器


二. 安装kibana

# 安装es可视化工具kibana
docker pull kibana:7.17.3

docker run -d -p 5601:5601 \
--restart=always \
-v /mydata/kibana/kibana.yml:/usr/share/kibana/config/kibana.yml \
--name kibana \
kibana:7.4.2

2.1 配置kibana连接es
进入挂载文件 /mydata/kibana/kibana.yml


server.port: 5601 
#主机地址,可以是ip,主机名
server.host: 0.0.0.0
elasticsearch.hosts: ["http://127.0.0.1:9200"]
xpack.monitoring.ui.container.elasticsearch.enabled: true
elasticsearch.username: "es账号"
elasticsearch.password: "es密码"
i18n.locale: "zh-CN"

2.2 访问 http://ip:9200,http://ip:5601看es,kibana是否安装完成

三.配置logstash

# logstash安装
 docker run -d --name logstash \
 -p 5043:5043 -p 5044:5044  --privileged=true \
 -v /mydata/logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.conf \
 -v /mydata/logstash/logstash.yml:/usr/share/logstash/config/logstash.yml \
 logstash:7.17.3

3.1 进入logstash容器执行以下命令:

docker exec -it logstash /bin/bash

/usr/share/logstash/bin/logstash-plugin install logstash-codec-json_lines

3.2 重启logstash
3.3 进入logstash容器的挂载目录logstash.conf

input {
  stdin { }

  tcp {
      mode => "server"
      host => "0.0.0.0"
      port => 5043
      codec => json_lines
  }
}

output {
  stdout {
    codec => rubydebug
  }
  elasticsearch {
    hosts => "192.168.1.1:9200"
    index => "springboot-%{+YYYY.MM.dd}"
    user => "es账号"
    password => "es密码"
  }
}

这里认证明文不安全.可以参考如下链接: 密文

四.springboot整合logstash

1.pom.xml

<dependency>
   <groupId>ch.qos.logback</groupId>
   <artifactId>logback-classic</artifactId>
</dependency>
<dependency>
   <groupId>net.logstash.logback</groupId>
   <artifactId>logstash-logback-encoder</artifactId>
   <version>7.3</version>
</dependency>

2. application.yml

server:
  port: 10500

spring:
  profiles:
      active: dev
  application:
    name: vector-search
  thymeleaf:
    cache: false
  # jackson时间格式化
  jackson:
    time-zone: GMT+8
  # elasticsearch
    date-format: yyyy-MM-dd HH:mm:ss

logging:
  level:
    root: info

3.application-dev.yml

logstash:
  host: 127.0.0.1
  port: 5043
  # 定义日志文件的存储地址 勿在 LogBack 的配置中使用相对路径
  path: /mydata/logstash
es:
  host: 127.0.0.1
  port: 9200
  username: 11111
  password: 11111

4.在resources文件夹下,创建logback-spring.xml而不是logback.xml.该配置放在公共模块,当依赖导入其他微服务模块即可实现多模块收集日志到logstash中.

<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="60 seconds" debug="false">
    <include resource="org/springframework/boot/logging/logback/base.xml"/>
    <include resource="org/springframework/boot/logging/logback/defaults.xml"/>

    <!--获取springboot的yml配置信息-->
    <springProperty scope="context" name="applicationName" source="spring.application.name" defaultValue="default"/>
    <springProperty scope="context" name="LOGSTASH_HOST" source="logstash.host" defaultValue="default"/>
    <springProperty scope="context" name="LOGSTASH_PORT" source="logstash.port" defaultValue="default"/>
    <springProperty scope="context" name="LOG_HOME" source="logstash.path" defaultValue="default"/>

    <!--输出到控制台-->
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
            <level>INFO</level>
        </filter>
        <withJansi>false</withJansi>
        <encoder>
            <!--<pattern>%d %p (%file:%line)- %m%n</pattern>-->
            <!--格式化输出:%d:表示日期    %thread:表示线程名     %-5level:级别从左显示5个字符宽度  %msg:日志消x`x`息    %n:是换行符-->
            <pattern>%d{yyyy-MM-dd HH:mm:ss} %highlight(%-5level) -- %boldMagenta([%thread]) %boldCyan(%logger) :
                %msg%n
            </pattern>
            <charset>UTF-8</charset>
        </encoder>
    </appender>

    <springProfile name="dev,test">
        <!--  日志发送至logstash  -->
        <appender name="LOGSTASH" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
            <destination>${LOGSTASH_HOST:- }:${LOGSTASH_PORT:- }</destination>
            <!-- encoder is required -->
            <encoder charset="UTF-8" class="net.logstash.logback.encoder.LogstashEncoder">
                <!-- 在elasticsearch的index中追加applicationName字段  -->
                <customFields>{"applicationName":"${applicationName}"}</customFields>

            </encoder>
        </appender>
    </springProfile>

    <!-- 按照每天生成日志文件 -->
    <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!--日志文件输出的文件名-->
            <FileNamePattern>${LOG_HOME}/TestWeb.log.%d{yyyy-MM-dd}.log</FileNamePattern>
            <!--日志文件保留天数-->
            <MaxHistory>30</MaxHistory>
        </rollingPolicy>
        <encoder charset="UTF-8" class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符-->
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
        </encoder>
        <!--日志文件最大的大小-->
        <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
            <MaxFileSize>10MB</MaxFileSize>
        </triggeringPolicy>
    </appender>

    <!-- 日志输出级别 -->
    <!-- 线上环境,日志配置 -->
    <springProfile name="prod">
        <root level="INFO">
            <appender-ref ref="LOGSTASH"/>
        </root>
    </springProfile>

    <!-- 本地、开发环境,日志配置 可以写logback支持的所有节点 -->
    <springProfile name="dev,test">
        <root level="INFO">
            <appender-ref ref="LOGSTASH"/>
            <appender-ref ref="console"/>
        </root>
    </springProfile>

</configuration>

五.spring整合Elastic Search

详见本篇文章

六. 定时清理

使用es新版客户端

@Component
@Slf4j
public class SystemLog {

    private ElasticsearchClient elasticsearchClient;
    private static final String SYSTEM_LOG_INDEX_PREFIX_MATCH = SystemLogConstants.SYSTEM_LOG_INDEX_PREFIX + "*";
    public SystemLog(ElasticsearchClient elasticsearchClient) {
        this.elasticsearchClient = elasticsearchClient;
    }

    /**
     * 每月1号删除一个月前的日志
     *
     * @throws IOException
     */
    @Scheduled(cron = "0 0 0 1 * ?")
    public void systemLog() throws IOException {
        Set<String> systemLogIndexLists = searchSystemLogIndex();
        if(systemLogIndexLists.isEmpty()){
            log.info("没有找到索引{}", systemLogIndexLists);
            return;
        }
        LocalDateTime oneMonthBefore = LocalDateTime.now().minusMonths(1);
        StringBuilder oneMonthFormat = new StringBuilder(oneMonthBefore.format(DateTimeFormatter.ISO_DATE));
        oneMonthFormat.insert(0, SystemLogConstants.SYSTEM_LOG_INDEX_PREFIX);
        //删除一个月前的索引
        List<String> ids = systemLogIndexLists.stream()
                .filter(currentIndexDate -> {
                    // 2020-03-25 > 2020-02-25 true
                    return oneMonthFormat.toString().compareTo(currentIndexDate) > 0;
                })
                .collect(Collectors.toList());

        if (ids.isEmpty()) {
            log.info("没有需要删除的索引{}", ids);
            return;
        }
        DeleteIndexResponse result = elasticsearchClient.indices().delete(DeleteIndexRequest.of(id -> id
                .index(ids)));
        log.info("删除索引{},结果:{}", ids, result.toString());
    }

    /**
     * 查询所有SystemLog索引
     *
     * @return
     * @throws IOException
     */
    private Set<String> searchSystemLogIndex() throws IOException {
        // 查询yiqichang-*的索引
        GetIndexResponse response = elasticsearchClient.indices()
                .get(GetIndexRequest.of(idx -> idx.index(SYSTEM_LOG_INDEX_PREFIX_MATCH)));
        Set<String> result = response.result().keySet();
        log.info("查询到的索引:{}", result);
        return result;
    }

}

有关logstash+elasticsearch+Kibana(ELK)日志收集的更多相关文章

  1. ruby - Sinatra 中的全局救援和日志记录异常 - 2

    如何在出现异常时指定全局救援,如果您将Sinatra用于API或应用程序,您将如何处理日志记录? 最佳答案 404可以在not_found方法的帮助下处理,例如:not_founddo'Sitedoesnotexist.'end500s可以通过调用带有block的错误方法来处理,例如:errordo"Applicationerror.Plstrylater."end错误的详细信息可以通过request.env中的sinatra.error访问,如下所示:errordo'Anerroroccured:'+request.env['si

  2. ruby-on-rails - 使用 Ruby 标准 Logger 每天只创建一个日志 - 2

    我正在使用ruby​​标准记录器,我想要每天轮换一次,所以在我的代码中我有:Logger.new("#{$ROOT_PATH}/log/errors.log",'daily')它运行完美,但它创建了两个文件errors.log.20130217和errors.log.20130217.1。如何强制它每天只创建一个文件? 最佳答案 您的代码对于长时间运行的应用程序是正确的。发生的事情是您在给定的一天多次运行代码。第一次运行时,Ruby会创建一个日志文件“errors.log”。当日期改变时,Ruby将文件重命名为“errors.log

  3. ruby - Cucumber/Savon 省略或删除日志输出 - 2

    在运行Cucumber测试时,我得到(除了测试结果)大量调试/日志相关的输出形式:D,[2013-03-06T12:21:38.911829#49031]DEBUG--:SOAPrequest:D,[2013-03-06T12:21:38.911919#49031]DEBUG--:Pragma:no-cache,SOAPAction:"",Content-Type:text/xml;charset=UTF-8,Content-Length:1592W,[2013-03-06T12:21:38.912360#49031]WARN--:HTTPIexecutesHTTPPOSTusingt

  4. ruby-on-rails - faraday如何设置日志级别 - 2

    我最近将我的http客户端切换到faraday,一切都按预期工作。我有以下代码来创建连接:@connection=Faraday.new(:url=>base_url)do|faraday|faraday.useCustim::Middlewarefaraday.request:url_encoded#form-encodePOSTparamsfaraday.request:jsonfaraday.response:json,:content_type=>/\bjson$/faraday.response:loggerfaraday.adapterFaraday.default_ada

  5. ruby-on-rails - Logstash 可以在 Rails 上使用 Ruby 吗? - 2

    有没有人得到Logstash在Rails上使用ruby​​?我的客户告诉我将Logstash用于日志收集器等。我正在使用ruby​​onrails技术。大部分都快完成了。但要求是将日志记录到logstash中。请让我知道这可能吗? 最佳答案 我为此编写了一个gem-logstasher.它将Rails日志写入一个单独的文件,采用纯json格式,无需任何处理即可由logstash使用。查看我的blog有关如何设置Logstash和Kibana的完整说明 关于ruby-on-rails-Lo

  6. ruby - 将 Logstash 中的时间戳时区转换为输出索引名称 - 2

    在我的场景中,Logstash收到的系统日志行的“时间戳”是UTC,我们在Elasticsearch输出中使用事件“时间戳”:output{elasticsearch{embedded=>falsehost=>localhostport=>9200protocol=>httpcluster=>'elasticsearch'index=>"syslog-%{+YYYY.MM.dd}"}}我的问题是,在UTC午夜,Logstash在外时区(GMT-4=>America/Montreal)结束前将日志发送到不同的索引,并且索引在20小时(晚上8点)之后没有日志,因为“时间戳”是UTC。我们已

  7. ruby - Rails Elasticsearch 聚合 - 2

    不知何故,我似乎无法获得包含我的聚合的响应...使用curl它按预期工作:HBZUMB01$curl-XPOST"http://localhost:9200/contents/_search"-d'{"size":0,"aggs":{"sport_count":{"value_count":{"field":"dwid"}}}}'我收到回复:{"took":4,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":90,"max_score":0.0,"hits":[]},"a

  8. 网站日志分析软件--让网站日志分析工作变得更简单 - 2

    网站的日志分析,是seo优化不可忽视的一门功课,但网站越大,每天产生的日志就越大,大站一天都可以产生几个G的网站日志,如果光靠肉眼去分析,那可能看到猴年马月都看不完,因此借助网站日志分析工具去分析网站日志,那将会使网站日志分析工作变得更简单。下面推荐两款网站日志分析软件。第一款:逆火网站日志分析器逆火网站日志分析器是一款功能全面的网站服务器日志分析软件。通过分析网站的日志文件,不仅能够精准的知道网站的访问量、网站的访问来源,网站的广告点击,访客的地区统计,搜索引擎关键字查询等,还能够一次性分析多个网站的日志文件,让你轻松管理网站。逆火网站日志分析器下载地址:https://pan.baidu.

  9. elasticsearch源码关于TransportSearchAction【阶段三】 - 2

    1.回顾.TransportServicepublicclassTransportServiceextendsAbstractLifecycleComponentTransportService:方法:1publicfinalTextendsTransportResponse>voidsendRequest(finalTransport.Connectionconnection,finalStringaction,finalTransportRequestrequest,finalTransportRequestOptionsoptions,TransportResponseHandlerT>

  10. ruby - 在 Ruby 数组中收集重复项的最快/单行方法? - 2

    像这样转换数组的最快/单行方法是什么:[1,1,1,1,2,2,3,5,5,5,8,13,21,21,21]...进入像这样的对象数组:[{1=>4},{2=>2},{3=>1},{5=>3},{8=>1},{13=>1},{21=>3}] 最佳答案 要获得所需的格式,您可以附加一个调用以映射到您的解决方案:array.inject({}){|h,v|h[v]||=0;h[v]+=1;h}.map{|k,v|{k=>v}}虽然它仍然是单行的,但它开始变得凌乱了。 关于ruby-在Ruby

随机推荐