1 // Package sinkdb provides a strongly consistent key-value store.
11 "github.com/golang/protobuf/proto"
13 "chain/database/sinkdb/internal/sinkpb"
18 // ErrConflict is returned by Exec when an instruction was
19 // not completed because its preconditions were not met.
20 var ErrConflict = errors.New("transaction conflict")
22 // Open initializes the key-value store and returns a database handle.
23 func Open(laddr, dir string, httpClient *http.Client) (*DB, error) {
25 sv, err := raft.Start(laddr, dir, httpClient, state)
29 db := &DB{state: state, raft: sv}
33 // DB provides access to an opened kv store.
42 // Ping peforms an empty write to verify the connection to
43 // the rest of the cluster.
44 func (db *DB) Ping() error {
45 const timeout = 5 * time.Second
46 ctx, cancel := context.WithTimeout(context.Background(), timeout)
49 _, err := db.raft.Exec(ctx, db.state.EmptyWrite())
53 // Close closes the database handle releasing its resources. It is
54 // the caller's responsibility to ensure that there are no concurrent
55 // database operations in flight. Close is idempotent.
57 // All other methods have undefined behavior on a closed DB.
58 func (db *DB) Close() error {
61 if db.closed { // make Close idempotent
68 // Exec executes the provided operations
69 // after combining them with All.
70 func (db *DB) Exec(ctx context.Context, ops ...Op) error {
76 // Disallow multiple writes to the same key.
77 sort.Slice(all.effects, func(i, j int) bool {
78 return all.effects[i].Key < all.effects[j].Key
81 for _, e := range all.effects {
83 err := errors.New("duplicate write")
84 return errors.Wrap(err, e.Key)
89 encoded, err := proto.Marshal(&sinkpb.Instruction{
90 Conditions: all.conds,
91 Operations: all.effects,
96 satisfied, err := db.raft.Exec(ctx, encoded)
106 // Get performs a linearizable read of the provided key. The
107 // read value is unmarshalled into v.
108 func (db *DB) Get(ctx context.Context, key string, v proto.Message) (Version, error) {
109 err := db.raft.WaitRead(ctx)
111 return Version{}, err
113 buf, ver := db.state.get(key)
114 return ver, proto.Unmarshal(buf, v)
117 // GetStale performs a non-linearizable read of the provided key.
118 // The value may be stale. The read value is unmarshalled into v.
119 func (db *DB) GetStale(key string, v proto.Message) (Version, error) {
120 buf, ver := db.state.get(key) // read directly from state
121 return ver, proto.Unmarshal(buf, v)
124 // RaftService returns the raft service used for replication.
125 func (db *DB) RaftService() *raft.Service {