Browse Source

Add upstream failovers

pool
Sammy Libre 8 years ago
parent
commit
eed80cf00d
  1. 7
      config.example.json
  2. 6
      go-pool/pool/pool.go
  3. 124
      go-pool/rpc/rpc.go
  4. 24
      go-pool/stratum/api.go
  5. 5
      go-pool/stratum/blocks.go
  6. 6
      go-pool/stratum/miner.go
  7. 57
      go-pool/stratum/stratum.go

7
config.example.json

@ -42,11 +42,14 @@ @@ -42,11 +42,14 @@
"hideIP": false
},
"daemon": {
"upstream": [
{
"name": "Main",
"host": "127.0.0.1",
"port": 18081,
"timeout": "10s"
},
}
],
"newrelicEnabled": false,
"newrelicName": "MyStratum",

6
go-pool/pool/pool.go

@ -5,7 +5,8 @@ type Config struct { @@ -5,7 +5,8 @@ type Config struct {
BypassAddressValidation bool `json:"bypassAddressValidation"`
BypassShareValidation bool `json:"bypassShareValidation"`
Stratum Stratum `json:"stratum"`
Daemon Daemon `json:"daemon"`
UpstreamCheckInterval string `json:"upstreamCheckInterval"`
Upstream []Upstream `json:"upstream"`
EstimationWindow string `json:"estimationWindow"`
LuckWindow string `json:"luckWindow"`
LargeLuckWindow string `json:"largeLuckWindow"`
@ -30,7 +31,8 @@ type Port struct { @@ -30,7 +31,8 @@ type Port struct {
MaxConn int `json:"maxConn"`
}
type Daemon struct {
type Upstream struct {
Name string `json:"name"`
Host string `json:"host"`
Port int `json:"port"`
Timeout string `json:"timeout"`

124
go-pool/rpc/rpc.go

@ -5,16 +5,29 @@ import ( @@ -5,16 +5,29 @@ import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"sync"
"sync/atomic"
"time"
"../pool"
)
type RPCClient struct {
url string
sync.RWMutex
Url *url.URL
login string
password string
Name string
sick bool
sickRate int
successRate int
Accepts uint64
Rejects uint64
LastSubmissionAt int64
client *http.Client
FailsCount uint64
}
type GetBlockTemplateReply struct {
@ -31,61 +44,104 @@ type JSONRpcResp struct { @@ -31,61 +44,104 @@ type JSONRpcResp struct {
Error map[string]interface{} `json:"error"`
}
func NewRPCClient(cfg *pool.Config) *RPCClient {
url := fmt.Sprintf("http://%s:%v/json_rpc", cfg.Daemon.Host, cfg.Daemon.Port)
rpcClient := &RPCClient{url: url}
timeout, _ := time.ParseDuration(cfg.Daemon.Timeout)
func NewRPCClient(cfg *pool.Upstream) (*RPCClient, error) {
rawUrl := fmt.Sprintf("http://%s:%v/json_rpc", cfg.Host, cfg.Port)
url, err := url.Parse(rawUrl)
if err != nil {
return nil, err
}
rpcClient := &RPCClient{Name: cfg.Name, Url: url}
timeout, _ := time.ParseDuration(cfg.Timeout)
rpcClient.client = &http.Client{
Timeout: timeout,
}
return rpcClient
return rpcClient, nil
}
func (r *RPCClient) GetBlockTemplate(reserveSize int, address string) (GetBlockTemplateReply, error) {
func (r *RPCClient) GetBlockTemplate(reserveSize int, address string) (*GetBlockTemplateReply, error) {
params := map[string]interface{}{"reserve_size": reserveSize, "wallet_address": address}
rpcResp, err := r.doPost(r.url, "getblocktemplate", params)
var reply GetBlockTemplateReply
rpcResp, err := r.doPost(r.Url.String(), "getblocktemplate", params)
var reply *GetBlockTemplateReply
if err != nil {
return reply, err
return nil, err
}
if rpcResp.Error != nil {
return reply, errors.New(string(rpcResp.Error["message"].(string)))
}
if rpcResp.Result != nil {
err = json.Unmarshal(*rpcResp.Result, &reply)
}
return reply, err
}
func (r *RPCClient) SubmitBlock(hash string) (JSONRpcResp, error) {
rpcResp, err := r.doPost(r.url, "submitblock", []string{hash})
if err != nil {
return rpcResp, err
}
if rpcResp.Error != nil {
return rpcResp, errors.New(string(rpcResp.Error["message"].(string)))
}
return rpcResp, nil
func (r *RPCClient) SubmitBlock(hash string) (*JSONRpcResp, error) {
return r.doPost(r.Url.String(), "submitblock", []string{hash})
}
func (r *RPCClient) doPost(url string, method string, params interface{}) (JSONRpcResp, error) {
jsonReq := map[string]interface{}{"id": "0", "method": method, "params": params}
func (r *RPCClient) doPost(url, method string, params interface{}) (*JSONRpcResp, error) {
jsonReq := map[string]interface{}{"jsonrpc": "2.0", "id": 0, "method": method, "params": params}
data, _ := json.Marshal(jsonReq)
req, err := http.NewRequest("POST", url, bytes.NewBuffer(data))
req.Header.Set("Content-Length", (string)(len(data)))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")
req.SetBasicAuth(r.login, r.password)
resp, err := r.client.Do(req)
var rpcResp JSONRpcResp
if err != nil {
return rpcResp, err
r.markSick()
return nil, err
}
defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)
err = json.Unmarshal(body, &rpcResp)
if resp.StatusCode < 200 || resp.StatusCode >= 400 {
return nil, errors.New(resp.Status)
}
var rpcResp *JSONRpcResp
err = json.NewDecoder(resp.Body).Decode(&rpcResp)
if err != nil {
r.markSick()
return nil, err
}
if rpcResp.Error != nil {
r.markSick()
return nil, errors.New(rpcResp.Error["message"].(string))
}
return rpcResp, err
}
func (r *RPCClient) Check(reserveSize int, address string) (bool, error) {
_, err := r.GetBlockTemplate(reserveSize, address)
if err != nil {
return false, err
}
r.markAlive()
return !r.Sick(), nil
}
func (r *RPCClient) Sick() bool {
r.RLock()
defer r.RUnlock()
return r.sick
}
func (r *RPCClient) markSick() {
r.Lock()
if !r.sick {
atomic.AddUint64(&r.FailsCount, 1)
}
r.sickRate++
r.successRate = 0
if r.sickRate >= 5 {
r.sick = true
}
r.Unlock()
}
func (r *RPCClient) markAlive() {
r.Lock()
r.successRate++
if r.successRate >= 5 {
r.sick = false
r.sickRate = 0
r.successRate = 0
}
r.Unlock()
}

24
go-pool/stratum/api.go

@ -6,6 +6,7 @@ import ( @@ -6,6 +6,7 @@ import (
"sync/atomic"
"time"
"../rpc"
"../util"
)
@ -24,6 +25,16 @@ func (s *StratumServer) StatsIndex(w http.ResponseWriter, r *http.Request) { @@ -24,6 +25,16 @@ func (s *StratumServer) StatsIndex(w http.ResponseWriter, r *http.Request) {
"now": util.MakeTimestamp(),
}
var upstreams []interface{}
current := atomic.LoadInt32(&s.upstream)
for i, u := range s.upstreams {
upstream := convertUpstream(u)
upstream["current"] = current == int32(i)
upstreams = append(upstreams, upstream)
}
stats["upstreams"] = upstreams
stats["current"] = convertUpstream(s.rpc())
stats["luck"] = s.getLuckStats()
if t := s.currentBlockTemplate(); t != nil {
@ -36,6 +47,19 @@ func (s *StratumServer) StatsIndex(w http.ResponseWriter, r *http.Request) { @@ -36,6 +47,19 @@ func (s *StratumServer) StatsIndex(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(stats)
}
func convertUpstream(u *rpc.RPCClient) map[string]interface{} {
upstream := map[string]interface{}{
"name": u.Name,
"url": u.Url.String(),
"sick": u.Sick(),
"accepts": atomic.LoadUint64(&u.Accepts),
"rejects": atomic.LoadUint64(&u.Rejects),
"lastSubmissionAt": atomic.LoadInt64(&u.LastSubmissionAt),
"failsCount": atomic.LoadUint64(&u.FailsCount),
}
return upstream
}
func (s *StratumServer) collectMinersStats() (float64, float64, int, []interface{}) {
now := util.MakeTimestamp()
var result []interface{}

5
go-pool/stratum/blocks.go

@ -30,7 +30,8 @@ func (b *BlockTemplate) nextBlob(extraNonce uint32, instanceId []byte) string { @@ -30,7 +30,8 @@ func (b *BlockTemplate) nextBlob(extraNonce uint32, instanceId []byte) string {
}
func (s *StratumServer) fetchBlockTemplate() bool {
reply, err := s.rpc.GetBlockTemplate(8, s.config.Address)
r := s.rpc()
reply, err := r.GetBlockTemplate(8, s.config.Address)
if err != nil {
log.Printf("Error while refreshing block template: %s", err)
return false
@ -45,7 +46,7 @@ func (s *StratumServer) fetchBlockTemplate() bool { @@ -45,7 +46,7 @@ func (s *StratumServer) fetchBlockTemplate() bool {
return false
}
} else {
log.Printf("New block to mine at height %v, diff: %v, prev_hash: %s", reply.Height, reply.Difficulty, reply.PrevHash)
log.Printf("New block to mine on %s at height %v, diff: %v, prev_hash: %s", r.Name, reply.Height, reply.Difficulty, reply.PrevHash)
}
newTemplate := BlockTemplate{
Difficulty: reply.Difficulty,

6
go-pool/stratum/miner.go

@ -145,6 +145,8 @@ func (m *Miner) findJob(id string) *Job { @@ -145,6 +145,8 @@ func (m *Miner) findJob(id string) *Job {
}
func (m *Miner) processShare(s *StratumServer, e *Endpoint, job *Job, t *BlockTemplate, nonce string, result string) bool {
r := s.rpc()
shareBuff := make([]byte, len(t.Buffer))
copy(shareBuff, t.Buffer)
copy(shareBuff[t.ReservedOffset+4:t.ReservedOffset+7], e.instanceId)
@ -179,9 +181,10 @@ func (m *Miner) processShare(s *StratumServer, e *Endpoint, job *Job, t *BlockTe @@ -179,9 +181,10 @@ func (m *Miner) processShare(s *StratumServer, e *Endpoint, job *Job, t *BlockTe
block := hashDiff >= t.Difficulty
if block {
_, err := s.rpc.SubmitBlock(hex.EncodeToString(shareBuff))
_, err := r.SubmitBlock(hex.EncodeToString(shareBuff))
if err != nil {
atomic.AddUint64(&m.rejects, 1)
atomic.AddUint64(&r.Rejects, 1)
log.Printf("Block submission failure at height %v: %v", t.Height, err)
} else {
if len(convertedBlob) == 0 {
@ -191,6 +194,7 @@ func (m *Miner) processShare(s *StratumServer, e *Endpoint, job *Job, t *BlockTe @@ -191,6 +194,7 @@ func (m *Miner) processShare(s *StratumServer, e *Endpoint, job *Job, t *BlockTe
// Immediately refresh current BT and send new jobs
s.refreshBlockTemplate(true)
atomic.AddUint64(&m.accepts, 1)
atomic.AddUint64(&r.Accepts, 1)
log.Printf("Block %v found at height %v by miner %v@%v", blockFastHash[0:6], t.Height, m.Login, m.IP)
}
} else if hashDiff < job.Difficulty {

57
go-pool/stratum/stratum.go

@ -21,7 +21,8 @@ type StratumServer struct { @@ -21,7 +21,8 @@ type StratumServer struct {
config *pool.Config
miners MinersMap
blockTemplate atomic.Value
rpc *rpc.RPCClient
upstream int32
upstreams []*rpc.RPCClient
timeout time.Duration
estimationWindow time.Duration
blocksMu sync.RWMutex
@ -50,7 +51,19 @@ const ( @@ -50,7 +51,19 @@ const (
func NewStratum(cfg *pool.Config) *StratumServer {
stratum := &StratumServer{config: cfg}
stratum.rpc = rpc.NewRPCClient(cfg)
stratum.upstreams = make([]*rpc.RPCClient, len(cfg.Upstream))
for i, v := range cfg.Upstream {
client, err := rpc.NewRPCClient(&v)
if err != nil {
log.Fatal(err)
} else {
stratum.upstreams[i] = client
log.Printf("Upstream: %s => %s", client.Name, client.Url)
}
}
log.Printf("Default upstream: %s => %s", stratum.rpc().Name, stratum.rpc().Url)
stratum.miners = NewMinersMap()
timeout, _ := time.ParseDuration(cfg.Stratum.Timeout)
@ -71,6 +84,9 @@ func NewStratum(cfg *pool.Config) *StratumServer { @@ -71,6 +84,9 @@ func NewStratum(cfg *pool.Config) *StratumServer {
refreshTimer := time.NewTimer(refreshIntv)
log.Printf("Set block refresh every %v", refreshIntv)
checkIntv, _ := time.ParseDuration(cfg.UpstreamCheckInterval)
checkTimer := time.NewTimer(checkIntv)
go func() {
for {
select {
@ -80,6 +96,17 @@ func NewStratum(cfg *pool.Config) *StratumServer { @@ -80,6 +96,17 @@ func NewStratum(cfg *pool.Config) *StratumServer {
}
}
}()
go func() {
for {
select {
case <-checkTimer.C:
stratum.checkUpstreams()
checkTimer.Reset(checkIntv)
}
}
}()
return stratum
}
@ -280,6 +307,32 @@ func (s *StratumServer) currentBlockTemplate() *BlockTemplate { @@ -280,6 +307,32 @@ func (s *StratumServer) currentBlockTemplate() *BlockTemplate {
return nil
}
func (s *StratumServer) checkUpstreams() {
candidate := int32(0)
backup := false
for i, v := range s.upstreams {
ok, err := v.Check(8, s.config.Address)
if err != nil {
log.Printf("Upstream %v didn't pass check: %v", v.Name, err)
}
if ok && !backup {
candidate = int32(i)
backup = true
}
}
if s.upstream != candidate {
log.Printf("Switching to %v upstream", s.upstreams[candidate].Name)
atomic.StoreInt32(&s.upstream, candidate)
}
}
func (s *StratumServer) rpc() *rpc.RPCClient {
i := atomic.LoadInt32(&s.upstream)
return s.upstreams[i]
}
func checkError(err error) {
if err != nil {
log.Fatalf("Error: %v", err)

Loading…
Cancel
Save