草庐IT

日志=》kafka》ELK

antyyy123 2023-08-07 原文

kELK是三个开源软件的缩写,分别表示:Elasticsearch , Logstash, Kibana;
Elasticsearch是个开源分布式搜索引擎,提供搜集、分析、存储数据三大功能;它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等。
Logstash 主要是用来日志的搜集、分析、过滤日志的工具,支持大量的数据获取方式。一般工作方式为c/s架构,client端安装在需要收集日志的主机上,server端负责将收到的各节点日志进行过滤、修改等操作在一并发往elasticsearch上去。
Kibana 也是一个开源和免费的工具,Kibana可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以帮助汇总、分析和搜索重要数据日志

logstach(日志收集)->Elasticsearch(日志存储和搜索)->Kibana(查看日志,可视化)

为什么要使用elk?
        ELK 组件在海量日志系统的运维中,可用于解决以下主要问题:- 分布式日志数据统一收集,实现集中式查询和管理
故障排查
安全信息和事件管理
报表功能

我们为什么用kafka,一定要通过kafka吗 
    不是,可以直接logback到ELK的,但是为什么使用kafka接收日志呢,是为了减少logstash对于日志进入时的压力。kafka的特性使用过的人应该都清楚,拥有这10W级别每秒的单机吞吐量,所以很适合作为数据来源缓冲区。

logback.xml

 <!-- kafkaAppender 输出日志到kafka -->
    <appender name="kafkaAppender" 
         class="com.td.ai.frame.uni.platform.oaudit.unify.config.KafkaAppender">
        <bootstrapServers>kafka-servers</bootstrapServers>
        <topic>kafka-topic</topic>
    </appender>

 <!-- 要输出日志的类 -->
    <logger name="logKafka" level="info">
        <appender-ref ref="kafkaAppender"/>
    </logger>
    <!-- 异步传递策略,建议选择异步,不然连接kafka失败,会阻挡服务启动 -->
    <appender name="Async" class="ch.qos.logback.classic.AsyncAppender">
        <appender-ref ref="kafkaAppender"/>
    </appender>
public class KafkaAppender extends AppenderBase<ILoggingEvent> {

    private static Logger logger = LoggerFactory.getLogger(KafkaAppender.class);

    private String topic = "***";



  
    private Producer<String, String> producer;



    @Override
    public void start() {
        super.start();
        if (producer == null) {
            Properties props = new Properties();
            props.put("security.protocol", "SASL_PLAINTEXT");
            props.put("sasl.mechanism", "SCRAM-SHA-512");
            props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule\n" +
                    "required username=\"***\"" +
                    "password=\"****\";");
            props.put("bootstrap.servers", topic);
            //判断是否成功,我们指定了“1”将会阻塞消息
            props.put("acks", "1");
            props.put("retries", 3);
            props.put("batch.size", 262144);
            //延迟10s,10s内数据会缓存进行发送\
            props.put("linger.ms", 10);
            props.put("buffer.memory", 67108864);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("metric.reporters", "com.ctg.kafka.clients.reporter.KafkaClientMetricsReporter");
            props.put("client.id", ""***);
            producer = new KafkaProducer<String, String>(props);


        }

    }


    @Override
    protected void append(ILoggingEvent iLoggingEvent) {
        String msg = iLoggingEvent.getFormattedMessage();
        String message = "";
        InetAddress localHost = null;
        try {
            localHost = Inet4Address.getLocalHost();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
        String hostIp = localHost.getHostAddress();
        String hostName = localHost.getHostName();
        Date date = new Date();
        SimpleDateFormat sdformat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");//24小时制
        String datetime = sdformat.format(date);
        JSONObject json = new JSONObject();
       
        json.put("podIP", hostIp);
        json.put("podName", hostName);
        message = json.toString();
//        System.out.println("向kafka推送日志开始:" + message);
        //key为null  2.4之前为轮询策略
        // 如果key值为null,并且使用了默认的分区器,Kafka会根据轮询(Random Robin)策略将消息均匀地分布到各个分区上。
        // 之后为粘性策略
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(
                topic, null, message);
        //同步发动消息-改-异步发送消息
        try {
            Future<RecordMetadata> result = producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        // 执行错误逻辑处理//否则就是成功喽
                        exception.printStackTrace();
                    }
                }
            });
//            System.out.println("分区:" + result.get().partition() + ",offset: " + result.get().offset());
        } catch (Exception e) {
            e.printStackTrace();
        }


        producer.flush();



    }



}

服务器安装logstash 


1. 查看一下路径
pwd
应该显示/app/logstash或者/data/logstash

2. 将tar包上传

3. 执行以下命令
tar -zxvf logstash-7.5.2.tar.gz(自己的版本号)

cd logstash-7.5.2

mkdir config/conf
mkdir config/certs
mkdir logs

cd config/conf
上传js-sysname.conf

input {
    kafka {
		topics_pattern  => "kafkatopic"
        consumer_threads => 4
        group_id => "***-consumer" # kafka 消费组
        type => "kafka"
        security_protocol => "SASL_PLAINTEXT"
        sasl_mechanism => "SCRAM-SHA-512"
        jaas_path => "/home/crmapp/logstash-7.5.2/config/certs/kafka_client_jaas.conf"
        bootstrap_servers => "*****"
        codec => "json"
    }
}
 
filter {
  ruby{
    code => "event.set('index_day',event.get('@timestamp').time.localtime('+08:00').strftime('%Y.%m.%d'))"
  }
 
}

output {
    
        elasticsearch {
       hosts => ["*****"]
       index => "***a-log-%{index_day}"
            user => "**"
            password => "这里写es的密码"
        }
    
}

cd ../certs
上传kafka_client_jaas.conf

KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="**"
password="****";
};

cd ..
vim pipelines.yml
将下面的加到"# Example of two pipelines:"这一行下面
 - pipeline.id:js-sysname
   pipeline.workers: 2
   path.config: "/app/logstash/logstash-7.5.2/config/conf/-js-sysname.conf"

cd /app/logstash/logstash-7.5.2/
nohup bin/logstash -r true --config.reload.automatic >> logs/logstash.log &

4. 查看日志
tail -100f logs/logstash.log
启动需要时间,如果没有erorr日志,没有提示连不上kafka或者elasricsearch即可

有关日志=》kafka》ELK的更多相关文章

  1. ruby - Sinatra 中的全局救援和日志记录异常 - 2

    如何在出现异常时指定全局救援,如果您将Sinatra用于API或应用程序,您将如何处理日志记录? 最佳答案 404可以在not_found方法的帮助下处理,例如:not_founddo'Sitedoesnotexist.'end500s可以通过调用带有block的错误方法来处理,例如:errordo"Applicationerror.Plstrylater."end错误的详细信息可以通过request.env中的sinatra.error访问,如下所示:errordo'Anerroroccured:'+request.env['si

  2. ruby-on-rails - 使用 Ruby 标准 Logger 每天只创建一个日志 - 2

    我正在使用ruby​​标准记录器,我想要每天轮换一次,所以在我的代码中我有:Logger.new("#{$ROOT_PATH}/log/errors.log",'daily')它运行完美,但它创建了两个文件errors.log.20130217和errors.log.20130217.1。如何强制它每天只创建一个文件? 最佳答案 您的代码对于长时间运行的应用程序是正确的。发生的事情是您在给定的一天多次运行代码。第一次运行时,Ruby会创建一个日志文件“errors.log”。当日期改变时,Ruby将文件重命名为“errors.log

  3. ruby - Cucumber/Savon 省略或删除日志输出 - 2

    在运行Cucumber测试时,我得到(除了测试结果)大量调试/日志相关的输出形式:D,[2013-03-06T12:21:38.911829#49031]DEBUG--:SOAPrequest:D,[2013-03-06T12:21:38.911919#49031]DEBUG--:Pragma:no-cache,SOAPAction:"",Content-Type:text/xml;charset=UTF-8,Content-Length:1592W,[2013-03-06T12:21:38.912360#49031]WARN--:HTTPIexecutesHTTPPOSTusingt

  4. ruby-on-rails - faraday如何设置日志级别 - 2

    我最近将我的http客户端切换到faraday,一切都按预期工作。我有以下代码来创建连接:@connection=Faraday.new(:url=>base_url)do|faraday|faraday.useCustim::Middlewarefaraday.request:url_encoded#form-encodePOSTparamsfaraday.request:jsonfaraday.response:json,:content_type=>/\bjson$/faraday.response:loggerfaraday.adapterFaraday.default_ada

  5. 网站日志分析软件--让网站日志分析工作变得更简单 - 2

    网站的日志分析,是seo优化不可忽视的一门功课,但网站越大,每天产生的日志就越大,大站一天都可以产生几个G的网站日志,如果光靠肉眼去分析,那可能看到猴年马月都看不完,因此借助网站日志分析工具去分析网站日志,那将会使网站日志分析工作变得更简单。下面推荐两款网站日志分析软件。第一款:逆火网站日志分析器逆火网站日志分析器是一款功能全面的网站服务器日志分析软件。通过分析网站的日志文件,不仅能够精准的知道网站的访问量、网站的访问来源,网站的广告点击,访客的地区统计,搜索引擎关键字查询等,还能够一次性分析多个网站的日志文件,让你轻松管理网站。逆火网站日志分析器下载地址:https://pan.baidu.

  6. ruby - 如何更改 Sinatra 中的日志级别 - 2

    我正在使用此代码在我的Sinatra应用程序中启用日志记录:log_file=File.new('my_log_file.log',"a")$stdout.reopen(log_file)$stderr.reopen(log_file)$stdout.sync=true$stderr.sync=true实际的日志记录是使用:logger.debug("Startingcall.Params=#{params.inspect}")事实证明,只有INFO或更高级别的日志消息被记录,而DEBUG消息没有被记录。我正在寻找一种将日志级别设置为DEBUG的方法。 最佳

  7. ruby - 带有 grep 远程日志文件的 tail - 2

    我有这段代码来跟踪远程日志文件:defdo_tail(session,file)session.open_channeldo|channel|channel.on_datado|ch,data|puts"[#{file}]->#{data}"endchannel.exec"tail-f#{file}"endNet::SSH.start("host","user",:password=>"passwd")do|session|do_tailsession,"/path_to_log/file.log"session.loop我只想在file.log中检索带有ERROR字符串的行,我正在尝

  8. Ruby 守护进程日志轮换 - 2

    当我为Daemons(1.1.0)gem设置日志记录参数时,我将如何实现与此行类似的行为?logger=Logger.new('foo.log',10,1024000)守护进程选项:options={:ARGV=>['start'],:dir_mode=>:normal,:dir=>log_dir,:multiple=>false,:ontop=>false:mode=>:exec,:backtrace=>true,:log_output=>true} 最佳答案 不幸的是,Daemonsgem不使用Logger。它将STDOUT和S

  9. ruby-on-rails - 在 Rails 应用程序的前端获取实时日志 - 2

    在Rails3.x应用程序中,我正在使用net::ssh并向远程pc运行一些命令。我想向用户的浏览器显示实时日志。比如,如果两个命令在net中运行::ssh执行即echo"Hello",echo"Bye"被传递然后"Hello"应该在执行后立即显示在浏览器中。这是代码我在ruby​​onrails应用程序中使用ssh连接和运行命令Net::SSH.start(@servers['local'],@machine_name,:password=>@machine_pwd,:timeout=>30)do|ssh|ssh.open_channeldo|channel|channel.requ

  10. ruby - gem 应该在哪里存储日志文件? - 2

    我正在构建一个应该输出日志文件的ruby​​gem。将日志文件存储在哪里是一个好习惯?我正在从我正在构建的Rails网站中提取此功能,我可以在那里简单地登录到log/目录。 最佳答案 理想情况下,使路径可配置(.rc文件、交换机、rails/rack配置等)。如果它是一个Rack中间件,添加在构造函数的参数中指定它的可能性。如果没有提供日志路径,回退到检测日志目录。(我依稀记得它是Rails中的config.paths['log'],但如果可以的话,请确保config在你的gem中使用之前确实指向某些东西在Rails之外使用。)如果

随机推荐