OSDN Git Service

Added blockchain struct.
[bytom/bytom.git] / database / sinkdb / sinkdb.go
1 // Package sinkdb provides a strongly consistent key-value store.
2 package sinkdb
3
4 import (
5         "context"
6         "net/http"
7         "sort"
8         "sync"
9         "time"
10
11         "github.com/golang/protobuf/proto"
12
13         "chain/database/sinkdb/internal/sinkpb"
14         "chain/errors"
15         "chain/net/raft"
16 )
17
18 // ErrConflict is returned by Exec when an instruction was
19 // not completed because its preconditions were not met.
20 var ErrConflict = errors.New("transaction conflict")
21
22 // Open initializes the key-value store and returns a database handle.
23 func Open(laddr, dir string, httpClient *http.Client) (*DB, error) {
24         state := newState()
25         sv, err := raft.Start(laddr, dir, httpClient, state)
26         if err != nil {
27                 return nil, err
28         }
29         db := &DB{state: state, raft: sv}
30         return db, nil
31 }
32
33 // DB provides access to an opened kv store.
34 type DB struct {
35         mu     sync.Mutex
36         closed bool
37
38         state *state
39         raft  *raft.Service
40 }
41
42 // Ping peforms an empty write to verify the connection to
43 // the rest of the cluster.
44 func (db *DB) Ping() error {
45         const timeout = 5 * time.Second
46         ctx, cancel := context.WithTimeout(context.Background(), timeout)
47         defer cancel()
48
49         _, err := db.raft.Exec(ctx, db.state.EmptyWrite())
50         return err
51 }
52
53 // Close closes the database handle releasing its resources. It is
54 // the caller's responsibility to ensure that there are no concurrent
55 // database operations in flight. Close is idempotent.
56 //
57 // All other methods have undefined behavior on a closed DB.
58 func (db *DB) Close() error {
59         db.mu.Lock()
60         defer db.mu.Unlock()
61         if db.closed { // make Close idempotent
62                 return nil
63         }
64         db.closed = true
65         return db.raft.Stop()
66 }
67
68 // Exec executes the provided operations
69 // after combining them with All.
70 func (db *DB) Exec(ctx context.Context, ops ...Op) error {
71         all := All(ops...)
72         if all.err != nil {
73                 return all.err
74         }
75
76         // Disallow multiple writes to the same key.
77         sort.Slice(all.effects, func(i, j int) bool {
78                 return all.effects[i].Key < all.effects[j].Key
79         })
80         var lastKey string
81         for _, e := range all.effects {
82                 if e.Key == lastKey {
83                         err := errors.New("duplicate write")
84                         return errors.Wrap(err, e.Key)
85                 }
86                 lastKey = e.Key
87         }
88
89         encoded, err := proto.Marshal(&sinkpb.Instruction{
90                 Conditions: all.conds,
91                 Operations: all.effects,
92         })
93         if err != nil {
94                 return err
95         }
96         satisfied, err := db.raft.Exec(ctx, encoded)
97         if err != nil {
98                 return err
99         }
100         if !satisfied {
101                 return ErrConflict
102         }
103         return nil
104 }
105
106 // Get performs a linearizable read of the provided key. The
107 // read value is unmarshalled into v.
108 func (db *DB) Get(ctx context.Context, key string, v proto.Message) (Version, error) {
109         err := db.raft.WaitRead(ctx)
110         if err != nil {
111                 return Version{}, err
112         }
113         buf, ver := db.state.get(key)
114         return ver, proto.Unmarshal(buf, v)
115 }
116
117 // GetStale performs a non-linearizable read of the provided key.
118 // The value may be stale. The read value is unmarshalled into v.
119 func (db *DB) GetStale(key string, v proto.Message) (Version, error) {
120         buf, ver := db.state.get(key) // read directly from state
121         return ver, proto.Unmarshal(buf, v)
122 }
123
124 // RaftService returns the raft service used for replication.
125 func (db *DB) RaftService() *raft.Service {
126         return db.raft
127 }