7 "github.com/golang/protobuf/proto"
9 "chain/database/sinkdb/internal/sinkpb"
14 nextNodeID = "raft/nextNodeID"
15 allowedMemberPrefix = "/raft/allowed"
18 // state is a general-purpose data store designed to accumulate
19 // and apply replicated updates from a raft log.
22 state map[string][]byte
23 peers map[uint64]string // id -> addr
25 version map[string]uint64 //key -> value index
28 // newState returns a new State.
29 func newState() *state {
31 state: map[string][]byte{nextNodeID: []byte("2")},
32 peers: make(map[uint64]string),
33 version: make(map[string]uint64),
37 // SetAppliedIndex sets the applied index to the provided index.
38 func (s *state) SetAppliedIndex(index uint64) {
41 s.appliedIndex = index
44 // Peers returns the current set of peer nodes. The returned
45 // map must not be modified.
46 func (s *state) Peers() map[uint64]string {
52 // SetPeerAddr sets the address for the given peer.
53 func (s *state) SetPeerAddr(id uint64, addr string) {
54 newPeers := make(map[uint64]string)
57 for nodeID, addr := range s.peers {
58 newPeers[nodeID] = addr
64 // RemovePeerAddr deletes the current address for the given peer if it exists.
65 func (s *state) RemovePeerAddr(id uint64) {
66 newPeers := make(map[uint64]string)
69 for nodeID, addr := range s.peers {
73 newPeers[nodeID] = addr
78 // RestoreSnapshot decodes data and overwrites the contents of s.
79 // It should be called with the retrieved snapshot
80 // when bootstrapping a new node from an existing cluster
81 // or when recovering from a file on disk.
82 func (s *state) RestoreSnapshot(data []byte, index uint64) error {
86 s.appliedIndex = index
87 //TODO (ameets): think about having sinkpb in state for restore
88 snapshot := &sinkpb.Snapshot{}
89 err := proto.Unmarshal(data, snapshot)
90 s.peers = snapshot.Peers
91 s.state = snapshot.State
92 s.version = snapshot.Version
93 return errors.Wrap(err)
96 // Snapshot returns an encoded copy of s suitable for RestoreSnapshot.
97 func (s *state) Snapshot() ([]byte, uint64, error) {
101 data, err := proto.Marshal(&sinkpb.Snapshot{
106 return data, s.appliedIndex, errors.Wrap(err)
109 // Apply applies a raft log entry payload to s. For conditional operations, it
110 // returns whether the condition was satisfied.
111 func (s *state) Apply(data []byte, index uint64) (satisfied bool) {
115 if index < s.appliedIndex {
116 panic(errors.New("entry already applied"))
118 instr := &sinkpb.Instruction{}
119 err := proto.Unmarshal(data, instr)
121 // An error here indicates a malformed update
122 // was written to the raft log. We do version
123 // negotiation in the transport layer, so this
124 // should be impossible; by this point, we are
125 // all speaking the same version.
129 s.appliedIndex = index
130 for _, cond := range instr.Conditions {
134 case sinkpb.Cond_NOT_KEY_EXISTS:
137 case sinkpb.Cond_KEY_EXISTS:
138 if _, ok := s.state[cond.Key]; ok != y {
141 case sinkpb.Cond_NOT_VALUE_EQUAL:
144 case sinkpb.Cond_VALUE_EQUAL:
145 if ok := bytes.Equal(s.state[cond.Key], cond.Value); ok != y {
148 case sinkpb.Cond_NOT_INDEX_EQUAL:
151 case sinkpb.Cond_INDEX_EQUAL:
152 if ok := (s.version[cond.Key] == cond.Index); ok != y {
156 panic(errors.New("unknown condition type"))
159 for _, op := range instr.Operations {
162 s.state[op.Key] = op.Value
163 s.version[op.Key] = index
164 case sinkpb.Op_DELETE:
165 delete(s.state, op.Key)
166 delete(s.version, op.Key)
168 panic(errors.New("unknown operation type"))
174 // get performs a provisional read operation.
175 func (s *state) get(key string) ([]byte, Version) {
179 b, ok := s.state[key]
181 return b, Version{key, ok, n}
184 // AppliedIndex returns the raft log index (applied index) of current state
185 func (s *state) AppliedIndex() uint64 {
189 return s.appliedIndex
192 // NextNodeID generates an ID for the next node to join the cluster.
193 func (s *state) NextNodeID() (id, version uint64) {
197 id, n := proto.DecodeVarint(s.state[nextNodeID])
199 panic("raft: cannot decode nextNodeID")
201 return id, s.version[nextNodeID]
204 func (s *state) IsAllowedMember(addr string) bool {
205 _, ver := s.get(allowedMemberPrefix + "/" + addr)
209 func (s *state) IncrementNextNodeID(oldID uint64, index uint64) (instruction []byte) {
210 instruction, _ = proto.Marshal(&sinkpb.Instruction{
211 Conditions: []*sinkpb.Cond{{
212 Type: sinkpb.Cond_INDEX_EQUAL,
216 Operations: []*sinkpb.Op{{
219 Value: proto.EncodeVarint(oldID + 1),
225 func (s *state) EmptyWrite() (instruction []byte) {
226 instruction, _ = proto.Marshal(&sinkpb.Instruction{
227 Operations: []*sinkpb.Op{{