Skip to content
Snippets Groups Projects
Commit 6a875604 authored by AhaanKanaujia's avatar AhaanKanaujia
Browse files

initial commit leader election working

parent 823c1563
No related branches found
No related tags found
No related merge requests found
Pipeline #202461 failed
go.mod 0 → 100644
module raft
go 1.22.0
package labgob
//
// trying to send non-capitalized fields over RPC produces a range of
// misbehavior, including both mysterious incorrect computation and
// outright crashes. so this wrapper around Go's encoding/gob warns
// about non-capitalized field names.
//
import "encoding/gob"
import "io"
import "reflect"
import "fmt"
import "sync"
import "unicode"
import "unicode/utf8"
var mu sync.Mutex
var errorCount int // for TestCapital
var checked map[reflect.Type]bool
type LabEncoder struct {
gob *gob.Encoder
}
func NewEncoder(w io.Writer) *LabEncoder {
enc := &LabEncoder{}
enc.gob = gob.NewEncoder(w)
return enc
}
func (enc *LabEncoder) Encode(e interface{}) error {
checkValue(e)
return enc.gob.Encode(e)
}
func (enc *LabEncoder) EncodeValue(value reflect.Value) error {
checkValue(value.Interface())
return enc.gob.EncodeValue(value)
}
type LabDecoder struct {
gob *gob.Decoder
}
func NewDecoder(r io.Reader) *LabDecoder {
dec := &LabDecoder{}
dec.gob = gob.NewDecoder(r)
return dec
}
func (dec *LabDecoder) Decode(e interface{}) error {
checkValue(e)
checkDefault(e)
return dec.gob.Decode(e)
}
func Register(value interface{}) {
checkValue(value)
gob.Register(value)
}
func RegisterName(name string, value interface{}) {
checkValue(value)
gob.RegisterName(name, value)
}
func checkValue(value interface{}) {
checkType(reflect.TypeOf(value))
}
func checkType(t reflect.Type) {
k := t.Kind()
mu.Lock()
// only complain once, and avoid recursion.
if checked == nil {
checked = map[reflect.Type]bool{}
}
if checked[t] {
mu.Unlock()
return
}
checked[t] = true
mu.Unlock()
switch k {
case reflect.Struct:
for i := 0; i < t.NumField(); i++ {
f := t.Field(i)
rune, _ := utf8.DecodeRuneInString(f.Name)
if unicode.IsUpper(rune) == false {
// ta da
fmt.Printf("labgob error: lower-case field %v of %v in RPC or persist/snapshot will break your Raft\n",
f.Name, t.Name())
mu.Lock()
errorCount += 1
mu.Unlock()
}
checkType(f.Type)
}
return
case reflect.Slice, reflect.Array, reflect.Ptr:
checkType(t.Elem())
return
case reflect.Map:
checkType(t.Elem())
checkType(t.Key())
return
default:
return
}
}
//
// warn if the value contains non-default values,
// as it would if one sent an RPC but the reply
// struct was already modified. if the RPC reply
// contains default values, GOB won't overwrite
// the non-default value.
//
func checkDefault(value interface{}) {
if value == nil {
return
}
checkDefault1(reflect.ValueOf(value), 1, "")
}
func checkDefault1(value reflect.Value, depth int, name string) {
if depth > 3 {
return
}
t := value.Type()
k := t.Kind()
switch k {
case reflect.Struct:
for i := 0; i < t.NumField(); i++ {
vv := value.Field(i)
name1 := t.Field(i).Name
if name != "" {
name1 = name + "." + name1
}
checkDefault1(vv, depth+1, name1)
}
return
case reflect.Ptr:
if value.IsNil() {
return
}
checkDefault1(value.Elem(), depth+1, name)
return
case reflect.Bool,
reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64,
reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64,
reflect.Uintptr, reflect.Float32, reflect.Float64,
reflect.String:
if reflect.DeepEqual(reflect.Zero(t).Interface(), value.Interface()) == false {
mu.Lock()
if errorCount < 1 {
what := name
if what == "" {
what = t.Name()
}
// this warning typically arises if code re-uses the same RPC reply
// variable for multiple RPC calls, or if code restores persisted
// state into variable that already have non-default values.
fmt.Printf("labgob warning: Decoding into a non-default variable/field %v may not work\n",
what)
}
errorCount += 1
mu.Unlock()
}
return
}
}
package labgob
import "testing"
import "bytes"
type T1 struct {
T1int0 int
T1int1 int
T1string0 string
T1string1 string
}
type T2 struct {
T2slice []T1
T2map map[int]*T1
T2t3 interface{}
}
type T3 struct {
T3int999 int
}
//
// test that we didn't break GOB.
//
func TestGOB(t *testing.T) {
e0 := errorCount
w := new(bytes.Buffer)
Register(T3{})
{
x0 := 0
x1 := 1
t1 := T1{}
t1.T1int1 = 1
t1.T1string1 = "6.824"
t2 := T2{}
t2.T2slice = []T1{T1{}, t1}
t2.T2map = map[int]*T1{}
t2.T2map[99] = &T1{1, 2, "x", "y"}
t2.T2t3 = T3{999}
e := NewEncoder(w)
e.Encode(x0)
e.Encode(x1)
e.Encode(t1)
e.Encode(t2)
}
data := w.Bytes()
{
var x0 int
var x1 int
var t1 T1
var t2 T2
r := bytes.NewBuffer(data)
d := NewDecoder(r)
if d.Decode(&x0) != nil ||
d.Decode(&x1) != nil ||
d.Decode(&t1) != nil ||
d.Decode(&t2) != nil {
t.Fatalf("Decode failed")
}
if x0 != 0 {
t.Fatalf("wrong x0 %v\n", x0)
}
if x1 != 1 {
t.Fatalf("wrong x1 %v\n", x1)
}
if t1.T1int0 != 0 {
t.Fatalf("wrong t1.T1int0 %v\n", t1.T1int0)
}
if t1.T1int1 != 1 {
t.Fatalf("wrong t1.T1int1 %v\n", t1.T1int1)
}
if t1.T1string0 != "" {
t.Fatalf("wrong t1.T1string0 %v\n", t1.T1string0)
}
if t1.T1string1 != "6.824" {
t.Fatalf("wrong t1.T1string1 %v\n", t1.T1string1)
}
if len(t2.T2slice) != 2 {
t.Fatalf("wrong t2.T2slice len %v\n", len(t2.T2slice))
}
if t2.T2slice[1].T1int1 != 1 {
t.Fatalf("wrong slice value\n")
}
if len(t2.T2map) != 1 {
t.Fatalf("wrong t2.T2map len %v\n", len(t2.T2map))
}
if t2.T2map[99].T1string1 != "y" {
t.Fatalf("wrong map value\n")
}
t3 := (t2.T2t3).(T3)
if t3.T3int999 != 999 {
t.Fatalf("wrong t2.T2t3.T3int999\n")
}
}
if errorCount != e0 {
t.Fatalf("there were errors, but should not have been")
}
}
type T4 struct {
Yes int
no int
}
//
// make sure we check capitalization
// labgob prints one warning during this test.
//
func TestCapital(t *testing.T) {
e0 := errorCount
v := []map[*T4]int{}
w := new(bytes.Buffer)
e := NewEncoder(w)
e.Encode(v)
data := w.Bytes()
var v1 []map[T4]int
r := bytes.NewBuffer(data)
d := NewDecoder(r)
d.Decode(&v1)
if errorCount != e0+1 {
t.Fatalf("failed to warn about lower-case field")
}
}
//
// check that we warn when someone sends a default value over
// RPC but the target into which we're decoding holds a non-default
// value, which GOB seems not to overwrite as you'd expect.
//
// labgob does not print a warning.
//
func TestDefault(t *testing.T) {
e0 := errorCount
type DD struct {
X int
}
// send a default value...
dd1 := DD{}
w := new(bytes.Buffer)
e := NewEncoder(w)
e.Encode(dd1)
data := w.Bytes()
// and receive it into memory that already
// holds non-default values.
reply := DD{99}
r := bytes.NewBuffer(data)
d := NewDecoder(r)
d.Decode(&reply)
if errorCount != e0+1 {
t.Fatalf("failed to warn about decoding into non-default value")
}
}
package labrpc
//
// channel-based RPC, for 824 labs.
//
// simulates a network that can lose requests, lose replies,
// delay messages, and entirely disconnect particular hosts.
//
// we will use the original labrpc.go to test your code for grading.
// so, while you can modify this code to help you debug, please
// test against the original before submitting.
//
// adapted from Go net/rpc/server.go.
//
// sends labgob-encoded values to ensure that RPCs
// don't include references to program objects.
//
// net := MakeNetwork() -- holds network, clients, servers.
// end := net.MakeEnd(endname) -- create a client end-point, to talk to one server.
// net.AddServer(servername, server) -- adds a named server to network.
// net.DeleteServer(servername) -- eliminate the named server.
// net.Connect(endname, servername) -- connect a client to a server.
// net.Enable(endname, enabled) -- enable/disable a client.
// net.Reliable(bool) -- false means drop/delay messages
//
// end.Call("Raft.AppendEntries", &args, &reply) -- send an RPC, wait for reply.
// the "Raft" is the name of the server struct to be called.
// the "AppendEntries" is the name of the method to be called.
// Call() returns true to indicate that the server executed the request
// and the reply is valid.
// Call() returns false if the network lost the request or reply
// or the server is down.
// It is OK to have multiple Call()s in progress at the same time on the
// same ClientEnd.
// Concurrent calls to Call() may be delivered to the server out of order,
// since the network may re-order messages.
// Call() is guaranteed to return (perhaps after a delay) *except* if the
// handler function on the server side does not return.
// the server RPC handler function must declare its args and reply arguments
// as pointers, so that their types exactly match the types of the arguments
// to Call().
//
// srv := MakeServer()
// srv.AddService(svc) -- a server can have multiple services, e.g. Raft and k/v
// pass srv to net.AddServer()
//
// svc := MakeService(receiverObject) -- obj's methods will handle RPCs
// much like Go's rpcs.Register()
// pass svc to srv.Addrvice()
//
import "raft/labgob"
import "bytes"
import "reflect"
import "sync"
import "log"
import "strings"
import "math/rand"
import "time"
import "sync/atomic"
type reqMsg struct {
endname interface{} // name of sending ClientEnd
svcMeth string // e.g. "Raft.AppendEntries"
argsType reflect.Type
args []byte
replyCh chan replyMsg
}
type replyMsg struct {
ok bool
reply []byte
}
type ClientEnd struct {
endname interface{} // this end-point's name
ch chan reqMsg // copy of Network.endCh
done chan struct{} // closed when Network is cleaned up
}
// send an RPC, wait for the reply.
// the return value indicates success; false means that
// no reply was received from the server.
func (e *ClientEnd) Call(svcMeth string, args interface{}, reply interface{}) bool {
req := reqMsg{}
req.endname = e.endname
req.svcMeth = svcMeth
req.argsType = reflect.TypeOf(args)
req.replyCh = make(chan replyMsg)
qb := new(bytes.Buffer)
qe := labgob.NewEncoder(qb)
qe.Encode(args)
req.args = qb.Bytes()
//
// send the request.
//
select {
case e.ch <- req:
// the request has been sent.
case <-e.done:
// entire Network has been destroyed.
return false
}
//
// wait for the reply.
//
rep := <-req.replyCh
if rep.ok {
rb := bytes.NewBuffer(rep.reply)
rd := labgob.NewDecoder(rb)
if err := rd.Decode(reply); err != nil {
log.Fatalf("ClientEnd.Call(): decode reply: %v\n", err)
}
return true
} else {
return false
}
}
type Network struct {
mu sync.Mutex
reliable bool
longDelays bool // pause a long time on send on disabled connection
longReordering bool // sometimes delay replies a long time
ends map[interface{}]*ClientEnd // ends, by name
enabled map[interface{}]bool // by end name
servers map[interface{}]*Server // servers, by name
connections map[interface{}]interface{} // endname -> servername
endCh chan reqMsg
done chan struct{} // closed when Network is cleaned up
count int32 // total RPC count, for statistics
bytes int64 // total bytes send, for statistics
}
func MakeNetwork() *Network {
rn := &Network{}
rn.reliable = true
rn.ends = map[interface{}]*ClientEnd{}
rn.enabled = map[interface{}]bool{}
rn.servers = map[interface{}]*Server{}
rn.connections = map[interface{}](interface{}){}
rn.endCh = make(chan reqMsg)
rn.done = make(chan struct{})
// single goroutine to handle all ClientEnd.Call()s
go func() {
for {
select {
case xreq := <-rn.endCh:
atomic.AddInt32(&rn.count, 1)
atomic.AddInt64(&rn.bytes, int64(len(xreq.args)))
go rn.processReq(xreq)
case <-rn.done:
return
}
}
}()
return rn
}
func (rn *Network) Cleanup() {
close(rn.done)
}
func (rn *Network) Reliable(yes bool) {
rn.mu.Lock()
defer rn.mu.Unlock()
rn.reliable = yes
}
func (rn *Network) LongReordering(yes bool) {
rn.mu.Lock()
defer rn.mu.Unlock()
rn.longReordering = yes
}
func (rn *Network) LongDelays(yes bool) {
rn.mu.Lock()
defer rn.mu.Unlock()
rn.longDelays = yes
}
func (rn *Network) readEndnameInfo(endname interface{}) (enabled bool,
servername interface{}, server *Server, reliable bool, longreordering bool,
) {
rn.mu.Lock()
defer rn.mu.Unlock()
enabled = rn.enabled[endname]
servername = rn.connections[endname]
if servername != nil {
server = rn.servers[servername]
}
reliable = rn.reliable
longreordering = rn.longReordering
return
}
func (rn *Network) isServerDead(endname interface{}, servername interface{}, server *Server) bool {
rn.mu.Lock()
defer rn.mu.Unlock()
if rn.enabled[endname] == false || rn.servers[servername] != server {
return true
}
return false
}
func (rn *Network) processReq(req reqMsg) {
enabled, servername, server, reliable, longreordering := rn.readEndnameInfo(req.endname)
if enabled && servername != nil && server != nil {
if reliable == false {
// short delay
ms := (rand.Int() % 27)
time.Sleep(time.Duration(ms) * time.Millisecond)
}
if reliable == false && (rand.Int()%1000) < 100 {
// drop the request, return as if timeout
req.replyCh <- replyMsg{false, nil}
return
}
// execute the request (call the RPC handler).
// in a separate thread so that we can periodically check
// if the server has been killed and the RPC should get a
// failure reply.
ech := make(chan replyMsg)
go func() {
r := server.dispatch(req)
ech <- r
}()
// wait for handler to return,
// but stop waiting if DeleteServer() has been called,
// and return an error.
var reply replyMsg
replyOK := false
serverDead := false
for replyOK == false && serverDead == false {
select {
case reply = <-ech:
replyOK = true
case <-time.After(100 * time.Millisecond):
serverDead = rn.isServerDead(req.endname, servername, server)
if serverDead {
go func() {
<-ech // drain channel to let the goroutine created earlier terminate
}()
}
}
}
// do not reply if DeleteServer() has been called, i.e.
// the server has been killed. this is needed to avoid
// situation in which a client gets a positive reply
// to an Append, but the server persisted the update
// into the old Persister. config.go is careful to call
// DeleteServer() before superseding the Persister.
serverDead = rn.isServerDead(req.endname, servername, server)
if replyOK == false || serverDead == true {
// server was killed while we were waiting; return error.
req.replyCh <- replyMsg{false, nil}
} else if reliable == false && (rand.Int()%1000) < 100 {
// drop the reply, return as if timeout
req.replyCh <- replyMsg{false, nil}
} else if longreordering == true && rand.Intn(900) < 600 {
// delay the response for a while
ms := 200 + rand.Intn(1+rand.Intn(2000))
// Russ points out that this timer arrangement will decrease
// the number of goroutines, so that the race
// detector is less likely to get upset.
time.AfterFunc(time.Duration(ms)*time.Millisecond, func() {
atomic.AddInt64(&rn.bytes, int64(len(reply.reply)))
req.replyCh <- reply
})
} else {
atomic.AddInt64(&rn.bytes, int64(len(reply.reply)))
req.replyCh <- reply
}
} else {
// simulate no reply and eventual timeout.
ms := 0
if rn.longDelays {
// let Raft tests check that leader doesn't send
// RPCs synchronously.
ms = (rand.Int() % 7000)
} else {
// many kv tests require the client to try each
// server in fairly rapid succession.
ms = (rand.Int() % 100)
}
time.AfterFunc(time.Duration(ms)*time.Millisecond, func() {
req.replyCh <- replyMsg{false, nil}
})
}
}
// create a client end-point.
// start the thread that listens and delivers.
func (rn *Network) MakeEnd(endname interface{}) *ClientEnd {
rn.mu.Lock()
defer rn.mu.Unlock()
if _, ok := rn.ends[endname]; ok {
log.Fatalf("MakeEnd: %v already exists\n", endname)
}
e := &ClientEnd{}
e.endname = endname
e.ch = rn.endCh
e.done = rn.done
rn.ends[endname] = e
rn.enabled[endname] = false
rn.connections[endname] = nil
return e
}
func (rn *Network) AddServer(servername interface{}, rs *Server) {
rn.mu.Lock()
defer rn.mu.Unlock()
rn.servers[servername] = rs
}
func (rn *Network) DeleteServer(servername interface{}) {
rn.mu.Lock()
defer rn.mu.Unlock()
rn.servers[servername] = nil
}
// connect a ClientEnd to a server.
// a ClientEnd can only be connected once in its lifetime.
func (rn *Network) Connect(endname interface{}, servername interface{}) {
rn.mu.Lock()
defer rn.mu.Unlock()
rn.connections[endname] = servername
}
// enable/disable a ClientEnd.
func (rn *Network) Enable(endname interface{}, enabled bool) {
rn.mu.Lock()
defer rn.mu.Unlock()
rn.enabled[endname] = enabled
}
// get a server's count of incoming RPCs.
func (rn *Network) GetCount(servername interface{}) int {
rn.mu.Lock()
defer rn.mu.Unlock()
svr := rn.servers[servername]
return svr.GetCount()
}
func (rn *Network) GetTotalCount() int {
x := atomic.LoadInt32(&rn.count)
return int(x)
}
func (rn *Network) GetTotalBytes() int64 {
x := atomic.LoadInt64(&rn.bytes)
return x
}
//
// a server is a collection of services, all sharing
// the same rpc dispatcher. so that e.g. both a Raft
// and a k/v server can listen to the same rpc endpoint.
//
type Server struct {
mu sync.Mutex
services map[string]*Service
count int // incoming RPCs
}
func MakeServer() *Server {
rs := &Server{}
rs.services = map[string]*Service{}
return rs
}
func (rs *Server) AddService(svc *Service) {
rs.mu.Lock()
defer rs.mu.Unlock()
rs.services[svc.name] = svc
}
func (rs *Server) dispatch(req reqMsg) replyMsg {
rs.mu.Lock()
rs.count += 1
// split Raft.AppendEntries into service and method
dot := strings.LastIndex(req.svcMeth, ".")
serviceName := req.svcMeth[:dot]
methodName := req.svcMeth[dot+1:]
service, ok := rs.services[serviceName]
rs.mu.Unlock()
if ok {
return service.dispatch(methodName, req)
} else {
choices := []string{}
for k, _ := range rs.services {
choices = append(choices, k)
}
log.Fatalf("labrpc.Server.dispatch(): unknown service %v in %v.%v; expecting one of %v\n",
serviceName, serviceName, methodName, choices)
return replyMsg{false, nil}
}
}
func (rs *Server) GetCount() int {
rs.mu.Lock()
defer rs.mu.Unlock()
return rs.count
}
// an object with methods that can be called via RPC.
// a single server may have more than one Service.
type Service struct {
name string
rcvr reflect.Value
typ reflect.Type
methods map[string]reflect.Method
}
func MakeService(rcvr interface{}) *Service {
svc := &Service{}
svc.typ = reflect.TypeOf(rcvr)
svc.rcvr = reflect.ValueOf(rcvr)
svc.name = reflect.Indirect(svc.rcvr).Type().Name()
svc.methods = map[string]reflect.Method{}
for m := 0; m < svc.typ.NumMethod(); m++ {
method := svc.typ.Method(m)
mtype := method.Type
mname := method.Name
//fmt.Printf("%v pp %v ni %v 1k %v 2k %v no %v\n",
// mname, method.PkgPath, mtype.NumIn(), mtype.In(1).Kind(), mtype.In(2).Kind(), mtype.NumOut())
if method.PkgPath != "" || // capitalized?
mtype.NumIn() != 3 ||
//mtype.In(1).Kind() != reflect.Ptr ||
mtype.In(2).Kind() != reflect.Ptr ||
mtype.NumOut() != 0 {
// the method is not suitable for a handler
//fmt.Printf("bad method: %v\n", mname)
} else {
// the method looks like a handler
svc.methods[mname] = method
}
}
return svc
}
func (svc *Service) dispatch(methname string, req reqMsg) replyMsg {
if method, ok := svc.methods[methname]; ok {
// prepare space into which to read the argument.
// the Value's type will be a pointer to req.argsType.
args := reflect.New(req.argsType)
// decode the argument.
ab := bytes.NewBuffer(req.args)
ad := labgob.NewDecoder(ab)
ad.Decode(args.Interface())
// allocate space for the reply.
replyType := method.Type.In(2)
replyType = replyType.Elem()
replyv := reflect.New(replyType)
// call the method.
function := method.Func
function.Call([]reflect.Value{svc.rcvr, args.Elem(), replyv})
// encode the reply.
rb := new(bytes.Buffer)
re := labgob.NewEncoder(rb)
re.EncodeValue(replyv)
return replyMsg{true, rb.Bytes()}
} else {
choices := []string{}
for k, _ := range svc.methods {
choices = append(choices, k)
}
log.Fatalf("labrpc.Service.dispatch(): unknown method %v in %v; expecting one of %v\n",
methname, req.svcMeth, choices)
return replyMsg{false, nil}
}
}
package labrpc
import "testing"
import "strconv"
import "sync"
import "runtime"
import "time"
import "fmt"
type JunkArgs struct {
X int
}
type JunkReply struct {
X string
}
type JunkServer struct {
mu sync.Mutex
log1 []string
log2 []int
}
func (js *JunkServer) Handler1(args string, reply *int) {
js.mu.Lock()
defer js.mu.Unlock()
js.log1 = append(js.log1, args)
*reply, _ = strconv.Atoi(args)
}
func (js *JunkServer) Handler2(args int, reply *string) {
js.mu.Lock()
defer js.mu.Unlock()
js.log2 = append(js.log2, args)
*reply = "handler2-" + strconv.Itoa(args)
}
func (js *JunkServer) Handler3(args int, reply *int) {
js.mu.Lock()
defer js.mu.Unlock()
time.Sleep(20 * time.Second)
*reply = -args
}
// args is a pointer
func (js *JunkServer) Handler4(args *JunkArgs, reply *JunkReply) {
reply.X = "pointer"
}
// args is a not pointer
func (js *JunkServer) Handler5(args JunkArgs, reply *JunkReply) {
reply.X = "no pointer"
}
func (js *JunkServer) Handler6(args string, reply *int) {
js.mu.Lock()
defer js.mu.Unlock()
*reply = len(args)
}
func (js *JunkServer) Handler7(args int, reply *string) {
js.mu.Lock()
defer js.mu.Unlock()
*reply = ""
for i := 0; i < args; i++ {
*reply = *reply + "y"
}
}
func TestBasic(t *testing.T) {
runtime.GOMAXPROCS(4)
rn := MakeNetwork()
defer rn.Cleanup()
e := rn.MakeEnd("end1-99")
js := &JunkServer{}
svc := MakeService(js)
rs := MakeServer()
rs.AddService(svc)
rn.AddServer("server99", rs)
rn.Connect("end1-99", "server99")
rn.Enable("end1-99", true)
{
reply := ""
e.Call("JunkServer.Handler2", 111, &reply)
if reply != "handler2-111" {
t.Fatalf("wrong reply from Handler2")
}
}
{
reply := 0
e.Call("JunkServer.Handler1", "9099", &reply)
if reply != 9099 {
t.Fatalf("wrong reply from Handler1")
}
}
}
func TestTypes(t *testing.T) {
runtime.GOMAXPROCS(4)
rn := MakeNetwork()
defer rn.Cleanup()
e := rn.MakeEnd("end1-99")
js := &JunkServer{}
svc := MakeService(js)
rs := MakeServer()
rs.AddService(svc)
rn.AddServer("server99", rs)
rn.Connect("end1-99", "server99")
rn.Enable("end1-99", true)
{
var args JunkArgs
var reply JunkReply
// args must match type (pointer or not) of handler.
e.Call("JunkServer.Handler4", &args, &reply)
if reply.X != "pointer" {
t.Fatalf("wrong reply from Handler4")
}
}
{
var args JunkArgs
var reply JunkReply
// args must match type (pointer or not) of handler.
e.Call("JunkServer.Handler5", args, &reply)
if reply.X != "no pointer" {
t.Fatalf("wrong reply from Handler5")
}
}
}
//
// does net.Enable(endname, false) really disconnect a client?
//
func TestDisconnect(t *testing.T) {
runtime.GOMAXPROCS(4)
rn := MakeNetwork()
defer rn.Cleanup()
e := rn.MakeEnd("end1-99")
js := &JunkServer{}
svc := MakeService(js)
rs := MakeServer()
rs.AddService(svc)
rn.AddServer("server99", rs)
rn.Connect("end1-99", "server99")
{
reply := ""
e.Call("JunkServer.Handler2", 111, &reply)
if reply != "" {
t.Fatalf("unexpected reply from Handler2")
}
}
rn.Enable("end1-99", true)
{
reply := 0
e.Call("JunkServer.Handler1", "9099", &reply)
if reply != 9099 {
t.Fatalf("wrong reply from Handler1")
}
}
}
//
// test net.GetCount()
//
func TestCounts(t *testing.T) {
runtime.GOMAXPROCS(4)
rn := MakeNetwork()
defer rn.Cleanup()
e := rn.MakeEnd("end1-99")
js := &JunkServer{}
svc := MakeService(js)
rs := MakeServer()
rs.AddService(svc)
rn.AddServer(99, rs)
rn.Connect("end1-99", 99)
rn.Enable("end1-99", true)
for i := 0; i < 17; i++ {
reply := ""
e.Call("JunkServer.Handler2", i, &reply)
wanted := "handler2-" + strconv.Itoa(i)
if reply != wanted {
t.Fatalf("wrong reply %v from Handler1, expecting %v", reply, wanted)
}
}
n := rn.GetCount(99)
if n != 17 {
t.Fatalf("wrong GetCount() %v, expected 17\n", n)
}
}
//
// test net.GetTotalBytes()
//
func TestBytes(t *testing.T) {
runtime.GOMAXPROCS(4)
rn := MakeNetwork()
defer rn.Cleanup()
e := rn.MakeEnd("end1-99")
js := &JunkServer{}
svc := MakeService(js)
rs := MakeServer()
rs.AddService(svc)
rn.AddServer(99, rs)
rn.Connect("end1-99", 99)
rn.Enable("end1-99", true)
for i := 0; i < 17; i++ {
args := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
args = args + args
args = args + args
reply := 0
e.Call("JunkServer.Handler6", args, &reply)
wanted := len(args)
if reply != wanted {
t.Fatalf("wrong reply %v from Handler6, expecting %v", reply, wanted)
}
}
n := rn.GetTotalBytes()
if n < 4828 || n > 6000 {
t.Fatalf("wrong GetTotalBytes() %v, expected about 5000\n", n)
}
for i := 0; i < 17; i++ {
args := 107
reply := ""
e.Call("JunkServer.Handler7", args, &reply)
wanted := args
if len(reply) != wanted {
t.Fatalf("wrong reply len=%v from Handler6, expecting %v", len(reply), wanted)
}
}
nn := rn.GetTotalBytes() - n
if nn < 1800 || nn > 2500 {
t.Fatalf("wrong GetTotalBytes() %v, expected about 2000\n", nn)
}
}
//
// test RPCs from concurrent ClientEnds
//
func TestConcurrentMany(t *testing.T) {
runtime.GOMAXPROCS(4)
rn := MakeNetwork()
defer rn.Cleanup()
js := &JunkServer{}
svc := MakeService(js)
rs := MakeServer()
rs.AddService(svc)
rn.AddServer(1000, rs)
ch := make(chan int)
nclients := 20
nrpcs := 10
for ii := 0; ii < nclients; ii++ {
go func(i int) {
n := 0
defer func() { ch <- n }()
e := rn.MakeEnd(i)
rn.Connect(i, 1000)
rn.Enable(i, true)
for j := 0; j < nrpcs; j++ {
arg := i*100 + j
reply := ""
e.Call("JunkServer.Handler2", arg, &reply)
wanted := "handler2-" + strconv.Itoa(arg)
if reply != wanted {
t.Fatalf("wrong reply %v from Handler1, expecting %v", reply, wanted)
}
n += 1
}
}(ii)
}
total := 0
for ii := 0; ii < nclients; ii++ {
x := <-ch
total += x
}
if total != nclients*nrpcs {
t.Fatalf("wrong number of RPCs completed, got %v, expected %v", total, nclients*nrpcs)
}
n := rn.GetCount(1000)
if n != total {
t.Fatalf("wrong GetCount() %v, expected %v\n", n, total)
}
}
//
// test unreliable
//
func TestUnreliable(t *testing.T) {
runtime.GOMAXPROCS(4)
rn := MakeNetwork()
defer rn.Cleanup()
rn.Reliable(false)
js := &JunkServer{}
svc := MakeService(js)
rs := MakeServer()
rs.AddService(svc)
rn.AddServer(1000, rs)
ch := make(chan int)
nclients := 300
for ii := 0; ii < nclients; ii++ {
go func(i int) {
n := 0
defer func() { ch <- n }()
e := rn.MakeEnd(i)
rn.Connect(i, 1000)
rn.Enable(i, true)
arg := i * 100
reply := ""
ok := e.Call("JunkServer.Handler2", arg, &reply)
if ok {
wanted := "handler2-" + strconv.Itoa(arg)
if reply != wanted {
t.Fatalf("wrong reply %v from Handler1, expecting %v", reply, wanted)
}
n += 1
}
}(ii)
}
total := 0
for ii := 0; ii < nclients; ii++ {
x := <-ch
total += x
}
if total == nclients || total == 0 {
t.Fatalf("all RPCs succeeded despite unreliable")
}
}
//
// test concurrent RPCs from a single ClientEnd
//
func TestConcurrentOne(t *testing.T) {
runtime.GOMAXPROCS(4)
rn := MakeNetwork()
defer rn.Cleanup()
js := &JunkServer{}
svc := MakeService(js)
rs := MakeServer()
rs.AddService(svc)
rn.AddServer(1000, rs)
e := rn.MakeEnd("c")
rn.Connect("c", 1000)
rn.Enable("c", true)
ch := make(chan int)
nrpcs := 20
for ii := 0; ii < nrpcs; ii++ {
go func(i int) {
n := 0
defer func() { ch <- n }()
arg := 100 + i
reply := ""
e.Call("JunkServer.Handler2", arg, &reply)
wanted := "handler2-" + strconv.Itoa(arg)
if reply != wanted {
t.Fatalf("wrong reply %v from Handler2, expecting %v", reply, wanted)
}
n += 1
}(ii)
}
total := 0
for ii := 0; ii < nrpcs; ii++ {
x := <-ch
total += x
}
if total != nrpcs {
t.Fatalf("wrong number of RPCs completed, got %v, expected %v", total, nrpcs)
}
js.mu.Lock()
defer js.mu.Unlock()
if len(js.log2) != nrpcs {
t.Fatalf("wrong number of RPCs delivered")
}
n := rn.GetCount(1000)
if n != total {
t.Fatalf("wrong GetCount() %v, expected %v\n", n, total)
}
}
//
// regression: an RPC that's delayed during Enabled=false
// should not delay subsequent RPCs (e.g. after Enabled=true).
//
func TestRegression1(t *testing.T) {
runtime.GOMAXPROCS(4)
rn := MakeNetwork()
defer rn.Cleanup()
js := &JunkServer{}
svc := MakeService(js)
rs := MakeServer()
rs.AddService(svc)
rn.AddServer(1000, rs)
e := rn.MakeEnd("c")
rn.Connect("c", 1000)
// start some RPCs while the ClientEnd is disabled.
// they'll be delayed.
rn.Enable("c", false)
ch := make(chan bool)
nrpcs := 20
for ii := 0; ii < nrpcs; ii++ {
go func(i int) {
ok := false
defer func() { ch <- ok }()
arg := 100 + i
reply := ""
// this call ought to return false.
e.Call("JunkServer.Handler2", arg, &reply)
ok = true
}(ii)
}
time.Sleep(100 * time.Millisecond)
// now enable the ClientEnd and check that an RPC completes quickly.
t0 := time.Now()
rn.Enable("c", true)
{
arg := 99
reply := ""
e.Call("JunkServer.Handler2", arg, &reply)
wanted := "handler2-" + strconv.Itoa(arg)
if reply != wanted {
t.Fatalf("wrong reply %v from Handler2, expecting %v", reply, wanted)
}
}
dur := time.Since(t0).Seconds()
if dur > 0.03 {
t.Fatalf("RPC took too long (%v) after Enable", dur)
}
for ii := 0; ii < nrpcs; ii++ {
<-ch
}
js.mu.Lock()
defer js.mu.Unlock()
if len(js.log2) != 1 {
t.Fatalf("wrong number (%v) of RPCs delivered, expected 1", len(js.log2))
}
n := rn.GetCount(1000)
if n != 1 {
t.Fatalf("wrong GetCount() %v, expected %v\n", n, 1)
}
}
//
// if an RPC is stuck in a server, and the server
// is killed with DeleteServer(), does the RPC
// get un-stuck?
//
func TestKilled(t *testing.T) {
runtime.GOMAXPROCS(4)
rn := MakeNetwork()
defer rn.Cleanup()
e := rn.MakeEnd("end1-99")
js := &JunkServer{}
svc := MakeService(js)
rs := MakeServer()
rs.AddService(svc)
rn.AddServer("server99", rs)
rn.Connect("end1-99", "server99")
rn.Enable("end1-99", true)
doneCh := make(chan bool)
go func() {
reply := 0
ok := e.Call("JunkServer.Handler3", 99, &reply)
doneCh <- ok
}()
time.Sleep(1000 * time.Millisecond)
select {
case <-doneCh:
t.Fatalf("Handler3 should not have returned yet")
case <-time.After(100 * time.Millisecond):
}
rn.DeleteServer("server99")
select {
case x := <-doneCh:
if x != false {
t.Fatalf("Handler3 returned successfully despite DeleteServer()")
}
case <-time.After(100 * time.Millisecond):
t.Fatalf("Handler3 should return after DeleteServer()")
}
}
func TestBenchmark(t *testing.T) {
runtime.GOMAXPROCS(4)
rn := MakeNetwork()
defer rn.Cleanup()
e := rn.MakeEnd("end1-99")
js := &JunkServer{}
svc := MakeService(js)
rs := MakeServer()
rs.AddService(svc)
rn.AddServer("server99", rs)
rn.Connect("end1-99", "server99")
rn.Enable("end1-99", true)
t0 := time.Now()
n := 100000
for iters := 0; iters < n; iters++ {
reply := ""
e.Call("JunkServer.Handler2", 111, &reply)
if reply != "handler2-111" {
t.Fatalf("wrong reply from Handler2")
}
}
fmt.Printf("%v for %v\n", time.Since(t0), n)
// march 2016, rtm laptop, 22 microseconds per RPC
}
package raft
//
// support for Raft tester.
//
// we will use the original config.go to test your code for grading.
// so, while you can modify this code to help you debug, please
// test with the original before submitting.
//
import "raft/labrpc"
import "log"
import "sync"
import "testing"
import "runtime"
import "math/rand"
import crand "crypto/rand"
import "math/big"
import "encoding/base64"
import "time"
import "fmt"
func randstring(n int) string {
b := make([]byte, 2*n)
crand.Read(b)
s := base64.URLEncoding.EncodeToString(b)
return s[0:n]
}
func makeSeed() int64 {
max := big.NewInt(int64(1) << 62)
bigx, _ := crand.Int(crand.Reader, max)
x := bigx.Int64()
return x
}
type config struct {
mu sync.Mutex
t *testing.T
net *labrpc.Network
n int
rafts []*Raft
applyErr []string // from apply channel readers
connected []bool // whether each server is on the net
endnames [][]string // the port file names each sends to
logs []map[int]interface{} // copy of each server's committed entries
start time.Time // time at which make_config() was called
// begin()/end() statistics
t0 time.Time // time at which test_test.go called cfg.begin()
rpcs0 int // rpcTotal() at start of test
cmds0 int // number of agreements
bytes0 int64
maxIndex int
maxIndex0 int
}
var ncpu_once sync.Once
func make_config(t *testing.T, n int, unreliable bool) *config {
ncpu_once.Do(func() {
if runtime.NumCPU() < 2 {
fmt.Printf("warning: only one CPU, which may conceal locking bugs\n")
}
rand.Seed(makeSeed())
})
runtime.GOMAXPROCS(4)
cfg := &config{}
cfg.t = t
cfg.net = labrpc.MakeNetwork()
cfg.n = n
cfg.applyErr = make([]string, cfg.n)
cfg.rafts = make([]*Raft, cfg.n)
cfg.connected = make([]bool, cfg.n)
cfg.endnames = make([][]string, cfg.n)
cfg.logs = make([]map[int]interface{}, cfg.n)
cfg.start = time.Now()
cfg.setunreliable(unreliable)
cfg.net.LongDelays(true)
// create a full set of Rafts.
for i := 0; i < cfg.n; i++ {
cfg.logs[i] = map[int]interface{}{}
cfg.start1(i)
}
// connect everyone
for i := 0; i < cfg.n; i++ {
cfg.connect(i)
}
return cfg
}
//
// start or re-start a Raft.
// if one already exists, "kill" it first.
// allocate new outgoing port file names, and a new
// state persister, to isolate previous instance of
// this server. since we cannot really kill it.
//
func (cfg *config) start1(i int) {
// a fresh set of outgoing ClientEnd names.
// so that old crashed instance's ClientEnds can't send.
cfg.endnames[i] = make([]string, cfg.n)
for j := 0; j < cfg.n; j++ {
cfg.endnames[i][j] = randstring(20)
}
// a fresh set of ClientEnds.
ends := make([]*labrpc.ClientEnd, cfg.n)
for j := 0; j < cfg.n; j++ {
ends[j] = cfg.net.MakeEnd(cfg.endnames[i][j])
cfg.net.Connect(cfg.endnames[i][j], j)
}
// listen to messages from Raft indicating newly committed messages.
applyCh := make(chan ApplyMsg)
go func() {
for m := range applyCh {
err_msg := ""
if m.CommandValid == false {
// ignore other types of ApplyMsg
} else {
v := m.Command
cfg.mu.Lock()
for j := 0; j < len(cfg.logs); j++ {
if old, oldok := cfg.logs[j][m.CommandIndex]; oldok && old != v {
// some server has already committed a different value for this entry!
err_msg = fmt.Sprintf("commit index=%v server=%v %v != server=%v %v",
m.CommandIndex, i, m.Command, j, old)
}
}
_, prevok := cfg.logs[i][m.CommandIndex-1]
cfg.logs[i][m.CommandIndex] = v
if m.CommandIndex > cfg.maxIndex {
cfg.maxIndex = m.CommandIndex
}
cfg.mu.Unlock()
if m.CommandIndex > 1 && prevok == false {
err_msg = fmt.Sprintf("server %v apply out of order %v", i, m.CommandIndex)
}
}
if err_msg != "" {
log.Fatalf("apply error: %v\n", err_msg)
cfg.applyErr[i] = err_msg
// keep reading after error so that Raft doesn't block
// holding locks...
}
}
}()
rf := Make(ends, i, applyCh)
cfg.mu.Lock()
cfg.rafts[i] = rf
cfg.mu.Unlock()
svc := labrpc.MakeService(rf)
srv := labrpc.MakeServer()
srv.AddService(svc)
cfg.net.AddServer(i, srv)
}
func (cfg *config) checkTimeout() {
// enforce a two minute real-time limit on each test
if !cfg.t.Failed() && time.Since(cfg.start) > 120*time.Second {
cfg.t.Fatal("test took longer than 120 seconds")
}
}
func (cfg *config) cleanup() {
for i := 0; i < len(cfg.rafts); i++ {
if cfg.rafts[i] != nil {
cfg.rafts[i].Kill()
}
}
cfg.net.Cleanup()
cfg.checkTimeout()
}
// attach server i to the net.
func (cfg *config) connect(i int) {
// fmt.Printf("connect(%d)\n", i)
cfg.connected[i] = true
// outgoing ClientEnds
for j := 0; j < cfg.n; j++ {
if cfg.connected[j] {
endname := cfg.endnames[i][j]
cfg.net.Enable(endname, true)
}
}
// incoming ClientEnds
for j := 0; j < cfg.n; j++ {
if cfg.connected[j] {
endname := cfg.endnames[j][i]
cfg.net.Enable(endname, true)
}
}
}
// detach server i from the net.
func (cfg *config) disconnect(i int) {
// fmt.Printf("disconnect(%d)\n", i)
cfg.connected[i] = false
// outgoing ClientEnds
for j := 0; j < cfg.n; j++ {
if cfg.endnames[i] != nil {
endname := cfg.endnames[i][j]
cfg.net.Enable(endname, false)
}
}
// incoming ClientEnds
for j := 0; j < cfg.n; j++ {
if cfg.endnames[j] != nil {
endname := cfg.endnames[j][i]
cfg.net.Enable(endname, false)
}
}
}
func (cfg *config) rpcCount(server int) int {
return cfg.net.GetCount(server)
}
func (cfg *config) rpcTotal() int {
return cfg.net.GetTotalCount()
}
func (cfg *config) setunreliable(unrel bool) {
cfg.net.Reliable(!unrel)
}
func (cfg *config) bytesTotal() int64 {
return cfg.net.GetTotalBytes()
}
func (cfg *config) setlongreordering(longrel bool) {
cfg.net.LongReordering(longrel)
}
// check that there's exactly one leader.
// try a few times in case re-elections are needed.
func (cfg *config) checkOneLeader() int {
for iters := 0; iters < 10; iters++ {
ms := 450 + (rand.Int63() % 100)
time.Sleep(time.Duration(ms) * time.Millisecond)
leaders := make(map[int][]int)
for i := 0; i < cfg.n; i++ {
if cfg.connected[i] {
if term, leader := cfg.rafts[i].GetState(); leader {
leaders[term] = append(leaders[term], i)
}
}
}
lastTermWithLeader := -1
for term, leaders := range leaders {
if len(leaders) > 1 {
cfg.t.Fatalf("term %d has %d (>1) leaders", term, len(leaders))
}
if term > lastTermWithLeader {
lastTermWithLeader = term
}
}
if len(leaders) != 0 {
return leaders[lastTermWithLeader][0]
}
}
cfg.t.Fatalf("expected one leader, got none")
return -1
}
// check that everyone agrees on the term.
func (cfg *config) checkTerms() int {
term := -1
for i := 0; i < cfg.n; i++ {
if cfg.connected[i] {
xterm, _ := cfg.rafts[i].GetState()
if term == -1 {
term = xterm
} else if term != xterm {
cfg.t.Fatalf("servers disagree on term")
}
}
}
return term
}
// check that there's no leader
func (cfg *config) checkNoLeader() {
for i := 0; i < cfg.n; i++ {
if cfg.connected[i] {
_, is_leader := cfg.rafts[i].GetState()
if is_leader {
cfg.t.Fatalf("expected no leader, but %v claims to be leader", i)
}
}
}
}
// how many servers think a log entry is committed?
func (cfg *config) nCommitted(index int) (int, interface{}) {
count := 0
var cmd interface{} = nil
for i := 0; i < len(cfg.rafts); i++ {
if cfg.applyErr[i] != "" {
cfg.t.Fatal(cfg.applyErr[i])
}
cfg.mu.Lock()
cmd1, ok := cfg.logs[i][index]
cfg.mu.Unlock()
if ok {
if count > 0 && cmd != cmd1 {
cfg.t.Fatalf("committed values do not match: index %v, %v, %v\n",
index, cmd, cmd1)
}
count += 1
cmd = cmd1
}
}
return count, cmd
}
// wait for at least n servers to commit.
// but don't wait forever.
func (cfg *config) wait(index int, n int, startTerm int) interface{} {
to := 10 * time.Millisecond
for iters := 0; iters < 30; iters++ {
nd, _ := cfg.nCommitted(index)
if nd >= n {
break
}
time.Sleep(to)
if to < time.Second {
to *= 2
}
if startTerm > -1 {
for _, r := range cfg.rafts {
if t, _ := r.GetState(); t > startTerm {
// someone has moved on
// can no longer guarantee that we'll "win"
return -1
}
}
}
}
nd, cmd := cfg.nCommitted(index)
if nd < n {
cfg.t.Fatalf("only %d decided for index %d; wanted %d\n",
nd, index, n)
}
return cmd
}
// do a complete agreement.
// it might choose the wrong leader initially,
// and have to re-submit after giving up.
// entirely gives up after about 10 seconds.
// indirectly checks that the servers agree on the
// same value, since nCommitted() checks this,
// as do the threads that read from applyCh.
// returns index.
// if retry==true, may submit the command multiple
// times, in case a leader fails just after Start().
// if retry==false, calls Start() only once, in order
// to simplify the early Lab 2B tests.
func (cfg *config) one(cmd interface{}, expectedServers int, retry bool) int {
t0 := time.Now()
starts := 0
for time.Since(t0).Seconds() < 10 {
// try all the servers, maybe one is the leader.
index := -1
for si := 0; si < cfg.n; si++ {
starts = (starts + 1) % cfg.n
var rf *Raft
cfg.mu.Lock()
if cfg.connected[starts] {
rf = cfg.rafts[starts]
}
cfg.mu.Unlock()
if rf != nil {
index1, _, ok := rf.Start(cmd)
if ok {
index = index1
break
}
}
}
if index != -1 {
// somebody claimed to be the leader and to have
// submitted our command; wait a while for agreement.
t1 := time.Now()
for time.Since(t1).Seconds() < 2 {
nd, cmd1 := cfg.nCommitted(index)
if nd > 0 && nd >= expectedServers {
// committed
if cmd1 == cmd {
// and it was the command we submitted.
return index
}
}
time.Sleep(20 * time.Millisecond)
}
if retry == false {
cfg.t.Fatalf("one(%v) failed to reach agreement", cmd)
}
} else {
time.Sleep(50 * time.Millisecond)
}
}
cfg.t.Fatalf("one(%v) failed to reach agreement", cmd)
return -1
}
// start a Test.
// print the Test message.
// e.g. cfg.begin("Test (2B): RPC counts aren't too high")
func (cfg *config) begin(description string) {
fmt.Printf("%s ...\n", description)
cfg.t0 = time.Now()
cfg.rpcs0 = cfg.rpcTotal()
cfg.bytes0 = cfg.bytesTotal()
cfg.cmds0 = 0
cfg.maxIndex0 = cfg.maxIndex
}
// end a Test -- the fact that we got here means there
// was no failure.
// print the Passed message,
// and some performance numbers.
func (cfg *config) end() {
cfg.checkTimeout()
if cfg.t.Failed() == false {
cfg.mu.Lock()
t := time.Since(cfg.t0).Seconds() // real time
npeers := cfg.n // number of Raft peers
nrpc := cfg.rpcTotal() - cfg.rpcs0 // number of RPC sends
nbytes := cfg.bytesTotal() - cfg.bytes0 // number of bytes
ncmds := cfg.maxIndex - cfg.maxIndex0 // number of Raft agreements reported
cfg.mu.Unlock()
fmt.Printf(" ... Passed --")
fmt.Printf(" %4.1f %d %4d %7d %4d\n", t, npeers, nrpc, nbytes, ncmds)
}
}
package raft
//
// this is an outline of the API that raft must expose to
// the service (or tester). see comments below for
// each of these functions for more details.
//
// rf = Make(...)
// create a new Raft server.
// rf.Start(command interface{}) (index, term, isleader)
// start agreement on a new log entry
// rf.GetState() (term, isLeader)
// ask a Raft for its current term, and whether it thinks it is leader
// ApplyMsg
// each time a new entry is committed to the log, each Raft peer
// should send an ApplyMsg to the service (or tester)
// in the same server.
//
import "sync"
import "sync/atomic"
import "raft/labrpc"
import "time"
import "math/rand"
//
// as each Raft peer becomes aware that successive log entries are
// committed, the peer should send an ApplyMsg to the service (or
// tester) on the same server, via the applyCh passed to Make(). set
// CommandValid to true to indicate that the ApplyMsg contains a newly
// committed log entry.
//
type ApplyMsg struct {
CommandValid bool
Command interface{}
CommandIndex int
}
// LogEntry struct containing command and term
type LogEntry struct {
Command interface{} // command for state machine
Term int // term when entry was received by leader
}
//
// A Go object implementing a single Raft peer.
//
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
me int // this peer's index into peers[]
dead int32 // set by Kill()
// Your data here (2A, 2B).
// Look at the paper's Figure 2 for a description of what
// state a Raft server must maintain.
// You may also need to add other state, as per your implementation.
// persistent state on all servers
currentTerm int // latest term server has seen
votedFor int // candidateId that received vote in current term
log []LogEntry // (command, entry) of each log entry, first index is 1
CurrentState string // server state (F: follower, C: candidate, L: leader)
// volatile state on all servers
commitIndex int // index of highest log entry known to be committed
lastApplied int // index of highest log entry applied to state machine
// volatile state on leaders, reinitialized after election
nextIndex []int // index of next entry to send to each server
matchIndex []int // index of highest entry known to be replicated on server
// channel to send ApplyMsg to service
applyCh chan ApplyMsg
// election timers
electionTimeout *time.Timer // timeout for election
heartbeatTimeout *time.Timer // timeout for heartbeat
}
// return currentTerm and whether this server
// believes it is the leader.
func (rf *Raft) GetState() (int, bool) {
var term int
var isleader bool
// Your code here (2A).
rf.mu.Lock()
defer rf.mu.Unlock()
term = rf.currentTerm
if rf.CurrentState == "L" {
isleader = true
} else {
isleader = false
}
return term, isleader
}
//
// example RequestVote RPC arguments structure.
// field names must start with capital letters!
//
type RequestVoteArgs struct {
// Your data here (2A, 2B).
Term int // term of candidate
CandidateId int // candidate requesting vote
LastLogIndex int // index of candidate last log entry
LastLogTerm int // term of candidate last log entry
}
//
// example RequestVote RPC reply structure.
// field names must start with capital letters!
//
type RequestVoteReply struct {
// Your data here (2A).
Term int // current term (for candidate to update itself)
VoteGranted bool // true if candidate received vote
}
// check if a candidate's log is at least as up-to-date as receiver's
func (rf *Raft) checkCandidateUpToDate(args *RequestVoteArgs) bool {
lastLogIndex := len(rf.log) - 1
if args.LastLogTerm > rf.log[lastLogIndex].Term {
return true
} else if args.LastLogTerm == rf.log[lastLogIndex].Term {
if args.LastLogIndex >= lastLogIndex {
return true
}
}
return false
}
//
// example RequestVote RPC handler.
//
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// Your code here (2A, 2B).
// Read the fields in "args",
// and accordingly assign the values for fields in "reply".
if rf.killed() {
return
}
rf.mu.Lock()
defer rf.mu.Unlock()
// candidate asking for vote has lower term
if args.Term < rf.currentTerm {
reply.VoteGranted = false
reply.Term = rf.currentTerm
return
}
// candidate asking for vote has higher term
if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
rf.votedFor = -1
rf.CurrentState = "F"
rf.resetElectionTimer()
}
candidateUpToDate := rf.checkCandidateUpToDate(args)
if (rf.votedFor == -1 || rf.votedFor == args.CandidateId) && candidateUpToDate {
rf.votedFor = args.CandidateId
reply.VoteGranted = true
rf.resetElectionTimer()
} else {
reply.VoteGranted = false
}
reply.Term = rf.currentTerm
}
//
// example code to send a RequestVote RPC to a server.
// server is the index of the target server in rf.peers[].
// expects RPC arguments in args.
// fills in *reply with RPC reply, so caller should
// pass &reply.
// the types of the args and reply passed to Call() must be
// the same as the types of the arguments declared in the
// handler function (including whether they are pointers).
//
// The labrpc package simulates a lossy network, in which servers
// may be unreachable, and in which requests and replies may be lost.
// Call() sends a request and waits for a reply. If a reply arrives
// within a timeout interval, Call() returns true; otherwise
// Call() returns false. Thus Call() may not return for a while.
// A false return can be caused by a dead server, a live server that
// can't be reached, a lost request, or a lost reply.
//
// Call() is guaranteed to return (perhaps after a delay) except if the
// handler function on the server side does not return. Thus there
// is no need to implement your own timeouts around Call().
//
// look at the comments in ../labrpc/labrpc.go for more details.
//
// if you're having trouble getting RPC to work, check that you've
// capitalized all field names in structs passed over RPC, and
// that the caller passes the address of the reply struct with &, not
// the struct itself.
//
func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
rf.mu.Lock()
defer rf.mu.Unlock()
if ok {
// reply term greater than current term
if reply.Term > rf.currentTerm {
rf.currentTerm = reply.Term
rf.votedFor = -1
rf.CurrentState = "F"
rf.resetElectionTimer()
}
}
return ok
}
// AppendEntriesArgs struct containing args for AppendEntries RPC
type AppendEntriesArgs struct {
Term int // term of leader
LeaderId int // follower can redirect clients
PrevLogIndex int // index of log entry immediately preceding new ones
PrevLogTerm int // term of prevLogIndex entry
Entries []LogEntry // log entries to store (empty for heartbeat)
LeaderCommit int // commitIndex of leader
}
// AppendEntriesReply struct containing reply for AppendEntries RPC
type AppendEntriesReply struct {
Term int // current term (for leader to update itself)
Success bool // true if follower contained entry matching prevLogIndex and prevLogTerm
}
// AppendEntries RPC handler
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
if rf.killed() {
return
}
rf.mu.Lock()
defer rf.mu.Unlock()
rf.resetElectionTimer()
reply.Success = false
// candidate term is less than current term
if args.Term < rf.currentTerm {
reply.Term = rf.currentTerm
return
}
// candidate term is greater than current term
// revert to follower state
if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
rf.votedFor = -1
rf.CurrentState = "F"
}
// check if log contains entry at prevLogIndex matching prevLogTerm
if args.PrevLogIndex >= len(rf.log) || rf.log[args.PrevLogIndex].Term != args.PrevLogTerm {
reply.Term = rf.currentTerm
return
}
// check if existing entry conflicts with new entries
// delete any conflicting entries
for i := len(rf.log) - 1; i > args.PrevLogIndex; i-- {
if rf.log[i].Term != entries[i].Term {
rf.log = rf.log[:i]
break
}
}
// append any new entries
if len(args.Entries) > 0 {
rf.log = append(rf.log, args.Entries...)
}
// update commit index
if args.LeaderCommit > rf.commitIndex {
if args.LeaderCommit < len(rf.log) - 1 {
rf.commitIndex = args.LeaderCommit
} else {
rf.commitIndex = len(rf.log) - 1
}
}
reply.Term = rf.currentTerm
reply.Success = true
}
func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
rf.mu.Lock()
defer rf.mu.Unlock()
if ok {
// reply term greater than current term
if reply.Term > rf.currentTerm {
rf.currentTerm = reply.Term
rf.votedFor = -1
rf.CurrentState = "F"
rf.resetElectionTimer()
return ok
}
} else {
return ok
}
return ok
}
func (rf *Raft) sendHeartbeats() {
for {
rf.mu.Lock()
// must be a leader to send heartbeats
if rf.CurrentState != "L" {
rf.mu.Unlock()
// wait a little before checking again if leader
time.Sleep(10 * time.Millisecond)
continue
}
lastLogIndex := len(rf.log) - 1
// sendAppendEntriesArgs
args := AppendEntriesArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
PrevLogIndex: lastLogIndex,
PrevLogTerm: rf.log[lastLogIndex].Term,
Entries: []LogEntry{},
LeaderCommit: rf.commitIndex,
}
peersCopy := make([]int, 0)
for i := 0; i < len(rf.peers); i++ {
if i != rf.me {
peersCopy = append(peersCopy, i)
}
}
// unlock to send AppendEntries RPC
rf.mu.Unlock()
// send heartbeat to all peers
for _, server := range peersCopy {
// create server for each peer to send AppendEntries RPC back to
go func(server int, args AppendEntriesArgs) {
reply := AppendEntriesReply{}
ok := rf.sendAppendEntries(server, &args, &reply)
if ok {
rf.mu.Lock()
// check if reply corresponds to current term
if rf.CurrentState == "L" && rf.currentTerm == args.Term {
if reply.Term > rf.currentTerm {
rf.currentTerm = reply.Term
rf.votedFor = -1
rf.CurrentState = "F"
rf.resetElectionTimer()
}
}
rf.mu.Unlock()
}
}(server, args)
}
// heartbeat duration
time.Sleep(100 * time.Millisecond)
}
}
func (rf *Raft) resetElectionTimer() {
// reset election timer
if rf.electionTimeout != nil {
rf.electionTimeout.Stop()
}
// set new election timeout (randomized between 300ms and 1000ms)
timeoutValue := time.Duration(300 + rand.Intn(700)) * time.Millisecond
rf.electionTimeout = time.AfterFunc(timeoutValue, func() {
rf.startElection()
})
}
func (rf *Raft) startElection() {
rf.mu.Lock()
// start election only if server is not leader
if rf.CurrentState == "L" {
rf.mu.Unlock()
return
}
// increment current term and change state to candidate
rf.currentTerm++
rf.votedFor = rf.me
rf.CurrentState = "C"
rf.resetElectionTimer()
// vote count
voteCount := 1
votesRequired := len(rf.peers) / 2 + 1
currentTermCopy := rf.currentTerm
candidateIdCopy := rf.me
lastAppliedCopy := rf.lastApplied
lastLogTermCopy := rf.log[rf.lastApplied].Term
rf.mu.Unlock()
// send RequestVote RPC to all peers
for i := 0; i < len(rf.peers); i++ {
if i != rf.me {
// create server for each peer to send RequestVote RPC back to
go func(server int) {
args := RequestVoteArgs{
Term: currentTermCopy,
CandidateId: candidateIdCopy,
LastLogIndex: lastAppliedCopy,
LastLogTerm: lastLogTermCopy,
}
reply := RequestVoteReply{}
ok := rf.sendRequestVote(server, &args, &reply)
if ok {
rf.mu.Lock()
// check if reply corresponds to current term
if rf.CurrentState == "C" && rf.currentTerm == args.Term {
if reply.VoteGranted {
voteCount++
if voteCount >= votesRequired {
// become leader
rf.CurrentState = "L"
// reinitialize volatile state on leaders
rf.nextIndex = make([]int, len(rf.peers))
rf.matchIndex = make([]int, len(rf.peers))
for i := 0; i < len(rf.peers); i++ {
rf.nextIndex[i] = rf.lastApplied + 1
rf.matchIndex[i] = 0
}
// reset election timer
rf.resetElectionTimer()
}
}
}
rf.mu.Unlock()
}
}(i)
}
}
}
//
// the service using Raft (e.g. a k/v server) wants to start
// agreement on the next command to be appended to Raft's log. if this
// server isn't the leader, returns false. otherwise start the
// agreement and return immediately. there is no guarantee that this
// command will ever be committed to the Raft log, since the leader
// may fail or lose an election. even if the Raft instance has been killed,
// this function should return gracefully.
//
// the first return value is the index that the command will appear at
// if it's ever committed. the second return value is the current
// term. the third return value is true if this server believes it is
// the leader.
//
func (rf *Raft) Start(command interface{}) (int, int, bool) {
index := -1
term := -1
isLeader := true
// Your code here (2B).
rf.mu.Lock()
if rf.CurrentState != "L" {
isLeader = false
} else {
logEntry := LogEntry{
Command: command,
Term: rf.currentTerm,
}
rf.log = append(rf.log, logEntry)
index = len(rf.log) - 1
rf.commitIndex = index
rf.lastApplied = index
}
term = rf.currentTerm
rf.mu.Unlock()
return index, term, isLeader
}
//
// the tester doesn't halt goroutines created by Raft after each test,
// but it does call the Kill() method. your code can use killed() to
// check whether Kill() has been called. the use of atomic avoids the
// need for a lock.
//
// the issue is that long-running goroutines use memory and may chew
// up CPU time, perhaps causing later tests to fail and generating
// confusing debug output. any goroutine with a long-running loop
// should call killed() to check whether it should stop.
//
func (rf *Raft) Kill() {
atomic.StoreInt32(&rf.dead, 1)
// Your code here, if desired.
}
func (rf *Raft) killed() bool {
z := atomic.LoadInt32(&rf.dead)
return z == 1
}
//
// the service or tester wants to create a Raft server. the ports
// of all the Raft servers (including this one) are in peers[]. this
// server's port is peers[me]. all the servers' peers[] arrays
// have the same order. applyCh is a channel on which the
// tester or service expects Raft to send ApplyMsg messages.
// Make() must return quickly, so it should start goroutines
// for any long-running work.
//
func Make(peers []*labrpc.ClientEnd, me int,
applyCh chan ApplyMsg) *Raft {
rf := &Raft{}
rf.peers = peers
rf.me = me
// Your initialization code here (2A, 2B).
rf.currentTerm = 0
rf.votedFor = -1
rf.log = make([]LogEntry, 1) // index 0 is empty
rf.log[0] = LogEntry{Command: nil, Term: 0}
rf.commitIndex = 0
rf.lastApplied = 0
rf.CurrentState = "F" // initial state is follower
rf.nextIndex = make([]int, len(peers))
rf.matchIndex = make([]int, len(peers))
// start election timer
rf.resetElectionTimer()
// start go routine to send heartbeats
go rf.sendHeartbeats()
return rf
}
\ No newline at end of file
package raft
//
// Raft tests.
//
// we will use the original test_test.go to test your code for grading.
// so, while you can modify this code to help you debug, please
// test with the original before submitting.
//
import "testing"
import "fmt"
import "time"
import "math/rand"
import "sync"
// The tester generously allows solutions to complete elections in one second
// (much more than the paper's range of timeouts).
const RaftElectionTimeout = 1000 * time.Millisecond
func TestInitialElection2A(t *testing.T) {
servers := 3
cfg := make_config(t, servers, false)
defer cfg.cleanup()
cfg.begin("Test (2A): initial election")
// is a leader elected?
cfg.checkOneLeader()
// sleep a bit to avoid racing with followers learning of the
// election, then check that all peers agree on the term.
time.Sleep(50 * time.Millisecond)
term1 := cfg.checkTerms()
if term1 < 1 {
t.Fatalf("term is %v, but should be at least 1", term1)
}
// does the leader+term stay the same if there is no network failure?
time.Sleep(2 * RaftElectionTimeout)
term2 := cfg.checkTerms()
if term1 != term2 {
fmt.Printf("warning: term changed even though there were no failures")
}
// there should still be a leader.
cfg.checkOneLeader()
cfg.end()
}
func TestReElection2A(t *testing.T) {
servers := 3
cfg := make_config(t, servers, false)
defer cfg.cleanup()
cfg.begin("Test (2A): election after network failure")
leader1 := cfg.checkOneLeader()
// if the leader disconnects, a new one should be elected.
cfg.disconnect(leader1)
cfg.checkOneLeader()
// if the old leader rejoins, that shouldn't
// disturb the new leader.
cfg.connect(leader1)
leader2 := cfg.checkOneLeader()
// if there's no quorum, no leader should
// be elected.
cfg.disconnect(leader2)
cfg.disconnect((leader2 + 1) % servers)
time.Sleep(2 * RaftElectionTimeout)
cfg.checkNoLeader()
// if a quorum arises, it should elect a leader.
cfg.connect((leader2 + 1) % servers)
cfg.checkOneLeader()
// re-join of last node shouldn't prevent leader from existing.
cfg.connect(leader2)
cfg.checkOneLeader()
cfg.end()
}
func TestBasicAgree2B(t *testing.T) {
servers := 3
cfg := make_config(t, servers, false)
defer cfg.cleanup()
cfg.begin("Test (2B): basic agreement")
iters := 3
for index := 1; index < iters+1; index++ {
nd, _ := cfg.nCommitted(index)
if nd > 0 {
t.Fatalf("some have committed before Start()")
}
xindex := cfg.one(index*100, servers, false)
if xindex != index {
t.Fatalf("got index %v but expected %v", xindex, index)
}
}
cfg.end()
}
//
// check, based on counting bytes of RPCs, that
// each command is sent to each peer just once.
//
func TestRPCBytes2B(t *testing.T) {
servers := 3
cfg := make_config(t, servers, false)
defer cfg.cleanup()
cfg.begin("Test (2B): RPC byte count")
cfg.one(99, servers, false)
bytes0 := cfg.bytesTotal()
iters := 10
var sent int64 = 0
for index := 2; index < iters+2; index++ {
cmd := randstring(5000)
xindex := cfg.one(cmd, servers, false)
if xindex != index {
t.Fatalf("got index %v but expected %v", xindex, index)
}
sent += int64(len(cmd))
}
bytes1 := cfg.bytesTotal()
got := bytes1 - bytes0
expected := int64(servers) * sent
if got > expected+50000 {
t.Fatalf("too many RPC bytes; got %v, expected %v", got, expected)
}
cfg.end()
}
func TestFailAgree2B(t *testing.T) {
servers := 3
cfg := make_config(t, servers, false)
defer cfg.cleanup()
cfg.begin("Test (2B): agreement despite follower disconnection")
cfg.one(101, servers, false)
// disconnect one follower from the network.
leader := cfg.checkOneLeader()
cfg.disconnect((leader + 1) % servers)
// the leader and remaining follower should be
// able to agree despite the disconnected follower.
cfg.one(102, servers-1, false)
cfg.one(103, servers-1, false)
time.Sleep(RaftElectionTimeout)
cfg.one(104, servers-1, false)
cfg.one(105, servers-1, false)
// re-connect
cfg.connect((leader + 1) % servers)
// the full set of servers should preserve
// previous agreements, and be able to agree
// on new commands.
cfg.one(106, servers, true)
time.Sleep(RaftElectionTimeout)
cfg.one(107, servers, true)
cfg.end()
}
func TestFailNoAgree2B(t *testing.T) {
servers := 5
cfg := make_config(t, servers, false)
defer cfg.cleanup()
cfg.begin("Test (2B): no agreement if too many followers disconnect")
cfg.one(10, servers, false)
// 3 of 5 followers disconnect
leader := cfg.checkOneLeader()
cfg.disconnect((leader + 1) % servers)
cfg.disconnect((leader + 2) % servers)
cfg.disconnect((leader + 3) % servers)
index, _, ok := cfg.rafts[leader].Start(20)
if ok != true {
t.Fatalf("leader rejected Start()")
}
if index != 2 {
t.Fatalf("expected index 2, got %v", index)
}
time.Sleep(2 * RaftElectionTimeout)
n, _ := cfg.nCommitted(index)
if n > 0 {
t.Fatalf("%v committed but no majority", n)
}
// repair
cfg.connect((leader + 1) % servers)
cfg.connect((leader + 2) % servers)
cfg.connect((leader + 3) % servers)
// the disconnected majority may have chosen a leader from
// among their own ranks, forgetting index 2.
leader2 := cfg.checkOneLeader()
index2, _, ok2 := cfg.rafts[leader2].Start(30)
if ok2 == false {
t.Fatalf("leader2 rejected Start()")
}
if index2 < 2 || index2 > 3 {
t.Fatalf("unexpected index %v", index2)
}
cfg.one(1000, servers, true)
cfg.end()
}
func TestConcurrentStarts2B(t *testing.T) {
servers := 3
cfg := make_config(t, servers, false)
defer cfg.cleanup()
cfg.begin("Test (2B): concurrent Start()s")
var success bool
loop:
for try := 0; try < 5; try++ {
if try > 0 {
// give solution some time to settle
time.Sleep(3 * time.Second)
}
leader := cfg.checkOneLeader()
_, term, ok := cfg.rafts[leader].Start(1)
if !ok {
// leader moved on really quickly
continue
}
iters := 5
var wg sync.WaitGroup
is := make(chan int, iters)
for ii := 0; ii < iters; ii++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
i, term1, ok := cfg.rafts[leader].Start(100 + i)
if term1 != term {
return
}
if ok != true {
return
}
is <- i
}(ii)
}
wg.Wait()
close(is)
for j := 0; j < servers; j++ {
if t, _ := cfg.rafts[j].GetState(); t != term {
// term changed -- can't expect low RPC counts
continue loop
}
}
failed := false
cmds := []int{}
for index := range is {
cmd := cfg.wait(index, servers, term)
if ix, ok := cmd.(int); ok {
if ix == -1 {
// peers have moved on to later terms
// so we can't expect all Start()s to
// have succeeded
failed = true
break
}
cmds = append(cmds, ix)
} else {
t.Fatalf("value %v is not an int", cmd)
}
}
if failed {
// avoid leaking goroutines
go func() {
for range is {
}
}()
continue
}
for ii := 0; ii < iters; ii++ {
x := 100 + ii
ok := false
for j := 0; j < len(cmds); j++ {
if cmds[j] == x {
ok = true
}
}
if ok == false {
t.Fatalf("cmd %v missing in %v", x, cmds)
}
}
success = true
break
}
if !success {
t.Fatalf("term changed too often")
}
cfg.end()
}
func TestRejoin2B(t *testing.T) {
servers := 3
cfg := make_config(t, servers, false)
defer cfg.cleanup()
cfg.begin("Test (2B): rejoin of partitioned leader")
cfg.one(101, servers, true)
// leader network failure
leader1 := cfg.checkOneLeader()
cfg.disconnect(leader1)
// make old leader try to agree on some entries
cfg.rafts[leader1].Start(102)
cfg.rafts[leader1].Start(103)
cfg.rafts[leader1].Start(104)
// new leader commits, also for index=2
cfg.one(103, 2, true)
// new leader network failure
leader2 := cfg.checkOneLeader()
cfg.disconnect(leader2)
// old leader connected again
cfg.connect(leader1)
cfg.one(104, 2, true)
// all together now
cfg.connect(leader2)
cfg.one(105, servers, true)
cfg.end()
}
func TestBackup2B(t *testing.T) {
servers := 5
cfg := make_config(t, servers, false)
defer cfg.cleanup()
cfg.begin("Test (2B): leader backs up quickly over incorrect follower logs")
cfg.one(rand.Int(), servers, true)
// put leader and one follower in a partition
leader1 := cfg.checkOneLeader()
cfg.disconnect((leader1 + 2) % servers)
cfg.disconnect((leader1 + 3) % servers)
cfg.disconnect((leader1 + 4) % servers)
// submit lots of commands that won't commit
for i := 0; i < 50; i++ {
cfg.rafts[leader1].Start(rand.Int())
}
time.Sleep(RaftElectionTimeout / 2)
cfg.disconnect((leader1 + 0) % servers)
cfg.disconnect((leader1 + 1) % servers)
// allow other partition to recover
cfg.connect((leader1 + 2) % servers)
cfg.connect((leader1 + 3) % servers)
cfg.connect((leader1 + 4) % servers)
// lots of successful commands to new group.
for i := 0; i < 50; i++ {
cfg.one(rand.Int(), 3, true)
}
// now another partitioned leader and one follower
leader2 := cfg.checkOneLeader()
other := (leader1 + 2) % servers
if leader2 == other {
other = (leader2 + 1) % servers
}
cfg.disconnect(other)
// lots more commands that won't commit
for i := 0; i < 50; i++ {
cfg.rafts[leader2].Start(rand.Int())
}
time.Sleep(RaftElectionTimeout / 2)
// bring original leader back to life,
for i := 0; i < servers; i++ {
cfg.disconnect(i)
}
cfg.connect((leader1 + 0) % servers)
cfg.connect((leader1 + 1) % servers)
cfg.connect(other)
// lots of successful commands to new group.
for i := 0; i < 50; i++ {
cfg.one(rand.Int(), 3, true)
}
// now everyone
for i := 0; i < servers; i++ {
cfg.connect(i)
}
cfg.one(rand.Int(), servers, true)
cfg.end()
}
func TestCount2B(t *testing.T) {
servers := 3
cfg := make_config(t, servers, false)
defer cfg.cleanup()
cfg.begin("Test (2B): RPC counts aren't too high")
rpcs := func() (n int) {
for j := 0; j < servers; j++ {
n += cfg.rpcCount(j)
}
return
}
leader := cfg.checkOneLeader()
total1 := rpcs()
if total1 > 30 || total1 < 1 {
t.Fatalf("too many or few RPCs (%v) to elect initial leader\n", total1)
}
var total2 int
var success bool
loop:
for try := 0; try < 5; try++ {
if try > 0 {
// give solution some time to settle
time.Sleep(3 * time.Second)
}
leader = cfg.checkOneLeader()
total1 = rpcs()
iters := 10
starti, term, ok := cfg.rafts[leader].Start(1)
if !ok {
// leader moved on really quickly
continue
}
cmds := []int{}
for i := 1; i < iters+2; i++ {
x := int(rand.Int31())
cmds = append(cmds, x)
index1, term1, ok := cfg.rafts[leader].Start(x)
if term1 != term {
// Term changed while starting
continue loop
}
if !ok {
// No longer the leader, so term has changed
continue loop
}
if starti+i != index1 {
t.Fatalf("Start() failed")
}
}
for i := 1; i < iters+1; i++ {
cmd := cfg.wait(starti+i, servers, term)
if ix, ok := cmd.(int); ok == false || ix != cmds[i-1] {
if ix == -1 {
// term changed -- try again
continue loop
}
t.Fatalf("wrong value %v committed for index %v; expected %v\n", cmd, starti+i, cmds)
}
}
failed := false
total2 = 0
for j := 0; j < servers; j++ {
if t, _ := cfg.rafts[j].GetState(); t != term {
// term changed -- can't expect low RPC counts
// need to keep going to update total2
failed = true
}
total2 += cfg.rpcCount(j)
}
if failed {
continue loop
}
if total2-total1 > (iters+1+3)*3 {
t.Fatalf("too many RPCs (%v) for %v entries\n", total2-total1, iters)
}
success = true
break
}
if !success {
t.Fatalf("term changed too often")
}
time.Sleep(RaftElectionTimeout)
total3 := 0
for j := 0; j < servers; j++ {
total3 += cfg.rpcCount(j)
}
if total3-total2 > 3*20 {
t.Fatalf("too many RPCs (%v) for 1 second of idleness\n", total3-total2)
}
cfg.end()
}
package raft
import "log"
// Debugging
const Debug = 0
func DPrintf(format string, a ...interface{}) (n int, err error) {
if Debug > 0 {
log.Printf(format, a...)
}
return
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment