From 6a875604d5b513fb890e04e924c874d8c8aa7f21 Mon Sep 17 00:00:00 2001 From: AhaanKanaujia <kanaujia.ahaan@gmail.com> Date: Sat, 12 Apr 2025 21:18:13 -0500 Subject: [PATCH] initial commit leader election working --- go.mod | 3 + labgob/labgob.go | 177 +++++++++++++ labgob/test_test.go | 172 +++++++++++++ labrpc/labrpc.go | 511 +++++++++++++++++++++++++++++++++++++ labrpc/test_test.go | 597 ++++++++++++++++++++++++++++++++++++++++++++ raft/config.go | 461 ++++++++++++++++++++++++++++++++++ raft/raft.go | 570 ++++++++++++++++++++++++++++++++++++++++++ raft/test_test.go | 551 ++++++++++++++++++++++++++++++++++++++++ raft/util.go | 13 + 9 files changed, 3055 insertions(+) create mode 100644 go.mod create mode 100644 labgob/labgob.go create mode 100644 labgob/test_test.go create mode 100644 labrpc/labrpc.go create mode 100644 labrpc/test_test.go create mode 100644 raft/config.go create mode 100644 raft/raft.go create mode 100644 raft/test_test.go create mode 100644 raft/util.go diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..0190c42 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module raft + +go 1.22.0 diff --git a/labgob/labgob.go b/labgob/labgob.go new file mode 100644 index 0000000..22cb91a --- /dev/null +++ b/labgob/labgob.go @@ -0,0 +1,177 @@ +package labgob + +// +// trying to send non-capitalized fields over RPC produces a range of +// misbehavior, including both mysterious incorrect computation and +// outright crashes. so this wrapper around Go's encoding/gob warns +// about non-capitalized field names. +// + +import "encoding/gob" +import "io" +import "reflect" +import "fmt" +import "sync" +import "unicode" +import "unicode/utf8" + +var mu sync.Mutex +var errorCount int // for TestCapital +var checked map[reflect.Type]bool + +type LabEncoder struct { + gob *gob.Encoder +} + +func NewEncoder(w io.Writer) *LabEncoder { + enc := &LabEncoder{} + enc.gob = gob.NewEncoder(w) + return enc +} + +func (enc *LabEncoder) Encode(e interface{}) error { + checkValue(e) + return enc.gob.Encode(e) +} + +func (enc *LabEncoder) EncodeValue(value reflect.Value) error { + checkValue(value.Interface()) + return enc.gob.EncodeValue(value) +} + +type LabDecoder struct { + gob *gob.Decoder +} + +func NewDecoder(r io.Reader) *LabDecoder { + dec := &LabDecoder{} + dec.gob = gob.NewDecoder(r) + return dec +} + +func (dec *LabDecoder) Decode(e interface{}) error { + checkValue(e) + checkDefault(e) + return dec.gob.Decode(e) +} + +func Register(value interface{}) { + checkValue(value) + gob.Register(value) +} + +func RegisterName(name string, value interface{}) { + checkValue(value) + gob.RegisterName(name, value) +} + +func checkValue(value interface{}) { + checkType(reflect.TypeOf(value)) +} + +func checkType(t reflect.Type) { + k := t.Kind() + + mu.Lock() + // only complain once, and avoid recursion. + if checked == nil { + checked = map[reflect.Type]bool{} + } + if checked[t] { + mu.Unlock() + return + } + checked[t] = true + mu.Unlock() + + switch k { + case reflect.Struct: + for i := 0; i < t.NumField(); i++ { + f := t.Field(i) + rune, _ := utf8.DecodeRuneInString(f.Name) + if unicode.IsUpper(rune) == false { + // ta da + fmt.Printf("labgob error: lower-case field %v of %v in RPC or persist/snapshot will break your Raft\n", + f.Name, t.Name()) + mu.Lock() + errorCount += 1 + mu.Unlock() + } + checkType(f.Type) + } + return + case reflect.Slice, reflect.Array, reflect.Ptr: + checkType(t.Elem()) + return + case reflect.Map: + checkType(t.Elem()) + checkType(t.Key()) + return + default: + return + } +} + +// +// warn if the value contains non-default values, +// as it would if one sent an RPC but the reply +// struct was already modified. if the RPC reply +// contains default values, GOB won't overwrite +// the non-default value. +// +func checkDefault(value interface{}) { + if value == nil { + return + } + checkDefault1(reflect.ValueOf(value), 1, "") +} + +func checkDefault1(value reflect.Value, depth int, name string) { + if depth > 3 { + return + } + + t := value.Type() + k := t.Kind() + + switch k { + case reflect.Struct: + for i := 0; i < t.NumField(); i++ { + vv := value.Field(i) + name1 := t.Field(i).Name + if name != "" { + name1 = name + "." + name1 + } + checkDefault1(vv, depth+1, name1) + } + return + case reflect.Ptr: + if value.IsNil() { + return + } + checkDefault1(value.Elem(), depth+1, name) + return + case reflect.Bool, + reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64, + reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, + reflect.Uintptr, reflect.Float32, reflect.Float64, + reflect.String: + if reflect.DeepEqual(reflect.Zero(t).Interface(), value.Interface()) == false { + mu.Lock() + if errorCount < 1 { + what := name + if what == "" { + what = t.Name() + } + // this warning typically arises if code re-uses the same RPC reply + // variable for multiple RPC calls, or if code restores persisted + // state into variable that already have non-default values. + fmt.Printf("labgob warning: Decoding into a non-default variable/field %v may not work\n", + what) + } + errorCount += 1 + mu.Unlock() + } + return + } +} diff --git a/labgob/test_test.go b/labgob/test_test.go new file mode 100644 index 0000000..f6d9f4e --- /dev/null +++ b/labgob/test_test.go @@ -0,0 +1,172 @@ +package labgob + +import "testing" + +import "bytes" + +type T1 struct { + T1int0 int + T1int1 int + T1string0 string + T1string1 string +} + +type T2 struct { + T2slice []T1 + T2map map[int]*T1 + T2t3 interface{} +} + +type T3 struct { + T3int999 int +} + +// +// test that we didn't break GOB. +// +func TestGOB(t *testing.T) { + e0 := errorCount + + w := new(bytes.Buffer) + + Register(T3{}) + + { + x0 := 0 + x1 := 1 + t1 := T1{} + t1.T1int1 = 1 + t1.T1string1 = "6.824" + t2 := T2{} + t2.T2slice = []T1{T1{}, t1} + t2.T2map = map[int]*T1{} + t2.T2map[99] = &T1{1, 2, "x", "y"} + t2.T2t3 = T3{999} + + e := NewEncoder(w) + e.Encode(x0) + e.Encode(x1) + e.Encode(t1) + e.Encode(t2) + } + data := w.Bytes() + + { + var x0 int + var x1 int + var t1 T1 + var t2 T2 + + r := bytes.NewBuffer(data) + d := NewDecoder(r) + if d.Decode(&x0) != nil || + d.Decode(&x1) != nil || + d.Decode(&t1) != nil || + d.Decode(&t2) != nil { + t.Fatalf("Decode failed") + } + + if x0 != 0 { + t.Fatalf("wrong x0 %v\n", x0) + } + if x1 != 1 { + t.Fatalf("wrong x1 %v\n", x1) + } + if t1.T1int0 != 0 { + t.Fatalf("wrong t1.T1int0 %v\n", t1.T1int0) + } + if t1.T1int1 != 1 { + t.Fatalf("wrong t1.T1int1 %v\n", t1.T1int1) + } + if t1.T1string0 != "" { + t.Fatalf("wrong t1.T1string0 %v\n", t1.T1string0) + } + if t1.T1string1 != "6.824" { + t.Fatalf("wrong t1.T1string1 %v\n", t1.T1string1) + } + if len(t2.T2slice) != 2 { + t.Fatalf("wrong t2.T2slice len %v\n", len(t2.T2slice)) + } + if t2.T2slice[1].T1int1 != 1 { + t.Fatalf("wrong slice value\n") + } + if len(t2.T2map) != 1 { + t.Fatalf("wrong t2.T2map len %v\n", len(t2.T2map)) + } + if t2.T2map[99].T1string1 != "y" { + t.Fatalf("wrong map value\n") + } + t3 := (t2.T2t3).(T3) + if t3.T3int999 != 999 { + t.Fatalf("wrong t2.T2t3.T3int999\n") + } + } + + if errorCount != e0 { + t.Fatalf("there were errors, but should not have been") + } +} + +type T4 struct { + Yes int + no int +} + +// +// make sure we check capitalization +// labgob prints one warning during this test. +// +func TestCapital(t *testing.T) { + e0 := errorCount + + v := []map[*T4]int{} + + w := new(bytes.Buffer) + e := NewEncoder(w) + e.Encode(v) + data := w.Bytes() + + var v1 []map[T4]int + r := bytes.NewBuffer(data) + d := NewDecoder(r) + d.Decode(&v1) + + if errorCount != e0+1 { + t.Fatalf("failed to warn about lower-case field") + } +} + +// +// check that we warn when someone sends a default value over +// RPC but the target into which we're decoding holds a non-default +// value, which GOB seems not to overwrite as you'd expect. +// +// labgob does not print a warning. +// +func TestDefault(t *testing.T) { + e0 := errorCount + + type DD struct { + X int + } + + // send a default value... + dd1 := DD{} + + w := new(bytes.Buffer) + e := NewEncoder(w) + e.Encode(dd1) + data := w.Bytes() + + // and receive it into memory that already + // holds non-default values. + reply := DD{99} + + r := bytes.NewBuffer(data) + d := NewDecoder(r) + d.Decode(&reply) + + if errorCount != e0+1 { + t.Fatalf("failed to warn about decoding into non-default value") + } +} diff --git a/labrpc/labrpc.go b/labrpc/labrpc.go new file mode 100644 index 0000000..7d1d974 --- /dev/null +++ b/labrpc/labrpc.go @@ -0,0 +1,511 @@ +package labrpc + +// +// channel-based RPC, for 824 labs. +// +// simulates a network that can lose requests, lose replies, +// delay messages, and entirely disconnect particular hosts. +// +// we will use the original labrpc.go to test your code for grading. +// so, while you can modify this code to help you debug, please +// test against the original before submitting. +// +// adapted from Go net/rpc/server.go. +// +// sends labgob-encoded values to ensure that RPCs +// don't include references to program objects. +// +// net := MakeNetwork() -- holds network, clients, servers. +// end := net.MakeEnd(endname) -- create a client end-point, to talk to one server. +// net.AddServer(servername, server) -- adds a named server to network. +// net.DeleteServer(servername) -- eliminate the named server. +// net.Connect(endname, servername) -- connect a client to a server. +// net.Enable(endname, enabled) -- enable/disable a client. +// net.Reliable(bool) -- false means drop/delay messages +// +// end.Call("Raft.AppendEntries", &args, &reply) -- send an RPC, wait for reply. +// the "Raft" is the name of the server struct to be called. +// the "AppendEntries" is the name of the method to be called. +// Call() returns true to indicate that the server executed the request +// and the reply is valid. +// Call() returns false if the network lost the request or reply +// or the server is down. +// It is OK to have multiple Call()s in progress at the same time on the +// same ClientEnd. +// Concurrent calls to Call() may be delivered to the server out of order, +// since the network may re-order messages. +// Call() is guaranteed to return (perhaps after a delay) *except* if the +// handler function on the server side does not return. +// the server RPC handler function must declare its args and reply arguments +// as pointers, so that their types exactly match the types of the arguments +// to Call(). +// +// srv := MakeServer() +// srv.AddService(svc) -- a server can have multiple services, e.g. Raft and k/v +// pass srv to net.AddServer() +// +// svc := MakeService(receiverObject) -- obj's methods will handle RPCs +// much like Go's rpcs.Register() +// pass svc to srv.Addrvice() +// + +import "raft/labgob" +import "bytes" +import "reflect" +import "sync" +import "log" +import "strings" +import "math/rand" +import "time" +import "sync/atomic" + +type reqMsg struct { + endname interface{} // name of sending ClientEnd + svcMeth string // e.g. "Raft.AppendEntries" + argsType reflect.Type + args []byte + replyCh chan replyMsg +} + +type replyMsg struct { + ok bool + reply []byte +} + +type ClientEnd struct { + endname interface{} // this end-point's name + ch chan reqMsg // copy of Network.endCh + done chan struct{} // closed when Network is cleaned up +} + +// send an RPC, wait for the reply. +// the return value indicates success; false means that +// no reply was received from the server. +func (e *ClientEnd) Call(svcMeth string, args interface{}, reply interface{}) bool { + req := reqMsg{} + req.endname = e.endname + req.svcMeth = svcMeth + req.argsType = reflect.TypeOf(args) + req.replyCh = make(chan replyMsg) + + qb := new(bytes.Buffer) + qe := labgob.NewEncoder(qb) + qe.Encode(args) + req.args = qb.Bytes() + + // + // send the request. + // + select { + case e.ch <- req: + // the request has been sent. + case <-e.done: + // entire Network has been destroyed. + return false + } + + // + // wait for the reply. + // + rep := <-req.replyCh + if rep.ok { + rb := bytes.NewBuffer(rep.reply) + rd := labgob.NewDecoder(rb) + if err := rd.Decode(reply); err != nil { + log.Fatalf("ClientEnd.Call(): decode reply: %v\n", err) + } + return true + } else { + return false + } +} + +type Network struct { + mu sync.Mutex + reliable bool + longDelays bool // pause a long time on send on disabled connection + longReordering bool // sometimes delay replies a long time + ends map[interface{}]*ClientEnd // ends, by name + enabled map[interface{}]bool // by end name + servers map[interface{}]*Server // servers, by name + connections map[interface{}]interface{} // endname -> servername + endCh chan reqMsg + done chan struct{} // closed when Network is cleaned up + count int32 // total RPC count, for statistics + bytes int64 // total bytes send, for statistics +} + +func MakeNetwork() *Network { + rn := &Network{} + rn.reliable = true + rn.ends = map[interface{}]*ClientEnd{} + rn.enabled = map[interface{}]bool{} + rn.servers = map[interface{}]*Server{} + rn.connections = map[interface{}](interface{}){} + rn.endCh = make(chan reqMsg) + rn.done = make(chan struct{}) + + // single goroutine to handle all ClientEnd.Call()s + go func() { + for { + select { + case xreq := <-rn.endCh: + atomic.AddInt32(&rn.count, 1) + atomic.AddInt64(&rn.bytes, int64(len(xreq.args))) + go rn.processReq(xreq) + case <-rn.done: + return + } + } + }() + + return rn +} + +func (rn *Network) Cleanup() { + close(rn.done) +} + +func (rn *Network) Reliable(yes bool) { + rn.mu.Lock() + defer rn.mu.Unlock() + + rn.reliable = yes +} + +func (rn *Network) LongReordering(yes bool) { + rn.mu.Lock() + defer rn.mu.Unlock() + + rn.longReordering = yes +} + +func (rn *Network) LongDelays(yes bool) { + rn.mu.Lock() + defer rn.mu.Unlock() + + rn.longDelays = yes +} + +func (rn *Network) readEndnameInfo(endname interface{}) (enabled bool, + servername interface{}, server *Server, reliable bool, longreordering bool, +) { + rn.mu.Lock() + defer rn.mu.Unlock() + + enabled = rn.enabled[endname] + servername = rn.connections[endname] + if servername != nil { + server = rn.servers[servername] + } + reliable = rn.reliable + longreordering = rn.longReordering + return +} + +func (rn *Network) isServerDead(endname interface{}, servername interface{}, server *Server) bool { + rn.mu.Lock() + defer rn.mu.Unlock() + + if rn.enabled[endname] == false || rn.servers[servername] != server { + return true + } + return false +} + +func (rn *Network) processReq(req reqMsg) { + enabled, servername, server, reliable, longreordering := rn.readEndnameInfo(req.endname) + + if enabled && servername != nil && server != nil { + if reliable == false { + // short delay + ms := (rand.Int() % 27) + time.Sleep(time.Duration(ms) * time.Millisecond) + } + + if reliable == false && (rand.Int()%1000) < 100 { + // drop the request, return as if timeout + req.replyCh <- replyMsg{false, nil} + return + } + + // execute the request (call the RPC handler). + // in a separate thread so that we can periodically check + // if the server has been killed and the RPC should get a + // failure reply. + ech := make(chan replyMsg) + go func() { + r := server.dispatch(req) + ech <- r + }() + + // wait for handler to return, + // but stop waiting if DeleteServer() has been called, + // and return an error. + var reply replyMsg + replyOK := false + serverDead := false + for replyOK == false && serverDead == false { + select { + case reply = <-ech: + replyOK = true + case <-time.After(100 * time.Millisecond): + serverDead = rn.isServerDead(req.endname, servername, server) + if serverDead { + go func() { + <-ech // drain channel to let the goroutine created earlier terminate + }() + } + } + } + + // do not reply if DeleteServer() has been called, i.e. + // the server has been killed. this is needed to avoid + // situation in which a client gets a positive reply + // to an Append, but the server persisted the update + // into the old Persister. config.go is careful to call + // DeleteServer() before superseding the Persister. + serverDead = rn.isServerDead(req.endname, servername, server) + + if replyOK == false || serverDead == true { + // server was killed while we were waiting; return error. + req.replyCh <- replyMsg{false, nil} + } else if reliable == false && (rand.Int()%1000) < 100 { + // drop the reply, return as if timeout + req.replyCh <- replyMsg{false, nil} + } else if longreordering == true && rand.Intn(900) < 600 { + // delay the response for a while + ms := 200 + rand.Intn(1+rand.Intn(2000)) + // Russ points out that this timer arrangement will decrease + // the number of goroutines, so that the race + // detector is less likely to get upset. + time.AfterFunc(time.Duration(ms)*time.Millisecond, func() { + atomic.AddInt64(&rn.bytes, int64(len(reply.reply))) + req.replyCh <- reply + }) + } else { + atomic.AddInt64(&rn.bytes, int64(len(reply.reply))) + req.replyCh <- reply + } + } else { + // simulate no reply and eventual timeout. + ms := 0 + if rn.longDelays { + // let Raft tests check that leader doesn't send + // RPCs synchronously. + ms = (rand.Int() % 7000) + } else { + // many kv tests require the client to try each + // server in fairly rapid succession. + ms = (rand.Int() % 100) + } + time.AfterFunc(time.Duration(ms)*time.Millisecond, func() { + req.replyCh <- replyMsg{false, nil} + }) + } + +} + +// create a client end-point. +// start the thread that listens and delivers. +func (rn *Network) MakeEnd(endname interface{}) *ClientEnd { + rn.mu.Lock() + defer rn.mu.Unlock() + + if _, ok := rn.ends[endname]; ok { + log.Fatalf("MakeEnd: %v already exists\n", endname) + } + + e := &ClientEnd{} + e.endname = endname + e.ch = rn.endCh + e.done = rn.done + rn.ends[endname] = e + rn.enabled[endname] = false + rn.connections[endname] = nil + + return e +} + +func (rn *Network) AddServer(servername interface{}, rs *Server) { + rn.mu.Lock() + defer rn.mu.Unlock() + + rn.servers[servername] = rs +} + +func (rn *Network) DeleteServer(servername interface{}) { + rn.mu.Lock() + defer rn.mu.Unlock() + + rn.servers[servername] = nil +} + +// connect a ClientEnd to a server. +// a ClientEnd can only be connected once in its lifetime. +func (rn *Network) Connect(endname interface{}, servername interface{}) { + rn.mu.Lock() + defer rn.mu.Unlock() + + rn.connections[endname] = servername +} + +// enable/disable a ClientEnd. +func (rn *Network) Enable(endname interface{}, enabled bool) { + rn.mu.Lock() + defer rn.mu.Unlock() + + rn.enabled[endname] = enabled +} + +// get a server's count of incoming RPCs. +func (rn *Network) GetCount(servername interface{}) int { + rn.mu.Lock() + defer rn.mu.Unlock() + + svr := rn.servers[servername] + return svr.GetCount() +} + +func (rn *Network) GetTotalCount() int { + x := atomic.LoadInt32(&rn.count) + return int(x) +} + +func (rn *Network) GetTotalBytes() int64 { + x := atomic.LoadInt64(&rn.bytes) + return x +} + +// +// a server is a collection of services, all sharing +// the same rpc dispatcher. so that e.g. both a Raft +// and a k/v server can listen to the same rpc endpoint. +// +type Server struct { + mu sync.Mutex + services map[string]*Service + count int // incoming RPCs +} + +func MakeServer() *Server { + rs := &Server{} + rs.services = map[string]*Service{} + return rs +} + +func (rs *Server) AddService(svc *Service) { + rs.mu.Lock() + defer rs.mu.Unlock() + rs.services[svc.name] = svc +} + +func (rs *Server) dispatch(req reqMsg) replyMsg { + rs.mu.Lock() + + rs.count += 1 + + // split Raft.AppendEntries into service and method + dot := strings.LastIndex(req.svcMeth, ".") + serviceName := req.svcMeth[:dot] + methodName := req.svcMeth[dot+1:] + + service, ok := rs.services[serviceName] + + rs.mu.Unlock() + + if ok { + return service.dispatch(methodName, req) + } else { + choices := []string{} + for k, _ := range rs.services { + choices = append(choices, k) + } + log.Fatalf("labrpc.Server.dispatch(): unknown service %v in %v.%v; expecting one of %v\n", + serviceName, serviceName, methodName, choices) + return replyMsg{false, nil} + } +} + +func (rs *Server) GetCount() int { + rs.mu.Lock() + defer rs.mu.Unlock() + return rs.count +} + +// an object with methods that can be called via RPC. +// a single server may have more than one Service. +type Service struct { + name string + rcvr reflect.Value + typ reflect.Type + methods map[string]reflect.Method +} + +func MakeService(rcvr interface{}) *Service { + svc := &Service{} + svc.typ = reflect.TypeOf(rcvr) + svc.rcvr = reflect.ValueOf(rcvr) + svc.name = reflect.Indirect(svc.rcvr).Type().Name() + svc.methods = map[string]reflect.Method{} + + for m := 0; m < svc.typ.NumMethod(); m++ { + method := svc.typ.Method(m) + mtype := method.Type + mname := method.Name + + //fmt.Printf("%v pp %v ni %v 1k %v 2k %v no %v\n", + // mname, method.PkgPath, mtype.NumIn(), mtype.In(1).Kind(), mtype.In(2).Kind(), mtype.NumOut()) + + if method.PkgPath != "" || // capitalized? + mtype.NumIn() != 3 || + //mtype.In(1).Kind() != reflect.Ptr || + mtype.In(2).Kind() != reflect.Ptr || + mtype.NumOut() != 0 { + // the method is not suitable for a handler + //fmt.Printf("bad method: %v\n", mname) + } else { + // the method looks like a handler + svc.methods[mname] = method + } + } + + return svc +} + +func (svc *Service) dispatch(methname string, req reqMsg) replyMsg { + if method, ok := svc.methods[methname]; ok { + // prepare space into which to read the argument. + // the Value's type will be a pointer to req.argsType. + args := reflect.New(req.argsType) + + // decode the argument. + ab := bytes.NewBuffer(req.args) + ad := labgob.NewDecoder(ab) + ad.Decode(args.Interface()) + + // allocate space for the reply. + replyType := method.Type.In(2) + replyType = replyType.Elem() + replyv := reflect.New(replyType) + + // call the method. + function := method.Func + function.Call([]reflect.Value{svc.rcvr, args.Elem(), replyv}) + + // encode the reply. + rb := new(bytes.Buffer) + re := labgob.NewEncoder(rb) + re.EncodeValue(replyv) + + return replyMsg{true, rb.Bytes()} + } else { + choices := []string{} + for k, _ := range svc.methods { + choices = append(choices, k) + } + log.Fatalf("labrpc.Service.dispatch(): unknown method %v in %v; expecting one of %v\n", + methname, req.svcMeth, choices) + return replyMsg{false, nil} + } +} diff --git a/labrpc/test_test.go b/labrpc/test_test.go new file mode 100644 index 0000000..1ec3e65 --- /dev/null +++ b/labrpc/test_test.go @@ -0,0 +1,597 @@ +package labrpc + +import "testing" +import "strconv" +import "sync" +import "runtime" +import "time" +import "fmt" + +type JunkArgs struct { + X int +} +type JunkReply struct { + X string +} + +type JunkServer struct { + mu sync.Mutex + log1 []string + log2 []int +} + +func (js *JunkServer) Handler1(args string, reply *int) { + js.mu.Lock() + defer js.mu.Unlock() + js.log1 = append(js.log1, args) + *reply, _ = strconv.Atoi(args) +} + +func (js *JunkServer) Handler2(args int, reply *string) { + js.mu.Lock() + defer js.mu.Unlock() + js.log2 = append(js.log2, args) + *reply = "handler2-" + strconv.Itoa(args) +} + +func (js *JunkServer) Handler3(args int, reply *int) { + js.mu.Lock() + defer js.mu.Unlock() + time.Sleep(20 * time.Second) + *reply = -args +} + +// args is a pointer +func (js *JunkServer) Handler4(args *JunkArgs, reply *JunkReply) { + reply.X = "pointer" +} + +// args is a not pointer +func (js *JunkServer) Handler5(args JunkArgs, reply *JunkReply) { + reply.X = "no pointer" +} + +func (js *JunkServer) Handler6(args string, reply *int) { + js.mu.Lock() + defer js.mu.Unlock() + *reply = len(args) +} + +func (js *JunkServer) Handler7(args int, reply *string) { + js.mu.Lock() + defer js.mu.Unlock() + *reply = "" + for i := 0; i < args; i++ { + *reply = *reply + "y" + } +} + +func TestBasic(t *testing.T) { + runtime.GOMAXPROCS(4) + + rn := MakeNetwork() + defer rn.Cleanup() + + e := rn.MakeEnd("end1-99") + + js := &JunkServer{} + svc := MakeService(js) + + rs := MakeServer() + rs.AddService(svc) + rn.AddServer("server99", rs) + + rn.Connect("end1-99", "server99") + rn.Enable("end1-99", true) + + { + reply := "" + e.Call("JunkServer.Handler2", 111, &reply) + if reply != "handler2-111" { + t.Fatalf("wrong reply from Handler2") + } + } + + { + reply := 0 + e.Call("JunkServer.Handler1", "9099", &reply) + if reply != 9099 { + t.Fatalf("wrong reply from Handler1") + } + } +} + +func TestTypes(t *testing.T) { + runtime.GOMAXPROCS(4) + + rn := MakeNetwork() + defer rn.Cleanup() + + e := rn.MakeEnd("end1-99") + + js := &JunkServer{} + svc := MakeService(js) + + rs := MakeServer() + rs.AddService(svc) + rn.AddServer("server99", rs) + + rn.Connect("end1-99", "server99") + rn.Enable("end1-99", true) + + { + var args JunkArgs + var reply JunkReply + // args must match type (pointer or not) of handler. + e.Call("JunkServer.Handler4", &args, &reply) + if reply.X != "pointer" { + t.Fatalf("wrong reply from Handler4") + } + } + + { + var args JunkArgs + var reply JunkReply + // args must match type (pointer or not) of handler. + e.Call("JunkServer.Handler5", args, &reply) + if reply.X != "no pointer" { + t.Fatalf("wrong reply from Handler5") + } + } +} + +// +// does net.Enable(endname, false) really disconnect a client? +// +func TestDisconnect(t *testing.T) { + runtime.GOMAXPROCS(4) + + rn := MakeNetwork() + defer rn.Cleanup() + + e := rn.MakeEnd("end1-99") + + js := &JunkServer{} + svc := MakeService(js) + + rs := MakeServer() + rs.AddService(svc) + rn.AddServer("server99", rs) + + rn.Connect("end1-99", "server99") + + { + reply := "" + e.Call("JunkServer.Handler2", 111, &reply) + if reply != "" { + t.Fatalf("unexpected reply from Handler2") + } + } + + rn.Enable("end1-99", true) + + { + reply := 0 + e.Call("JunkServer.Handler1", "9099", &reply) + if reply != 9099 { + t.Fatalf("wrong reply from Handler1") + } + } +} + +// +// test net.GetCount() +// +func TestCounts(t *testing.T) { + runtime.GOMAXPROCS(4) + + rn := MakeNetwork() + defer rn.Cleanup() + + e := rn.MakeEnd("end1-99") + + js := &JunkServer{} + svc := MakeService(js) + + rs := MakeServer() + rs.AddService(svc) + rn.AddServer(99, rs) + + rn.Connect("end1-99", 99) + rn.Enable("end1-99", true) + + for i := 0; i < 17; i++ { + reply := "" + e.Call("JunkServer.Handler2", i, &reply) + wanted := "handler2-" + strconv.Itoa(i) + if reply != wanted { + t.Fatalf("wrong reply %v from Handler1, expecting %v", reply, wanted) + } + } + + n := rn.GetCount(99) + if n != 17 { + t.Fatalf("wrong GetCount() %v, expected 17\n", n) + } +} + +// +// test net.GetTotalBytes() +// +func TestBytes(t *testing.T) { + runtime.GOMAXPROCS(4) + + rn := MakeNetwork() + defer rn.Cleanup() + + e := rn.MakeEnd("end1-99") + + js := &JunkServer{} + svc := MakeService(js) + + rs := MakeServer() + rs.AddService(svc) + rn.AddServer(99, rs) + + rn.Connect("end1-99", 99) + rn.Enable("end1-99", true) + + for i := 0; i < 17; i++ { + args := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + args = args + args + args = args + args + reply := 0 + e.Call("JunkServer.Handler6", args, &reply) + wanted := len(args) + if reply != wanted { + t.Fatalf("wrong reply %v from Handler6, expecting %v", reply, wanted) + } + } + + n := rn.GetTotalBytes() + if n < 4828 || n > 6000 { + t.Fatalf("wrong GetTotalBytes() %v, expected about 5000\n", n) + } + + for i := 0; i < 17; i++ { + args := 107 + reply := "" + e.Call("JunkServer.Handler7", args, &reply) + wanted := args + if len(reply) != wanted { + t.Fatalf("wrong reply len=%v from Handler6, expecting %v", len(reply), wanted) + } + } + + nn := rn.GetTotalBytes() - n + if nn < 1800 || nn > 2500 { + t.Fatalf("wrong GetTotalBytes() %v, expected about 2000\n", nn) + } +} + +// +// test RPCs from concurrent ClientEnds +// +func TestConcurrentMany(t *testing.T) { + runtime.GOMAXPROCS(4) + + rn := MakeNetwork() + defer rn.Cleanup() + + js := &JunkServer{} + svc := MakeService(js) + + rs := MakeServer() + rs.AddService(svc) + rn.AddServer(1000, rs) + + ch := make(chan int) + + nclients := 20 + nrpcs := 10 + for ii := 0; ii < nclients; ii++ { + go func(i int) { + n := 0 + defer func() { ch <- n }() + + e := rn.MakeEnd(i) + rn.Connect(i, 1000) + rn.Enable(i, true) + + for j := 0; j < nrpcs; j++ { + arg := i*100 + j + reply := "" + e.Call("JunkServer.Handler2", arg, &reply) + wanted := "handler2-" + strconv.Itoa(arg) + if reply != wanted { + t.Fatalf("wrong reply %v from Handler1, expecting %v", reply, wanted) + } + n += 1 + } + }(ii) + } + + total := 0 + for ii := 0; ii < nclients; ii++ { + x := <-ch + total += x + } + + if total != nclients*nrpcs { + t.Fatalf("wrong number of RPCs completed, got %v, expected %v", total, nclients*nrpcs) + } + + n := rn.GetCount(1000) + if n != total { + t.Fatalf("wrong GetCount() %v, expected %v\n", n, total) + } +} + +// +// test unreliable +// +func TestUnreliable(t *testing.T) { + runtime.GOMAXPROCS(4) + + rn := MakeNetwork() + defer rn.Cleanup() + rn.Reliable(false) + + js := &JunkServer{} + svc := MakeService(js) + + rs := MakeServer() + rs.AddService(svc) + rn.AddServer(1000, rs) + + ch := make(chan int) + + nclients := 300 + for ii := 0; ii < nclients; ii++ { + go func(i int) { + n := 0 + defer func() { ch <- n }() + + e := rn.MakeEnd(i) + rn.Connect(i, 1000) + rn.Enable(i, true) + + arg := i * 100 + reply := "" + ok := e.Call("JunkServer.Handler2", arg, &reply) + if ok { + wanted := "handler2-" + strconv.Itoa(arg) + if reply != wanted { + t.Fatalf("wrong reply %v from Handler1, expecting %v", reply, wanted) + } + n += 1 + } + }(ii) + } + + total := 0 + for ii := 0; ii < nclients; ii++ { + x := <-ch + total += x + } + + if total == nclients || total == 0 { + t.Fatalf("all RPCs succeeded despite unreliable") + } +} + +// +// test concurrent RPCs from a single ClientEnd +// +func TestConcurrentOne(t *testing.T) { + runtime.GOMAXPROCS(4) + + rn := MakeNetwork() + defer rn.Cleanup() + + js := &JunkServer{} + svc := MakeService(js) + + rs := MakeServer() + rs.AddService(svc) + rn.AddServer(1000, rs) + + e := rn.MakeEnd("c") + rn.Connect("c", 1000) + rn.Enable("c", true) + + ch := make(chan int) + + nrpcs := 20 + for ii := 0; ii < nrpcs; ii++ { + go func(i int) { + n := 0 + defer func() { ch <- n }() + + arg := 100 + i + reply := "" + e.Call("JunkServer.Handler2", arg, &reply) + wanted := "handler2-" + strconv.Itoa(arg) + if reply != wanted { + t.Fatalf("wrong reply %v from Handler2, expecting %v", reply, wanted) + } + n += 1 + }(ii) + } + + total := 0 + for ii := 0; ii < nrpcs; ii++ { + x := <-ch + total += x + } + + if total != nrpcs { + t.Fatalf("wrong number of RPCs completed, got %v, expected %v", total, nrpcs) + } + + js.mu.Lock() + defer js.mu.Unlock() + if len(js.log2) != nrpcs { + t.Fatalf("wrong number of RPCs delivered") + } + + n := rn.GetCount(1000) + if n != total { + t.Fatalf("wrong GetCount() %v, expected %v\n", n, total) + } +} + +// +// regression: an RPC that's delayed during Enabled=false +// should not delay subsequent RPCs (e.g. after Enabled=true). +// +func TestRegression1(t *testing.T) { + runtime.GOMAXPROCS(4) + + rn := MakeNetwork() + defer rn.Cleanup() + + js := &JunkServer{} + svc := MakeService(js) + + rs := MakeServer() + rs.AddService(svc) + rn.AddServer(1000, rs) + + e := rn.MakeEnd("c") + rn.Connect("c", 1000) + + // start some RPCs while the ClientEnd is disabled. + // they'll be delayed. + rn.Enable("c", false) + ch := make(chan bool) + nrpcs := 20 + for ii := 0; ii < nrpcs; ii++ { + go func(i int) { + ok := false + defer func() { ch <- ok }() + + arg := 100 + i + reply := "" + // this call ought to return false. + e.Call("JunkServer.Handler2", arg, &reply) + ok = true + }(ii) + } + + time.Sleep(100 * time.Millisecond) + + // now enable the ClientEnd and check that an RPC completes quickly. + t0 := time.Now() + rn.Enable("c", true) + { + arg := 99 + reply := "" + e.Call("JunkServer.Handler2", arg, &reply) + wanted := "handler2-" + strconv.Itoa(arg) + if reply != wanted { + t.Fatalf("wrong reply %v from Handler2, expecting %v", reply, wanted) + } + } + dur := time.Since(t0).Seconds() + + if dur > 0.03 { + t.Fatalf("RPC took too long (%v) after Enable", dur) + } + + for ii := 0; ii < nrpcs; ii++ { + <-ch + } + + js.mu.Lock() + defer js.mu.Unlock() + if len(js.log2) != 1 { + t.Fatalf("wrong number (%v) of RPCs delivered, expected 1", len(js.log2)) + } + + n := rn.GetCount(1000) + if n != 1 { + t.Fatalf("wrong GetCount() %v, expected %v\n", n, 1) + } +} + +// +// if an RPC is stuck in a server, and the server +// is killed with DeleteServer(), does the RPC +// get un-stuck? +// +func TestKilled(t *testing.T) { + runtime.GOMAXPROCS(4) + + rn := MakeNetwork() + defer rn.Cleanup() + + e := rn.MakeEnd("end1-99") + + js := &JunkServer{} + svc := MakeService(js) + + rs := MakeServer() + rs.AddService(svc) + rn.AddServer("server99", rs) + + rn.Connect("end1-99", "server99") + rn.Enable("end1-99", true) + + doneCh := make(chan bool) + go func() { + reply := 0 + ok := e.Call("JunkServer.Handler3", 99, &reply) + doneCh <- ok + }() + + time.Sleep(1000 * time.Millisecond) + + select { + case <-doneCh: + t.Fatalf("Handler3 should not have returned yet") + case <-time.After(100 * time.Millisecond): + } + + rn.DeleteServer("server99") + + select { + case x := <-doneCh: + if x != false { + t.Fatalf("Handler3 returned successfully despite DeleteServer()") + } + case <-time.After(100 * time.Millisecond): + t.Fatalf("Handler3 should return after DeleteServer()") + } +} + +func TestBenchmark(t *testing.T) { + runtime.GOMAXPROCS(4) + + rn := MakeNetwork() + defer rn.Cleanup() + + e := rn.MakeEnd("end1-99") + + js := &JunkServer{} + svc := MakeService(js) + + rs := MakeServer() + rs.AddService(svc) + rn.AddServer("server99", rs) + + rn.Connect("end1-99", "server99") + rn.Enable("end1-99", true) + + t0 := time.Now() + n := 100000 + for iters := 0; iters < n; iters++ { + reply := "" + e.Call("JunkServer.Handler2", 111, &reply) + if reply != "handler2-111" { + t.Fatalf("wrong reply from Handler2") + } + } + fmt.Printf("%v for %v\n", time.Since(t0), n) + // march 2016, rtm laptop, 22 microseconds per RPC +} diff --git a/raft/config.go b/raft/config.go new file mode 100644 index 0000000..fbb24fa --- /dev/null +++ b/raft/config.go @@ -0,0 +1,461 @@ +package raft + +// +// support for Raft tester. +// +// we will use the original config.go to test your code for grading. +// so, while you can modify this code to help you debug, please +// test with the original before submitting. +// + +import "raft/labrpc" +import "log" +import "sync" +import "testing" +import "runtime" +import "math/rand" +import crand "crypto/rand" +import "math/big" +import "encoding/base64" +import "time" +import "fmt" + +func randstring(n int) string { + b := make([]byte, 2*n) + crand.Read(b) + s := base64.URLEncoding.EncodeToString(b) + return s[0:n] +} + +func makeSeed() int64 { + max := big.NewInt(int64(1) << 62) + bigx, _ := crand.Int(crand.Reader, max) + x := bigx.Int64() + return x +} + +type config struct { + mu sync.Mutex + t *testing.T + net *labrpc.Network + n int + rafts []*Raft + applyErr []string // from apply channel readers + connected []bool // whether each server is on the net + endnames [][]string // the port file names each sends to + logs []map[int]interface{} // copy of each server's committed entries + start time.Time // time at which make_config() was called + // begin()/end() statistics + t0 time.Time // time at which test_test.go called cfg.begin() + rpcs0 int // rpcTotal() at start of test + cmds0 int // number of agreements + bytes0 int64 + maxIndex int + maxIndex0 int +} + +var ncpu_once sync.Once + +func make_config(t *testing.T, n int, unreliable bool) *config { + ncpu_once.Do(func() { + if runtime.NumCPU() < 2 { + fmt.Printf("warning: only one CPU, which may conceal locking bugs\n") + } + rand.Seed(makeSeed()) + }) + runtime.GOMAXPROCS(4) + cfg := &config{} + cfg.t = t + cfg.net = labrpc.MakeNetwork() + cfg.n = n + cfg.applyErr = make([]string, cfg.n) + cfg.rafts = make([]*Raft, cfg.n) + cfg.connected = make([]bool, cfg.n) + cfg.endnames = make([][]string, cfg.n) + cfg.logs = make([]map[int]interface{}, cfg.n) + cfg.start = time.Now() + + cfg.setunreliable(unreliable) + + cfg.net.LongDelays(true) + + // create a full set of Rafts. + for i := 0; i < cfg.n; i++ { + cfg.logs[i] = map[int]interface{}{} + cfg.start1(i) + } + + // connect everyone + for i := 0; i < cfg.n; i++ { + cfg.connect(i) + } + + return cfg +} + +// +// start or re-start a Raft. +// if one already exists, "kill" it first. +// allocate new outgoing port file names, and a new +// state persister, to isolate previous instance of +// this server. since we cannot really kill it. +// +func (cfg *config) start1(i int) { + + // a fresh set of outgoing ClientEnd names. + // so that old crashed instance's ClientEnds can't send. + cfg.endnames[i] = make([]string, cfg.n) + for j := 0; j < cfg.n; j++ { + cfg.endnames[i][j] = randstring(20) + } + + // a fresh set of ClientEnds. + ends := make([]*labrpc.ClientEnd, cfg.n) + for j := 0; j < cfg.n; j++ { + ends[j] = cfg.net.MakeEnd(cfg.endnames[i][j]) + cfg.net.Connect(cfg.endnames[i][j], j) + } + + // listen to messages from Raft indicating newly committed messages. + applyCh := make(chan ApplyMsg) + go func() { + for m := range applyCh { + err_msg := "" + if m.CommandValid == false { + // ignore other types of ApplyMsg + } else { + v := m.Command + cfg.mu.Lock() + for j := 0; j < len(cfg.logs); j++ { + if old, oldok := cfg.logs[j][m.CommandIndex]; oldok && old != v { + // some server has already committed a different value for this entry! + err_msg = fmt.Sprintf("commit index=%v server=%v %v != server=%v %v", + m.CommandIndex, i, m.Command, j, old) + } + } + _, prevok := cfg.logs[i][m.CommandIndex-1] + cfg.logs[i][m.CommandIndex] = v + if m.CommandIndex > cfg.maxIndex { + cfg.maxIndex = m.CommandIndex + } + cfg.mu.Unlock() + + if m.CommandIndex > 1 && prevok == false { + err_msg = fmt.Sprintf("server %v apply out of order %v", i, m.CommandIndex) + } + } + + if err_msg != "" { + log.Fatalf("apply error: %v\n", err_msg) + cfg.applyErr[i] = err_msg + // keep reading after error so that Raft doesn't block + // holding locks... + } + } + }() + + rf := Make(ends, i, applyCh) + + cfg.mu.Lock() + cfg.rafts[i] = rf + cfg.mu.Unlock() + + svc := labrpc.MakeService(rf) + srv := labrpc.MakeServer() + srv.AddService(svc) + cfg.net.AddServer(i, srv) +} + +func (cfg *config) checkTimeout() { + // enforce a two minute real-time limit on each test + if !cfg.t.Failed() && time.Since(cfg.start) > 120*time.Second { + cfg.t.Fatal("test took longer than 120 seconds") + } +} + +func (cfg *config) cleanup() { + for i := 0; i < len(cfg.rafts); i++ { + if cfg.rafts[i] != nil { + cfg.rafts[i].Kill() + } + } + cfg.net.Cleanup() + cfg.checkTimeout() +} + +// attach server i to the net. +func (cfg *config) connect(i int) { + // fmt.Printf("connect(%d)\n", i) + + cfg.connected[i] = true + + // outgoing ClientEnds + for j := 0; j < cfg.n; j++ { + if cfg.connected[j] { + endname := cfg.endnames[i][j] + cfg.net.Enable(endname, true) + } + } + + // incoming ClientEnds + for j := 0; j < cfg.n; j++ { + if cfg.connected[j] { + endname := cfg.endnames[j][i] + cfg.net.Enable(endname, true) + } + } +} + +// detach server i from the net. +func (cfg *config) disconnect(i int) { + // fmt.Printf("disconnect(%d)\n", i) + + cfg.connected[i] = false + + // outgoing ClientEnds + for j := 0; j < cfg.n; j++ { + if cfg.endnames[i] != nil { + endname := cfg.endnames[i][j] + cfg.net.Enable(endname, false) + } + } + + // incoming ClientEnds + for j := 0; j < cfg.n; j++ { + if cfg.endnames[j] != nil { + endname := cfg.endnames[j][i] + cfg.net.Enable(endname, false) + } + } +} + +func (cfg *config) rpcCount(server int) int { + return cfg.net.GetCount(server) +} + +func (cfg *config) rpcTotal() int { + return cfg.net.GetTotalCount() +} + +func (cfg *config) setunreliable(unrel bool) { + cfg.net.Reliable(!unrel) +} + +func (cfg *config) bytesTotal() int64 { + return cfg.net.GetTotalBytes() +} + +func (cfg *config) setlongreordering(longrel bool) { + cfg.net.LongReordering(longrel) +} + +// check that there's exactly one leader. +// try a few times in case re-elections are needed. +func (cfg *config) checkOneLeader() int { + for iters := 0; iters < 10; iters++ { + ms := 450 + (rand.Int63() % 100) + time.Sleep(time.Duration(ms) * time.Millisecond) + + leaders := make(map[int][]int) + for i := 0; i < cfg.n; i++ { + if cfg.connected[i] { + if term, leader := cfg.rafts[i].GetState(); leader { + leaders[term] = append(leaders[term], i) + } + } + } + + lastTermWithLeader := -1 + for term, leaders := range leaders { + if len(leaders) > 1 { + cfg.t.Fatalf("term %d has %d (>1) leaders", term, len(leaders)) + } + if term > lastTermWithLeader { + lastTermWithLeader = term + } + } + + if len(leaders) != 0 { + return leaders[lastTermWithLeader][0] + } + } + cfg.t.Fatalf("expected one leader, got none") + return -1 +} + +// check that everyone agrees on the term. +func (cfg *config) checkTerms() int { + term := -1 + for i := 0; i < cfg.n; i++ { + if cfg.connected[i] { + xterm, _ := cfg.rafts[i].GetState() + if term == -1 { + term = xterm + } else if term != xterm { + cfg.t.Fatalf("servers disagree on term") + } + } + } + return term +} + +// check that there's no leader +func (cfg *config) checkNoLeader() { + for i := 0; i < cfg.n; i++ { + if cfg.connected[i] { + _, is_leader := cfg.rafts[i].GetState() + if is_leader { + cfg.t.Fatalf("expected no leader, but %v claims to be leader", i) + } + } + } +} + +// how many servers think a log entry is committed? +func (cfg *config) nCommitted(index int) (int, interface{}) { + count := 0 + var cmd interface{} = nil + for i := 0; i < len(cfg.rafts); i++ { + if cfg.applyErr[i] != "" { + cfg.t.Fatal(cfg.applyErr[i]) + } + + cfg.mu.Lock() + cmd1, ok := cfg.logs[i][index] + cfg.mu.Unlock() + + if ok { + if count > 0 && cmd != cmd1 { + cfg.t.Fatalf("committed values do not match: index %v, %v, %v\n", + index, cmd, cmd1) + } + count += 1 + cmd = cmd1 + } + } + return count, cmd +} + +// wait for at least n servers to commit. +// but don't wait forever. +func (cfg *config) wait(index int, n int, startTerm int) interface{} { + to := 10 * time.Millisecond + for iters := 0; iters < 30; iters++ { + nd, _ := cfg.nCommitted(index) + if nd >= n { + break + } + time.Sleep(to) + if to < time.Second { + to *= 2 + } + if startTerm > -1 { + for _, r := range cfg.rafts { + if t, _ := r.GetState(); t > startTerm { + // someone has moved on + // can no longer guarantee that we'll "win" + return -1 + } + } + } + } + nd, cmd := cfg.nCommitted(index) + if nd < n { + cfg.t.Fatalf("only %d decided for index %d; wanted %d\n", + nd, index, n) + } + return cmd +} + +// do a complete agreement. +// it might choose the wrong leader initially, +// and have to re-submit after giving up. +// entirely gives up after about 10 seconds. +// indirectly checks that the servers agree on the +// same value, since nCommitted() checks this, +// as do the threads that read from applyCh. +// returns index. +// if retry==true, may submit the command multiple +// times, in case a leader fails just after Start(). +// if retry==false, calls Start() only once, in order +// to simplify the early Lab 2B tests. +func (cfg *config) one(cmd interface{}, expectedServers int, retry bool) int { + t0 := time.Now() + starts := 0 + for time.Since(t0).Seconds() < 10 { + // try all the servers, maybe one is the leader. + index := -1 + for si := 0; si < cfg.n; si++ { + starts = (starts + 1) % cfg.n + var rf *Raft + cfg.mu.Lock() + if cfg.connected[starts] { + rf = cfg.rafts[starts] + } + cfg.mu.Unlock() + if rf != nil { + index1, _, ok := rf.Start(cmd) + if ok { + index = index1 + break + } + } + } + + if index != -1 { + // somebody claimed to be the leader and to have + // submitted our command; wait a while for agreement. + t1 := time.Now() + for time.Since(t1).Seconds() < 2 { + nd, cmd1 := cfg.nCommitted(index) + if nd > 0 && nd >= expectedServers { + // committed + if cmd1 == cmd { + // and it was the command we submitted. + return index + } + } + time.Sleep(20 * time.Millisecond) + } + if retry == false { + cfg.t.Fatalf("one(%v) failed to reach agreement", cmd) + } + } else { + time.Sleep(50 * time.Millisecond) + } + } + cfg.t.Fatalf("one(%v) failed to reach agreement", cmd) + return -1 +} + +// start a Test. +// print the Test message. +// e.g. cfg.begin("Test (2B): RPC counts aren't too high") +func (cfg *config) begin(description string) { + fmt.Printf("%s ...\n", description) + cfg.t0 = time.Now() + cfg.rpcs0 = cfg.rpcTotal() + cfg.bytes0 = cfg.bytesTotal() + cfg.cmds0 = 0 + cfg.maxIndex0 = cfg.maxIndex +} + +// end a Test -- the fact that we got here means there +// was no failure. +// print the Passed message, +// and some performance numbers. +func (cfg *config) end() { + cfg.checkTimeout() + if cfg.t.Failed() == false { + cfg.mu.Lock() + t := time.Since(cfg.t0).Seconds() // real time + npeers := cfg.n // number of Raft peers + nrpc := cfg.rpcTotal() - cfg.rpcs0 // number of RPC sends + nbytes := cfg.bytesTotal() - cfg.bytes0 // number of bytes + ncmds := cfg.maxIndex - cfg.maxIndex0 // number of Raft agreements reported + cfg.mu.Unlock() + + fmt.Printf(" ... Passed --") + fmt.Printf(" %4.1f %d %4d %7d %4d\n", t, npeers, nrpc, nbytes, ncmds) + } +} diff --git a/raft/raft.go b/raft/raft.go new file mode 100644 index 0000000..3839334 --- /dev/null +++ b/raft/raft.go @@ -0,0 +1,570 @@ +package raft + +// +// this is an outline of the API that raft must expose to +// the service (or tester). see comments below for +// each of these functions for more details. +// +// rf = Make(...) +// create a new Raft server. +// rf.Start(command interface{}) (index, term, isleader) +// start agreement on a new log entry +// rf.GetState() (term, isLeader) +// ask a Raft for its current term, and whether it thinks it is leader +// ApplyMsg +// each time a new entry is committed to the log, each Raft peer +// should send an ApplyMsg to the service (or tester) +// in the same server. +// + +import "sync" +import "sync/atomic" +import "raft/labrpc" + +import "time" +import "math/rand" + +// +// as each Raft peer becomes aware that successive log entries are +// committed, the peer should send an ApplyMsg to the service (or +// tester) on the same server, via the applyCh passed to Make(). set +// CommandValid to true to indicate that the ApplyMsg contains a newly +// committed log entry. +// +type ApplyMsg struct { + CommandValid bool + Command interface{} + CommandIndex int +} + +// LogEntry struct containing command and term +type LogEntry struct { + Command interface{} // command for state machine + Term int // term when entry was received by leader +} + +// +// A Go object implementing a single Raft peer. +// +type Raft struct { + mu sync.Mutex // Lock to protect shared access to this peer's state + peers []*labrpc.ClientEnd // RPC end points of all peers + me int // this peer's index into peers[] + dead int32 // set by Kill() + + // Your data here (2A, 2B). + // Look at the paper's Figure 2 for a description of what + // state a Raft server must maintain. + // You may also need to add other state, as per your implementation. + + // persistent state on all servers + currentTerm int // latest term server has seen + votedFor int // candidateId that received vote in current term + log []LogEntry // (command, entry) of each log entry, first index is 1 + + CurrentState string // server state (F: follower, C: candidate, L: leader) + + // volatile state on all servers + commitIndex int // index of highest log entry known to be committed + lastApplied int // index of highest log entry applied to state machine + + // volatile state on leaders, reinitialized after election + nextIndex []int // index of next entry to send to each server + matchIndex []int // index of highest entry known to be replicated on server + + // channel to send ApplyMsg to service + applyCh chan ApplyMsg + + // election timers + electionTimeout *time.Timer // timeout for election + heartbeatTimeout *time.Timer // timeout for heartbeat +} + +// return currentTerm and whether this server +// believes it is the leader. +func (rf *Raft) GetState() (int, bool) { + + var term int + var isleader bool + // Your code here (2A). + + rf.mu.Lock() + defer rf.mu.Unlock() + + term = rf.currentTerm + if rf.CurrentState == "L" { + isleader = true + } else { + isleader = false + } + + return term, isleader +} + +// +// example RequestVote RPC arguments structure. +// field names must start with capital letters! +// +type RequestVoteArgs struct { + // Your data here (2A, 2B). + Term int // term of candidate + CandidateId int // candidate requesting vote + LastLogIndex int // index of candidate last log entry + LastLogTerm int // term of candidate last log entry +} + +// +// example RequestVote RPC reply structure. +// field names must start with capital letters! +// +type RequestVoteReply struct { + // Your data here (2A). + Term int // current term (for candidate to update itself) + VoteGranted bool // true if candidate received vote +} + +// check if a candidate's log is at least as up-to-date as receiver's +func (rf *Raft) checkCandidateUpToDate(args *RequestVoteArgs) bool { + lastLogIndex := len(rf.log) - 1 + if args.LastLogTerm > rf.log[lastLogIndex].Term { + return true + } else if args.LastLogTerm == rf.log[lastLogIndex].Term { + if args.LastLogIndex >= lastLogIndex { + return true + } + } + return false +} + +// +// example RequestVote RPC handler. +// +func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) { + // Your code here (2A, 2B). + // Read the fields in "args", + // and accordingly assign the values for fields in "reply". + + if rf.killed() { + return + } + + rf.mu.Lock() + defer rf.mu.Unlock() + + // candidate asking for vote has lower term + if args.Term < rf.currentTerm { + reply.VoteGranted = false + reply.Term = rf.currentTerm + return + } + + // candidate asking for vote has higher term + if args.Term > rf.currentTerm { + rf.currentTerm = args.Term + rf.votedFor = -1 + rf.CurrentState = "F" + rf.resetElectionTimer() + } + + candidateUpToDate := rf.checkCandidateUpToDate(args) + if (rf.votedFor == -1 || rf.votedFor == args.CandidateId) && candidateUpToDate { + rf.votedFor = args.CandidateId + reply.VoteGranted = true + rf.resetElectionTimer() + } else { + reply.VoteGranted = false + } + reply.Term = rf.currentTerm +} + +// +// example code to send a RequestVote RPC to a server. +// server is the index of the target server in rf.peers[]. +// expects RPC arguments in args. +// fills in *reply with RPC reply, so caller should +// pass &reply. +// the types of the args and reply passed to Call() must be +// the same as the types of the arguments declared in the +// handler function (including whether they are pointers). +// +// The labrpc package simulates a lossy network, in which servers +// may be unreachable, and in which requests and replies may be lost. +// Call() sends a request and waits for a reply. If a reply arrives +// within a timeout interval, Call() returns true; otherwise +// Call() returns false. Thus Call() may not return for a while. +// A false return can be caused by a dead server, a live server that +// can't be reached, a lost request, or a lost reply. +// +// Call() is guaranteed to return (perhaps after a delay) except if the +// handler function on the server side does not return. Thus there +// is no need to implement your own timeouts around Call(). +// +// look at the comments in ../labrpc/labrpc.go for more details. +// +// if you're having trouble getting RPC to work, check that you've +// capitalized all field names in structs passed over RPC, and +// that the caller passes the address of the reply struct with &, not +// the struct itself. +// +func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool { + ok := rf.peers[server].Call("Raft.RequestVote", args, reply) + + rf.mu.Lock() + defer rf.mu.Unlock() + + if ok { + // reply term greater than current term + if reply.Term > rf.currentTerm { + rf.currentTerm = reply.Term + rf.votedFor = -1 + rf.CurrentState = "F" + rf.resetElectionTimer() + } + } + + return ok +} + +// AppendEntriesArgs struct containing args for AppendEntries RPC +type AppendEntriesArgs struct { + Term int // term of leader + LeaderId int // follower can redirect clients + PrevLogIndex int // index of log entry immediately preceding new ones + PrevLogTerm int // term of prevLogIndex entry + Entries []LogEntry // log entries to store (empty for heartbeat) + LeaderCommit int // commitIndex of leader +} + +// AppendEntriesReply struct containing reply for AppendEntries RPC +type AppendEntriesReply struct { + Term int // current term (for leader to update itself) + Success bool // true if follower contained entry matching prevLogIndex and prevLogTerm +} + +// AppendEntries RPC handler +func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) { + if rf.killed() { + return + } + + rf.mu.Lock() + defer rf.mu.Unlock() + + rf.resetElectionTimer() + reply.Success = false + + // candidate term is less than current term + if args.Term < rf.currentTerm { + reply.Term = rf.currentTerm + return + } + + // candidate term is greater than current term + // revert to follower state + if args.Term > rf.currentTerm { + rf.currentTerm = args.Term + rf.votedFor = -1 + rf.CurrentState = "F" + } + + // check if log contains entry at prevLogIndex matching prevLogTerm + if args.PrevLogIndex >= len(rf.log) || rf.log[args.PrevLogIndex].Term != args.PrevLogTerm { + reply.Term = rf.currentTerm + return + } + + // check if existing entry conflicts with new entries + // delete any conflicting entries + for i := len(rf.log) - 1; i > args.PrevLogIndex; i-- { + if rf.log[i].Term != entries[i].Term { + rf.log = rf.log[:i] + break + } + } + + // append any new entries + if len(args.Entries) > 0 { + rf.log = append(rf.log, args.Entries...) + } + + // update commit index + if args.LeaderCommit > rf.commitIndex { + if args.LeaderCommit < len(rf.log) - 1 { + rf.commitIndex = args.LeaderCommit + } else { + rf.commitIndex = len(rf.log) - 1 + } + } + + reply.Term = rf.currentTerm + reply.Success = true +} + +func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool { + ok := rf.peers[server].Call("Raft.AppendEntries", args, reply) + + rf.mu.Lock() + defer rf.mu.Unlock() + + if ok { + // reply term greater than current term + if reply.Term > rf.currentTerm { + rf.currentTerm = reply.Term + rf.votedFor = -1 + rf.CurrentState = "F" + rf.resetElectionTimer() + return ok + } + } else { + return ok + } + + return ok +} + +func (rf *Raft) sendHeartbeats() { + for { + rf.mu.Lock() + + // must be a leader to send heartbeats + if rf.CurrentState != "L" { + rf.mu.Unlock() + // wait a little before checking again if leader + time.Sleep(10 * time.Millisecond) + continue + } + + lastLogIndex := len(rf.log) - 1 + // sendAppendEntriesArgs + args := AppendEntriesArgs{ + Term: rf.currentTerm, + LeaderId: rf.me, + PrevLogIndex: lastLogIndex, + PrevLogTerm: rf.log[lastLogIndex].Term, + Entries: []LogEntry{}, + LeaderCommit: rf.commitIndex, + } + + peersCopy := make([]int, 0) + for i := 0; i < len(rf.peers); i++ { + if i != rf.me { + peersCopy = append(peersCopy, i) + } + } + + // unlock to send AppendEntries RPC + rf.mu.Unlock() + + // send heartbeat to all peers + for _, server := range peersCopy { + // create server for each peer to send AppendEntries RPC back to + go func(server int, args AppendEntriesArgs) { + reply := AppendEntriesReply{} + ok := rf.sendAppendEntries(server, &args, &reply) + + if ok { + rf.mu.Lock() + // check if reply corresponds to current term + if rf.CurrentState == "L" && rf.currentTerm == args.Term { + if reply.Term > rf.currentTerm { + rf.currentTerm = reply.Term + rf.votedFor = -1 + rf.CurrentState = "F" + rf.resetElectionTimer() + } + } + rf.mu.Unlock() + } + }(server, args) + } + + // heartbeat duration + time.Sleep(100 * time.Millisecond) + } +} + +func (rf *Raft) resetElectionTimer() { + // reset election timer + if rf.electionTimeout != nil { + rf.electionTimeout.Stop() + } + + // set new election timeout (randomized between 300ms and 1000ms) + timeoutValue := time.Duration(300 + rand.Intn(700)) * time.Millisecond + rf.electionTimeout = time.AfterFunc(timeoutValue, func() { + rf.startElection() + }) +} + +func (rf *Raft) startElection() { + rf.mu.Lock() + + // start election only if server is not leader + if rf.CurrentState == "L" { + rf.mu.Unlock() + return + } + + // increment current term and change state to candidate + rf.currentTerm++ + rf.votedFor = rf.me + rf.CurrentState = "C" + rf.resetElectionTimer() + + // vote count + voteCount := 1 + votesRequired := len(rf.peers) / 2 + 1 + + currentTermCopy := rf.currentTerm + candidateIdCopy := rf.me + lastAppliedCopy := rf.lastApplied + lastLogTermCopy := rf.log[rf.lastApplied].Term + + rf.mu.Unlock() + + // send RequestVote RPC to all peers + for i := 0; i < len(rf.peers); i++ { + if i != rf.me { + // create server for each peer to send RequestVote RPC back to + go func(server int) { + args := RequestVoteArgs{ + Term: currentTermCopy, + CandidateId: candidateIdCopy, + LastLogIndex: lastAppliedCopy, + LastLogTerm: lastLogTermCopy, + } + + reply := RequestVoteReply{} + ok := rf.sendRequestVote(server, &args, &reply) + + if ok { + rf.mu.Lock() + + // check if reply corresponds to current term + if rf.CurrentState == "C" && rf.currentTerm == args.Term { + if reply.VoteGranted { + voteCount++ + if voteCount >= votesRequired { + // become leader + rf.CurrentState = "L" + + // reinitialize volatile state on leaders + rf.nextIndex = make([]int, len(rf.peers)) + rf.matchIndex = make([]int, len(rf.peers)) + for i := 0; i < len(rf.peers); i++ { + rf.nextIndex[i] = rf.lastApplied + 1 + rf.matchIndex[i] = 0 + } + + // reset election timer + rf.resetElectionTimer() + } + } + } + + rf.mu.Unlock() + } + }(i) + } + } +} + +// +// the service using Raft (e.g. a k/v server) wants to start +// agreement on the next command to be appended to Raft's log. if this +// server isn't the leader, returns false. otherwise start the +// agreement and return immediately. there is no guarantee that this +// command will ever be committed to the Raft log, since the leader +// may fail or lose an election. even if the Raft instance has been killed, +// this function should return gracefully. +// +// the first return value is the index that the command will appear at +// if it's ever committed. the second return value is the current +// term. the third return value is true if this server believes it is +// the leader. +// +func (rf *Raft) Start(command interface{}) (int, int, bool) { + index := -1 + term := -1 + isLeader := true + + // Your code here (2B). + + rf.mu.Lock() + if rf.CurrentState != "L" { + isLeader = false + } else { + logEntry := LogEntry{ + Command: command, + Term: rf.currentTerm, + } + rf.log = append(rf.log, logEntry) + index = len(rf.log) - 1 + rf.commitIndex = index + rf.lastApplied = index + } + + term = rf.currentTerm + rf.mu.Unlock() + + return index, term, isLeader +} + +// +// the tester doesn't halt goroutines created by Raft after each test, +// but it does call the Kill() method. your code can use killed() to +// check whether Kill() has been called. the use of atomic avoids the +// need for a lock. +// +// the issue is that long-running goroutines use memory and may chew +// up CPU time, perhaps causing later tests to fail and generating +// confusing debug output. any goroutine with a long-running loop +// should call killed() to check whether it should stop. +// +func (rf *Raft) Kill() { + atomic.StoreInt32(&rf.dead, 1) + // Your code here, if desired. +} + +func (rf *Raft) killed() bool { + z := atomic.LoadInt32(&rf.dead) + return z == 1 +} + +// +// the service or tester wants to create a Raft server. the ports +// of all the Raft servers (including this one) are in peers[]. this +// server's port is peers[me]. all the servers' peers[] arrays +// have the same order. applyCh is a channel on which the +// tester or service expects Raft to send ApplyMsg messages. +// Make() must return quickly, so it should start goroutines +// for any long-running work. +// +func Make(peers []*labrpc.ClientEnd, me int, + applyCh chan ApplyMsg) *Raft { + rf := &Raft{} + rf.peers = peers + rf.me = me + + // Your initialization code here (2A, 2B). + rf.currentTerm = 0 + rf.votedFor = -1 + rf.log = make([]LogEntry, 1) // index 0 is empty + rf.log[0] = LogEntry{Command: nil, Term: 0} + + rf.commitIndex = 0 + rf.lastApplied = 0 + + rf.CurrentState = "F" // initial state is follower + + rf.nextIndex = make([]int, len(peers)) + rf.matchIndex = make([]int, len(peers)) + + // start election timer + rf.resetElectionTimer() + + // start go routine to send heartbeats + go rf.sendHeartbeats() + + return rf +} \ No newline at end of file diff --git a/raft/test_test.go b/raft/test_test.go new file mode 100644 index 0000000..a53441b --- /dev/null +++ b/raft/test_test.go @@ -0,0 +1,551 @@ +package raft + +// +// Raft tests. +// +// we will use the original test_test.go to test your code for grading. +// so, while you can modify this code to help you debug, please +// test with the original before submitting. +// + +import "testing" +import "fmt" +import "time" +import "math/rand" +import "sync" + +// The tester generously allows solutions to complete elections in one second +// (much more than the paper's range of timeouts). +const RaftElectionTimeout = 1000 * time.Millisecond + +func TestInitialElection2A(t *testing.T) { + servers := 3 + cfg := make_config(t, servers, false) + defer cfg.cleanup() + + cfg.begin("Test (2A): initial election") + + // is a leader elected? + cfg.checkOneLeader() + + // sleep a bit to avoid racing with followers learning of the + // election, then check that all peers agree on the term. + time.Sleep(50 * time.Millisecond) + term1 := cfg.checkTerms() + if term1 < 1 { + t.Fatalf("term is %v, but should be at least 1", term1) + } + + // does the leader+term stay the same if there is no network failure? + time.Sleep(2 * RaftElectionTimeout) + term2 := cfg.checkTerms() + if term1 != term2 { + fmt.Printf("warning: term changed even though there were no failures") + } + + // there should still be a leader. + cfg.checkOneLeader() + + cfg.end() +} + +func TestReElection2A(t *testing.T) { + servers := 3 + cfg := make_config(t, servers, false) + defer cfg.cleanup() + + cfg.begin("Test (2A): election after network failure") + + leader1 := cfg.checkOneLeader() + + // if the leader disconnects, a new one should be elected. + cfg.disconnect(leader1) + cfg.checkOneLeader() + + // if the old leader rejoins, that shouldn't + // disturb the new leader. + cfg.connect(leader1) + leader2 := cfg.checkOneLeader() + + // if there's no quorum, no leader should + // be elected. + cfg.disconnect(leader2) + cfg.disconnect((leader2 + 1) % servers) + time.Sleep(2 * RaftElectionTimeout) + cfg.checkNoLeader() + + // if a quorum arises, it should elect a leader. + cfg.connect((leader2 + 1) % servers) + cfg.checkOneLeader() + + // re-join of last node shouldn't prevent leader from existing. + cfg.connect(leader2) + cfg.checkOneLeader() + + cfg.end() +} + +func TestBasicAgree2B(t *testing.T) { + servers := 3 + cfg := make_config(t, servers, false) + defer cfg.cleanup() + + cfg.begin("Test (2B): basic agreement") + + iters := 3 + for index := 1; index < iters+1; index++ { + nd, _ := cfg.nCommitted(index) + if nd > 0 { + t.Fatalf("some have committed before Start()") + } + + xindex := cfg.one(index*100, servers, false) + if xindex != index { + t.Fatalf("got index %v but expected %v", xindex, index) + } + } + + cfg.end() +} + +// +// check, based on counting bytes of RPCs, that +// each command is sent to each peer just once. +// +func TestRPCBytes2B(t *testing.T) { + servers := 3 + cfg := make_config(t, servers, false) + defer cfg.cleanup() + + cfg.begin("Test (2B): RPC byte count") + + cfg.one(99, servers, false) + bytes0 := cfg.bytesTotal() + + iters := 10 + var sent int64 = 0 + for index := 2; index < iters+2; index++ { + cmd := randstring(5000) + xindex := cfg.one(cmd, servers, false) + if xindex != index { + t.Fatalf("got index %v but expected %v", xindex, index) + } + sent += int64(len(cmd)) + } + + bytes1 := cfg.bytesTotal() + got := bytes1 - bytes0 + expected := int64(servers) * sent + if got > expected+50000 { + t.Fatalf("too many RPC bytes; got %v, expected %v", got, expected) + } + + cfg.end() +} + +func TestFailAgree2B(t *testing.T) { + servers := 3 + cfg := make_config(t, servers, false) + defer cfg.cleanup() + + cfg.begin("Test (2B): agreement despite follower disconnection") + + cfg.one(101, servers, false) + + // disconnect one follower from the network. + leader := cfg.checkOneLeader() + cfg.disconnect((leader + 1) % servers) + + // the leader and remaining follower should be + // able to agree despite the disconnected follower. + cfg.one(102, servers-1, false) + cfg.one(103, servers-1, false) + time.Sleep(RaftElectionTimeout) + cfg.one(104, servers-1, false) + cfg.one(105, servers-1, false) + + // re-connect + cfg.connect((leader + 1) % servers) + + // the full set of servers should preserve + // previous agreements, and be able to agree + // on new commands. + cfg.one(106, servers, true) + time.Sleep(RaftElectionTimeout) + cfg.one(107, servers, true) + + cfg.end() +} + +func TestFailNoAgree2B(t *testing.T) { + servers := 5 + cfg := make_config(t, servers, false) + defer cfg.cleanup() + + cfg.begin("Test (2B): no agreement if too many followers disconnect") + + cfg.one(10, servers, false) + + // 3 of 5 followers disconnect + leader := cfg.checkOneLeader() + cfg.disconnect((leader + 1) % servers) + cfg.disconnect((leader + 2) % servers) + cfg.disconnect((leader + 3) % servers) + + index, _, ok := cfg.rafts[leader].Start(20) + if ok != true { + t.Fatalf("leader rejected Start()") + } + if index != 2 { + t.Fatalf("expected index 2, got %v", index) + } + + time.Sleep(2 * RaftElectionTimeout) + + n, _ := cfg.nCommitted(index) + if n > 0 { + t.Fatalf("%v committed but no majority", n) + } + + // repair + cfg.connect((leader + 1) % servers) + cfg.connect((leader + 2) % servers) + cfg.connect((leader + 3) % servers) + + // the disconnected majority may have chosen a leader from + // among their own ranks, forgetting index 2. + leader2 := cfg.checkOneLeader() + index2, _, ok2 := cfg.rafts[leader2].Start(30) + if ok2 == false { + t.Fatalf("leader2 rejected Start()") + } + if index2 < 2 || index2 > 3 { + t.Fatalf("unexpected index %v", index2) + } + + cfg.one(1000, servers, true) + + cfg.end() +} + +func TestConcurrentStarts2B(t *testing.T) { + servers := 3 + cfg := make_config(t, servers, false) + defer cfg.cleanup() + + cfg.begin("Test (2B): concurrent Start()s") + + var success bool +loop: + for try := 0; try < 5; try++ { + if try > 0 { + // give solution some time to settle + time.Sleep(3 * time.Second) + } + + leader := cfg.checkOneLeader() + _, term, ok := cfg.rafts[leader].Start(1) + if !ok { + // leader moved on really quickly + continue + } + + iters := 5 + var wg sync.WaitGroup + is := make(chan int, iters) + for ii := 0; ii < iters; ii++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + i, term1, ok := cfg.rafts[leader].Start(100 + i) + if term1 != term { + return + } + if ok != true { + return + } + is <- i + }(ii) + } + + wg.Wait() + close(is) + + for j := 0; j < servers; j++ { + if t, _ := cfg.rafts[j].GetState(); t != term { + // term changed -- can't expect low RPC counts + continue loop + } + } + + failed := false + cmds := []int{} + for index := range is { + cmd := cfg.wait(index, servers, term) + if ix, ok := cmd.(int); ok { + if ix == -1 { + // peers have moved on to later terms + // so we can't expect all Start()s to + // have succeeded + failed = true + break + } + cmds = append(cmds, ix) + } else { + t.Fatalf("value %v is not an int", cmd) + } + } + + if failed { + // avoid leaking goroutines + go func() { + for range is { + } + }() + continue + } + + for ii := 0; ii < iters; ii++ { + x := 100 + ii + ok := false + for j := 0; j < len(cmds); j++ { + if cmds[j] == x { + ok = true + } + } + if ok == false { + t.Fatalf("cmd %v missing in %v", x, cmds) + } + } + + success = true + break + } + + if !success { + t.Fatalf("term changed too often") + } + + cfg.end() +} + +func TestRejoin2B(t *testing.T) { + servers := 3 + cfg := make_config(t, servers, false) + defer cfg.cleanup() + + cfg.begin("Test (2B): rejoin of partitioned leader") + + cfg.one(101, servers, true) + + // leader network failure + leader1 := cfg.checkOneLeader() + cfg.disconnect(leader1) + + // make old leader try to agree on some entries + cfg.rafts[leader1].Start(102) + cfg.rafts[leader1].Start(103) + cfg.rafts[leader1].Start(104) + + // new leader commits, also for index=2 + cfg.one(103, 2, true) + + // new leader network failure + leader2 := cfg.checkOneLeader() + cfg.disconnect(leader2) + + // old leader connected again + cfg.connect(leader1) + + cfg.one(104, 2, true) + + // all together now + cfg.connect(leader2) + + cfg.one(105, servers, true) + + cfg.end() +} + +func TestBackup2B(t *testing.T) { + servers := 5 + cfg := make_config(t, servers, false) + defer cfg.cleanup() + + cfg.begin("Test (2B): leader backs up quickly over incorrect follower logs") + + cfg.one(rand.Int(), servers, true) + + // put leader and one follower in a partition + leader1 := cfg.checkOneLeader() + cfg.disconnect((leader1 + 2) % servers) + cfg.disconnect((leader1 + 3) % servers) + cfg.disconnect((leader1 + 4) % servers) + + // submit lots of commands that won't commit + for i := 0; i < 50; i++ { + cfg.rafts[leader1].Start(rand.Int()) + } + + time.Sleep(RaftElectionTimeout / 2) + + cfg.disconnect((leader1 + 0) % servers) + cfg.disconnect((leader1 + 1) % servers) + + // allow other partition to recover + cfg.connect((leader1 + 2) % servers) + cfg.connect((leader1 + 3) % servers) + cfg.connect((leader1 + 4) % servers) + + // lots of successful commands to new group. + for i := 0; i < 50; i++ { + cfg.one(rand.Int(), 3, true) + } + + // now another partitioned leader and one follower + leader2 := cfg.checkOneLeader() + other := (leader1 + 2) % servers + if leader2 == other { + other = (leader2 + 1) % servers + } + cfg.disconnect(other) + + // lots more commands that won't commit + for i := 0; i < 50; i++ { + cfg.rafts[leader2].Start(rand.Int()) + } + + time.Sleep(RaftElectionTimeout / 2) + + // bring original leader back to life, + for i := 0; i < servers; i++ { + cfg.disconnect(i) + } + cfg.connect((leader1 + 0) % servers) + cfg.connect((leader1 + 1) % servers) + cfg.connect(other) + + // lots of successful commands to new group. + for i := 0; i < 50; i++ { + cfg.one(rand.Int(), 3, true) + } + + // now everyone + for i := 0; i < servers; i++ { + cfg.connect(i) + } + cfg.one(rand.Int(), servers, true) + + cfg.end() +} + +func TestCount2B(t *testing.T) { + servers := 3 + cfg := make_config(t, servers, false) + defer cfg.cleanup() + + cfg.begin("Test (2B): RPC counts aren't too high") + + rpcs := func() (n int) { + for j := 0; j < servers; j++ { + n += cfg.rpcCount(j) + } + return + } + + leader := cfg.checkOneLeader() + + total1 := rpcs() + + if total1 > 30 || total1 < 1 { + t.Fatalf("too many or few RPCs (%v) to elect initial leader\n", total1) + } + + var total2 int + var success bool +loop: + for try := 0; try < 5; try++ { + if try > 0 { + // give solution some time to settle + time.Sleep(3 * time.Second) + } + + leader = cfg.checkOneLeader() + total1 = rpcs() + + iters := 10 + starti, term, ok := cfg.rafts[leader].Start(1) + if !ok { + // leader moved on really quickly + continue + } + cmds := []int{} + for i := 1; i < iters+2; i++ { + x := int(rand.Int31()) + cmds = append(cmds, x) + index1, term1, ok := cfg.rafts[leader].Start(x) + if term1 != term { + // Term changed while starting + continue loop + } + if !ok { + // No longer the leader, so term has changed + continue loop + } + if starti+i != index1 { + t.Fatalf("Start() failed") + } + } + + for i := 1; i < iters+1; i++ { + cmd := cfg.wait(starti+i, servers, term) + if ix, ok := cmd.(int); ok == false || ix != cmds[i-1] { + if ix == -1 { + // term changed -- try again + continue loop + } + t.Fatalf("wrong value %v committed for index %v; expected %v\n", cmd, starti+i, cmds) + } + } + + failed := false + total2 = 0 + for j := 0; j < servers; j++ { + if t, _ := cfg.rafts[j].GetState(); t != term { + // term changed -- can't expect low RPC counts + // need to keep going to update total2 + failed = true + } + total2 += cfg.rpcCount(j) + } + + if failed { + continue loop + } + + if total2-total1 > (iters+1+3)*3 { + t.Fatalf("too many RPCs (%v) for %v entries\n", total2-total1, iters) + } + + success = true + break + } + + if !success { + t.Fatalf("term changed too often") + } + + time.Sleep(RaftElectionTimeout) + + total3 := 0 + for j := 0; j < servers; j++ { + total3 += cfg.rpcCount(j) + } + + if total3-total2 > 3*20 { + t.Fatalf("too many RPCs (%v) for 1 second of idleness\n", total3-total2) + } + + cfg.end() +} + diff --git a/raft/util.go b/raft/util.go new file mode 100644 index 0000000..d77e71f --- /dev/null +++ b/raft/util.go @@ -0,0 +1,13 @@ +package raft + +import "log" + +// Debugging +const Debug = 0 + +func DPrintf(format string, a ...interface{}) (n int, err error) { + if Debug > 0 { + log.Printf(format, a...) + } + return +} -- GitLab