diff --git a/go-pool/stratum/blocks.go b/go-pool/stratum/blocks.go index c10d5b4..3b6a0a0 100644 --- a/go-pool/stratum/blocks.go +++ b/go-pool/stratum/blocks.go @@ -5,32 +5,28 @@ import ( "encoding/binary" "encoding/hex" "log" - "sync/atomic" "../../cnutil" ) type BlockTemplate struct { - Blob string Difficulty int64 Height int64 ReservedOffset int PrevHash string Buffer []byte - ExtraNonce uint32 } -func (b *BlockTemplate) nextBlob() (string, uint32) { - // Preventing race by using atomic op here - // No need for using locks, because this is only one write to BT and it's atomic - extraNonce := atomic.AddUint32(&b.ExtraNonce, 1) +func (b *BlockTemplate) nextBlob(extraNonce uint32, instanceId []byte) string { extraBuff := new(bytes.Buffer) binary.Write(extraBuff, binary.BigEndian, extraNonce) + blobBuff := make([]byte, len(b.Buffer)) - copy(blobBuff, b.Buffer) // We never write to this buffer to prevent race + copy(blobBuff, b.Buffer) + copy(blobBuff[b.ReservedOffset+4:b.ReservedOffset+7], instanceId) copy(blobBuff[b.ReservedOffset:], extraBuff.Bytes()) blob := cnutil.ConvertBlob(blobBuff) - return hex.EncodeToString(blob), extraNonce + return hex.EncodeToString(blob) } func (s *StratumServer) fetchBlockTemplate() bool { @@ -52,15 +48,12 @@ func (s *StratumServer) fetchBlockTemplate() bool { log.Printf("New block to mine at height %v, diff: %v, prev_hash: %s", reply.Height, reply.Difficulty, reply.PrevHash) } newTemplate := BlockTemplate{ - Blob: reply.Blob, Difficulty: reply.Difficulty, Height: reply.Height, PrevHash: reply.PrevHash, ReservedOffset: reply.ReservedOffset, } newTemplate.Buffer, _ = hex.DecodeString(reply.Blob) - copy(newTemplate.Buffer[reply.ReservedOffset+4:reply.ReservedOffset+7], s.instanceId) - newTemplate.ExtraNonce = 0 s.blockTemplate.Store(&newTemplate) return true } diff --git a/go-pool/stratum/handlers.go b/go-pool/stratum/handlers.go index 573eb7d..d477526 100644 --- a/go-pool/stratum/handlers.go +++ b/go-pool/stratum/handlers.go @@ -14,14 +14,15 @@ func init() { noncePattern, _ = regexp.Compile("^[0-9a-f]{8}$") } -func (s *StratumServer) handleLoginRPC(cs *Session, params *LoginParams) (reply *JobReply, errorReply *ErrorReply) { +func (s *StratumServer) handleLoginRPC(cs *Session, e *Endpoint, params *LoginParams) (reply *JobReply, errorReply *ErrorReply) { if !s.config.BypassAddressValidation && !util.ValidateAddress(params.Login, s.config.Address) { errorReply = &ErrorReply{Code: -1, Message: "Invalid address used for login", Close: true} return } - miner := NewMiner(params.Login, params.Pass, s.port.Difficulty, cs.ip) + miner := NewMiner(params.Login, params.Pass, e.config.Difficulty, cs.ip) miner.Session = cs + miner.Endpoint = e s.registerMiner(miner) miner.heartbeat() @@ -34,7 +35,7 @@ func (s *StratumServer) handleLoginRPC(cs *Session, params *LoginParams) (reply return } -func (s *StratumServer) handleGetJobRPC(cs *Session, params *GetJobParams) (reply *JobReplyData, errorReply *ErrorReply) { +func (s *StratumServer) handleGetJobRPC(cs *Session, e *Endpoint, params *GetJobParams) (reply *JobReplyData, errorReply *ErrorReply) { miner, ok := s.miners.Get(params.Id) if !ok { errorReply = &ErrorReply{Code: -1, Message: "Unauthenticated", Close: true} @@ -45,7 +46,7 @@ func (s *StratumServer) handleGetJobRPC(cs *Session, params *GetJobParams) (repl return } -func (s *StratumServer) handleSubmitRPC(cs *Session, params *SubmitParams) (reply *SubmitReply, errorReply *ErrorReply) { +func (s *StratumServer) handleSubmitRPC(cs *Session, e *Endpoint, params *SubmitParams) (reply *SubmitReply, errorReply *ErrorReply) { miner, ok := s.miners.Get(params.Id) if !ok { errorReply = &ErrorReply{Code: -1, Message: "Unauthenticated", Close: true} @@ -77,7 +78,7 @@ func (s *StratumServer) handleSubmitRPC(cs *Session, params *SubmitParams) (repl return } - validShare := miner.processShare(s, job, t, nonce, params.Result) + validShare := miner.processShare(s, e, job, t, nonce, params.Result) if !validShare { errorReply = &ErrorReply{Code: -1, Message: "Low difficulty share", Close: !ok} return diff --git a/go-pool/stratum/miner.go b/go-pool/stratum/miner.go index 27984ee..5f5372f 100644 --- a/go-pool/stratum/miner.go +++ b/go-pool/stratum/miner.go @@ -35,6 +35,7 @@ type Miner struct { TargetHex string LastBeat int64 Session *Session + Endpoint *Endpoint } func (job *Job) submit(nonce string) bool { @@ -75,7 +76,8 @@ func (m *Miner) getJob(s *StratumServer) *JobReplyData { return &JobReplyData{} } - blob, extraNonce := t.nextBlob() + extraNonce := atomic.AddUint32(&m.Endpoint.extraNonce, 1) + blob := t.nextBlob(extraNonce, m.Endpoint.instanceId) job := &Job{Id: util.Random(), ExtraNonce: extraNonce, Height: t.Height, Difficulty: m.Difficulty} job.Submissions = make(map[string]bool) m.pushJob(job) @@ -99,9 +101,10 @@ func (m *Miner) findJob(id string) *Job { return nil } -func (m *Miner) processShare(s *StratumServer, job *Job, t *BlockTemplate, nonce string, result string) bool { +func (m *Miner) processShare(s *StratumServer, e *Endpoint, job *Job, t *BlockTemplate, nonce string, result string) bool { shareBuff := make([]byte, len(t.Buffer)) copy(shareBuff, t.Buffer) + copy(shareBuff[t.ReservedOffset+4:t.ReservedOffset+7], e.instanceId) extraBuff := new(bytes.Buffer) binary.Write(extraBuff, binary.BigEndian, job.ExtraNonce) @@ -144,6 +147,6 @@ func (m *Miner) processShare(s *StratumServer, job *Job, t *BlockTemplate, nonce return false } - log.Printf("Valid share at difficulty %v/%v", s.port.Difficulty, hashDiff) + log.Printf("Valid share at difficulty %v/%v", e.config.Difficulty, hashDiff) return true } diff --git a/go-pool/stratum/stratum.go b/go-pool/stratum/stratum.go index 22a5818..9506dbd 100644 --- a/go-pool/stratum/stratum.go +++ b/go-pool/stratum/stratum.go @@ -18,14 +18,17 @@ import ( ) type StratumServer struct { - config *pool.Config - port pool.Port - miners MinersMap - blockTemplate atomic.Value - instanceId []byte - rpc *rpc.RPCClient - timeout time.Duration - broadcastTimer *time.Timer + config *pool.Config + miners MinersMap + blockTemplate atomic.Value + rpc *rpc.RPCClient + timeout time.Duration +} + +type Endpoint struct { + config *pool.Port + instanceId []byte + extraNonce uint32 } type Session struct { @@ -39,14 +42,8 @@ const ( MaxReqSize = 10 * 1024 ) -func NewStratum(cfg *pool.Config, port pool.Port) *StratumServer { - b := make([]byte, 4) - _, err := rand.Read(b) - if err != nil { - log.Fatalf("Can't seed with random bytes: %v", err) - } - - stratum := &StratumServer{config: cfg, port: port, instanceId: b} +func NewStratum(cfg *pool.Config) *StratumServer { + stratum := &StratumServer{config: cfg} stratum.rpc = rpc.NewRPCClient(cfg) stratum.miners = NewMinersMap() @@ -73,8 +70,29 @@ func NewStratum(cfg *pool.Config, port pool.Port) *StratumServer { return stratum } +func NewEndpoint(cfg *pool.Port) *Endpoint { + e := &Endpoint{config: cfg} + e.instanceId = make([]byte, 4) + _, err := rand.Read(e.instanceId) + if err != nil { + log.Fatalf("Can't seed with random bytes: %v", err) + } + return e +} + func (s *StratumServer) Listen() { - bindAddr := fmt.Sprintf("%s:%d", s.port.Host, s.port.Port) + quit := make(chan bool) + for _, port := range s.config.Stratum.Ports { + go func(cfg pool.Port) { + e := NewEndpoint(&cfg) + e.Listen(s) + }(port) + } + <-quit +} + +func (e *Endpoint) Listen(s *StratumServer) { + bindAddr := fmt.Sprintf("%s:%d", e.config.Host, e.config.Port) addr, err := net.ResolveTCPAddr("tcp", bindAddr) checkError(err) server, err := net.ListenTCP("tcp", addr) @@ -82,7 +100,7 @@ func (s *StratumServer) Listen() { defer server.Close() log.Printf("Stratum listening on %s", bindAddr) - var accept = make(chan int, s.port.MaxConn) + accept := make(chan int, e.config.MaxConn) n := 0 for { @@ -96,7 +114,7 @@ func (s *StratumServer) Listen() { accept <- n go func() { - err = s.handleClient(conn, ip) + err = s.handleClient(conn, e, ip) if err != nil { conn.Close() } @@ -105,7 +123,7 @@ func (s *StratumServer) Listen() { } } -func (s *StratumServer) handleClient(conn *net.TCPConn, ip string) error { +func (s *StratumServer) handleClient(conn *net.TCPConn, e *Endpoint, ip string) error { cs := &Session{conn: conn, ip: ip} cs.enc = json.NewEncoder(conn) connbuff := bufio.NewReaderSize(conn, MaxReqSize) @@ -135,13 +153,13 @@ func (s *StratumServer) handleClient(conn *net.TCPConn, ip string) error { return err } s.setDeadline(conn) - cs.handleMessage(s, &req) + cs.handleMessage(s, e, &req) } } return nil } -func (cs *Session) handleMessage(s *StratumServer, req *JSONRpcReq) { +func (cs *Session) handleMessage(s *StratumServer, e *Endpoint, req *JSONRpcReq) { if req.Id == nil { log.Println("Missing RPC id") cs.conn.Close() @@ -163,7 +181,7 @@ func (cs *Session) handleMessage(s *StratumServer, req *JSONRpcReq) { log.Println("Unable to parse params") break } - reply, errReply := s.handleLoginRPC(cs, ¶ms) + reply, errReply := s.handleLoginRPC(cs, e, ¶ms) if errReply != nil { err = cs.sendError(req.Id, errReply) break @@ -176,7 +194,7 @@ func (cs *Session) handleMessage(s *StratumServer, req *JSONRpcReq) { log.Println("Unable to parse params") break } - reply, errReply := s.handleGetJobRPC(cs, ¶ms) + reply, errReply := s.handleGetJobRPC(cs, e, ¶ms) if errReply != nil { err = cs.sendError(req.Id, errReply) break @@ -189,7 +207,7 @@ func (cs *Session) handleMessage(s *StratumServer, req *JSONRpcReq) { log.Println("Unable to parse params") break } - reply, errReply := s.handleSubmitRPC(cs, ¶ms) + reply, errReply := s.handleSubmitRPC(cs, e, ¶ms) if errReply != nil { err = cs.sendError(req.Id, errReply) break diff --git a/main.go b/main.go index 1fd47ce..6e87ab5 100644 --- a/main.go +++ b/main.go @@ -27,16 +27,8 @@ func startStratum() { log.Printf("Running with default %v threads", n) } - quit := make(chan bool) - for _, port := range cfg.Stratum.Ports { - s := stratum.NewStratum(&cfg, port) - - go func() { - s.Listen() - quit <- true - }() - } - <-quit + s := stratum.NewStratum(&cfg) + s.Listen() } func startNewrelic() {