应用双缓冲技术完美解决资源数据优雅无损的热加载问题

本文 http://blog.codeg.cn/2016/01/27/double-buffering/ 是作者zieckey在研究和学习相关内容时所做的笔记,欢迎广大朋友指正和交流! 版权所有,欢迎转载和分享,但请保留此段声明。

简介

在一个网络服务器不间断运行过程中,有一些资源数据需要实时更新,例如需要及时更新一份白名单列表,怎么做才能做到优雅无损的更新到服务的进程空间内?这里我们提出一种叫“双缓冲”的技术来解决这种问题。

这里的双缓冲技术是借鉴了计算机屏幕绘图领域的概念。双缓冲技术绘图即在内存中创建一个与屏幕绘图区域一致的对象,先将图形绘制到内存中的这个对象上,再一次性将这个对象上的图形拷贝到屏幕上,这样能大大加快绘图的速度。

问题抽象

假设我们有一个查询服务,为了方便描述,我们将数据加密传输等一些不必要的细节都省去后,请求报文可以抽象成两个参数:一个是id,用来唯一标识一台设备(例如手机或电脑);另一个查询主体query。服务端业务逻辑是通过query查询数据库/NoSQL等数据引擎然后返回相应的数据,同时记录一条请求日志。

用Golang来实现这个逻辑如下:

package main

import (
    "net/http"
    "log"
    "os"
    "fmt"
)

func Query(r *http.Request) string {
    id := r.FormValue("id")
    query := r.FormValue("query")

    //参数合法性检查

    //具体的业务逻辑,查询数据库/NoSQL等数据引擎,然后做逻辑计算,然后合并结果
    //这里简单抽象,直接返回欢迎语
    result := fmt.Sprintf("hello, %v", id)

    // 记录一条查询日志,用于离线统计和分析
    log.Printf("<id=%v><query=%v><result=%v><ip=%v>", id, query, result, r.RemoteAddr)

    return result
}

func Handler(w http.ResponseWriter, r *http.Request) {
    r.ParseForm()
    result := Query(r)
    w.Write([]byte(result))
}

func main() {
    http.HandleFunc("/q", Handler)
    hostname, _ := os.Hostname()
    log.Printf("start http://%s:8091/q", hostname)
    log.Fatal(http.ListenAndServe(":8091", nil))
}

服务上线一段时间后,通过日志分析发现有一些id发起的请求异常,每天的请求量远远高于其他id,我们有理由怀疑这些请求是竞争对手在抓我们的数据。这个时候就开始进入攻防阶段了。

有几种攻防策略可供选择:

  1. 直接封IP,这种策略有可能会误杀一些正常用户。
  2. 将id加入黑名单

假设我们将策略1放到前端接入服务处(例如Nginx)进行拦截,策略2在我们自己的业务逻辑中实现,即在Query函数中加入对id的判断即可。现在的完整代码如下:

package main

import (
    "net/http"
    "log"
    "os"
    "fmt"
    "io/ioutil"
    "bytes"
    "strings"
    "io"
)

var blackIDs map[string]int
func LoadBlackIDs(filepath string) error {
    // 加载黑名单列表文件,每行一个
    b, err := ioutil.ReadFile(filepath)
    if err != nil {
        return err
    }
    r := bytes.NewBuffer(b)
    for {
        id, err := r.ReadString('\n')
        if err == io.EOF || err == nil {
            id = strings.TrimSpace(id)
            if len(id) > 0 {
                blackIDs[id] = 1
            }
        }

        if err != nil {
            break
        }
    }

    return nil
}

func IsBlackID(id string) bool {
    _, exist := blackIDs[id]
    return exist
}

func Query(r *http.Request) (string, error) {
    id := r.FormValue("id")
    query := r.FormValue("query")

    //参数合法性检查

    if IsBlackID(id) {
        return "ERROR", fmt.Errorf("ERROR id")
    }

    //具体的业务逻辑,查询数据库/NoSQL等数据引擎,然后做逻辑计算,然后合并结果
    //这里简单抽象,直接返回欢迎语
    result := fmt.Sprintf("hello, %v", id)

    // 记录一条查询日志,用于离线统计和分析
    log.Printf("<id=%v><query=%v><result=%v><ip=%v>", id, query, result, r.RemoteAddr)

    return result, nil
}

func Handler(w http.ResponseWriter, r *http.Request) {
    r.ParseForm()
    result, err := Query(r)
    if err == nil {
        w.Write([]byte(result))
    } else {
        w.WriteHeader(403)
        w.Write([]byte(result))
    }
}

func main() {
    blackIDs = make(map[string]int)
    if len(os.Args) == 2 {
        err := LoadBlackIDs(os.Args[1])
        if err != nil {
            panic(err)
        }
    }

    http.HandleFunc("/q", Handler)
    hostname, _ := os.Hostname()
    log.Printf("start http://%s:8091/q", hostname)
    log.Fatal(http.ListenAndServe(":8091", nil))
}

经过上述努力,终于将一些异常请求屏蔽掉了,一看时间都凌晨了,恩,好好回家碎个叫,累死哥了。

解决思路

又过了一些日子,产品妹子还是找过来了,说我们的最新数据又被竞争对手抓走了,肿么回事? 我们只能做一个离线流程将恶意id实时过滤出来,然后及时反馈到在线服务中去, 一开始想到可以通过重启进程的方式来加载这份black_id.txt,这就要求我们的程序对reload要做到足够优雅, 例如不能丢请求、reload过程中要足够平滑,短时间做到这一点还有些困难。另外,整个程序reload过程所消耗的CPU/IO资源较多,例如一些不需更新的资源也需要reload。 如果能做到按需加载就更好了,即:哪个资源有变化,我们就只加载那个资源。 然后我们就想到了本文所提到的双缓冲技术。

这里的双缓冲技术是指对black_id.txt文件的加载过程是在后台独立加载,等加载完毕之后,再与当前正在使用的对象直接交换一下,即可完成新文件的加载。 这里有几个细节需要讨论一下:

  1. black_id.txt在内存中是一个map结构,有人说,等有更新时,直接将增量更新进map即可,这就需要对该map结构上锁,且所有用到的地方都加锁,锁粒度有点粗
  2. 一个简单直接的办法是对black_id.txt整体重新生成一个新的map结构,使用的时候直接拿到这个map的指针替换掉原来的指针即可
  3. 新老替换后,老的资源什么释放?在Golang中,一般情况下可以通过其自身的GC来释放即可。但有时候,有一些资源是需要我们自己主动释放的,GC这一点做不到,例如通过CGO方式嵌入进来的C扩展对象的释放工作。这里我们通过引用计数技术来解决。

双缓冲技术Golang实现

直接上代码。

import (
    "sync"
    "io/ioutil"
    "crypto/md5"
    "fmt"
    "time"
    "sync/atomic"
)

type DoubleBufferingTarget interface {
    Initialize(conf string) bool // 初始化,继承类可以在此做一些初始化的工作
    Close() // 继承类如果在Initialize函数中申请了一些资源,可以在这里将这些资源进行回收
}

type DoubleBufferingTargetCreator func() DoubleBufferingTarget

type DoubleBufferingTargetRef struct {
    Target DoubleBufferingTarget
    ref    *int32
}

type DoubleBuffering struct {
    creator         DoubleBufferingTargetCreator

    mutex           sync.Mutex
    refTarget       DoubleBufferingTargetRef

    reloadTimestamp int64
    md5h            string
}


func newDoubleBuffering(f DoubleBufferingTargetCreator) *DoubleBuffering {
    d := new(DoubleBuffering)
    d.creator = f
    d.reloadTimestamp = 0
    return d
}

func (d *DoubleBuffering) reload(conf string) bool {
    t := d.creator()
    if t.Initialize(conf) == false {
        return false
    }

    content, err := ioutil.ReadFile(conf)
    if err != nil {
        content = []byte(conf)
    }
    d.md5h = fmt.Sprint("%x", md5.Sum(content))
    d.reloadTimestamp = time.Now().Unix()

    d.mutex.Lock()
    defer d.mutex.Unlock()
    d.refTarget.Release() // 将老对象释放掉

    d.refTarget.Target = t
    d.refTarget.ref = new(int32)
    *d.refTarget.ref = 1 // 初始设置为1,由DoubleBuffering代为管理

    return true
}

// ReloadTimestamp return the latest timestamp when the DoubleBuffering reloaded at the last time
func (d *DoubleBuffering) ReloadTimestamp() int64 {
    return d.reloadTimestamp
}

// LatestConfMD5 return the latest config's md5
func (d *DoubleBuffering) LatestConfMD5() string {
    return d.md5h
}

// Get return the target this DoubleBuffering manipulated.
// You should call DoubleBufferingTargetRef.Release() function after you have used it.
func (d *DoubleBuffering) Get() DoubleBufferingTargetRef {
    d.mutex.Lock()
    defer d.mutex.Unlock()
    atomic.AddInt32(d.refTarget.ref, 1)
    return d.refTarget
}

func (d DoubleBufferingTargetRef) Release() {
    if d.ref != nil && atomic.AddInt32(d.ref, -1) == 0 {
        d.Target.Close()
    }
}

func (d DoubleBufferingTargetRef) Ref() int32 {
    if d.ref != nil {
        return *d.ref
    }

    return 0
}

type DoubleBufferingMap map[string/*name*/]*DoubleBuffering
type DoubleBufferingManager struct {
    targets DoubleBufferingMap
    mutex sync.Mutex
}

func NewDoubleBufferingManager() *DoubleBufferingManager {
    m := new(DoubleBufferingManager)
    m.targets = make(DoubleBufferingMap)
    return m
}

func (m *DoubleBufferingManager) Add(name string, conf string, f DoubleBufferingTargetCreator) bool {
    d := newDoubleBuffering(f)
    if d.reload(conf) {
        m.targets[name] = d
        return true
    }

    return false
}

func (m *DoubleBufferingManager) Get(name string) *DoubleBuffering {
    m.mutex.Lock()
    defer m.mutex.Unlock()
    if t, ok := m.targets[name]; ok {
        return t
    }

    //panic("cannot find this kind of DoubleBuffering")
    return nil
}

func (m *DoubleBufferingManager) Reload(name, conf string) bool {
    d := m.Get(name)
    if d == nil {
        return false
    }

    return d.reload(conf)
}

使用DoubleBuffering改造最开始那个抽象问题

package main

import (
    "net/http"
    "log"
    "os"
    "fmt"
    "io/ioutil"
    "bytes"
    "strings"
    "io"
)

type BlackIDDict struct {
    blackIDs map[string]int
}

func NewBlackIDDict() DoubleBufferingTarget {
    d := &BlackIDDict{
        blackIDs: make(map[string]int),
    }
    return d
}

var dbm *DoubleBufferingManager

func (d *BlackIDDict) Initialize(conf string) bool {
    filepath := conf

    // 加载黑名单列表文件,每行一个
    b, err := ioutil.ReadFile(filepath)
    if err != nil {
        return false
    }
    r := bytes.NewBuffer(b)
    for {
        id, err := r.ReadString('\n')
        if err == io.EOF || err == nil {
            id = strings.TrimSpace(id)
            if len(id) > 0 {
                d.blackIDs[id] = 1
            }
        }

        if err != nil {
            break
        }
    }

    return true
}

func (d *BlackIDDict) Close() {
    // 在这里做一些资源释放工作
    // 当前这个例子没有资源需要我们手工释放
}

func (d *BlackIDDict) IsBlackID(id string) bool {
    _, exist := d.blackIDs[id]
    return exist
}

func Query(r *http.Request) (string, error) {
    id := r.FormValue("id")
    query := r.FormValue("query")

    //TODO 参数合法性检查

    d := dbm.Get("black_id")
    tg := d.Get()
    defer tg.Release()
    dict := tg.Target.(*BlackIDDict)  // 转换为具体的Dict对象
    if dict == nil {
        return "", fmt.Errorf("ERROR, Convert DoubleBufferingTarget to Dict failed")
    }

    if dict.IsBlackID(id) {
        return "ERROR", fmt.Errorf("ERROR id")
    }

    //具体的业务逻辑,查询数据库/NoSQL等数据引擎,然后做逻辑计算,然后合并结果
    //这里简单抽象,直接返回欢迎语
    result := fmt.Sprintf("hello, %v", id)

    // 记录一条查询日志,用于离线统计和分析
    log.Printf("<id=%v><query=%v><result=%v><ip=%v>", id, query, result, r.RemoteAddr)

    return result, nil
}

func Handler(w http.ResponseWriter, r *http.Request) {
    r.ParseForm()
    result, err := Query(r)
    if err == nil {
        w.Write([]byte(result))
    } else {
        w.WriteHeader(403)
        w.Write([]byte(result))
    }
}

func Reload(w http.ResponseWriter, r *http.Request) {
    // 这里简化处理,直接重新加载black_id。如果有多个,可以从url参数中获取资源名称
    if dbm.Reload("black_id", os.Args[1]) {
        w.Write([]byte("OK"))
    } else {
        w.Write([]byte("FAILED"))
    }
}

func main() {
    if len(os.Args) != 2 {
        panic("Not specify black_id.txt")
    }

    dbm = NewDoubleBufferingManager()
    rc := dbm.Add("black_id", os.Args[1], NewBlackIDDict)
    if rc == false {
        panic("black_id initialize failed")
    }

    http.HandleFunc("/q", Handler)
    http.HandleFunc("/admin/reload", Reload) // 管理接口,用于重新加载black_id.txt。如果有多个这种资源,可以增加一些参数来说区分不同的资源
    hostname, _ := os.Hostname()
    log.Printf("start http://%s:8091/q", hostname)
    log.Fatal(http.ListenAndServe(":8091", nil))
}

程序启动之后,使用black_id.txt里面的id请求时,都会返回403,如果有新增的black_id,我们也加入到black_id.txt文件中,然后调用 /admin/reload 接口使之生效即可。

C++版本实现

//TODO

参考文献

  1. 双缓冲技术介绍
  2. Golang实现的示例源码在这里 https://github.com/zieckey/dbuf