OSDN Git Service

Added blockchain struct.
[bytom/bytom.git] / database / sinkdb / state.go
1 package sinkdb
2
3 import (
4         "bytes"
5         "sync"
6
7         "github.com/golang/protobuf/proto"
8
9         "chain/database/sinkdb/internal/sinkpb"
10         "chain/errors"
11 )
12
13 const (
14         nextNodeID          = "raft/nextNodeID"
15         allowedMemberPrefix = "/raft/allowed"
16 )
17
18 // state is a general-purpose data store designed to accumulate
19 // and apply replicated updates from a raft log.
20 type state struct {
21         mu           sync.Mutex
22         state        map[string][]byte
23         peers        map[uint64]string // id -> addr
24         appliedIndex uint64
25         version      map[string]uint64 //key -> value index
26 }
27
28 // newState returns a new State.
29 func newState() *state {
30         return &state{
31                 state:   map[string][]byte{nextNodeID: []byte("2")},
32                 peers:   make(map[uint64]string),
33                 version: make(map[string]uint64),
34         }
35 }
36
37 // SetAppliedIndex sets the applied index to the provided index.
38 func (s *state) SetAppliedIndex(index uint64) {
39         s.mu.Lock()
40         defer s.mu.Unlock()
41         s.appliedIndex = index
42 }
43
44 // Peers returns the current set of peer nodes. The returned
45 // map must not be modified.
46 func (s *state) Peers() map[uint64]string {
47         s.mu.Lock()
48         defer s.mu.Unlock()
49         return s.peers
50 }
51
52 // SetPeerAddr sets the address for the given peer.
53 func (s *state) SetPeerAddr(id uint64, addr string) {
54         newPeers := make(map[uint64]string)
55         s.mu.Lock()
56         defer s.mu.Unlock()
57         for nodeID, addr := range s.peers {
58                 newPeers[nodeID] = addr
59         }
60         newPeers[id] = addr
61         s.peers = newPeers
62 }
63
64 // RemovePeerAddr deletes the current address for the given peer if it exists.
65 func (s *state) RemovePeerAddr(id uint64) {
66         newPeers := make(map[uint64]string)
67         s.mu.Lock()
68         defer s.mu.Unlock()
69         for nodeID, addr := range s.peers {
70                 if nodeID == id {
71                         continue
72                 }
73                 newPeers[nodeID] = addr
74         }
75         s.peers = newPeers
76 }
77
78 // RestoreSnapshot decodes data and overwrites the contents of s.
79 // It should be called with the retrieved snapshot
80 // when bootstrapping a new node from an existing cluster
81 // or when recovering from a file on disk.
82 func (s *state) RestoreSnapshot(data []byte, index uint64) error {
83         s.mu.Lock()
84         defer s.mu.Unlock()
85
86         s.appliedIndex = index
87         //TODO (ameets): think about having sinkpb in state for restore
88         snapshot := &sinkpb.Snapshot{}
89         err := proto.Unmarshal(data, snapshot)
90         s.peers = snapshot.Peers
91         s.state = snapshot.State
92         s.version = snapshot.Version
93         return errors.Wrap(err)
94 }
95
96 // Snapshot returns an encoded copy of s suitable for RestoreSnapshot.
97 func (s *state) Snapshot() ([]byte, uint64, error) {
98         s.mu.Lock()
99         defer s.mu.Unlock()
100
101         data, err := proto.Marshal(&sinkpb.Snapshot{
102                 Version: s.version,
103                 State:   s.state,
104                 Peers:   s.peers,
105         })
106         return data, s.appliedIndex, errors.Wrap(err)
107 }
108
109 // Apply applies a raft log entry payload to s. For conditional operations, it
110 // returns whether the condition was satisfied.
111 func (s *state) Apply(data []byte, index uint64) (satisfied bool) {
112         s.mu.Lock()
113         defer s.mu.Unlock()
114
115         if index < s.appliedIndex {
116                 panic(errors.New("entry already applied"))
117         }
118         instr := &sinkpb.Instruction{}
119         err := proto.Unmarshal(data, instr)
120         if err != nil {
121                 // An error here indicates a malformed update
122                 // was written to the raft log. We do version
123                 // negotiation in the transport layer, so this
124                 // should be impossible; by this point, we are
125                 // all speaking the same version.
126                 panic(err)
127         }
128
129         s.appliedIndex = index
130         for _, cond := range instr.Conditions {
131                 y := true
132                 switch cond.Type {
133
134                 case sinkpb.Cond_NOT_KEY_EXISTS:
135                         y = false
136                         fallthrough
137                 case sinkpb.Cond_KEY_EXISTS:
138                         if _, ok := s.state[cond.Key]; ok != y {
139                                 return false
140                         }
141                 case sinkpb.Cond_NOT_VALUE_EQUAL:
142                         y = false
143                         fallthrough
144                 case sinkpb.Cond_VALUE_EQUAL:
145                         if ok := bytes.Equal(s.state[cond.Key], cond.Value); ok != y {
146                                 return false
147                         }
148                 case sinkpb.Cond_NOT_INDEX_EQUAL:
149                         y = false
150                         fallthrough
151                 case sinkpb.Cond_INDEX_EQUAL:
152                         if ok := (s.version[cond.Key] == cond.Index); ok != y {
153                                 return false
154                         }
155                 default:
156                         panic(errors.New("unknown condition type"))
157                 }
158         }
159         for _, op := range instr.Operations {
160                 switch op.Type {
161                 case sinkpb.Op_SET:
162                         s.state[op.Key] = op.Value
163                         s.version[op.Key] = index
164                 case sinkpb.Op_DELETE:
165                         delete(s.state, op.Key)
166                         delete(s.version, op.Key)
167                 default:
168                         panic(errors.New("unknown operation type"))
169                 }
170         }
171         return true
172 }
173
174 // get performs a provisional read operation.
175 func (s *state) get(key string) ([]byte, Version) {
176         s.mu.Lock()
177         defer s.mu.Unlock()
178
179         b, ok := s.state[key]
180         n := s.version[key]
181         return b, Version{key, ok, n}
182 }
183
184 // AppliedIndex returns the raft log index (applied index) of current state
185 func (s *state) AppliedIndex() uint64 {
186         s.mu.Lock()
187         defer s.mu.Unlock()
188
189         return s.appliedIndex
190 }
191
192 // NextNodeID generates an ID for the next node to join the cluster.
193 func (s *state) NextNodeID() (id, version uint64) {
194         s.mu.Lock()
195         defer s.mu.Unlock()
196
197         id, n := proto.DecodeVarint(s.state[nextNodeID])
198         if n == 0 {
199                 panic("raft: cannot decode nextNodeID")
200         }
201         return id, s.version[nextNodeID]
202 }
203
204 func (s *state) IsAllowedMember(addr string) bool {
205         _, ver := s.get(allowedMemberPrefix + "/" + addr)
206         return ver.Exists()
207 }
208
209 func (s *state) IncrementNextNodeID(oldID uint64, index uint64) (instruction []byte) {
210         instruction, _ = proto.Marshal(&sinkpb.Instruction{
211                 Conditions: []*sinkpb.Cond{{
212                         Type:  sinkpb.Cond_INDEX_EQUAL,
213                         Key:   nextNodeID,
214                         Index: index,
215                 }},
216                 Operations: []*sinkpb.Op{{
217                         Type:  sinkpb.Op_SET,
218                         Key:   nextNodeID,
219                         Value: proto.EncodeVarint(oldID + 1),
220                 }},
221         })
222         return instruction
223 }
224
225 func (s *state) EmptyWrite() (instruction []byte) {
226         instruction, _ = proto.Marshal(&sinkpb.Instruction{
227                 Operations: []*sinkpb.Op{{
228                         Type:  sinkpb.Op_SET,
229                         Key:   "/dummyWrite",
230                         Value: []byte(""),
231                 }}})
232         return instruction
233 }