elasticsearch 安装/配置/开发/搜索全记录

因为公司业务的需要,要进行复杂的查询,同时还要带上地理位置的查询,而项目本身使用的是 MySQL 作为存储数据库,
复杂的查询条件和地理位置查询让 MySQL 显得力不从心,所以我转而去寻找其它搜索工具。

以前的项目中,使用过 sphinx 的汉化版 coreseek 来做查询,coreseek 满足了我当时的查询需求,但是 coreseek 有两个缺点,
一个是查询条件的组合不好用,更多的返回是id,而不是全文内容,并且地理位置查询似乎不支持;另外就是 coreseek 已经停止开发很久了。
经过一些搜索和考察,我决定使用 elasticsearch 作为搜索工具。使用 elasticsearch 的过程,也是一个学习的过程,下面就把我的使用学习
过程和心得记录下来。

本文主要包含以下四个部分:

  1. 安装
  2. 配置
  3. 数据同步
  4. 搜索

1. 安装

在 elasticsearch 的官网上,提供了两类主要 linux 发行版的安装方法,根据官网的指示按步骤操作即可,主要要注意的是因为 elasticsearch 是
java 开发的,所以需要安装 jre 包。

elasticsearch 安装教程

安装好后,可以根据需要,安装一个可视化的监控工具 marvel,它里面带的调试器非常方便调试、查看、搜索我们导入的数据以及数据的属性等信息。

1
2
3
4
# marvel 安装方法
# 需要注意的是,要切换到 elasticsearch 的安装主目录,一般是 /usr/share/elasticsearch
./bin/plugin -i elasticsearch/marvel/latest

安装好后,可以通过这里来直观的对服务器的运行状态进行查看,也可以使用它的 sense 功能,直观的编写 dsl 命令查询数据,访问地址是:

http://host_ip:9200/_plugin/marvel/sense/index.html

因为 elasticsearch 是用 java 开发的,所以,要求安装 java 运行环境,在 ubuntu 中,可以参考: How To Install Java on Ubuntu with Apt-Get

简单来说,就是 apt-get install openjdk-7-jre

默认情况下,elasticsearch 对外提供 restful api 服务的端口是 9200,所以,可以直接使用 http://host:9200/_search 进行查看。

至此,基本上 elasticsearch 就安装完毕了,安装的过程应该是很就顺畅的。

2. 配置

本部分主要参考 《elasticsearch 权威指南》 的相关信息,更多信息可以前往围观。 elasticsearch 权威指南

3.数据导入与同步更新

数据同步是一个大难题。网络上搜索出来的信息,更多的是使用 elasticsearch-river-jdbc 这个 river 插件,但是随着 elasticsearch 官方将 river 接口给废弃掉,
elasticsearch-river-jdbc 这个插件也被作者更名为 elasticsearch-jdbc,但是不管是 1.7.1 还是 1.6.2 之类的版本,我都没法运行起来这个插件,让他从 mysql 同步
数据到 elasticsearch 中,总是报错。网络上的相关信息太少了,并且也较陈旧,很多示例还是 1.0 rc 这这样的,而现在已经 1.7.1 了,所以经过一段时间的折腾后,我最终放弃了
这个插件,改为自己使用 python 开发程序来进行数据同步。

我的方法比较粗暴、野蛮、傻,基本不可复用,但是可以解决问题。一个孕妇十个月生个娃儿,那么十个人一个月生个娃;这不是最悲剧的,最悲剧的是,一个人的要一个月生个娃
只有一周的时间,要对 elasticsearch 从一点不了解到能上生产使用,真是程序员的悲哀。活着有罪!

因为需要自己解决数据同步的问题,就要解决三个问题:

  1. 已有数据导入 elasticsearch
  2. 新数据能进入 elasticsearch
  3. 更新的数据,能同步在 elasticsearch 中进行更新

so,对于第一个问题,已有数据。我的方法就是,写一些初始化脚本,将 MySQL 的数据重新进行整理、组合后,使用 elasticsearch 提供个接口导入进入。这里在导入数据
到 elasticsearch 中时,根据 elasticsearch 的设计和使用思维,将某一类数据存储到一个 type 下,这样,我在导入数据时,就需要将从 MySQL 读取的数据重新进行组合
分配,然后再带入,这样的带来的好处是,搜索的便捷和返回的搜索结果的方便性。

elasticsearch 与关系型数据库的结构对应关系

使用 elasticsearch 时,第一个要明白的问题就是,数据和结构的映射关系。很多文章会有一个 http://localhost:9200/xxx/_search?pretty=true 这样的示例,特别是在
使用 elasticsearch-river-jdbc 的介绍文章中,这样的示例会让人感觉神秘又无所适从。似乎很强大,但是又不知道从哪里下手。所以,第一件要明白的事情就是对应关系
和路径的意义。

这里借用权威指南的中的介绍

1
2
Relational DB -> Databases -> Tables -> Rows -> Columns
Elasticsearch -> Indices -> Types -> Documents -> Fields

从上面我们可以看出来,MySQL 的数据库名对应着 elasticsearch 的 index,表名对应着 type,这里是最重要的两点。
elasticsearch 提供的 restful api 中,在路径中,依次对应着 index 和 type,比如假设我们有一个数据库,数据库名是 website,里面有个表是 user,
那么它提供的查询 url 就是 http://localhost:9200/website/user,这里,路径的层次,首先是 index,也就是数据库名,其次是 type,也就是表名,
可以做这样的划分,但是在实际操作中,可能有一些区别,这里主要指的是数据的区别,比如说这里 type 对应着表名,但是在从 MySQL 导入数据到 elasticsearch 中时,
可以将几个关联表的数据导入到这个 type 名下,这里就出现了不一致的地方,因为这里的 type 是一个名字,但是它对应的数据却是从几个表中来的。

明白了上面的对应关系后,就可以进行后续的操作了。

地理位置的配置

在 elasticsearch 中,所有的数据都有一个类型,什么样的类型,就可以在其上做一些对应类型的特殊操作,在 elasticsearch 中把这样的属性关系叫做 mappings,一般使用
http://host:9200/index/_mapping 即可查看这个 index 下面的所有 type 的 mappings 信息,比如下面这样的就是一个这样的映射关系的片段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
{
"younixue": {
"mappings": {
"orgs": {
"properties": {
"address": {
"type": "string"
},
"cats": {
"properties": {
"cat_id": {
"type": "long"
},
"cat_name": {
"type": "string"
},
"id": {
"type": "long"
},
"org_id": {
"type": "long"
},
"version": {
"type": "long"
}
}
},
"city": {
"type": "string"
},
"comments_num": {
"type": "long"
},
"course_rate": {
"type": "double"
},
"created_at": {
"type": "date",
"format": "dateOptionalTime"
},
"fav_num": {
"type": "long"
},
"location": {
"type": "geo_point"
},
"org_desc": {
"type": "string"
},
```
说这个的目的是,如果我们要对我们的数据库的某些字段进行某些操作,那么就要考虑 elasticsearch 在进行数据导入映射时,是否将这个数据处理成了对应的数据类型,同时还有一点就是,
对于某些字段数据,是否要对其进行分析,如何分析,可以添加更多的 mappings 参数。
mappings 参数既可以在数据导入时,elasticsearch 自动分析生成,也可以根据业务需要手动指定,在很多特殊使用场景下,都需要进行手动指定 mappings 信息。比如说地理位置
相关的数据。只有先设定好了特殊的 mappings 信息 ,后续的数据导入才会有期望的类型。
在 elasticsearch 中,对于经纬度来说,要想使用 elasticsearch 提供的地理位置查询相关的功能,就需要构造一个结构,并且将其类型属性设置为
`geo_point`,这个结构的一般是:
```python
# geo 相关功能需要的数据结构
"location": {
"lat":120.123,
"lon": 32.456
}

那么对应的 mappings 设置就是:

1
2
3
4
5
6
7
8
9
10
11
12
# "type" 指的是 type,即对应数据库中的表名
{
"mappings": {
"type": {
"properties": {
"location": {
"type": "geo_point"
}
}
}
}
}

通过 elasticsearch 提供的相关创建接口,即可创建对应的数据类型,这里需要注意的一点是,要先创建数据类型,后再导入数据,如果对已有的数据类型进行修改,
将是一个很复杂的过程,对于使用者来说,若无必要,还是先创建特殊的字段类型,再导入数据。不要瞎折腾。下面的代码是我写的一个创建地理位置相关的类型的代码,
这里使用的是 python 语言。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 忽略掉已存在等错误 ,这里setings.es_host 和 settings.es_port 指的是运行 elasticsearch 的地址和端口
def create_location_mappings(index='younixue', doc_type="orgs", ignore=400):
es = Elasticsearch([{"host": settings.es_host, "port": settings.es_port}])
cl = client.IndicesClient(es)
body = {"mappings": {doc_type: {"properties": {"location": {"type": "geo_point"}}}}}
cl.create(index=index, body=body, ignore=400)
```
关于 python 操作 elasticsearch 的库,可以使用 elasticsearch-py,它的文档有很详细的说明,[elasticsearch-py docs](https://elasticsearch-py.readthedocs.org/en/master/)
**数据的初始化导入**
前面说过,`elasticsearch-river-jdbc` 即 `elasticsearch-jdbc` 在我这里不好使,搞了一两天都没有办法运行起来,为了不太耽误工程进度,决定采取取巧的方式。在数据的初始化导入
这里,我自己写了个程序进行导入。具体的实现方式是,根据业务需求,按照搜索需要,将对应的一些表的数据构造在一个 type 里面,然后使用 sql 语句查询出来要导入 elasticsearch 的数据,
拼接后导入 elasticsearch 中,elasticsearch 提供的 index 接口,对于不存在的数据,创建索引,对于存在的数据,进行更新,并对 version 字段记录值加 1。下面是我的导入数据的一个示例代码,
为了保证初始化导入的效果,在导入之前,先运行了删除方法,将已有的 index 和 type 删除掉。
*数据初始化时,删除旧的数据*
```python
# 删除旧的 index
def delete_index():
es = Elasticsearch([{"host": settings.es_host, "port": settings.es_port}])
es.indices.delete(index='younixue', ignore=[400, 404])

数据初始化时,创建需要特殊指定的 mappings

1
2
3
4
5
6
7
8
9
10
11
def create_location_mappings(index='younixue', doc_type="orgs", ignore=400):
es = Elasticsearch([{"host": settings.es_host, "port": settings.es_port}])
cl = client.IndicesClient(es)
body = {"mappings": {doc_type: {"properties": {"location": {"type": "geo_point"}}}}}
cl.create(index=index, body=body, ignore=ignore)
def put_location_mappings(index='younixue', doc_type='courses'):
es = Elasticsearch([{"host": settings.es_host, "port": settings.es_port}])
cl = client.IndicesClient(es)
body = {doc_type: {"properties": {"location": {"type": "geo_point"}}}}
cl.put_mapping(doc_type=doc_type, body=body, index=index)

这里使用了两个方法, createput_mapping。原因是,当创建一个 index 后,如果要对新的 type 指定某些特殊的 mapping 设置,这个时候,
需要使用的就是更新,而不是创建了。根据我的业务需求,有两个地方需要指定对应的数据映射为 geo 类型,所以,在第第一个 type 初始化时,采用的是
create,而第二个就需要 put_mapping 了,这些都可以从官方文档中看到哈。

数据初始化时,执行数据导入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# 这里以导入 type 名为 orgs 为例
def init_orgs_index(index='younixue', doc_type='orgs'):
create_location_mappings(index, doc_type)
es = Elasticsearch([{"host": settings.es_host, "port": settings.es_port}])
tdb = torndb.Connection(host=settings.DBHOST, database=settings.DATABASE, user=settings.DBUSER, password=settings.DBPWD)
sql = "select * from orgs where status='A'"
ret = tdb.query(sql)
if not ret:
return
# 数据需要拼接、将 longitude,latitude 写为 lon,lat, 并且存放在 location 字段中 {"location": {"lat":xx, "lon": yy}}
# 同时增加 org_teachers,org_students 信息
for org in ret:
lon = org.pop('longitude')
lat = org.pop('latitude')
location = {"lon": lon, "lat": lat}
org['location'] = location
teachers = tdb.query("select * from org_teachers where org_id=%s and is_valid='Y'", org.org_id)
students = tdb.query("select * from org_students where org_id=%s and is_valid='Y'", org.org_id)
cats = tdb.query("select * from org_official_cat where org_id=%s", org.org_id)
org['org_teachers'] = teachers
org['org_students'] = students
org['cats'] = cats
# 给数据创建 elasticsearch 索引
es.index(index=index, doc_type=doc_type, id=org.org_id, body=org)
```
这里就是从 MySQL 数据库表中导入几个表的数据,拼接成新的 type 数据,然后执行 index 方法,即可导入。
`create_location_mappings(index, doc_type)` 这里使用这个方法的原因是,这里是整个 index 的创建,所以使用了 create 方法。
如果还有其他的表需要同步数据到其它的 type 名中,执行和上述差不多的代码逻辑即可完成这个过程。
**数据的同步更新**
如果上面导入数据的方法,数据的同步更新实现也是很 ugly 的。
采用的方法是当有数据更新时,就将对应的数据对应的 type 的那条记录整个 update 一下,这样的实现,就会要求程序开发实现两个功能:
1. 数据更新方法;传递一个能唯一标识一条数据的字段,对这条数据进行更新;
1. 在有数据 insert/update/delete 的地方,要调用上面实现的方法;
在具体的实现上,可以采用直接在数据变动的地方直接调用数据更新方法;也可以通过队列的方法,将能标识变动数据的信息抛给队列,比如 celery,
celery 再去调用执行相应的数据更新方法。
在我这里,考虑到数据变动的频繁性和程序开发的耗时,采用了在数据变更的地方同步调用数据更新的方法,下面是数据更新的方法示例:
*数据更新示例,分为数据更新和数据删除*
```python
# self.geo 即 Elasticsearch 实例
def update_org_es_data(self, org_id):
sql = "select * from orgs where status='A' and org_id=%s"
org = self.db.get(sql, org_id)
if not org:
return
lon = org.pop('longitude')
lat = org.pop('latitude')
location = {"lon": lon, "lat": lat}
org['location'] = location
teachers = self.db.query("select * from org_teachers where org_id=%s and is_valid='Y'", org.org_id)
students = self.db.query("select * from org_students where org_id=%s and is_valid='Y'", org.org_id)
cats = self.db.query("select * from org_official_cat where org_id=%s", org.org_id)
org['org_teachers'] = teachers
org['org_students'] = students
org['cats'] = cats
# 给数据创建/更新 elasticsearch 索引
self.geo.index(index='younixue', doc_type='orgs', id=org_id, body=org)
self.reqlog.info('ES UPDATED orgs [{org_id}]'.format(org_id=org_id))
def delete_org_es_data(self, org_id):
self.geo.delete(index='younixue', doc_type='orgs', id=org_id)
self.reqlog.info('ES DELETED orgs [{org_id}]'.format(org_id=org_id))

至此,整个数据的导入和更新都解决了,虽然很 ugly,但是还是能很好的工作。

不够优雅?想什么呢,要什么自行车呢!

4. 搜索

使用 elasticsearch 的目的是为了使用它提供的强大的搜索功能。上面我们解决了输入问题,下面我们就关注输出的问题。
elasticsearch 有自己的一套搜索语法,通过提供的 restful api,在查询体里面包装上查询语句,可以做到复杂的查询。

最简单的查询就是 GET /index_name/type_name/_search?pretty=true,为了和 MySQL 对应起来,并且默认 MySQL 的数据库和表名
在这里映射后不会更改,那么前面的语句的意思就是:从数据库名为 index_name,表名为 type_name 的数据中查询数据,并优雅的输出。

如果是精确查询就是 GET /index_name/type_name/id_number 这个就是指定了对应的 id 来标识一条数据,这样的效果就是精确定位一条数据;

更多的查询语法可以参考:Elasticsearch权威指南(中文版)

需要注意的是,搜索包含三部分:

  1. 查询内容;
  2. 对查询的内容进行筛选;
  3. 对最终结果进行排序;

通常而言,一个查询会包含上述三部分内容,输入的参数是从客户端获取的,获取到了查询参数后,根据查询参数,动态拼接构造查询语句。在 elasticsearch 中,
有一些默认的查询规则,比如说 filter 要包含在 query 中,这些具体的细节,可以查阅文档,这里不再展开了。

通常而言,一个默认空白的查询框架如下:

1
2
3
4
5
6
query = {"match_all": {}}
default_body = {
"query": query,
"from": page,
"size": size
}

fromsize 指定了分页的起始点和一页的大小;

上面的 default_body 就是一个查询所有内容的查询体。下面我们将根据参数的来进行补充,这里以如下需求为例:

在北京,以经度100,纬度40,方圆2000km内,查找课程名包含“为明”,分类是“早教”,适合年龄为 “7-12”的课程,并以评论数、收藏数的倒排序为排序标准,并以最大相关性为排序标准,
返回第一页的最多20条数据

那么我们接收到的参数有:

city_id=102, longitude=102.3212, latitude=24.1243, radius=30km, keyword=英语, cat_id=402, ages_id=7, comments_num=desc, fav_num=desc,
from=0, size=20

对于这些参数,我们首先要进行的是分类,按照上面说的三种类别查询、筛选、排序来对最终的查询语句体进行构造。
通过分析,我们这里可以看出来查询的是 keyword;筛选的是 city_id, longitude, latitude, radius, cat_id, ages_id;排序的是 comments_num, fav_num
下一步就是构造查询语句了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
match = []
if city:
match.append({"match: {"city_id":city_id}})
if keyword:
match.append({"match": {"shop_name":keyword}})
if cat_name:
match.append({"match": {"cats.cat_id": cat_id}})
if match:
query = {
"bool": {
"must": match
}
}
sort = [{'_score': {'order': 'desc'}}]
if longitude:
# 组合经纬度和按经纬度远近排序
geo_filter = {
"geo_distance": {
"distance": radius,
"location": {
"lat": latitude,
"lon": longitude,
}
}
}
geo_sort = {
"_geo_distance": {
"location": {
"lat": latitude,
"lon": longitude
},
"order": "asc",
"unit": "km"
}
}
sort.append(geo_sort)
if fav_num and fav_num in ('asc', 'desc'):
fav_sort = {"fav_num": {"order": fav_num}}
sort.append(fav_sort)
if comments_num and comments_num in ('asc', 'desc'):
comments_sort = {"comments_num": {"order": comments_num}}
sort.append(comments_sort)
#if views_num and views_num in ('asc', 'desc'):
#views_sort = {"views_num": {"order": views_num}}
#sort.append(views_sort)
# 开始拼接查询结构体
if sort:
default_body['sort'] = sort
if match and not longitude: # 仅有关键词/城市等数据,无经纬度
default_body['query'] = query
elif match and longitude: # 既有关键词,又有经纬度等信息
filtered = {"query": query, "filter": geo_filter}
default_body['query'] = {'filtered': filtered}
elif longitude and not match: # 仅有经纬度,无城市或关键词等信息
filtered = {"query": query, "filter": geo_filter}
default_body['query'] = {'filtered': filtered}
body = default_body
ret = self.geo.search(index='younixue', doc_type='orgs', body=body)

上面就是根据条件拼接出来的查询语句,那么最终是个什么结果呢?如果参数较全的话,就是下面这样的:

这里的示例和上面的描述不一定完全对得起来,但是作为例子,也可以说明根本性的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
{
"query": {
"filtered": {
"query": {
"bool": {
"must": [
{
"multi_match": {
"query": "潜能",
"type": "phrase",
"fields": ["cat_name", "cat_name_1"]
}
},
{"match_phrase": {"org.city": "北京"}}
]
}
},
"filter": {
"geo_distance": {
"distance": "120000km",
"location": {
"lat": 40,
"lon": 100
}
}
}
}
},
"sort": [
{
"_geo_distance": {
"location": {
"lat": 40,
"lon": 100
},
"order": "asc",
"unit": "km"
}
},
{"fav_num": {"order": "desc"}},
{"comments_num": {"order": "desc"}}
]
}

通过上面的方法,就构造了一个查询了。最终的效果,还要根据业务需求,进行细致的调整和测试。

结语

elasticsearch 很强大,在复杂的查询条件下,能够用较清晰的语法描述清楚,我这里也是赶鸭子上架,看了文档就开始干,刚开始纠结于如何导入数据
到 elasticsearch 中,后面实在是无解了,就还是先把功能做出来再说吧。

我这里用到的主要功能是地理位置的查询,其它的更多功能,暂时还未用上,这里记录下折腾得过程,如果能够帮到你,我就太高兴了。

时间会让文章变得失效或者错误,所以读者要有自己的判断,比如网上很多着他那个 river jdbc 那个,就是失效的。