草庐IT

go - goraft中所有节点的状态

coder 2023-07-01 原文

我有 4 个节点的集群 2001、2002、2003 和 2004。 他们使用 goraft 绑定(bind)。 假设 2001 是主服务器。 现在当它失败时,另一个节点成为服务器。 现在我想要的是,成为当前服务器的节点应该发送消息说我是新的领导者。 那么如何实现呢? 我正在使用带有 GORAFD 实现的 GORAFT。 我在这里附上源代码。

main.go - 客户端

package main

import (
    "flag"
    "fmt"
    "github.com/goraft/raft"
    "github.com/goraft/raftd/command"
    "github.com/goraft/raftd/server"
    "log"
    "math/rand"
    "os"
    "time"
    "strconv"
)

var verbose bool
var trace bool
var debug bool
var host string
var port int
var join string

func init() {
    flag.Parse()
    flag.BoolVar(&verbose, "v", false, "verbose logging")
    flag.BoolVar(&trace, "trace", false, "Raft trace debugging")
    flag.BoolVar(&debug, "debug", false, "Raft debugging")
    flag.StringVar(&host, "h", "localhost", "hostname")
    p,_:=strconv.Atoi(flag.Arg(1))
    flag.IntVar(&port, "p", p, "port")
    flag.StringVar(&join, "join", "", "host:port of leader to join")
    flag.Usage = func() {
        fmt.Fprintf(os.Stderr, "Usage: %s [arguments] <data-path> \n", os.Args[0])
        flag.PrintDefaults()
    }
}

func main() {
    log.SetFlags(0)
    flag.Parse()
    if verbose {
        log.Print("Verbose logging enabled.")
    }
    if trace {
        raft.SetLogLevel(raft.Trace)
        log.Print("Raft trace debugging enabled.")
    } else if debug {
        raft.SetLogLevel(raft.Debug)
        log.Print("Raft debugging enabled.")
    }

    rand.Seed(time.Now().UnixNano())

    // Setup commands.
    raft.RegisterCommand(&command.WriteCommand{})

    // Set the data directory.
    if flag.NArg() == 0 {
        flag.Usage()
        log.Fatal("Data path argument required")
    }
    path := flag.Arg(0)
    if err := os.MkdirAll(path, 0744); err != nil {
        log.Fatalf("Unable to create path: %v", err)
    }

    log.SetFlags(log.LstdFlags)
    s := server.New(path, host, port)
    log.Fatal(s.ListenAndServe("localhost:2001"))
    fmt.Println("I am changing my status");
}

Main.go - 用于服务器,即 2001

package main

import (
    "flag"
    "fmt"
    "github.com/goraft/raft"
    "github.com/goraft/raftd/command"
    "github.com/goraft/raftd/server"
    "log"
    "math/rand"
    "os"
    "time"
    "strconv"
)

var verbose bool
var trace bool
var debug bool
var host string
var port int
var join string

func init() {
    flag.Parse()
    flag.BoolVar(&verbose, "v", false, "verbose logging")
    flag.BoolVar(&trace, "trace", false, "Raft trace debugging")
    flag.BoolVar(&debug, "debug", false, "Raft debugging")
    flag.StringVar(&host, "h", "localhost", "hostname")
    p,_:=strconv.Atoi(flag.Arg(1))
    flag.IntVar(&port, "p", p, "port")
    flag.StringVar(&join, "join", "", "host:port of leader to join")
    flag.Usage = func() {
        fmt.Fprintf(os.Stderr, "Usage: %s [arguments] <data-path> \n", os.Args[0])
        flag.PrintDefaults()
    }
}

func main() {
    log.SetFlags(0)
    flag.Parse()
    if verbose {
        log.Print("Verbose logging enabled.")
    }
    if trace {
        raft.SetLogLevel(raft.Trace)
        log.Print("Raft trace debugging enabled.")
    } else if debug {
        raft.SetLogLevel(raft.Debug)
        log.Print("Raft debugging enabled.")
    }

    rand.Seed(time.Now().UnixNano())

    // Setup commands.
    raft.RegisterCommand(&command.WriteCommand{})

    // Set the data directory.
    if flag.NArg() == 0 {
        flag.Usage()
        log.Fatal("Data path argument required")
    }
    path := flag.Arg(0)
    if err := os.MkdirAll(path, 0744); err != nil {
        log.Fatalf("Unable to create path: %v", err)
    }

    log.SetFlags(log.LstdFlags)
    s := server.New(path, host, port)
    log.Fatal(s.ListenAndServe(join))
}

普通Server.go代码

package server

import (
    "bytes"
    "encoding/json"
    "fmt"
    "github.com/goraft/raft"
    "github.com/goraft/raftd/command"
    "github.com/goraft/raftd/db"
    "github.com/gorilla/mux"
    "io/ioutil"
    "log"
    "math/rand"
    "net/http"
    "path/filepath"
    "sync"
    "time"
)

// The raftd server is a combination of the Raft server and an HTTP
// server which acts as the transport.
type Server struct {
    name       string
    host       string
    port       int
    path       string
    router     *mux.Router
    raftServer raft.Server
    httpServer *http.Server
    db         *db.DB
    mutex      sync.RWMutex
}

// Creates a new server.
func New(path string, host string, port int) *Server {
    s := &Server{
        host:   host,
        port:   port,
        path:   path,
        db:     db.New(),
        router: mux.NewRouter(),
    }

    // Read existing name or generate a new one.
    if b, err := ioutil.ReadFile(filepath.Join(path, "name")); err == nil {
        s.name = string(b)
    } else {
        s.name = fmt.Sprintf("%07x", rand.Int())[0:7]
        if err = ioutil.WriteFile(filepath.Join(path, "name"), []byte(s.name), 0644); err != nil {
            panic(err)
        }
    }

    return s
}

// Returns the connection string.
func (s *Server) connectionString() string {
    return fmt.Sprintf("http://%s:%d", s.host, s.port)
}

// Starts the server.
func (s *Server) ListenAndServe(leader string) error {
    var err error

    log.Printf("Initializing Raft Server: %s", s.path)

    // Initialize and start Raft server.
    transporter := raft.NewHTTPTransporter("/raft", 200*time.Millisecond)
    s.raftServer, err = raft.NewServer(s.name, s.path, transporter, nil, s.db, "")
    if err != nil {
        log.Fatal(err)
    }
    transporter.Install(s.raftServer, s)
    s.raftServer.Start()

    if leader != "" {
        // Join to leader if specified.

        log.Println("Attempting to join leader:", leader)

        if !s.raftServer.IsLogEmpty() {
            log.Fatal("Cannot join with an existing log")
        }
        if err := s.Join(leader); err != nil {
            log.Fatal(err)
        }

    } else if s.raftServer.IsLogEmpty() {
        // Initialize the server by joining itself.

        log.Println("Initializing new cluster")

        _, err := s.raftServer.Do(&raft.DefaultJoinCommand{
            Name:             s.raftServer.Name(),
            ConnectionString: s.connectionString(),
        })
        if err != nil {
            log.Fatal(err)
        }

    } else {
        log.Println("Recovered from log")
    }

    log.Println("Initializing HTTP server")

    // Initialize and start HTTP server.
    s.httpServer = &http.Server{
        Addr:    fmt.Sprintf(":%d", s.port),
        Handler: s.router,
    }

    s.router.HandleFunc("/db/{key}", s.readHandler).Methods("GET")
    s.router.HandleFunc("/db/{key}", s.writeHandler).Methods("POST")
    s.router.HandleFunc("/join", s.joinHandler).Methods("POST")

    log.Println("Listening at:", s.connectionString())

    return s.httpServer.ListenAndServe()
}

// This is a hack around Gorilla mux not providing the correct net/http
// HandleFunc() interface.
func (s *Server) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) {
    s.router.HandleFunc(pattern, handler)
}

// Joins to the leader of an existing cluster.
func (s *Server) Join(leader string) error {
    command := &raft.DefaultJoinCommand{
        Name:     s.raftServer.Name(),
        ConnectionString: s.connectionString(),
    }

    var b bytes.Buffer
    json.NewEncoder(&b).Encode(command)
    resp, err := http.Post(fmt.Sprintf("http://%s/join", leader), "application/json", &b)
    if err != nil {
        return err
    }
    resp.Body.Close()

    return nil
}

func (s *Server) joinHandler(w http.ResponseWriter, req *http.Request) {
    command := &raft.DefaultJoinCommand{}

    if err := json.NewDecoder(req.Body).Decode(&command); err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    if _, err := s.raftServer.Do(command); err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
}

func (s *Server) readHandler(w http.ResponseWriter, req *http.Request) {
    vars := mux.Vars(req)
    value := s.db.Get(vars["key"])
    w.Write([]byte(value))
}

func (s *Server) writeHandler(w http.ResponseWriter, req *http.Request) {
    vars := mux.Vars(req)

    // Read the value from the POST body.
    b, err := ioutil.ReadAll(req.Body)
    if err != nil {
        w.WriteHeader(http.StatusBadRequest)
        return
    }
    value := string(b)

    // Execute the command against the Raft server.
    _, err = s.raftServer.Do(command.NewWriteCommand(vars["key"], value))
    if err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
    }
}

请给出一些解决方案。

最佳答案

我做到了。

我刚刚在 goraft-library 代码中插入了新行,其中领导者选择发生。

所以要做到这一点就去 goraft的server.go文件,做如下修改。

原始 Server.go - 行 [287-309]

// Sets the state of the server.
func (s *server) setState(state string) {
    s.mutex.Lock()
    defer s.mutex.Unlock()

    // Temporarily store previous values.
    prevState := s.state
    prevLeader := s.leader

    // Update state and leader.
    s.state = state
    if state == Leader {
        s.leader = s.Name()
        s.syncedPeer = make(map[string]bool)
    }

    // Dispatch state and leader change events.
    s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState))

    if prevLeader != s.leader {
        s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
    }
}

编辑的Server.go

// Sets the state of the server.
func (s *server) setState(state string) {
    s.mutex.Lock()
    defer s.mutex.Unlock()

    // Temporarily store previous values.
    prevState := s.state
    prevLeader := s.leader

    // Update state and leader.
    s.state = state
    if state == Leader {
        s.leader = s.Name()
        s.syncedPeer = make(map[string]bool)
    }

    // Dispatch state and leader change events.
    s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState))

    if prevLeader != s.leader {
        fmt.Println("I am the Leader..!!  ",s.connectionString,"   ",s.path)
        s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
    }
}

所以它会在事件主服务器的控制台上打印连接 Stirng 以及当前服务器的存储路径。

关于go - goraft中所有节点的状态,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35452424/

有关go - goraft中所有节点的状态的更多相关文章

  1. ruby - 在 Ruby 程序执行时阻止 Windows 7 PC 进入休眠状态 - 2

    我需要在客户计算机上运行Ruby应用程序。通常需要几天才能完成(复制大备份文件)。问题是如果启用sleep,它会中断应用程序。否则,计算机将持续运行数周,直到我下次访问为止。有什么方法可以防止执行期间休眠并让Windows在执行后休眠吗?欢迎任何疯狂的想法;-) 最佳答案 Here建议使用SetThreadExecutionStateWinAPI函数,使应用程序能够通知系统它正在使用中,从而防止系统在应用程序运行时进入休眠状态或关闭显示。像这样的东西:require'Win32API'ES_AWAYMODE_REQUIRED=0x0

  2. ruby-on-rails - 跳过状态机方法的所有验证 - 2

    当我的预订模型通过rake任务在状态机上转换时,我试图找出如何跳过对ActiveRecord对象的特定实例的验证。我想在reservation.close时跳过所有验证!叫做。希望调用reservation.close!(:validate=>false)之类的东西。仅供引用,我们正在使用https://github.com/pluginaweek/state_machine用于状态机。这是我的预订模型的示例。classReservation["requested","negotiating","approved"])}state_machine:initial=>'requested

  3. ruby - 字符串文字中的转义状态作为 `String#tr` 的参数 - 2

    对于作为String#tr参数的单引号字符串文字中反斜杠的转义状态,我觉得有些神秘。你能解释一下下面三个例子之间的对比吗?我特别不明白第二个。为了避免复杂化,我在这里使用了'd',在双引号中转义时不会改变含义("\d"="d")。'\\'.tr('\\','x')#=>"x"'\\'.tr('\\d','x')#=>"\\"'\\'.tr('\\\d','x')#=>"x" 最佳答案 在tr中转义tr的第一个参数非常类似于正则表达式中的括号字符分组。您可以在表达式的开头使用^来否定匹配(替换任何不匹配的内容)并使用例如a-f来匹配一

  4. ruby - Net::HTTP 获取源代码和状态 - 2

    我目前正在使用以下方法获取页面的源代码:Net::HTTP.get(URI.parse(page.url))我还想获取HTTP状态,而无需发出第二个请求。有没有办法用另一种方法做到这一点?我一直在查看文档,但似乎找不到我要找的东西。 最佳答案 在我看来,除非您需要一些真正的低级访问或控制,否则最好使用Ruby的内置Open::URI模块:require'open-uri'io=open('http://www.example.org/')#=>#body=io.read[0,50]#=>"["200","OK"]io.base_ur

  5. ruby-on-rails - 为模型创建状态属性 - 2

    我想为我的Task模型创建一个status属性,该属性将按以下顺序指示它在三部分进度中的位置:打开=>进行中=>完成。它的工作方式类似于亚马逊包裹的交付方式:已订购=>已发货=>已交付。我想知道设置此属性的最佳方法是什么。我可能是错的,但创建三个独立的bool属性似乎有点多余。实现此目标的最佳方法是什么? 最佳答案 Rails4有一个内置的enummacro.它使用单个整数列并映射到键列表。classOrderenumstatus:[:ordered,:shipped,:delivered]end状态映射如下:{ordered:0,

  6. ruby - 是否可以在不实际发送或读取数据的情况下查明 ruby​​ 套接字是否处于 ESTABLISHED 或 CLOSE_WAIT 状态? - 2

    s=Socket.new(Socket::AF_INET,Socket::SOCK_STREAM,0)s.connect(Socket.pack_sockaddr_in('port','hostname'))ssl=OpenSSL::SSL::SSLSocket.new(s,sslcert)ssl.connect从这里开始,如果ssl连接和底层套接字仍然是ESTABLISHED,或者它是否在默认值7200之后进入CLOSE_WAIT,我想检查一个线程几秒钟甚至更糟的是在实际上不需要.write()或.read()的情况下关闭。是用select()、IO.select()还是其他方法完成

  7. ruby - 在 ruby​​ 中生成一个进程,捕获 stdout,stderr,获取退出状态 - 2

    我想从ruby​​rake脚本运行一个可执行文件,比如foo.exe我希望将foo.exe的STDOUT和STDERR输出直接写入我正在运行rake任务的控制台.当进程完成时,我想将退出代码捕获到一个变量中。我如何实现这一目标?我一直在玩backticks、process.spawn、system但我无法获得我想要的所有行为,只有部分更新:我在Windows上,在标准命令提示符下,而不是cygwin 最佳答案 system获取您想要的STDOUT行为。它还返回true作为零退出代码,这可能很有用。$?填充了有关最后一次system调

  8. ruby-on-rails - 状态机、模型验证和 RSpec - 2

    这是我当前的类定义和规范:classEvent:not_starteddoevent:game_starteddotransition:not_started=>:in_progressendevent:game_endeddotransition:in_progress=>:finalendevent:game_postponeddotransition[:not_started,:in_progress]=>:postponedendstate:not_started,:in_progress,:postponeddovalidate:end_time_before_finalen

  9. ruby - 如何使用 cucumber 在场景之间共享状态 - 2

    我有一个功能“从外部网站导入文章”。在我的第一个场景中,我测试从外部网站导入链接列表。Feature:ImportingarticlesfromexternalwebsiteScenario:Searchingarticlesonexample.comandreturnthelinksGiventhereisanImporterAnditsURLis"http://example.com"Whenwesearchfor"demo"ThentheImportershouldreturn25linksAndoneofthelinksshouldbe"http://example.com/d

  10. ruby - 从 ruby​​ 调用时返回 shell 脚本的状态值? - 2

    我希望这些值匹配。当shell脚本由于某些错误条件而退出时(因此返回非零值),它们不匹配。壳$?返回1,ruby$?返回256。>>%x[lskkr]ls:kkr:Nosuchfileordirectory=>"">>puts$?256=>nil>>exitHadoop:~Madcap$lskkrls:kkr:NosuchfileordirectoryHadoop:~Madcap$echo$?1 最佳答案 在Ruby中$?是一个Process::Status实例。打印$?等同于调用$?.to_s,这等同于$?.to_i.to_s(来

随机推荐