一. 前言
在Python中使用Elasticsearch(ES)通常涉及安装Elasticsearch的Python客户端库,然后通过该库与Elasticsearch集群进行交互。
二. 基本使用
1. 安装Elasticsearch Python客户端库
首先,你需要安装elasticsearch库。你可以使用pip来安装它:
pip install elasticsearch
2. 连接到Elasticsearch集群
在Python中,你可以通过创建一个Elasticsearch对象来连接到Elasticsearch集群。
from elasticsearch import Elasticsearch# 创建Elasticsearch客户端实例
es = Elasticsearch(['http://localhost:9200'])# 检查连接是否成功
if es.ping():print("Successfully connected to Elasticsearch!")
else:print("Could not connect to Elasticsearch")
3. 执行索引操作
创建索引
在Elasticsearch中,索引类似于关系型数据库中的表。可以使用客户端实例的indices.create()
方法来创建一个新的索引。
# 创建索引的请求体(这里是一个简单的例子,实际使用中可能更复杂)
index_body = {"settings": {"number_of_shards": 1,"number_of_replicas": 1},"mappings": {"properties": {"field1": {"type": "text"},"field2": {"type": "integer"}}}
}# 创建索引
es.indices.create(index='my_index', body=index_body)
添加文档
可以使用index()
方法来向索引中添加文档。
# 添加文档的请求体
doc_body = {"field1": "value1","field2": 123
}# 添加文档(指定索引名和文档ID)
es.index(index='my_index', id=1, body=doc_body)
4. 执行搜索操作
使用search()
方法来执行搜索查询。
# 查询DSL
query_body = {"query": {"match": {"field1": "value1"}}
}# 执行搜索
response = es.search(index='my_index', body=query_body)# 处理响应
for hit in response['hits']['hits']:print(hit['_source'])
三. 整合封装成一个类来使用
import json
import uuid
from datetime import datetime, timedeltafrom elasticsearch import Elasticsearchfrom base.common.time_format import get_isoformat_time
from configure.configure import config
from configure.server_config import logger
import time
import tracebackes_conf = config['elasticsearch']class ElasticSearchService(Elasticsearch):es_service = Nonemappings = {"properties": {# "id": {"type": "keyword"},"content": {"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_smart"},"time": {"type": "date", "format": "yyyy-MM-dd'T'HH:mm:ss"},"qst_id": {"type": "keyword"},"reply_type": {"type": "integer"}}}def __init__(self, index_name, addrs, *args, **kwargs):self.max_result_window = es_conf['max_result_window']self.username = es_conf['username']self.password = es_conf['password']self.index_name = index_nameself.addrs = addrssuper().__init__(self.addrs, basic_auth=(self.username, self.password), request_timeout=3600)# 1.查询index是否存在if not self.indices.exists(index=self.index_name):self.create_index(self.index_name)if not self.ping():logger.error(f"ElasticSearchHandler Connection failed")logger.info(f"Connect to ElasticService successfully!!! addrs:{addrs}, index_name:{self.index_name}")def create_index(self, index_name):# 创建索引if not self.indices.exists(index=index_name):response = self.indices.create(index=index_name, body={})logger.info(f"Index [{index_name}] created successfully!")# 检查索引是否创建成功if not response.get('acknowledged'):logger.info(f"Failed to create index '{index_name}'. Response: {response}")return Falseself.create_mapping_session_history()self.create_index_setting()if response.get('shards_acknowledged'):logger.info(f"Index '{index_name}' All shards are acknowledged.")else:logger.info(f"Index '{index_name}' Not all shards are acknowledged.")def create_mapping_session_history(self):mapping = ElasticSearchService.mappings# 将mapping添加到索引response = self.indices.put_mapping(index=self.index_name, body=mapping)# 检查索引是否创建成功if response.get('acknowledged'):logger.info(f"Index '{self.index_name}' created successfully with mapping.")else:logger.info(f"Failed to create index '{self.index_name}'. Response: {response}")def create_index_setting(self):setting = {"number_of_replicas": "0"}# 将setting添加到索引response = self.indices.put_settings(index=self.index_name, body=setting)# 检查索引是否创建成功if response.get('acknowledged'):logger.info(f"Index '{self.index_name}' created successfully with setting.")else:logger.info(f"Failed to create index setting'{self.index_name}'. Response: {response}")def delete_index(self, index_name):res = self.indices.delete(index=index_name)logger.info(f"Index [{index_name}] deleted successfully!, res: {res}")return resdef insert_doc(self, hist_hash_id: str, doc_body: dict):"""新增数据:param hist_hash_id::param doc::return:"""try:self.index(index=self.index_name, id=hist_hash_id, body=doc_body)# 刷新索引以确保文档立即可见res = self.indices.refresh(index=self.index_name)logger.info(f"Document hist_hash_id:[{hist_hash_id}] indexed successfully!")return resexcept Exception as e:logger.error(f"Failed to index document hist_hash_id:[{hist_hash_id}]: {e}")def bulk_insert_docs(self, session_histories: list):"""批量新增数据:param chitchat_list::return:"""try:# 准备批量数据bulk_actions = []failed_list = []batch_count = 1000for i in range(0, len(session_histories), batch_count):for item in session_histories[i:i + batch_count]:doc = {# "id": item.get('id', 0),"you_feild": item.get('you_feild', ''),...}action = {"index": { # Use "index" as the action"_index": self.index_name,# 如果需要指定文档ID,可以取消下面的注释"_id": item.get('hist_hash_id', '')}}# 将 action 和 doc 作为元组的两个元素添加到 bulk_actions 列表中bulk_actions.append(action)bulk_actions.append(doc)print(f"insert data -> {item}")response = self.bulk(index=self.index_name, body=bulk_actions)# 检查响应中的成功和失败项for item in response['items']:if item['index']['status'] != 201:failed_list.append(item)logger.info(f"Elasticsearch 批量新增完成,failed_list:{failed_list}")# 刷新索引以确保文档立即可见self.indices.refresh(index=self.index_name)return failed_listexcept Exception as e:traceback.print_exc()logger.error(f"Elasticsearch bulk insert doc error:{e}")def delete_doc_by_id(self, doc_ids):""" 删除文档 """try:failed_list = []for doc_id in doc_ids:response = self.delete(index=self.index_name, id=doc_id)# 检查响应状态if response.meta.status != 200:failed_list.append(doc_id)logger.info(f"Document with ID {doc_id} in index {self.index_name} was deleted failed!")logger.info(f"Document with ID {doc_id} in index {self.index_name} was deleted successfully.")return failed_listexcept Exception as e:traceback.print_exc()logger.error(f"Elasticsearch delete_doc error:{e}")def delete_docs_by_query_body(self, query_body):# 使用_delete_by_query API 删除所有文档# 注意:这里我们使用了一个匹配所有文档的查询:{"query": {"match_all": {}}}try:response = self.delete_by_query(index=self.index_name, body=query_body)logger.info("Deleted documents:", response['_deleted']) # 这将显示被删除的文档数量except Exception as e:# 捕获并处理异常logger.error(f"Deleted docs error: {e}")def update_doc(self, datas: list):""" 更新文档 """try:failed_list = []for data in datas:# 更新文档(实际上是重新索引)response = self.index(index=self.index_name, id=data['doc_id'], body=data['doc'])logger.info("Update Response:", response)if response.meta.status != 200:failed_list.append(data)# 刷新索引以立即应用更改(可选)self.indices.refresh(index=self.index_name)logger.info(f"Elasticsearch update_doc finished! failed_list -> {failed_list}")except Exception as e:traceback.print_exc()logger.error(f"Elasticsearch update_doc error: {e}")def get_doc(self, doc_id):""" 获取文档数据 """try:doc = self.get(index=self.index_name, id=doc_id)['_source']return docexcept Exception as e:logger.error(f"Error retrieving document {doc_id}: {e}")return Nonedef search_index(self, query_body):"""检索文档query_body:查询体(Query DSL)"""try:logger.info(f'elasticsearch search index_name={self.index_name},query_body={query_body}')response = self.search(index=self.index_name, body=query_body)logger.info(f'elasticsearch search [{self.index_name}] response.meta.status={response.meta.status}')if response.get('_shards', {}).get('successful') == response.get('_shards', {}).get('total'):logger.info(f"Search index successful! total count={response['hits']['total']['value']}, match count={len(response['hits']['hits'])}")# logger.info(f"search response -> {response}")if response['hits']['total']['value'] > 0:return responsereturn Nonereturn Noneexcept Exception as e:traceback.print_exc()logger.error(f"ElasticService search_index error:{e}")def search_index_by_scroll_api(self, query_body):"""分页查询query_body:查询体(Query DSL)"""try:logger.info(f'elasticsearch search index_name={self.index_name},query_body={query_body}')response = self.search(index=self.index_name, body=query_body, scroll='1m')logger.info(f'elasticsearch search [{self.index_name}] response.meta.status={response.meta.status}')if response.get('_shards', {}).get('successful') == response.get('_shards', {}).get('total'):logger.info(f"Search index successful! total count={response['hits']['total']['value']}, match count={len(response['hits']['hits'])}")# logger.info(f"search response -> {response}")if response['hits']['total']['value'] > 0:return responsereturn Nonereturn Noneexcept Exception as e:traceback.print_exc()logger.error(f"ElasticService search_index error:{e}")def search_by_sql(self, sql_body):try:logger.info(f'elasticsearch search index_name={self.index_name},sql_body={sql_body}')response = self.sql.query(body=sql_body)logger.info(f'elasticsearch search [{self.index_name}] response.meta.status={response.meta.status}')if response.meta.status == 200:columns = response.get('columns')rows = response.get('rows')# 提取列名column_names = [col['name'] for col in columns]# 组织成字典格式result_dicts = []for row in rows:result_dict = {column_names[i]: row[i] for i in range(len(column_names))}result_dicts.append(result_dict)logger.info(f"Search index successful! match count={len(result_dicts)}")return result_dictsreturn []except Exception as e:traceback.print_exc()logger.error(f"ElasticService search_by_sql error:{e}")return []def get_elastic_instance(index_name, addrs):_es_service = None_wait_times = 0_try_count = 5_interval_seconds = 10for i in range(_try_count): # 初始化后,尝试启动5次,第次间隔10秒try:_es_service = ElasticSearchService(index_name, addrs)if _es_service:logger.info(f"ElasticService initial successfully!")print(f"ElasticService initial successfully!")return _es_servicelogger.warning(f"Connect to elasticServer failed, try reconnect to elasticServer [{i}]!")except Exception as e:traceback.print_exc()logger.warning(f"初始化ElasticService失败,结果:{_es_service}, 异常原因:{str(e)}, 应用将在{_interval_seconds}秒后重新尝试.")time.sleep(_interval_seconds)es_service = None
port = es_conf['port']
host = es_conf['host']
addrs = [f"http://{host}:{port}", ]if config['elasticsearch']['enabled']:index_name = config['elasticsearch']['session_history_index']es_service = get_elastic_instance(index_name, addrs)
else:logger.info(f"[elasticsearch] 未启用! enabled -> {config['elasticsearch']['enabled']}")if __name__ == '__main__':index_name = config['elasticsearch']['session_history_index']es_service = get_elastic_instance(index_name, addrs)# 添加文档docs = [{# "id": i + 1,"you_feild": "",...} for i in range(5)]# 插入数据# es_service.insert_doc('2', doc)print(es_service.bulk_insert_docs(docs))# 删除index# print(es_service.delete_index(index_name))# 获取文档# print(es_service.get_doc('c2b27b31-80f8-4cf6-b3f2-36683b60d7da'))# logger.info(es_service.get_doc('2'))# 删除文档# logger.info(es_service.delete_doc_by_id(['f554d0e5-e4cc-4556-952b-b12cdc640fe56']))# query_body = {"query": {"match_all": {}}}# logger.info(es_service.delete_docs_by_query_body(query_body))# 更新数据# datas = [{'doc_id': 'c2b27b31-80f8-4cf6-b3f2-36683b60d7da', 'doc': {'qst_content': 'qqq'}}]# print(es_service.update_doc(datas))# 查询数据keyword = "缴清"query_body = {"query": {"multi_match": {"query": keyword,"fields": ["reply_content", "qst_content", "standard_qst"]}},"from": 0,"size": 10,"sort": [{"chat_qst_time": "desc"}]}# print(es_service.search_index(query_body))
四. 总结
以上是使用Python与Elasticsearch进行交互的基本步骤。可以根据实际需求扩展这些操作,例如处理更复杂的查询、使用聚合、批量操作等。Elasticsearch的Python客户端库提供了丰富的API,可以满足大多数与Elasticsearch交互的需求。
希望对你有所帮助!