Elasticsearch 实战教程


什么是Elasticsearch?

Elasticsearch是一个分布式的开源搜索和分析引擎,基于Apache Lucene构建。它能够快速地存储、搜索和分析大量数据,常用于全文搜索、结构化搜索、日志分析、实时分析等场景。

Elasticsearch的主要特点:

  • 分布式:支持水平扩展,能够处理PB级别的数据
  • 实时搜索:近实时的搜索和分析能力
  • RESTful API:提供简单易用的RESTful接口
  • 全文搜索:强大的全文搜索和相关性评分
  • 灵活的数据模型:支持JSON格式的文档存储
  • 聚合分析:强大的数据分析和聚合功能

1. Elasticsearch安装与配置

1.1 Docker方式安装

# 拉取Elasticsearch镜像
docker pull docker.elastic.co/elasticsearch/elasticsearch:8.12.0

# 运行Elasticsearch容器(单节点)
docker run -d --name elasticsearch \
  -p 9200:9200 -p 9300:9300 \
  -e "discovery.type=single-node" \
  -e "xpack.security.enabled=false" \
  -e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \
  docker.elastic.co/elasticsearch/elasticsearch:8.12.0

# 检查Elasticsearch状态
curl -X GET "localhost:9200/"

1.2 Docker Compose方式安装

# docker-compose.yml
version: '3.8'
services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.12.0
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ports:
      - "9200:9200"
      - "9300:9300"
    volumes:
      - es_data:/usr/share/elasticsearch/data

volumes:
  es_data:

1.3 配置文件详解

# config/elasticsearch.yml
cluster.name: my-application
node.name: node-1
path.data: /usr/share/elasticsearch/data
path.logs: /usr/share/elasticsearch/logs

# 网络配置
network.host: 0.0.0.0
http.port: 9200
transport.port: 9300

# 集群发现
discovery.type: single-node

# 内存配置
bootstrap.memory_lock: true
indices.fielddata.cache.size: 40%
indices.query.cache.size: 10%

# 安全配置(禁用x-pack安全功能)
xpack.security.enabled: false

2. Elasticsearch基础概念

2.1 核心概念

概念说明类比关系型数据库
Index文档的集合Database
Type文档的类型(7.x后废弃)Table
Document搜索的基本单位Row
Field文档的属性Column
Shard分片,用于水平扩展Horizontal partition
Replica副本,用于高可用Backup

2.2 REST API基本操作

# 检查集群健康状态
GET /_cluster/health

# 查看集群节点信息
GET /_cat/nodes?v

# 查看所有索引
GET /_cat/indices?v

# 创建索引
PUT /my_index

# 删除索引
DELETE /my_index

# 查看索引映射
GET /my_index/_mapping

# 查看索引设置
GET /my_index/_settings

3. 索引管理

3.1 创建索引与映射

# 创建带有映射的索引
PUT /blog_posts
{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 1,
    "analysis": {
      "analyzer": {
        "my_analyzer": {
          "type": "custom",
          "tokenizer": "standard",
          "filter": ["lowercase", "stop"]
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "title": {
        "type": "text",
        "analyzer": "my_analyzer",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "content": {
        "type": "text",
        "analyzer": "my_analyzer"
      },
      "author": {
        "type": "keyword"
      },
      "publish_date": {
        "type": "date",
        "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
      },
      "tags": {
        "type": "keyword"
      },
      "views": {
        "type": "integer"
      },
      "rating": {
        "type": "float"
      }
    }
  }
}

3.2 字段类型详解

{
  "mappings": {
    "properties": {
      // 文本类型
      "full_text": {
        "type": "text",
        "analyzer": "standard"
      },
      
      // 关键词类型
      "category": {
        "type": "keyword"
      },
      
      // 数值类型
      "price": {
        "type": "double"
      },
      "quantity": {
        "type": "integer"
      },
      
      // 日期类型
      "created_at": {
        "type": "date",
        "format": "strict_date_optional_time||epoch_millis"
      },
      
      // 布尔类型
      "is_active": {
        "type": "boolean"
      },
      
      // 对象类型
      "address": {
        "type": "object",
        "properties": {
          "street": { "type": "text" },
          "city": { "type": "keyword" },
          "zipcode": { "type": "keyword" }
        }
      },
      
      // 嵌套类型
      "comments": {
        "type": "nested",
        "properties": {
          "author": { "type": "keyword" },
          "content": { "type": "text" },
          "date": { "type": "date" }
        }
      },
      
      // 地理位置类型
      "location": {
        "type": "geo_point"
      },
      
      // IP类型
      "ip_address": {
        "type": "ip"
      }
    }
  }
}

3.3 索引设置优化

{
  "settings": {
    "number_of_shards": 5,
    "number_of_replicas": 1,
    "refresh_interval": "30s",
    "index.translog.durability": "async",
    "index.merge.policy.max_merge_at_once": 30,
    "index.merge.policy.segments_per_tier": 30,
    "index.store.type": "niofs"
  }
}

4. 文档操作

4.1 索引文档

# 索引文档(自动生成ID)
POST /blog_posts/_doc
{
  "title": "Elasticsearch入门指南",
  "content": "Elasticsearch是一个强大的搜索引擎...",
  "author": "张三",
  "publish_date": "2026-04-10",
  "tags": ["elasticsearch", "search", "tutorial"],
  "views": 100,
  "rating": 4.5
}

# 索引文档(指定ID)
PUT /blog_posts/_doc/1
{
  "title": "Python与Elasticsearch集成",
  "content": "如何在Python项目中使用Elasticsearch...",
  "author": "李四",
  "publish_date": "2026-04-09",
  "tags": ["python", "elasticsearch", "integration"],
  "views": 200,
  "rating": 4.8
}

# 批量操作
POST /_bulk
{"index":{"_index":"blog_posts","_id":"2"}}
{"title":"Elasticsearch性能优化","content":"优化技巧和最佳实践","author":"王五","publish_date":"2026-04-08","tags":["elasticsearch","performance"],"views":150,"rating":4.6}
{"update":{"_index":"blog_posts","_id":"1"}}
{"doc":{"views":105}}
{"delete":{"_index":"blog_posts","_id":"temp_doc"}}

# 检查文档是否存在
HEAD /blog_posts/_doc/1

4.2 获取文档

# 获取单个文档
GET /blog_posts/_doc/1

# 获取多个文档
GET /blog_posts/_mget
{
  "ids": ["1", "2"]
}

# 获取文档的部分字段
GET /blog_posts/_doc/1?_source=title,author,publish_date

# 检查文档是否存在
HEAD /blog_posts/_doc/1

4.3 更新文档

# 完全替换文档
PUT /blog_posts/_doc/1
{
  "title": "更新后的标题",
  "content": "更新后的内容...",
  "author": "张三",
  "publish_date": "2026-04-10",
  "tags": ["elasticsearch", "search", "tutorial"],
  "views": 110,
  "rating": 4.5
}

# 部分更新
POST /blog_posts/_update/1
{
  "doc": {
    "views": 115,
    "last_updated": "2026-04-10T20:00:00"
  }
}

# 脚本更新
POST /blog_posts/_update/1
{
  "script": {
    "source": "ctx._source.views += params.increment",
    "params": {
      "increment": 5
    }
  }
}

# 条件更新
POST /blog_posts/_update/1
{
  "script": {
    "source": "if (ctx._source.views < params.threshold) { ctx._source.views += params.increment; } else { ctx.op = 'noop'; }",
    "params": {
      "increment": 1,
      "threshold": 1000
    }
  }
}

5. 搜索操作

5.1 简单搜索

# 查询所有文档
GET /blog_posts/_search
{
  "query": {
    "match_all": {}
  }
}

# 精确匹配
GET /blog_posts/_search
{
  "query": {
    "term": {
      "author.keyword": "张三"
    }
  }
}

# 全文搜索
GET /blog_posts/_search
{
  "query": {
    "match": {
      "content": "搜索引擎"
    }
  }
}

# 多字段搜索
GET /blog_posts/_search
{
  "query": {
    "multi_match": {
      "query": "elasticsearch",
      "fields": ["title", "content", "tags"]
    }
  }
}

# 布尔查询
GET /blog_posts/_search
{
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "content": "elasticsearch"
          }
        }
      ],
      "filter": [
        {
          "range": {
            "publish_date": {
              "gte": "2026-01-01"
            }
          }
        },
        {
          "term": {
            "author.keyword": "张三"
          }
        }
      ]
    }
  }
}

5.2 高级搜索

# 短语搜索
GET /blog_posts/_search
{
  "query": {
    "match_phrase": {
      "content": "搜索引擎 技术"
    }
  }
}

# 通配符搜索
GET /blog_posts/_search
{
  "query": {
    "wildcard": {
      "author.keyword": "张*"
    }
  }
}

# 正则表达式搜索
GET /blog_posts/_search
{
  "query": {
    "regexp": {
      "author.keyword": "zhang.*"
    }
  }
}

# 模糊搜索
GET /blog_posts/_search
{
  "query": {
    "fuzzy": {
      "title": {
        "value": "elasticsearh",
        "fuzziness": "AUTO"
      }
    }
  }
}

# 范围搜索
GET /blog_posts/_search
{
  "query": {
    "range": {
      "views": {
        "gte": 100,
        "lte": 1000
      }
    }
  }
}

# 嵌套查询
GET /blog_posts/_search
{
  "query": {
    "nested": {
      "path": "comments",
      "query": {
        "bool": {
          "must": [
            {
              "match": {
                "comments.author": "评论者"
              }
            },
            {
              "range": {
                "comments.date": {
                  "gte": "2026-01-01"
                }
              }
            }
          ]
        }
      }
    }
  }
}

5.3 搜索结果排序

# 按字段排序
GET /blog_posts/_search
{
  "query": {
    "match_all": {}
  },
  "sort": [
    {
      "publish_date": {
        "order": "desc"
      }
    },
    {
      "_score": {
        "order": "desc"
      }
    }
  ]
}

# 按相关性评分排序
GET /blog_posts/_search
{
  "query": {
    "function_score": {
      "query": {
        "match": {
          "content": "elasticsearch"
        }
      },
      "functions": [
        {
          "field_value_factor": {
            "field": "views",
            "factor": 1.2,
            "modifier": "sqrt",
            "missing": 1
          }
        }
      ],
      "boost_mode": "multiply"
    }
  },
  "sort": [
    {
      "_score": {
        "order": "desc"
      }
    }
  ]
}

6. Python与Elasticsearch集成

6.1 安装客户端

pip install elasticsearch
pip install elastic-transport  # 可选:更好的传输层支持

6.2 基本连接与操作

from elasticsearch import Elasticsearch
from datetime import datetime
import json

# 连接Elasticsearch
es = Elasticsearch(
    hosts=['localhost:9200'],
    basic_auth=('username', 'password'),  # 如果启用了安全功能
    verify_certs=False  # 生产环境中应为True
)

# 检查连接
if es.ping():
    print("Connected to Elasticsearch")
else:
    print("Could not connect to Elasticsearch")

# 创建索引
def create_blog_index():
    mapping = {
        "mappings": {
            "properties": {
                "title": {
                    "type": "text",
                    "analyzer": "standard",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                },
                "content": {
                    "type": "text",
                    "analyzer": "standard"
                },
                "author": {
                    "type": "keyword"
                },
                "publish_date": {
                    "type": "date",
                    "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
                },
                "tags": {
                    "type": "keyword"
                },
                "views": {
                    "type": "integer"
                },
                "rating": {
                    "type": "float"
                }
            }
        }
    }
    
    if not es.indices.exists(index="blog_posts"):
        es.indices.create(index="blog_posts", body=mapping)
        print("Index created successfully")

# 索引文档
def index_blog_post(post_id, post_data):
    response = es.index(
        index="blog_posts",
        id=post_id,
        document=post_data
    )
    return response

# 获取文档
def get_blog_post(post_id):
    try:
        response = es.get(index="blog_posts", id=post_id)
        return response['_source']
    except Exception as e:
        print(f"Error retrieving document: {e}")
        return None

# 搜索文档
def search_blog_posts(query_string, size=10, from_=0):
    query_body = {
        "query": {
            "multi_match": {
                "query": query_string,
                "fields": ["title", "content", "tags"]
            }
        },
        "size": size,
        "from": from_,
        "highlight": {
            "fields": {
                "title": {},
                "content": {}
            }
        }
    }
    
    response = es.search(index="blog_posts", body=query_body)
    return response

# 删除文档
def delete_blog_post(post_id):
    response = es.delete(index="blog_posts", id=post_id)
    return response

6.3 高级搜索功能

# 聚合搜索
def aggregate_by_author():
    aggregation_query = {
        "aggs": {
            "authors": {
                "terms": {
                    "field": "author.keyword",
                    "size": 10
                }
            },
            "avg_rating": {
                "avg": {
                    "field": "rating"
                }
            },
            "publish_date_range": {
                "date_histogram": {
                    "field": "publish_date",
                    "calendar_interval": "month"
                }
            }
        }
    }
    
    response = es.search(index="blog_posts", body=aggregation_query)
    return response

# 复杂布尔查询
def advanced_search(author=None, tags=None, date_from=None, date_to=None, min_views=0):
    must_conditions = []
    filter_conditions = []
    
    if author:
        must_conditions.append({
            "term": {
                "author.keyword": author
            }
        })
    
    if tags:
        must_conditions.append({
            "terms": {
                "tags.keyword": tags
            }
        })
    
    if date_from or date_to:
        date_range = {}
        if date_from:
            date_range["gte"] = date_from
        if date_to:
            date_range["lte"] = date_to
        filter_conditions.append({
            "range": {
                "publish_date": date_range
            }
        })
    
    if min_views > 0:
        filter_conditions.append({
            "range": {
                "views": {
                    "gte": min_views
                }
            }
        })
    
    query_body = {
        "query": {
            "bool": {
                "must": must_conditions,
                "filter": filter_conditions
            }
        }
    }
    
    response = es.search(index="blog_posts", body=query_body)
    return response

# 批量操作
def bulk_index_posts(posts):
    from elasticsearch.helpers import bulk
    
    actions = []
    for post in posts:
        action = {
            "_index": "blog_posts",
            "_source": post
        }
        actions.append(action)
    
    success_count, failed_docs = bulk(es, actions)
    return success_count, failed_docs

# 更新文档
def update_post_views(post_id, increment=1):
    script = {
        "script": {
            "source": "ctx._source.views += params.increment",
            "params": {
                "increment": increment
            }
        }
    }
    
    response = es.update(index="blog_posts", id=post_id, body=script)
    return response

6.4 实际应用场景

6.4.1 搜索引擎实现

class BlogSearchEngine:
    def __init__(self, es_client):
        self.es = es_client
        self.index_name = "blog_posts"
    
    def create_index(self):
        """创建索引"""
        mapping = {
            "mappings": {
                "properties": {
                    "title": {
                        "type": "text",
                        "analyzer": "standard",
                        "fields": {
                            "suggest": {
                                "type": "completion",
                                "analyzer": "simple"
                            }
                        }
                    },
                    "content": {
                        "type": "text",
                        "analyzer": "standard"
                    },
                    "author": {
                        "type": "keyword"
                    },
                    "publish_date": {
                        "type": "date",
                        "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
                    },
                    "tags": {
                        "type": "keyword"
                    },
                    "views": {
                        "type": "integer"
                    },
                    "rating": {
                        "type": "float"
                    }
                }
            }
        }
        
        if not self.es.indices.exists(index=self.index_name):
            self.es.indices.create(index=self.index_name, body=mapping)
    
    def index_document(self, doc_id, document):
        """索引文档"""
        return self.es.index(index=self.index_name, id=doc_id, document=document)
    
    def search(self, query, page=1, size=10, filters=None):
        """搜索文档"""
        search_body = {
            "query": {
                "bool": {
                    "should": [
                        {
                            "multi_match": {
                                "query": query,
                                "fields": ["title^3", "content", "tags^2"],
                                "type": "best_fields",
                                "fuzziness": "AUTO"
                            }
                        },
                        {
                            "match_phrase_prefix": {
                                "title": {
                                    "query": query,
                                    "boost": 2
                                }
                            }
                        }
                    ]
                }
            },
            "highlight": {
                "fields": {
                    "title": {
                        "fragment_size": 150,
                        "number_of_fragments": 1
                    },
                    "content": {
                        "fragment_size": 150,
                        "number_of_fragments": 3
                    }
                }
            },
            "sort": [
                {"_score": {"order": "desc"}},
                {"publish_date": {"order": "desc"}}
            ],
            "from": (page - 1) * size,
            "size": size
        }
        
        # 添加过滤条件
        if filters:
            if 'author' in filters:
                search_body['query']['bool'].setdefault('filter', []).append({
                    'term': {'author.keyword': filters['author']}
                })
            if 'tags' in filters:
                search_body['query']['bool'].setdefault('filter', []).append({
                    'terms': {'tags.keyword': filters['tags']}
                })
            if 'date_range' in filters:
                search_body['query']['bool'].setdefault('filter', []).append({
                    'range': {'publish_date': filters['date_range']}
                })
        
        response = self.es.search(index=self.index_name, body=search_body)
        return response
    
    def suggest(self, prefix):
        """搜索建议"""
        suggest_body = {
            "suggest": {
                "title_suggest": {
                    "prefix": prefix,
                    "completion": {
                        "field": "title.suggest",
                        "size": 10
                    }
                }
            }
        }
        
        response = self.es.search(index=self.index_name, body=suggest_body)
        suggestions = []
        for suggestion in response['suggest']['title_suggest'][0]['options']:
            suggestions.append(suggestion['text'])
        
        return suggestions

# 使用示例
search_engine = BlogSearchEngine(es)
search_engine.create_index()

# 索引示例数据
sample_posts = [
    {
        "title": "Elasticsearch入门指南",
        "content": "Elasticsearch是一个强大的开源搜索引擎,基于Lucene构建...",
        "author": "张三",
        "publish_date": "2026-04-10",
        "tags": ["elasticsearch", "search", "tutorial"],
        "views": 100,
        "rating": 4.5
    },
    {
        "title": "Python与Elasticsearch集成",
        "content": "如何在Python项目中集成Elasticsearch进行全文搜索...",
        "author": "李四",
        "publish_date": "2026-04-09",
        "tags": ["python", "elasticsearch", "integration"],
        "views": 200,
        "rating": 4.8
    }
]

for i, post in enumerate(sample_posts):
    search_engine.index_document(i+1, post)

# 执行搜索
results = search_engine.search("elasticsearch", page=1, size=5)
print(json.dumps(results, indent=2, default=str))

6.4.2 日志分析系统

class LogAnalyzer:
    def __init__(self, es_client):
        self.es = es_client
        self.index_name = "application_logs"
    
    def create_log_index(self):
        """创建日志索引"""
        mapping = {
            "mappings": {
                "properties": {
                    "timestamp": {
                        "type": "date"
                    },
                    "level": {
                        "type": "keyword"
                    },
                    "service": {
                        "type": "keyword"
                    },
                    "message": {
                        "type": "text"
                    },
                    "trace_id": {
                        "type": "keyword"
                    },
                    "user_id": {
                        "type": "keyword"
                    },
                    "ip_address": {
                        "type": "ip"
                    },
                    "response_time": {
                        "type": "float"
                    }
                }
            }
        }
        
        if not self.es.indices.exists(index=self.index_name):
            self.es.indices.create(index=self.index_name, body=mapping)
    
    def index_log(self, log_entry):
        """索引日志条目"""
        log_entry['timestamp'] = datetime.now().isoformat()
        return self.es.index(index=self.index_name, document=log_entry)
    
    def get_error_logs(self, hours_back=24):
        """获取错误日志"""
        query = {
            "query": {
                "bool": {
                    "should": [
                        {"term": {"level.keyword": "ERROR"}},
                        {"term": {"level.keyword": "FATAL"}}
                    ],
                    "filter": [
                        {
                            "range": {
                                "timestamp": {
                                    "gte": f"now-{hours_back}h"
                                }
                            }
                        }
                    ]
                }
            },
            "sort": [{"timestamp": {"order": "desc"}}],
            "size": 100
        }
        
        return self.es.search(index=self.index_name, body=query)
    
    def analyze_response_times(self, service_name, hours_back=24):
        """分析响应时间"""
        query = {
            "query": {
                "bool": {
                    "must": [
                        {"term": {"service.keyword": service_name}},
                        {"exists": {"field": "response_time"}}
                    ],
                    "filter": [
                        {
                            "range": {
                                "timestamp": {
                                    "gte": f"now-{hours_back}h"
                                }
                            }
                        }
                    ]
                }
            },
            "aggs": {
                "avg_response_time": {
                    "avg": {
                        "field": "response_time"
                    }
                },
                "percentiles_response_time": {
                    "percentiles": {
                        "field": "response_time",
                        "percents": [50, 95, 99]
                    }
                },
                "response_time_over_time": {
                    "date_histogram": {
                        "field": "timestamp",
                        "calendar_interval": "minute"
                    }
                }
            }
        }
        
        return self.es.search(index=self.index_name, body=query)

# 使用示例
log_analyzer = LogAnalyzer(es)
log_analyzer.create_log_index()

# 模拟记录日志
sample_logs = [
    {
        "level": "INFO",
        "service": "user-service",
        "message": "User login successful",
        "user_id": "12345",
        "ip_address": "192.168.1.100",
        "response_time": 120.5
    },
    {
        "level": "ERROR",
        "service": "payment-service",
        "message": "Payment processing failed",
        "trace_id": "abc123",
        "response_time": 2500.0
    }
]

for log in sample_logs:
    log_analyzer.index_log(log)

7. 聚合分析

7.1 基础聚合

# 词项聚合
GET /blog_posts/_search
{
  "size": 0,
  "aggs": {
    "authors": {
      "terms": {
        "field": "author.keyword",
        "size": 10
      }
    }
  }
}

# 指标聚合
GET /blog_posts/_search
{
  "size": 0,
  "aggs": {
    "avg_rating": {
      "avg": {
        "field": "rating"
      }
    },
    "total_views": {
      "sum": {
        "field": "views"
      }
    },
    "max_views": {
      "max": {
        "field": "views"
      }
    },
    "min_views": {
      "min": {
        "field": "views"
      }
    }
  }
}

# 日期直方图聚合
GET /blog_posts/_search
{
  "size": 0,
  "aggs": {
    "posts_over_time": {
      "date_histogram": {
        "field": "publish_date",
        "calendar_interval": "day"
      }
    }
  }
}

# 范围聚合
GET /blog_posts/_search
{
  "size": 0,
  "aggs": {
    "view_ranges": {
      "range": {
        "field": "views",
        "ranges": [
          {"to": 100},
          {"from": 100, "to": 500},
          {"from": 500, "to": 1000},
          {"from": 1000}
        ]
      }
    }
  }
}

7.2 高级聚合

# 嵌套聚合
GET /blog_posts/_search
{
  "size": 0,
  "aggs": {
    "authors": {
      "terms": {
        "field": "author.keyword",
        "size": 10
      },
      "aggs": {
        "avg_rating": {
          "avg": {
            "field": "rating"
          }
        },
        "total_views": {
          "sum": {
            "field": "views"
          }
        },
        "post_count": {
          "value_count": {
            "field": "_id"
          }
        }
      }
    }
  }
}

# 管道聚合
GET /blog_posts/_search
{
  "size": 0,
  "aggs": {
    "monthly_posts": {
      "date_histogram": {
        "field": "publish_date",
        "calendar_interval": "month"
      },
      "aggs": {
        "avg_views": {
          "avg": {
            "field": "views"
          }
        }
      }
    },
    "avg_monthly_views": {
      "avg_bucket": {
        "buckets_path": "monthly_posts>avg_views"
      }
    }
  }
}

# 百分位数聚合
GET /blog_posts/_search
{
  "size": 0,
  "aggs": {
    "view_percentiles": {
      "percentiles": {
        "field": "views",
        "percents": [25, 50, 75, 90, 95, 99]
      }
    }
  }
}

8. 性能优化

8.1 索引优化

# 优化索引设置(用于批量索引)
PUT /optimized_index
{
  "settings": {
    "number_of_replicas": 0,
    "refresh_interval": "-1",
    "index.translog.durability": "async",
    "index.merge.policy.max_merge_at_once": 30,
    "index.merge.policy.segments_per_tier": 30
  }
}

# 索引完成后恢复正常设置
PUT /optimized_index/_settings
{
  "number_of_replicas": 1,
  "refresh_interval": "30s"
}

8.2 查询优化

# 优化查询结构
def optimized_search(query_string, filters=None):
    query_body = {
        "query": {
            "bool": {
                "filter": [],  # 使用filter上下文提高性能
                "must": []
            }
        },
        "_source": ["title", "author", "publish_date"],  # 只返回需要的字段
        "size": 20,
        "track_total_hits": 10000  # 限制总数统计
    }
    
    # 将精确匹配放入filter
    if filters:
        if 'author' in filters:
            query_body['query']['bool']['filter'].append({
                'term': {'author.keyword': filters['author']}
            })
        if 'min_views' in filters:
            query_body['query']['bool']['filter'].append({
                'range': {'views': {'gte': filters['min_views']}}
            })
    
    # 将全文搜索放入must
    if query_string:
        query_body['query']['bool']['must'].append({
            'multi_match': {
                'query': query_string,
                'fields': ['title^3', 'content^1'],
                'type': 'best_fields'
            }
        })
    
    return es.search(index="blog_posts", body=query_body)

8.3 内存和缓存优化

# jvm.options - JVM调优
-Xms2g
-Xmx2g

# elasticsearch.yml - 缓存配置
indices.fielddata.cache.size: 40%
indices.query.cache.size: 10%
indices.requests.cache.size: 2%

9. 集群与高可用

9.1 集群配置

# 集群节点1 (node-1)
cluster.name: my-cluster
node.name: node-1
network.host: 0.0.0.0
http.port: 9200
transport.port: 9300
node.roles: ["master", "data", "ingest"]

# 集群发现
cluster.initial_master_nodes: ["node-1", "node-2", "node-3"]
discovery.seed_hosts: ["node1:9300", "node2:9300", "node3:9300"]

# 集群节点2 (node-2)
cluster.name: my-cluster
node.name: node-2
network.host: 0.0.0.0
http.port: 9201
transport.port: 9301
node.roles: ["data", "ingest"]

# 集群节点3 (node-3)
cluster.name: my-cluster
node.name: node-3
network.host: 0.0.0.0
http.port: 9202
transport.port: 9302
node.roles: ["data", "ingest"]

9.2 Python客户端集群配置

from elasticsearch import Elasticsearch

# 配置集群连接
es_cluster = Elasticsearch(
    [
        {'host': 'node1', 'port': 9200},
        {'host': 'node2', 'port': 9200},
        {'host': 'node3', 'port': 9200}
    ],
    sniff_on_start=True,
    sniff_on_connection_fail=True,
    sniffer_timeout=60,
    retry_on_timeout=True,
    max_retries=10,
    request_timeout=30
)

10. 监控与运维

10.1 集群监控

# 集群健康状态
GET /_cluster/health

# 节点统计信息
GET /_nodes/stats

# 索引统计信息
GET /blog_posts/_stats

# 段信息
GET /blog_posts/_segments

# 任务列表
GET /_tasks

# 慢查询日志
GET /_cluster/settings
{
  "transient": {
    "logger.org.elasticsearch.index.search.slowlog": "DEBUG"
  }
}

10.2 性能调优

# 查看热线程
GET /_nodes/hot_threads

# 查看索引统计
GET /_stats

# 查看索引分段
GET /blog_posts/_segments

# 强制合并索引(谨慎使用)
POST /blog_posts/_forcemerge?max_num_segments=1

11. 最佳实践

11.1 索引设计最佳实践

"""
索引设计最佳实践:
1. 合理设置分片数量:避免过多或过少的分片
2. 为写入优化的设置:批量索引时临时调整设置
3. 选择合适的映射类型:根据使用场景选择text/keyword
4. 使用别名:便于索引滚动和维护
5. 数据生命周期管理:定期清理过期数据
"""

# 索引别名使用
def create_index_with_alias(alias_name, index_suffix=None):
    import time
    
    if not index_suffix:
        index_suffix = int(time.time())
    
    index_name = f"{alias_name}_{index_suffix}"
    
    # 创建新索引
    mapping = {
        "mappings": {
            "properties": {
                "title": {"type": "text"},
                "content": {"type": "text"},
                "timestamp": {"type": "date"}
            }
        }
    }
    
    es.indices.create(index=index_name, body=mapping)
    
    # 添加别名
    es.indices.put_alias(index=index_name, name=alias_name)
    
    return index_name

# 使用索引别名
def index_with_alias(alias_name, document):
    return es.index(index=alias_name, document=document)

11.2 安全配置

# 安全配置
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.http.ssl.enabled: true

# 用户角色配置
PUT /_security/user/my_user
{
  "password" : "my_password",
  "roles" : [ "my_role" ],
  "full_name" : "My User"
}

# 角色权限
PUT /_security/role/my_role
{
  "cluster": ["monitor"],
  "indices": [
    {
      "names": ["blog_posts"],
      "privileges": ["read", "write"]
    }
  ]
}

11.3 应用场景总结

场景说明推荐配置
全文搜索博客、文档、产品搜索text类型,合适的分析器
日志分析应用日志、系统监控keyword类型,时间序列优化
实时分析业务指标、用户行为适当聚合,索引优化
电商搜索商品搜索、筛选多字段匹配,建议器
地理搜索位置服务、LBSgeo_point类型

总结

Elasticsearch是一个功能强大、可扩展的搜索引擎,适用于各种搜索和分析场景。通过合理的设计索引结构、优化查询性能、配置集群,可以构建高性能的搜索和分析系统。掌握Elasticsearch的核心概念和最佳实践,能够帮助开发者构建出色的搜索体验和数据分析平台。