From eed80cf00d3be3992bbc8426f51ed927be0501f4 Mon Sep 17 00:00:00 2001 From: Sammy Libre Date: Tue, 6 Dec 2016 22:16:57 +0500 Subject: [PATCH] Add upstream failovers --- config.example.json | 13 ++-- go-pool/pool/pool.go | 32 +++++----- go-pool/rpc/rpc.go | 126 ++++++++++++++++++++++++++----------- go-pool/stratum/api.go | 24 +++++++ go-pool/stratum/blocks.go | 5 +- go-pool/stratum/miner.go | 6 +- go-pool/stratum/stratum.go | 57 ++++++++++++++++- 7 files changed, 203 insertions(+), 60 deletions(-) diff --git a/config.example.json b/config.example.json index 51c40a7..d9fbf76 100644 --- a/config.example.json +++ b/config.example.json @@ -42,11 +42,14 @@ "hideIP": false }, - "daemon": { - "host": "127.0.0.1", - "port": 18081, - "timeout": "10s" - }, + "upstream": [ + { + "name": "Main", + "host": "127.0.0.1", + "port": 18081, + "timeout": "10s" + } + ], "newrelicEnabled": false, "newrelicName": "MyStratum", diff --git a/go-pool/pool/pool.go b/go-pool/pool/pool.go index 6e0b555..89d20d6 100644 --- a/go-pool/pool/pool.go +++ b/go-pool/pool/pool.go @@ -1,20 +1,21 @@ package pool type Config struct { - Address string `json:"address"` - BypassAddressValidation bool `json:"bypassAddressValidation"` - BypassShareValidation bool `json:"bypassShareValidation"` - Stratum Stratum `json:"stratum"` - Daemon Daemon `json:"daemon"` - EstimationWindow string `json:"estimationWindow"` - LuckWindow string `json:"luckWindow"` - LargeLuckWindow string `json:"largeLuckWindow"` - Threads int `json:"threads"` - Frontend Frontend `json:"frontend"` - NewrelicName string `json:"newrelicName"` - NewrelicKey string `json:"newrelicKey"` - NewrelicVerbose bool `json:"newrelicVerbose"` - NewrelicEnabled bool `json:"newrelicEnabled"` + Address string `json:"address"` + BypassAddressValidation bool `json:"bypassAddressValidation"` + BypassShareValidation bool `json:"bypassShareValidation"` + Stratum Stratum `json:"stratum"` + UpstreamCheckInterval string `json:"upstreamCheckInterval"` + Upstream []Upstream `json:"upstream"` + EstimationWindow string `json:"estimationWindow"` + LuckWindow string `json:"luckWindow"` + LargeLuckWindow string `json:"largeLuckWindow"` + Threads int `json:"threads"` + Frontend Frontend `json:"frontend"` + NewrelicName string `json:"newrelicName"` + NewrelicKey string `json:"newrelicKey"` + NewrelicVerbose bool `json:"newrelicVerbose"` + NewrelicEnabled bool `json:"newrelicEnabled"` } type Stratum struct { @@ -30,7 +31,8 @@ type Port struct { MaxConn int `json:"maxConn"` } -type Daemon struct { +type Upstream struct { + Name string `json:"name"` Host string `json:"host"` Port int `json:"port"` Timeout string `json:"timeout"` diff --git a/go-pool/rpc/rpc.go b/go-pool/rpc/rpc.go index a5820ca..40a3dc9 100644 --- a/go-pool/rpc/rpc.go +++ b/go-pool/rpc/rpc.go @@ -5,16 +5,29 @@ import ( "encoding/json" "errors" "fmt" - "io/ioutil" "net/http" + "net/url" + "sync" + "sync/atomic" "time" "../pool" ) type RPCClient struct { - url string - client *http.Client + sync.RWMutex + Url *url.URL + login string + password string + Name string + sick bool + sickRate int + successRate int + Accepts uint64 + Rejects uint64 + LastSubmissionAt int64 + client *http.Client + FailsCount uint64 } type GetBlockTemplateReply struct { @@ -31,61 +44,104 @@ type JSONRpcResp struct { Error map[string]interface{} `json:"error"` } -func NewRPCClient(cfg *pool.Config) *RPCClient { - url := fmt.Sprintf("http://%s:%v/json_rpc", cfg.Daemon.Host, cfg.Daemon.Port) - rpcClient := &RPCClient{url: url} - timeout, _ := time.ParseDuration(cfg.Daemon.Timeout) +func NewRPCClient(cfg *pool.Upstream) (*RPCClient, error) { + rawUrl := fmt.Sprintf("http://%s:%v/json_rpc", cfg.Host, cfg.Port) + url, err := url.Parse(rawUrl) + if err != nil { + return nil, err + } + rpcClient := &RPCClient{Name: cfg.Name, Url: url} + timeout, _ := time.ParseDuration(cfg.Timeout) rpcClient.client = &http.Client{ Timeout: timeout, } - return rpcClient + return rpcClient, nil } -func (r *RPCClient) GetBlockTemplate(reserveSize int, address string) (GetBlockTemplateReply, error) { +func (r *RPCClient) GetBlockTemplate(reserveSize int, address string) (*GetBlockTemplateReply, error) { params := map[string]interface{}{"reserve_size": reserveSize, "wallet_address": address} - - rpcResp, err := r.doPost(r.url, "getblocktemplate", params) - var reply GetBlockTemplateReply + rpcResp, err := r.doPost(r.Url.String(), "getblocktemplate", params) + var reply *GetBlockTemplateReply if err != nil { - return reply, err + return nil, err } - if rpcResp.Error != nil { - return reply, errors.New(string(rpcResp.Error["message"].(string))) + if rpcResp.Result != nil { + err = json.Unmarshal(*rpcResp.Result, &reply) } - - err = json.Unmarshal(*rpcResp.Result, &reply) return reply, err } -func (r *RPCClient) SubmitBlock(hash string) (JSONRpcResp, error) { - rpcResp, err := r.doPost(r.url, "submitblock", []string{hash}) - if err != nil { - return rpcResp, err - } - if rpcResp.Error != nil { - return rpcResp, errors.New(string(rpcResp.Error["message"].(string))) - } - return rpcResp, nil +func (r *RPCClient) SubmitBlock(hash string) (*JSONRpcResp, error) { + return r.doPost(r.Url.String(), "submitblock", []string{hash}) } -func (r *RPCClient) doPost(url string, method string, params interface{}) (JSONRpcResp, error) { - jsonReq := map[string]interface{}{"id": "0", "method": method, "params": params} +func (r *RPCClient) doPost(url, method string, params interface{}) (*JSONRpcResp, error) { + jsonReq := map[string]interface{}{"jsonrpc": "2.0", "id": 0, "method": method, "params": params} data, _ := json.Marshal(jsonReq) - req, err := http.NewRequest("POST", url, bytes.NewBuffer(data)) req.Header.Set("Content-Length", (string)(len(data))) req.Header.Set("Content-Type", "application/json") req.Header.Set("Accept", "application/json") - + req.SetBasicAuth(r.login, r.password) resp, err := r.client.Do(req) - var rpcResp JSONRpcResp - if err != nil { - return rpcResp, err + r.markSick() + return nil, err } defer resp.Body.Close() - body, _ := ioutil.ReadAll(resp.Body) - err = json.Unmarshal(body, &rpcResp) + if resp.StatusCode < 200 || resp.StatusCode >= 400 { + return nil, errors.New(resp.Status) + } + + var rpcResp *JSONRpcResp + err = json.NewDecoder(resp.Body).Decode(&rpcResp) + if err != nil { + r.markSick() + return nil, err + } + if rpcResp.Error != nil { + r.markSick() + return nil, errors.New(rpcResp.Error["message"].(string)) + } return rpcResp, err } + +func (r *RPCClient) Check(reserveSize int, address string) (bool, error) { + _, err := r.GetBlockTemplate(reserveSize, address) + if err != nil { + return false, err + } + r.markAlive() + return !r.Sick(), nil +} + +func (r *RPCClient) Sick() bool { + r.RLock() + defer r.RUnlock() + return r.sick +} + +func (r *RPCClient) markSick() { + r.Lock() + if !r.sick { + atomic.AddUint64(&r.FailsCount, 1) + } + r.sickRate++ + r.successRate = 0 + if r.sickRate >= 5 { + r.sick = true + } + r.Unlock() +} + +func (r *RPCClient) markAlive() { + r.Lock() + r.successRate++ + if r.successRate >= 5 { + r.sick = false + r.sickRate = 0 + r.successRate = 0 + } + r.Unlock() +} diff --git a/go-pool/stratum/api.go b/go-pool/stratum/api.go index 21dd958..5eef9c8 100644 --- a/go-pool/stratum/api.go +++ b/go-pool/stratum/api.go @@ -6,6 +6,7 @@ import ( "sync/atomic" "time" + "../rpc" "../util" ) @@ -24,6 +25,16 @@ func (s *StratumServer) StatsIndex(w http.ResponseWriter, r *http.Request) { "now": util.MakeTimestamp(), } + var upstreams []interface{} + current := atomic.LoadInt32(&s.upstream) + + for i, u := range s.upstreams { + upstream := convertUpstream(u) + upstream["current"] = current == int32(i) + upstreams = append(upstreams, upstream) + } + stats["upstreams"] = upstreams + stats["current"] = convertUpstream(s.rpc()) stats["luck"] = s.getLuckStats() if t := s.currentBlockTemplate(); t != nil { @@ -36,6 +47,19 @@ func (s *StratumServer) StatsIndex(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(stats) } +func convertUpstream(u *rpc.RPCClient) map[string]interface{} { + upstream := map[string]interface{}{ + "name": u.Name, + "url": u.Url.String(), + "sick": u.Sick(), + "accepts": atomic.LoadUint64(&u.Accepts), + "rejects": atomic.LoadUint64(&u.Rejects), + "lastSubmissionAt": atomic.LoadInt64(&u.LastSubmissionAt), + "failsCount": atomic.LoadUint64(&u.FailsCount), + } + return upstream +} + func (s *StratumServer) collectMinersStats() (float64, float64, int, []interface{}) { now := util.MakeTimestamp() var result []interface{} diff --git a/go-pool/stratum/blocks.go b/go-pool/stratum/blocks.go index 015c048..bc648f3 100644 --- a/go-pool/stratum/blocks.go +++ b/go-pool/stratum/blocks.go @@ -30,7 +30,8 @@ func (b *BlockTemplate) nextBlob(extraNonce uint32, instanceId []byte) string { } func (s *StratumServer) fetchBlockTemplate() bool { - reply, err := s.rpc.GetBlockTemplate(8, s.config.Address) + r := s.rpc() + reply, err := r.GetBlockTemplate(8, s.config.Address) if err != nil { log.Printf("Error while refreshing block template: %s", err) return false @@ -45,7 +46,7 @@ func (s *StratumServer) fetchBlockTemplate() bool { return false } } else { - log.Printf("New block to mine at height %v, diff: %v, prev_hash: %s", reply.Height, reply.Difficulty, reply.PrevHash) + log.Printf("New block to mine on %s at height %v, diff: %v, prev_hash: %s", r.Name, reply.Height, reply.Difficulty, reply.PrevHash) } newTemplate := BlockTemplate{ Difficulty: reply.Difficulty, diff --git a/go-pool/stratum/miner.go b/go-pool/stratum/miner.go index a5675df..621190b 100644 --- a/go-pool/stratum/miner.go +++ b/go-pool/stratum/miner.go @@ -145,6 +145,8 @@ func (m *Miner) findJob(id string) *Job { } func (m *Miner) processShare(s *StratumServer, e *Endpoint, job *Job, t *BlockTemplate, nonce string, result string) bool { + r := s.rpc() + shareBuff := make([]byte, len(t.Buffer)) copy(shareBuff, t.Buffer) copy(shareBuff[t.ReservedOffset+4:t.ReservedOffset+7], e.instanceId) @@ -179,9 +181,10 @@ func (m *Miner) processShare(s *StratumServer, e *Endpoint, job *Job, t *BlockTe block := hashDiff >= t.Difficulty if block { - _, err := s.rpc.SubmitBlock(hex.EncodeToString(shareBuff)) + _, err := r.SubmitBlock(hex.EncodeToString(shareBuff)) if err != nil { atomic.AddUint64(&m.rejects, 1) + atomic.AddUint64(&r.Rejects, 1) log.Printf("Block submission failure at height %v: %v", t.Height, err) } else { if len(convertedBlob) == 0 { @@ -191,6 +194,7 @@ func (m *Miner) processShare(s *StratumServer, e *Endpoint, job *Job, t *BlockTe // Immediately refresh current BT and send new jobs s.refreshBlockTemplate(true) atomic.AddUint64(&m.accepts, 1) + atomic.AddUint64(&r.Accepts, 1) log.Printf("Block %v found at height %v by miner %v@%v", blockFastHash[0:6], t.Height, m.Login, m.IP) } } else if hashDiff < job.Difficulty { diff --git a/go-pool/stratum/stratum.go b/go-pool/stratum/stratum.go index 524bb6a..0151e43 100644 --- a/go-pool/stratum/stratum.go +++ b/go-pool/stratum/stratum.go @@ -21,7 +21,8 @@ type StratumServer struct { config *pool.Config miners MinersMap blockTemplate atomic.Value - rpc *rpc.RPCClient + upstream int32 + upstreams []*rpc.RPCClient timeout time.Duration estimationWindow time.Duration blocksMu sync.RWMutex @@ -50,7 +51,19 @@ const ( func NewStratum(cfg *pool.Config) *StratumServer { stratum := &StratumServer{config: cfg} - stratum.rpc = rpc.NewRPCClient(cfg) + + stratum.upstreams = make([]*rpc.RPCClient, len(cfg.Upstream)) + for i, v := range cfg.Upstream { + client, err := rpc.NewRPCClient(&v) + if err != nil { + log.Fatal(err) + } else { + stratum.upstreams[i] = client + log.Printf("Upstream: %s => %s", client.Name, client.Url) + } + } + log.Printf("Default upstream: %s => %s", stratum.rpc().Name, stratum.rpc().Url) + stratum.miners = NewMinersMap() timeout, _ := time.ParseDuration(cfg.Stratum.Timeout) @@ -71,6 +84,9 @@ func NewStratum(cfg *pool.Config) *StratumServer { refreshTimer := time.NewTimer(refreshIntv) log.Printf("Set block refresh every %v", refreshIntv) + checkIntv, _ := time.ParseDuration(cfg.UpstreamCheckInterval) + checkTimer := time.NewTimer(checkIntv) + go func() { for { select { @@ -80,6 +96,17 @@ func NewStratum(cfg *pool.Config) *StratumServer { } } }() + + go func() { + for { + select { + case <-checkTimer.C: + stratum.checkUpstreams() + checkTimer.Reset(checkIntv) + } + } + }() + return stratum } @@ -280,6 +307,32 @@ func (s *StratumServer) currentBlockTemplate() *BlockTemplate { return nil } +func (s *StratumServer) checkUpstreams() { + candidate := int32(0) + backup := false + + for i, v := range s.upstreams { + ok, err := v.Check(8, s.config.Address) + if err != nil { + log.Printf("Upstream %v didn't pass check: %v", v.Name, err) + } + if ok && !backup { + candidate = int32(i) + backup = true + } + } + + if s.upstream != candidate { + log.Printf("Switching to %v upstream", s.upstreams[candidate].Name) + atomic.StoreInt32(&s.upstream, candidate) + } +} + +func (s *StratumServer) rpc() *rpc.RPCClient { + i := atomic.LoadInt32(&s.upstream) + return s.upstreams[i] +} + func checkError(err error) { if err != nil { log.Fatalf("Error: %v", err)