前言
随着系统规模的不断变大,日志规模也变得越来越大,很多服务每天的日志量都超过了1TB,所以需要对超长的日志进行统计,然后对超长的日志做优化。
python脚本
from elasticsearch import Elasticsearch
import datetime
import re
import jsonindex_name = "driver-order-service-pro-*" # 替换为你的索引名称
fileName = "driverOrderBigLog.txt"
# 替换为你需要统计的时间段
start_date = datetime.datetime(2024, 12, 9, 17, 00, 00)
end_date = datetime.datetime(2024, 12, 9, 18, 00, 00)
maxPages =1000 #统计日志的总页数,时间+页数确定统计的样本start = (start_date - datetime.timedelta(hours=8)).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
end = (end_date - datetime.timedelta(hours=8)).strftime("%Y-%m-%dT%H:%M:%S.%fZ")def paginated_search(es_client, index_name, query, page_size=10, max_pages=None):"""分页查询Elasticsearch数据并打印结果:param es_client: Elasticsearch客户端实例:param index_name: 要查询的索引名称:param query: 查询的DSL(Domain Specific Language)字典:param page_size: 每页返回的记录数,默认是10:param max_pages: 最大查询页数,默认是None(表示查询所有页)"""page_number = 1search_after = Nonescroll_id = Nonewhile True:if search_after:search_body = {"query": query,"size": page_size,"search_after": search_after,"sort": [{"_id": {"order": "asc"}}] # 需要一个稳定的排序字段,这里使用_id}else:search_body = {"query": query,"size": page_size,"from": (page_number - 1) * page_size,"_source": True # 可选,指定返回哪些字段}try:if scroll_id:response = es_client.scroll(scroll_id=scroll_id, scroll='2m')scroll_id = response['_scroll_id']hits = response['hits']['hits']else:response = es_client.search(index=index_name, body=search_body)hits = response['hits']['hits']if not hits:breakfor hit in hits:#print(hit['_source'])result = hit['_source']['message']#这里可以添加去重的操作, 比如相同前缀的忽略,或者包含某个关键词的不打印if(len(result)>5000):with open(fileName, 'a+', encoding='utf-8') as f2:f2.writelines(result + "\n\n")f2.close()# 更新分页参数if search_after:search_after = hits[-1]['sort']else:page_number += 1# 检查是否达到最大页数if max_pages and page_number > max_pages:breakexcept Exception as e:print(f"Error occurred during search: {e}")breakfinally:# 如果有scroll_id,记得清理if scroll_id:try:es_client.clear_scroll(scroll_id=scroll_id)except Exception as e:print(f"Error occurred during clear scroll: {e}")scroll_id = Noneif scroll_id:try:es_client.clear_scroll(scroll_id=scroll_id)except Exception as e:print(f"Error occurred during final clear scroll: {e}")# 示例使用
if __name__ == "__main__":es = Elasticsearch(hosts="http://X.X.X.X:9200/", http_auth=('userName', 'password')) # 替换为你的Elasticsearch地址query = {"bool": {"filter":[{"range":{"@timestamp":{"format": "strict_date_optional_time","gte": start,"lte": end}}}],"must":[],"must_not":[],"should":[]}}paginated_search(es, index_name, query, page_size=10, max_pages=maxPages)
后续处理
通过上面的脚本,将超长的日志打印出来,然后在服务里面对超长的日志进行处理,比如不打印,或者只打印关键信息即可实现日志的压缩。