源码阅读-悟空搜索引擎

本文 http://blog.codeg.cn/2016/02/02/wukong-source-code-reading/ 是作者zieckey在研究和学习相关内容时所做的笔记,欢迎广大朋友指正和交流! 版权所有,欢迎转载和分享,但请保留此段声明。

一个最简单的例子

我们还是从一个最简单的示例代码开始:

package main

import (
	"github.com/huichen/wukong/engine"
	"github.com/huichen/wukong/types"
	"log"
)

var (
// searcher是协程安全的
	searcher = engine.Engine{}
)

func main() {
	// 初始化
	searcher.Init(types.EngineInitOptions{
		SegmenterDictionaries: "./data/dictionary.txt"})
	defer searcher.Close()

	// 将文档加入索引
	searcher.IndexDocument(0, types.DocumentIndexData{Content: "此次百度收购将成中国互联网最大并购"})
	searcher.IndexDocument(1, types.DocumentIndexData{Content: "百度宣布拟全资收购91无线业务"})
	searcher.IndexDocument(2, types.DocumentIndexData{Content: "百度是中国最大的搜索引擎"})

	// 等待索引刷新完毕
	searcher.FlushIndex()

	// 搜索输出格式见types.SearchResponse结构体
	res := searcher.Search(types.SearchRequest{Text:"百度中国"})
	log.Printf("num=%d ", res.NumDocs)
	for _, d := range res.Docs {
		log.Printf("docId=%d", d.DocId)
		log.Print("\tscore:", d.Scores)
		log.Print("\tTokenLocations:", d.TokenLocations)
		log.Print("\tTokenSnippetLocations:", d.TokenSnippetLocations)
	}
}

悟空搜索引擎不是一个完整的搜索引擎,我们可以把它当做一个搜索引擎基础库来使用。上面的示例代码是一个最简单的例子,展示了如何使用这个库,非常简单,三步即可完成:

  1. 初始化引擎: searcher.Init
  2. 将文档加入索引列表中: searcher.IndexDocument
  3. 执行搜索任务:searcher.Search

悟空搜索引擎内部整体框架图

引擎中处理用户请求、分词、索引和排序分别由不同的协程(goroutines)完成。

  1. 主协程,用于收发用户请求
  2. 分词器(segmenter)协程,负责分词
  3. 索引器(indexer)协程,负责建立和查找索引表
  4. 排序器(ranker)协程,负责对文档评分排序

引擎初始化过程

从上面最简单的那个例子可以看出,我们所有的操作都是基于searcher对象(engine.Engine类型),初始化引擎、将文档加入索引列表中、Flush索引列表、执行搜索任务。下面我们详细分析一下初始化过程:

加载分词词典

有一个参数NotUsingSegmenter可以控制是否加载分词词典。小小吐槽一下:这里没有使用正语义,导致我脑袋需要非非转换,(⊙o⊙)… ,我相信如果使用UsingSegmenter参数的话,应该更好理解一点。

	if !options.NotUsingSegmenter {
		// 载入分词器词典
		engine.segmenter.LoadDictionary(options.SegmenterDictionaries)

		// 初始化停用词
		engine.stopTokens.Init(options.StopTokenFile)
	}

分词词典的内部加载过程,可以详细参考 https://github.com/huichen/sego 这个项目,这个可以单独来分析,在这里就不在展开说了。

初始化索引器和排序器

	for shard := 0; shard < options.NumShards; shard++ {
		engine.indexers = append(engine.indexers, core.Indexer{})
		engine.indexers[shard].Init(*options.IndexerInitOptions)

		engine.rankers = append(engine.rankers, core.Ranker{})
		engine.rankers[shard].Init()
	}

options.NumShards 参数可以设置shard(分片,项目作者称之为裂分)个数,根据shard个数来初始化索引器(Indexer)、排序器(Rander)的个数。这里是为了方便并行处理,每一个shard都有一个索引器(Indexer)和排序器(Rander),并提前初始化好。

初始化分词器通道

	engine.segmenterChannel = make(
		chan segmenterRequest, options.NumSegmenterThreads)

初始化索引器通道

	engine.indexerAddDocumentChannels = make(
		[]chan indexerAddDocumentRequest, options.NumShards)
	engine.indexerRemoveDocChannels = make(
		[]chan indexerRemoveDocRequest, options.NumShards)
	engine.indexerLookupChannels = make(
		[]chan indexerLookupRequest, options.NumShards)
	for shard := 0; shard < options.NumShards; shard++ {
		engine.indexerAddDocumentChannels[shard] = make(
			chan indexerAddDocumentRequest,
			options.IndexerBufferLength)
		engine.indexerRemoveDocChannels[shard] = make(
			chan indexerRemoveDocRequest,
			options.IndexerBufferLength)
		engine.indexerLookupChannels[shard] = make(
			chan indexerLookupRequest,
			options.IndexerBufferLength)
	}

从这里可以看出索引器(Indexer)有三个功能:将一个文档添加到索引中、将一个文档从索引中移除、从索引中查找一个文档。每一个shard都有独立的channel,互不冲突。

初始化排序器通道

	engine.rankerAddDocChannels = make(
		[]chan rankerAddDocRequest, options.NumShards)
	engine.rankerRankChannels = make(
		[]chan rankerRankRequest, options.NumShards)
	engine.rankerRemoveDocChannels = make(
		[]chan rankerRemoveDocRequest, options.NumShards)
	for shard := 0; shard < options.NumShards; shard++ {
		engine.rankerAddDocChannels[shard] = make(
			chan rankerAddDocRequest,
			options.RankerBufferLength)
		engine.rankerRankChannels[shard] = make(
			chan rankerRankRequest,
			options.RankerBufferLength)
		engine.rankerRemoveDocChannels[shard] = make(
			chan rankerRemoveDocRequest,
			options.RankerBufferLength)
	}

与上面类似,从这里可以看出排序器(Rander)有三个功能:将一个文档添加到排序器中、在排序器中进行排序、将一个文档从排序器中移除。每一个shard都有独立的channel,互不冲突。

初始化持久化存储通道

	if engine.initOptions.UsePersistentStorage {
		engine.persistentStorageIndexDocumentChannels =
			make([]chan persistentStorageIndexDocumentRequest,
				engine.initOptions.PersistentStorageShards)
		for shard := 0; shard < engine.initOptions.PersistentStorageShards; shard++ {
			engine.persistentStorageIndexDocumentChannels[shard] = make(
				chan persistentStorageIndexDocumentRequest)
		}
		engine.persistentStorageInitChannel = make(
			chan bool, engine.initOptions.PersistentStorageShards)
	}

注意:PersistentStorageShards持久化存储的分片数目是独立参数控制的。

启动各个功能协程goroutine

  1. 启动分词器协程
  2. 启动索引器和排序器协程
  3. 启动持久化存储工作协程

至此,所有初始化工作完毕。

索引过程分析

下面我们来分析索引过程。

// 将文档加入索引
//
// 输入参数:
// 	docId	标识文档编号,必须唯一
//	data	见DocumentIndexData注释
//
// 注意:
//      1. 这个函数是线程安全的,请尽可能并发调用以提高索引速度
// 	2. 这个函数调用是非同步的,也就是说在函数返回时有可能文档还没有加入索引中,因此
//         如果立刻调用Search可能无法查询到这个文档。强制刷新索引请调用FlushIndex函数。
func (engine *Engine) IndexDocument(docId uint64, data types.DocumentIndexData) {
	engine.internalIndexDocument(docId, data)

	hash := murmur.Murmur3([]byte(fmt.Sprint("%d", docId))) % uint32(engine.initOptions.PersistentStorageShards)
	if engine.initOptions.UsePersistentStorage {
		engine.persistentStorageIndexDocumentChannels[hash] <- persistentStorageIndexDocumentRequest{docId: docId, data: data}
	}
}

func (engine *Engine) internalIndexDocument(docId uint64, data types.DocumentIndexData) {
	if !engine.initialized {
		log.Fatal("必须先初始化引擎")
	}

	atomic.AddUint64(&engine.numIndexingRequests, 1)
	hash := murmur.Murmur3([]byte(fmt.Sprint("%d%s", docId, data.Content)))
	engine.segmenterChannel <- segmenterRequest{
		docId: docId, hash: hash, data: data}
}

这里需要注意的是,docId参数需要调用者从外部传入,而不是在内部自己创建,这给搜索引擎的实现者更大的自由。 将文档交给分词器处理,然后根据murmur3计算的hash值模PersistentStorageShards,选择合适的shard写入持久化存储中。

索引过程分析:分词协程处理过程

分词器协程的逻辑代码在这里:segmenter_worker.go:func (engine *Engine) segmenterWorker()

分词器协程的逻辑是一个死循环,不停的从channel engine.segmenterChannel中读取数据,针对每一次读取的数据:

  1. 计算shard
  2. 将文档分词
  3. 根据分词结果,构造indexerAddDocumentRequestrankerAddDocRequest
  4. indexerAddDocumentRequest投递到channel engine.indexerAddDocumentChannels[shard]
  5. rankerAddDocRequest投递到channel engine.rankerAddDocChannels[shard]

补充一句:这里shard号的计算过程如下:

// 从文本hash得到要分配到的shard
func (engine *Engine) getShard(hash uint32) int {
	return int(hash - hash/uint32(engine.initOptions.NumShards)*uint32(engine.initOptions.NumShards))
}

为什么不是直接取模呢?

索引过程分析:索引器协程处理过程

首先介绍一下倒排索引表,这是搜索引擎的核心数据结构。

// 索引器
type Indexer struct {
	// 从搜索键到文档列表的反向索引
	// 加了读写锁以保证读写安全
	tableLock struct {
		sync.RWMutex
		table map[string]*KeywordIndices
		docs  map[uint64]bool
	}

	initOptions types.IndexerInitOptions
	initialized bool

	// 这实际上是总文档数的一个近似
	numDocuments uint64

	// 所有被索引文本的总关键词数
	totalTokenLength float32

	// 每个文档的关键词长度
	docTokenLengths map[uint64]float32
}

// 反向索引表的一行,收集了一个搜索键出现的所有文档,按照DocId从小到大排序。
type KeywordIndices struct {
	// 下面的切片是否为空,取决于初始化时IndexType的值
	docIds      []uint64  // 全部类型都有
	frequencies []float32 // IndexType == FrequenciesIndex
	locations   [][]int   // IndexType == LocationsIndex
}

table map[string]*KeywordIndices这个是核心:一个关键词,对应一个KeywordIndices结构。该结构的docIds字段记录了所有包含这个关键词的文档id。 如果 IndexType == FrequenciesIndex ,则同时记录这个关键词在该文档中出现次数。 如果 IndexType == LocationsIndex ,则同时记录这个关键词在该文档中出现的所有位置的起始偏移。

下面是索引的主函数代码:

func (engine *Engine) indexerAddDocumentWorker(shard int) {
	for {
		request := <-engine.indexerAddDocumentChannels[shard]
		engine.indexers[shard].AddDocument(request.document)
		atomic.AddUint64(&engine.numTokenIndexAdded,
			uint64(len(request.document.Keywords)))
		atomic.AddUint64(&engine.numDocumentsIndexed, 1)
	}
}

其主要逻辑又封装在func (indexer *Indexer) AddDocument(document *types.DocumentIndex)函数中实现。其逻辑如下:

  1. 将倒排索引表加锁
  2. 更新文档关键词的长度加在一起的总和
  3. 查找关键词在倒排索引表中是否存在
  4. 如果不存在,则直接加入到table map[string]*KeywordIndices
  5. 如果存在KeywordIndices,则使用二分查找该关键词对应的docId是否已经在KeywordIndices.docIds中存在。分两种情况: 1) docId存在,则更新原有的数据结构。 2) docId不存在,则插入到KeywordIndices.docIds数组中,同时保持升序排列。
  6. 更新索引过的文章总数

索引过程分析:排序器协程处理过程

在新索引文档的过程,排序器的主逻辑如下:

func (engine *Engine) rankerAddDocWorker(shard int) {
	for {
		request := <-engine.rankerAddDocChannels[shard]
		engine.rankers[shard].AddDoc(request.docId, request.fields)
	}
}

进而调用下面的函数

// 给某个文档添加评分字段
func (ranker *Ranker) AddDoc(docId uint64, fields interface{}) {
	if ranker.initialized == false {
		log.Fatal("排序器尚未初始化")
	}

	ranker.lock.Lock()
	ranker.lock.fields[docId] = fields
	ranker.lock.docs[docId] = true
	ranker.lock.Unlock()
}

上述函数非常简单,只是将应用层自定义的数据加入到ranker中。

至此索引过程就完成了。简单来讲就是下面两个过程:

  1. 将文档分词,得到一堆关键词
  2. 将 关键词->docId 的对应关系加入到全局的map中(实际上是分了多个shard)

搜索过程分析

下面我们来分析一下搜索的过程。首先构造一个SearchRequest对象。一般情况下只需提供SearchRequest.Text即可。

type SearchRequest struct {
	// 搜索的短语(必须是UTF-8格式),会被分词
	// 当值为空字符串时关键词会从下面的Tokens读入
	Text string

	// 关键词(必须是UTF-8格式),当Text不为空时优先使用Text
	// 通常你不需要自己指定关键词,除非你运行自己的分词程序
	Tokens []string

	// 文档标签(必须是UTF-8格式),标签不存在文档文本中,但也属于搜索键的一种
	Labels []string

	// 当不为nil时,仅从这些DocIds包含的键中搜索(忽略值)
	DocIds map[uint64]bool

	// 排序选项
	RankOptions *RankOptions

	// 超时,单位毫秒(千分之一秒)。此值小于等于零时不设超时。
	// 搜索超时的情况下仍有可能返回部分排序结果。
	Timeout int

	// 设为true时仅统计搜索到的文档个数,不返回具体的文档
	CountDocsOnly bool

	// 不排序,对于可在引擎外部(比如客户端)排序情况适用
	// 对返回文档很多的情况打开此选项可以有效节省时间
	Orderless bool
}

从本文一开始那段示例代码的搜索语句读起:searcher.Search(types.SearchRequest{Text:"百度中国"})。进入到 Search 函数内部,其逻辑如下:

设置一些搜索选项

例如排序选项RankOptions, 分数计算条件ScoringCriteria等等

将搜索词进行分词

	// 收集关键词
	tokens := []string{}
	if request.Text != "" {
		querySegments := engine.segmenter.Segment([]byte(request.Text))
		for _, s := range querySegments {
			token := s.Token().Text()
			if !engine.stopTokens.IsStopToken(token) {
				tokens = append(tokens, s.Token().Text())
			}
		}
	} else {
		for _, t := range request.Tokens {
			tokens = append(tokens, t)
		}
	}

这里的”百度中国”会分词得到两个词:百度中国

向索引器发送查找请求

	// 建立排序器返回的通信通道
	rankerReturnChannel := make(
		chan rankerReturnRequest, engine.initOptions.NumShards)

	// 生成查找请求
	lookupRequest := indexerLookupRequest{
		countDocsOnly:       request.CountDocsOnly,
		tokens:              tokens,
		labels:              request.Labels,
		docIds:              request.DocIds,
		options:             rankOptions,
		rankerReturnChannel: rankerReturnChannel,
		orderless:           request.Orderless,
	}

	// 向索引器发送查找请求
	for shard := 0; shard < engine.initOptions.NumShards; shard++ {
		engine.indexerLookupChannels[shard] <- lookupRequest
	}

这里是否可以进行优化? 1) 只向特定的shard分发请求,避免无谓的indexer查找过程。2) rankerReturnChannel是否不用每次都创建新的?

读取索引器的返回结果然后排序

上面已经建立了结果的返回通道rankerReturnChannel,直接从个channel中读取返回数据,并加入到数组rankOutput中。 注意,如果设置了超时,就在超时之前能读取多少就读多少。 然后调用排序算法进行排序。排序算法直接调用Golang自带的sort包的排序算法。

下面我们深入到索引器,看看索引器是如何进行搜索的。其核心代码在这里func (engine *Engine) indexerLookupWorker(shard int),它的主逻辑是一个死循环,不断的从engine.indexerLookupChannels[shard]读取搜索请求。

针对每一个搜索请求,会将请求分发到索引器去,调用func (indexer *Indexer) Lookup(tokens []string, labels []string, docIds map[uint64]bool, countDocsOnly bool) (docs []types.IndexedDocument, numDocs int)方法。其主要逻辑如下:

  1. 将分词和标签合并在一起进行搜索
  2. 合并搜索到的docId,并进行初步排序,将docId大的排在前面(实际上是认为docId越大,时间越近,时效性越好)
  3. 然后进行排序,BM25算法
  4. 最后返回数据

参考文献

  1. 悟空搜索引擎项目源码:https://github.com/huichen/wukong
  2. 悟空引擎入门教程
  3. Code reading: Wukong full-text search engine