草庐IT

ElasticSearch索引的快照、清理策略脚本

跨行菜鸟运维 2023-04-22 原文

一、简介

当使用ES存储接入的应用日志时,日志索引会日益增多。而真实的日志查询需求一般是要求半月可查,存储半年到一年。应用每天产生的日志普遍会存到对应ES中以当天日期命名的索引中。查询时根据需求,最多查询半月对应索引里的数据。至于超过半月以上的日志索引数据,可快照文件,存储到文件系统中。在有特殊场景需求时进行快照恢复进行查询。这样减小ES的索引压力,提高查询效率。

需求

将半月以上的日志索引快照成文件,存储到快照仓库中
删除已快照的日志索引
定时检测创建超过15天的索引并快照、清理
钉钉通知快照后删除的索引名称
脚本执行错误时告警

二、基于API的Shell脚本

1、Shell脚本

通过环境变量设置参数
脚本执行完成后发送钉钉通知,显示脚本涉及到的ES索引
集成Sentry告警,每当脚本执行出错时将时间发送至Sentry,再由Sentry进行邮件告警
可使用Linux cron工具或K8S上的cornjob定时每天早上1点执行该脚本

#!/bin/bash
export SENTRY_DSN=http://*****@sentry@example.com/28
eval "$(sentry-cli bash-hook)"

elasticsearch_host=${ELASTICSEARCH_HOST:192.168.10.22}
elasticsearch_username=${ELASTICSEARCH_USERNAME:cronjob}
elasticsearch_password=${ELASTICSEARCH_PASSWORD:******}
elasticsearch_index_expiry_day=${ELASTICSEARCH_INDEX_EXPIRY_DAY:15}
elasticsearch_exclude_index=${ELASTICSEARCH_EXCLUDE_INDEX:.*}
elasticsearch_snapshots_repository=${ELASTICSEARCH_SNAPSHOTS_REPOSITORY:***}

elasticsearch_index_expiry_sec=$((elasticsearch_index_expiry_day*86400))
elasticsearch_url="http://${elasticsearch_host}:9200"
allIndex=`curl -s -u ${elasticsearch_username}:${elasticsearch_password} -XGET "${elasticsearch_url}/_cat/indices/_all?h=index"`
excludeIndex=`curl -s -u ${elasticsearch_username}:${elasticsearch_password} -XGET "${elasticsearch_url}/_cat/indices/${elasticsearch_exclude_index}/?h=i"`
indices=`echo -e "$allIndex\n""$excludeIndex" |sort -n |uniq -u`

for i in $indices ;
do
  createdateincludemesc=`curl -s -u ${elasticsearch_username}:${elasticsearch_password} -XGET "${elasticsearch_url}/_cat/indices/$i?h=cd"` ;
  createdate=$((createdateincludemesc/1000))
  currentdate=`date +%s`
  durationtime=$((currentdate-createdate)) ;
  if [ $durationtime -gt $elasticsearch_index_expiry_sec ] ;then
     snapshotsIndices=$i"\n"${snapshotsIndices}
  fi
done

for i in `echo -e $snapshotsIndices` ;
do
  if [ `curl -o /dev/null -w "%{http_code}\n" -s -u ${elasticsearch_username}:${elasticsearch_password} -XPUT "${elasticsearch_url}/_snapshot/${elasticsearch_snapshots_repository}/%3C$i-%7Bnow%2Fd%7D%3E?wait_for_completion=true" -H 'Content-Type: application/json' -d'{"indices": "'$i'","ignore_unavailable": true,"include_global_state": false}'` = 200 ] ;then
    if [ `curl -o /dev/null -w "%{http_code}\n" -s -u ${elasticsearch_username}:${elasticsearch_password} -XDELETE "${elasticsearch_url}/$i"` = 200 ] ;then
      echo -e "The Index $i \t have been snapshoted to repository and deleted !" ;
    else
      echo "$i failed to delete " ;
    fi
  else
    echo "$i failed to snapshot " ;
  fi
done

curl -s -o /dev/null 'https://oapi.dingtalk.com/robot/send?access_token=*****' \
  -H 'Content-Type: application/json' \
  -d '{"msgtype": "text",
       "text": {"content": "已成功将以下'"$elasticsearch_index_expiry_day"'天之前的索引进行了快照:\n'"$snapshotsIndices"'"}
  }'

2、脚本部署执行

①Linux的Cronjob

echo "0 1 * * * /opt/es-index-snapshots.sh" > /etc/crontab

②Kubernetes的cronjob

1、构建Cronjob镜像

Dockerfile

FROM centos:7
RUN curl -sL https://sentry.io/get-cli/ | bash
ADD ./es-index-snapshots.sh /usr/sbin/es-index-snapshots.sh
Entrypoint ["/bin/sh","-c"]
CMD ["/usr/sbin/es-index-snapshots.sh"]
docker build -t es-index-snapshots:v1 .

2、k8s资源声明文件

es-index-snapshots-cronjob.yml

apiVersion: batch/v1beta1
kind: CronJob
metadata:
  name: es-index-snapshots-cronjob
  namespace: logging
spec:
  concurrencyPolicy: Allow
  failedJobsHistoryLimit: 1
  schedule: 0 1 * * *
  startingDeadlineSeconds: 200
  successfulJobsHistoryLimit: 3
  suspend: false
  jobTemplate:
    spec:
      template:
        spec:
          containers:
          - env:
            - name: TZ
              value: Asia/Shanghai
            - name: ELASTICSEARCH_HOST
              value: *.logging.svc
            - name: ELASTICSEARCH_USERNAME
              value: cronjob
            - name: ELASTICSEARCH_PASSWORD
              value: "***""
            - name: ELASTICSEARCH_INDEX_EXPIRY_DAY
              value: "15"
            - name: ELASTICSEARCH_EXCLUDE_INDEX
              value: .*
            - name: ELASTICSEARCH_SNAPSHOTS_REPOSITORY
              value: "***"
            image: es-index-snapshots:v1
            imagePullPolicy: Always
            name: es-index-snapshots-cronjob
            resources:
              limits:
                cpu: 600m
                memory: 800Mi
              requests:
                cpu: 300m
                memory: 500Mi
            terminationMessagePath: /dev/termination-log
            terminationMessagePolicy: File
          dnsPolicy: ClusterFirst
          imagePullSecrets:
          - name: harbor-secrets
          restartPolicy: OnFailure
          schedulerName: default-scheduler
          securityContext: {}
          terminationGracePeriodSeconds: 30
kubectl apply -f es-index-snapshots-cronjob.yml

三、基于 Python版本SDK的脚本

1、Python脚本

脚本依据索引的创建时间进行处理的。例如设置快照删除15天以前的索引,判断计算的期限是以索引的创建时间算起的15天
定时检测将指定期限以上的索引快照成文件,存储到快照仓库中,然后删除
钉钉通知进行处理的索引名称
脚本执行错误时进行Sentry告警

#!/usr/bin/python3
# -*- coding: UTF-8 -*-

import json,time,requests,sentry_sdk
from elasticsearch import Elasticsearch

es = Elasticsearch(
    ["192.168.10.60","192.168.10.70"],
    # es用户角色权限要求:集群权限:monitor、create_snapshot 索引权限:*(所有索引) monitor、delete_index
    http_auth=('es用户名', 'es用户密码'),
    scheme="http",
    port=9200,
    http_compress=True
)

app_index_retain_day=15
nginx_index_retain_day=15

# Sentry DSN
sentry_sdk.init(dsn='http://*****:*****@sentry.example.com/12')

# 钉钉机器人Token
dingding_webhook_token="*****"

# 获取所有索引
def getAllIndex():
    return es.cat.indices('*', h='index,cd', format='json', s='index')

# 将获取到的所有索引去除"."开头的、名字异常的或想排除的
def getExcludeSystemAndAberrantIndex():
    return list(filter(lambda x: (not ( x['index'].startswith('.') or '%{[app]}' in x['index'] or x['index'].startswith('gitlab-production') or x['index'].startswith('jaeger') )), getAllIndex()))

# 获取应用日志索引
def getAppIndex():
    return list(filter(lambda x: ( not ('nginx' in x['index'] or 'mysql-slowlog' in x['index'] )), getExcludeSystemAndAberrantIndex()))

# 获取Nginx日志索引
def getNginxIndex():
    return list(filter(lambda x: ( 'nginx' in x['index'] ), getExcludeSystemAndAberrantIndex()))

# 获取MySQL慢日志索引
def getMysqlSlowQueryLogIndex():
    return list(filter(lambda x: ('mysql-slowlog' in x['index']), getAllIndex()))

# Snapshots索引
def snapshotIndex(index):
    index_body = {"indices": index }
    print(index)
    return es.snapshot.create(body=index_body,repository='NAS-NFS-Snapshots-Repository', wait_for_completion='true', request_timeout=300, snapshot= index+'-snapshoted-'+ time.strftime('%m-%d') )

# 删除索引
def deleteIndex(index):
    es.indices.delete(index=index)

# 钉钉通知
def dingdingNotification(token,msg,day):

    url = "https://oapi.dingtalk.com/robot/send?access_token="+token
    headers = { "Content-Type": "application/json", "Charset": "UTF-8" }
    # 构建请求数据,post请求
    data = {
        "msgtype": "text",
        "text": {
            "content": msg+"\n"
        },
        "at": {
            "isAtAll": 'true'
        }
    }

    if not requests.post(url, data=json.dumps(data), headers=headers) :
        print("发送钉钉通知失败!")
        sentry_sdk.capture_exception(Exception("发送钉钉通知失败!"))
# 将创建日志超过指定天数的日志索引快照到存储仓库中,然后删除
def snapshotAndDeleteAppIndex(type,day):

    if type == 'app' :
        snapshoted_deleted_app_indices=[]
        for i in getAppIndex():
            cts=time.time()
            if ( (cts - int(i["cd"])/1000) ) > day*86400 :
                 if 'SUCCESS' in snapshotIndex(i["index"])['snapshot']["state"]:
                    deleteIndex(i['index'])
                    print(i['index']+ "已在ES中快照并删除!")
                    snapshoted_deleted_app_indices.append(i['index'])
                 else:
                    print("应用日志索引:"+i['index']+"快照失败")
                    sentry_sdk.capture_exception(Exception("应用日志索引:"+i['index']+"快照失败"))
                    continue
        if snapshoted_deleted_app_indices :
            Notification_Context="[索引快照清理任务]\n成功将以下"+str(day)+"天之前的应用日志索引进行了快照\n"+"\n".join(str(i) for i in  snapshoted_deleted_app_indices)
            dingdingNotification(dingding_webhook_token,Notification_Context,day)
        else:
            Notification_Context = "[索引快照清理任务]\n没有超过"+ str(day)+"天的应用日志索引需要被快照删除!"
            dingdingNotification(dingding_webhook_token, Notification_Context, day)
    elif type == 'nginx' :
        snapshoted_deleted_nginx_indices = []
        for i in getNginxIndex():
            cts=time.time()
            if ( (cts - int(i["cd"])/1000) ) > day*86400 :
                if 'SUCCESS' in snapshotIndex(i["index"])['snapshot']["state"]:
                    deleteIndex(i['index'])
                    print(i['index'] + "已在ES中快照并删除!")
                    snapshoted_deleted_nginx_indices.append(i['index'])
                else:
                    print("Nginx日志索引:" + i['index'] + "快照失败")
                    sentry_sdk.capture_exception(Exception("Nginx日志索引:" + i['index'] + "快照失败"))
                    continue
        if snapshoted_deleted_nginx_indices:
            Notification_Context = "[索引快照清理任务]\n成功将以下" + str(day) + "天之前的应用Nginx索引进行了快照\n" + "\n".join(str(i) for i in snapshoted_deleted_nginx_indices)
            dingdingNotification(dingding_webhook_token, Notification_Context, day)
        else:
            Notification_Context = "[索引快照清理任务]\n没有超过" + str(day) + "天的应用Nginx日志索引需要被快照删除!"
            dingdingNotification(dingding_webhook_token, Notification_Context, day)

    elif type == 'mysqlslowlog' :
        snapshoted_deleted_mysqlslowlog_indices = []
        for i in getMysqlSlowQueryLogIndex():
            cts = time.time()
            if ((cts - int(i["cd"]) / 1000)) > day * 86400:
                if 'SUCCESS' in snapshotIndex(i["index"])['snapshot']["state"]:
                    deleteIndex(i['index'])
                    print(i['index'] + "已在ES中快照并删除!")
                    snapshoted_deleted_mysqlslowlog_indices.append(i['index'])
                else:
                    print("MySQL慢查询日志索引:" + i['index'] + "快照失败")
                    sentry_sdk.capture_exception(Exception("MySQL慢查询日志索引:" + i['index'] + "快照失败"))
                    continue
        if snapshoted_deleted_mysqlslowlog_indices :
            Notification_Context = "[索引快照清理任务]\n成功将以下" + str(day) + "天之前的MySQL慢查询日志索引进行了快照\n" + "\n".join(str(i) for i in snapshoted_deleted_mysqlslowlog_indices)
            dingdingNotification(dingding_webhook_token, Notification_Context, day)
        else:
            Notification_Context = "[索引快照清理任务]\n没有超过" + str(day) + "天的MySQL慢查询日志索引需要被快照删除!"
            dingdingNotification(dingding_webhook_token, Notification_Context, day)

def main():
    print("====================="+time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())+"开始清理es中的索引=====================")
    print("................开始清理"+app_index_retain_day+"天前应用相关索引................")
    # 快照删除指定期限之前的应用索引
    snapshotAndDeleteAppIndex('app',app_index_retain_day)
    print("................开始清理"+nginx_index_retain_day+"天前nginx相关索引................")
    # 快照删除指定期限之前Nginx索引
    snapshotAndDeleteAppIndex('nginx',nginx_index_retain_day)

    exit(0)

if __name__ == "__main__" :
    main()

2、requirements.txt

elasticsearch==7.0.0
pyyaml
requests
sentry_sdk

3、操作步骤

Python版本:3

默认清理策略

快照删除指定日期前的应用日记索引
快照删除指定日期前的应用Nginx日记索引 (索引名包含Nginx关键字的)
安装依赖

  pip3 install -r requierements.txt

执行脚本


PYTHONIOENCODING=utf-8 python3 es-index-snapshots-clean.py

Crontab定时执行脚本:每天凌晨1点执行

  0 0 1 * * ? python3 es-index-snapshots-clean.py

四、官方的curator索引管理工具

https://www.elastic.co/guide/en/elasticsearch/client/curator/5.8/index.html

五、ES自带的ILM(index lifecycle management)功能

https://www.elastic.co/guide/en/elasticsearch/reference/7.17/index-lifecycle-management.html

有关ElasticSearch索引的快照、清理策略脚本的更多相关文章

  1. ruby - 如何将脚本文件的末尾读取为数据文件(Perl 或任何其他语言) - 2

    我正在寻找执行以下操作的正确语法(在Perl、Shell或Ruby中):#variabletoaccessthedatalinesappendedasafileEND_OF_SCRIPT_MARKERrawdatastartshereanditcontinues. 最佳答案 Perl用__DATA__做这个:#!/usr/bin/perlusestrict;usewarnings;while(){print;}__DATA__Texttoprintgoeshere 关于ruby-如何将脚

  2. ruby-on-rails - 独立 ruby​​ 脚本的配置文件 - 2

    我有一个在Linux服务器上运行的ruby​​脚本。它不使用rails或任何东西。它基本上是一个命令行ruby​​脚本,可以像这样传递参数:./ruby_script.rbarg1arg2如何将参数抽象到配置文件(例如yaml文件或其他文件)中?您能否举例说明如何做到这一点?提前谢谢你。 最佳答案 首先,您可以运行一个写入YAML配置文件的独立脚本:require"yaml"File.write("path_to_yaml_file",[arg1,arg2].to_yaml)然后,在您的应用中阅读它:require"yaml"arg

  3. postman——集合——执行集合——测试脚本——pm对象简单示例02 - 2

    //1.验证返回状态码是否是200pm.test("Statuscodeis200",function(){pm.response.to.have.status(200);});//2.验证返回body内是否含有某个值pm.test("Bodymatchesstring",function(){pm.expect(pm.response.text()).to.include("string_you_want_to_search");});//3.验证某个返回值是否是100pm.test("Yourtestname",function(){varjsonData=pm.response.json

  4. ruby - 确定 ruby​​ 脚本是否已经在运行 - 2

    有没有一种简单的方法可以判断ruby​​脚本是否已经在运行,然后适本地处理它?例如:我有一个名为really_long_script.rb的脚本。我让它每5分钟运行一次。当它运行时,我想看看之前运行的是否还在运行,然后停止第二个脚本的执行。有什么想法吗? 最佳答案 ps是一种非常糟糕的方法,并且可能会出现竞争条件。传统的Unix/Linux方法是将PID写入文件(通常在/var/run中)并在启动时检查该文件是否存在。例如pid文件位于/var/run/myscript.pid然后你会在运行程序之前检查它是否存在。有一些技巧可以避免

  5. ruby-on-rails - 协会的 Rails 索引 - 2

    我发现自己需要这个。假设cart是一个包含用户列表的模型。defindex_of_itemcart.users.each_with_indexdo|u,i|ifu==current_userreturniendend获取此类关联索引的更简单方法是什么? 最佳答案 indexArray上的方法与您的index_of_item方法相同,例如cart.users.index(current_user)返回数组中第一个对象的索引==给obj。如果未找到匹配项,则返回nil。 关于ruby-on-

  6. ruby - ruby 脚本可以预编译成二进制文件吗? - 2

    我正在开发一个Ruby脚本,需要在没有Ruby解释器的情况下部署到系统上。它将需要在使用ELF格式的FreeBSD系统上运行。我知道有一个ruby​​2exe项目可以编译在Windows上运行的ruby​​脚本,但是在其他操作系统上这样做容易吗?甚至可能吗? 最佳答案 您是否检查过Rubinius或JRuby是否允许您预编译您的代码? 关于ruby-ruby脚本可以预编译成二进制文件吗?,我们在StackOverflow上找到一个类似的问题: https://

  7. ruby - Rails -- :id attribute? 所需的数据库索引 - 2

    因此,当我遵循MichaelHartl的RubyonRails教程时,我注意到在用户表中,我们为:email属性添加了一个唯一索引,以提高find的效率方法,因此它不会逐行搜索。到目前为止,我们一直在根据情况使用find_by_email和find_by_id进行搜索。然而,我们从未为:id属性设置索引。:id是否自动索引,因为它在默认情况下是唯一的并且本质上是顺序的?或者情况并非如此,我应该为:id搜索添加索引吗? 最佳答案 大多数数据库(包括sqlite,这是RoR中的默认数据库)会自动索引主键,对于RailsMigration

  8. ruby-on-rails - solr 清理查询 - 2

    我在Rails上使用带有ruby​​的solr。一切正常,我只需要知道是否有任何现有代码来清理用户输入,比如以?开头的查询。或* 最佳答案 我不知道执行此操作的任何代码,但理论上可以通过查看parsingcodeinLucene来完成并搜索thrownewParseException(只有16个匹配!)。在实践中,我认为您最好只捕获代码中的任何solr异常并显示“无效查询”消息或类似信息。编辑:这里有几个“sanitizer”:http://pivotallabs.com/users/zach/blog/articles/937-s

  9. ruby - 引用具有指定索引的枚举器值 - 2

    假设我有一个可枚举对象enum,现在我想获取第三个项目。我知道一种通用方法是转换成数组,然后使用索引访问,如:enum.to_a[2]但这种方式会创建一个临时数组,效率可能很低。现在我使用:enum.each_with_index{|v,i|breakvifi==2}但这非常丑陋和多余。执行此操作最有效的方法是什么? 最佳答案 你可以使用take剥离前三个元素,然后剥离last从take给你的数组中获取第三个元素:third=enum.take(3).last如果您根本不想生成任何数组,那么也许:#Ifenumisn'tanEnum

  10. ruby-on-rails - 从对 super 的调用中清理一个 block - 2

    我正在使用ActiveAttr,它为您提供了很好的通过block选项进行初始化:person=Person.new()do|p|p.first_name='test'p.last_name='man'end但是,在包含ActiveAttr::Model的特定类中,我想绕过此功能,因为我想将该block用于其他用途。所以我们开始吧:classImperator::CommandincludeActiveAttr::ModelendclassMyCommand这失败得很惨,因为该block仍然向上传递到链中,并最终在ActiveAttr内部运行此代码:definitialize(*)sup

随机推荐