微医搜索系统实时索引架构设计实战

内容纲要

前言

随着互联网数据规模的爆炸式增长, 如何从海量的历史, 实时数据中快速获取有用的信息, 变得越来越有挑战性。 一个中等的互联网公司, 每天都要产生百万条原始数据, 上亿条用户行为数据。这些数据一般存储在以下3种数据系统中:

  • 关系型数据库, 大多数互联网公司会选用mysql作为关数据库的主选, 用于存储商品, 用户信息等数据. 关系型数据库对于事务性非常高的OLTP操作(比如订单, 结算等)支持良好。
  • hadoop生态, hadoop是数据仓库主要的载体, 除了备份关系型数据库的所有版本, 还存储用户行为, 点击, 曝光, 互动等海量日志数据, hadoop对于数据分析, 数据挖掘等OLAP支持比关系型数据库更加具有扩展性和稳定性。
  • 搜索引擎, 以elasticsearch和solr为代表. 搜索引擎是获取信息最高效的途径, 几乎成为各类网站, 应用的基础标配设施(地位仅次于数据库)。

在这里,我们仅探讨一下搜索引擎中的索引数据构建的场景,但溯本归源,其他场景也可借鉴。

索引构建

在业务初期,数据量较少,简单的全量索引构建可能就已经满足需求了;随着业务的发展,数据的增长越来越快,全量构建一次需要的时间也越来越长,增量+全量也可以满足需求了;

业务进一步膨胀,对数据的实时性提出要求,如何提升索引数据的实时性,成为了新的挑战,这时候就需要构建另外一套索引流程,虽然我们能够加快增量的执行频率,但这可能依然不够快,也会加大对业务库的压力,扩展空间有限。
基于数据在写入数据库的时间线,总结了以下方式:

因为增量和全量的实现方式比较简单,本文就不做赘述,主要介绍一下实时索引的实现方式。

基于AOP的实时索引

AOP相信使用JAVA的同学不会陌生,以前面试的时候经常被问的一个问题就是:举例一个AOP的使用场景!经典答案应该是:日志记录!在这里,我们也可以通过AOP实现搜索引擎的实时索引构建,大致流程如下:

应用端利用AOP在数据成功写入数据库后发送消息到kafka,搜索引擎端消费kafka消息构建索引文件。
个人认为小型公司比较适用,最大的优点是实现非常简单,最大的缺点是侵入性太高,维护复杂。

基于BINLOG的实时索引

基于BINLOG的实时索引构建是目前业界比较主流且成熟的一种方案。主要有以下三个步骤:

  1. BINLOG日志的拉取和解析
  2. 解析数据的存储通道
  3. 获取数据并构建索引

系统建设架构图如下:

binlog日志的拉取和解析

我们首先来聊聊binlog日志这个东西。
binlog是一个二进制格式的文件,用于记录用户对数据库更新的SQL语句信息,例如更改数据库表和更改内容的SQL语句都会记录到binlog里,但是对库表等内容的查询不会记录。简单的理解,就是mysql在执行操作前,会先把DML(排除SELECT)和DDL语句记录到一个日志文件中,这个日志文件就是binlog文件。它的主要作用是mysql数据库之间的主从同步和数据增量恢复。
在大概了解了binlog文件之后,我们回到今天的主题,如何拉取和解析binlog日志。因为binlog文件是一个二进制的,所以不能用常规的方式来解析它,并且由于是mysql的内置日志文件,我们也不能随意的对这个文件进行操作,想要很好的实现这些功能,想想还是有些头疼的,幸运的是我们有很多可选的开源组件可供选择,比如:canal,maxwell,mypipe,databus等。这些组件的原理基本相同,不同的是实现细节和功能模块。这里以canal为例:

  1. canal server模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  2. mysql master收到dump请求,开始推送(slave拉取,不是master主动push给slaves)binary log给slave(也就是canal)
  3. canal server解析binary log对象(原始为byte流)
  4. canal client接收server端的数据进行处理并发送到数据通道
    file

maxwell实时抓取mysql数据的原理也是基于binlog,和canal相比,maxwell更像是canal server + 实时client(数据抽取 + 数据转换)。maxwell集成了kafka producer,直接从binlog获取数据更新并解析后写入kafka,而canal则需要自己开发实时client将canal读取和解析的binlog内容写入kafka中。

解析数据的存储通道

目前比较流行的是使用消息队列,例如:kafka,rabbitmq等,当然也可以使用redis等缓存。

获取数据并构建索引

我们从binlog中拿到的数据,并且发送到了数据通道中,最后我们需要从数据通道中获取数据并且构建索引文件。看到这里可能有同学想问了,为什么我们不在解析binlog后直接构建索引文件,要从数据通道这里饶一次,数据通道存在的意义是什么呢,我个人的理解是提供数据缓冲和架构解耦。
在设计索引构建系统时,会涉及到kafka(因为主流使用kafka,所以以下设计都是基于kafka作为数据通道)的topic设计,这里有两种不同的方案:

  1. 一张表一个topic: 优点是数据隔离,缺点是topic膨胀,服务资源利用率低,横向扩展时需要修改代码。

  2. 仅仅一个topic:优点是横向扩展方便,缺点是消费端需要做逻辑路由。

topic的设计
一张表一个topic

file

表和topic一一对应,一张表的消息固定发送到一个topic中,不同表的消息发送到不同的topic中。
如上文所说,这种设计方式的好处是数据隔离,一个topic中只存放了一张表的数据,没有其他表的数据产生干扰。但是如果有上千张表的时候就需要上千个topic,如此多的topic,管理成本就很高了。另外,一个consumer需要占用一个线程/进程,topic越多,partition越多,占用的线程/进程资源就越多,同时也存在数据热点问题,可能很多表的变更极不频繁,但是这些表对应的topic、partition、consumer却一直占用着系统资源(和28定律相契合)。当系统进行横向扩展时,这些topic也会成为一个麻烦,我们需要修改每一台服务器上的配置文件,给其分配需要消费的topic。在使用index的线程/进程时,也存在同样的问题。

一个topic

file
和库或者表无关,所有的binlog消息全部发送到同一个topic中。
从两张架构图中,我们可以很清晰的看出差别来。当只有一个topic时,我们需要增加topic的partition(总数加起来会比上面的少),因为所有的数据放到一个topic中了。流程中增加了一个路由的角色,我大致描述一下这个路由是如何工作:

  • 首先我们需要有一份配置,用来描述每一张表所对应的要执行的index脚本,称为表-index映射
  • 然后预解析从kafka中消费到的消息,拿到消息中的表信息,然后根据表-index映射找到index脚本的执行实例
  • 将执行实例放入线程/进程池中执行。

为什么我们说这种方式横向扩展方便?
当数据膨胀较快时,我们只需要横向增加kafka partition的数量,消费端也同样的横向增加consumer数量即可,无需修改任何一行代码。

将所有的执行实例放入一个线程/进程池的好处是什么?
这个设计是基于大多数表的变更不频繁的原则来设计的。例如:table1的数据变更很频繁,而table2,table3,table4的数据变更很不频繁,如果我们给每一个执行实例配置一个进程/线程池(假设数量都是3),那么对于table1的执行实例来说,有点太少了,但是对于其他table的执行实例来说,实在是太浪费了。当然,我们可以给table1的执行实例配置更多的进程/线程数量。但如果我们让所有的执行实例共用一个线程/进程池呢?,我们线程/进程池的数量将会是12,当table1的数据变更频繁时,它最多可拥有12个线程/进程可用,资源的利用率是不是一下子就变的高起来了。当然,这个设计也有它的缺点,如果有某个执行实例比较慢时可能会造成阻塞。对于这个,我们可以将重要的执行实例单独设置线程/进程池;不重要的共用一个。

批量更新怎么办

在系统运行过程中,难免会碰到业务数据库批量更新数据的场景,会给实时索引系统产生巨大的压力。如果是大量表的批量更新,这个还是需要从业务端想办法来解决了。如果是少量表的批量更新,在共用一个topic的场景,能够在一定程度上缓解这个压力(不讨论搜索引擎端的压力)。
一般批量更新都是在晚上进行,这时候业务数据一般较少产生,实时索引系统的压力一般也较少。因为共用线程/进程池(比单独设置线程/进程池拥有更多的处理线程/进程),所以当某个表因为批量更新而产生大量数据时,能够很快速的消化掉。
另外我们可以在路由中增加一个处理队列,所有的消息都先进入处理队列(一张表对应一个队列),当队列中的消息数量达到阈值时,可以采用批量提交或者直接拒绝,也可以使用降级策略,比如队列可以写入,每隔1s将队列的消息一次写出等。

发表评论

电子邮件地址不会被公开。 必填项已用*标注