使用Golang利用ectd实现一个分布式锁
etcd
是随着CoreOS
项目一起成长起来的,随着Golang和CoreOS等项目在开源社区日益火热,
etcd
作为一个高可用、强一致性的分布式Key-Value存储系统被越来越多的开发人员关注和使用。
这篇文章全方位介绍了etcd的应用场景,这里简单摘要如下:
- 服务发现(Service Discovery)
- 消息发布与订阅
- 负载均衡
- 分布式通知与协调
- 分布式锁
- 分布式队列
- 集群监控与Leader竞选
- 为什么用etcd而不用ZooKeeper
本文重点介绍如何利用ectd
实现一个分布式锁。
锁的概念大家都熟悉,当我们希望某一事件在同一时间点只有一个线程(goroutine)在做,或者某一个资源在同一时间点只有一个服务能访问,这个时候我们就需要用到锁。
例如我们要实现一个分布式的id生成器,多台服务器之间的协调就非常麻烦。分布式锁就正好派上用场。
其基本实现原理为:
- 在ectd系统里创建一个key
- 如果创建失败,key存在,则监听该key的变化事件,直到该key被删除,回到1
- 如果创建成功,则认为我获得了锁
具体代码如下:
package etcdsync
import (
"fmt"
"io"
"os"
"sync"
"time"
"github.com/coreos/etcd/client"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
)
const (
defaultTTL = 60
defaultTry = 3
deleteAction = "delete"
expireAction = "expire"
)
// A Mutex is a mutual exclusion lock which is distributed across a cluster.
type Mutex struct {
key string
id string // The identity of the caller
client client.Client
kapi client.KeysAPI
ctx context.Context
ttl time.Duration
mutex *sync.Mutex
logger io.Writer
}
// New creates a Mutex with the given key which must be the same
// across the cluster nodes.
// machines are the ectd cluster addresses
func New(key string, ttl int, machines []string) *Mutex {
cfg := client.Config{
Endpoints: machines,
Transport: client.DefaultTransport,
HeaderTimeoutPerRequest: time.Second,
}
c, err := client.New(cfg)
if err != nil {
return nil
}
hostname, err := os.Hostname()
if err != nil {
return nil
}
if len(key) == 0 || len(machines) == 0 {
return nil
}
if key[0] != '/' {
key = "/" + key
}
if ttl < 1 {
ttl = defaultTTL
}
return &Mutex{
key: key,
id: fmt.Sprintf("%v-%v-%v", hostname, os.Getpid(), time.Now().Format("20060102-15:04:05.999999999")),
client: c,
kapi: client.NewKeysAPI(c),
ctx: context.TODO(),
ttl: time.Second * time.Duration(ttl),
mutex: new(sync.Mutex),
}
}
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() (err error) {
m.mutex.Lock()
for try := 1; try <= defaultTry; try++ {
if m.lock() == nil {
return nil
}
m.debug("Lock node %v ERROR %v", m.key, err)
if try < defaultTry {
m.debug("Try to lock node %v again", m.key, err)
}
}
return err
}
func (m *Mutex) lock() (err error) {
m.debug("Trying to create a node : key=%v", m.key)
setOptions := &client.SetOptions{
PrevExist:client.PrevNoExist,
TTL: m.ttl,
}
resp, err := m.kapi.Set(m.ctx, m.key, m.id, setOptions)
if err == nil {
m.debug("Create node %v OK [%q]", m.key, resp)
return nil
}
m.debug("Create node %v failed [%v]", m.key, err)
e, ok := err.(client.Error)
if !ok {
return err
}
if e.Code != client.ErrorCodeNodeExist {
return err
}
// Get the already node's value.
resp, err = m.kapi.Get(m.ctx, m.key, nil)
if err != nil {
return err
}
m.debug("Get node %v OK", m.key)
watcherOptions := &client.WatcherOptions{
AfterIndex : resp.Index,
Recursive:false,
}
watcher := m.kapi.Watcher(m.key, watcherOptions)
for {
m.debug("Watching %v ...", m.key)
resp, err = watcher.Next(m.ctx)
if err != nil {
return err
}
m.debug("Received an event : %q", resp)
if resp.Action == deleteAction || resp.Action == expireAction {
return nil
}
}
}
// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() (err error) {
defer m.mutex.Unlock()
for i := 1; i <= defaultTry; i++ {
var resp *client.Response
resp, err = m.kapi.Delete(m.ctx, m.key, nil)
if err == nil {
m.debug("Delete %v OK", m.key)
return nil
}
m.debug("Delete %v falied: %q", m.key, resp)
e, ok := err.(client.Error)
if ok && e.Code == client.ErrorCodeKeyNotFound {
return nil
}
}
return err
}
func (m *Mutex) debug(format string, v ...interface{}) {
if m.logger != nil {
m.logger.Write([]byte(m.id))
m.logger.Write([]byte(" "))
m.logger.Write([]byte(fmt.Sprintf(format, v...)))
m.logger.Write([]byte("\n"))
}
}
func (m *Mutex) SetDebugLogger(w io.Writer) {
m.logger = w
}
其实类似的实现有很多,但目前都已经过时,使用的都是被官方标记为deprecated
的项目。且大部分接口都不如上述代码简单。
使用上,跟Golang官方sync包的Mutex接口非常类似,先New()
,然后调用Lock()
,使用完后调用Unlock()
,就三个接口,就是这么简单。示例代码如下:
package main
import (
"github.com/zieckey/etcdsync"
"log"
)
func main() {
//etcdsync.SetDebug(true)
log.SetFlags(log.Ldate|log.Ltime|log.Lshortfile)
m := etcdsync.New("/etcdsync", "123", []string{"http://127.0.0.1:2379"})
if m == nil {
log.Printf("etcdsync.NewMutex failed")
}
err := m.Lock()
if err != nil {
log.Printf("etcdsync.Lock failed")
} else {
log.Printf("etcdsync.Lock OK")
}
log.Printf("Get the lock. Do something here.")
err = m.Unlock()
if err != nil {
log.Printf("etcdsync.Unlock failed")
} else {
log.Printf("etcdsync.Unlock OK")
}
}