diff --git a/go-pool/stratum/handlers.go b/go-pool/stratum/handlers.go index 7f80520..3b7d9a7 100644 --- a/go-pool/stratum/handlers.go +++ b/go-pool/stratum/handlers.go @@ -30,17 +30,16 @@ func (s *StratumServer) handleLoginRPC(cs *Session, e *Endpoint, params *LoginPa id := extractWorkerId(params.Login) miner, ok := s.miners.Get(id) if !ok { - miner = NewMiner(id, e.config.Difficulty, cs.ip) + miner = NewMiner(id, cs.ip) + s.registerMiner(miner) } log.Printf("Miner connected %s@%s", id, cs.ip) - miner.Session = cs - miner.Endpoint = e - s.registerMiner(miner) + s.registerSession(cs) miner.heartbeat() - return &JobReply{Id: id, Job: miner.getJob(t, true), Status: "OK"}, nil + return &JobReply{Id: id, Job: cs.getJob(t), Status: "OK"}, nil } func (s *StratumServer) handleGetJobRPC(cs *Session, e *Endpoint, params *GetJobParams) (reply *JobReplyData, errorReply *ErrorReply) { @@ -55,7 +54,7 @@ func (s *StratumServer) handleGetJobRPC(cs *Session, e *Endpoint, params *GetJob return } miner.heartbeat() - reply = miner.getJob(t, false) + reply = cs.getJob(t) return } @@ -67,7 +66,7 @@ func (s *StratumServer) handleSubmitRPC(cs *Session, e *Endpoint, params *Submit } miner.heartbeat() - job := miner.findJob(params.JobId) + job := cs.findJob(params.JobId) if job == nil { errorReply = &ErrorReply{Code: -1, Message: "Invalid job id", Close: true} atomic.AddUint64(&miner.invalidShares, 1) @@ -115,24 +114,27 @@ func (s *StratumServer) broadcastNewJobs() { if t == nil { return } - log.Printf("Broadcasting new jobs to %v miners", s.miners.Count()) + s.sessionsMu.RLock() + defer s.sessionsMu.RUnlock() + count := len(s.sessions) + log.Printf("Broadcasting new jobs to %d miners", count) bcast := make(chan int, 1024*16) n := 0 - for m := range s.miners.IterBuffered() { + for m := range s.sessions { n++ bcast <- n - go func(miner *Miner) { - reply := miner.getJob(t, true) - err := miner.Session.pushMessage("job", &reply) + go func(cs *Session) { + reply := cs.getJob(t) + err := cs.pushMessage("job", &reply) <-bcast if err != nil { - log.Printf("Job transmit error to %v@%v: %v", miner.Id, miner.IP, err) - s.removeMiner(miner.Id) + log.Printf("Job transmit error to %s: %v", cs.ip, err) + s.removeSession(cs) } else { - s.setDeadline(miner.Session.conn) + s.setDeadline(cs.conn) } - }(m.Val) + }(m) } } diff --git a/go-pool/stratum/miner.go b/go-pool/stratum/miner.go index 5d884f2..2151247 100644 --- a/go-pool/stratum/miner.go +++ b/go-pool/stratum/miner.go @@ -25,16 +25,10 @@ type Job struct { type Miner struct { sync.RWMutex - Id string - IP string - Difficulty int64 - ValidJobs []*Job - LastBlockHeight int64 - Target uint32 - TargetHex string - LastBeat int64 - Session *Session - Endpoint *Endpoint + Id string + IP string + LastBeat int64 + Endpoint *Endpoint startedAt int64 validShares uint64 @@ -56,39 +50,46 @@ func (job *Job) submit(nonce string) bool { return false } -func NewMiner(id string, diff int64, ip string) *Miner { +func NewMiner(id string, ip string) *Miner { shares := make(map[int64]int64) - miner := &Miner{Id: id, Difficulty: diff, IP: ip, shares: shares} - target, targetHex := util.GetTargetHex(diff) - miner.Target = target - miner.TargetHex = targetHex - return miner + return &Miner{Id: id, IP: ip, shares: shares} } -func (m *Miner) pushJob(job *Job) { - m.Lock() - defer m.Unlock() - m.ValidJobs = append(m.ValidJobs, job) +func (cs *Session) getJob(t *BlockTemplate) *JobReplyData { + height := atomic.SwapInt64(&cs.lastBlockHeight, t.Height) - if len(m.ValidJobs) > 4 { - m.ValidJobs = m.ValidJobs[1:] + if height == t.Height { + return &JobReplyData{} } + + extraNonce := atomic.AddUint32(&cs.endpoint.extraNonce, 1) + blob := t.nextBlob(extraNonce, cs.endpoint.instanceId) + job := &Job{Id: util.Random(), ExtraNonce: extraNonce, Height: t.Height, Difficulty: cs.difficulty} + job.Submissions = make(map[string]bool) + cs.pushJob(job) + reply := &JobReplyData{JobId: job.Id, Blob: blob, Target: cs.targetHex} + return reply } -func (m *Miner) getJob(t *BlockTemplate, force bool) *JobReplyData { - height := atomic.SwapInt64(&m.LastBlockHeight, t.Height) +func (cs *Session) pushJob(job *Job) { + cs.Lock() + defer cs.Unlock() + cs.validJobs = append(cs.validJobs, job) - if !force && height == t.Height { - return &JobReplyData{} + if len(cs.validJobs) > 4 { + cs.validJobs = cs.validJobs[1:] } +} - extraNonce := atomic.AddUint32(&m.Endpoint.extraNonce, 1) - blob := t.nextBlob(extraNonce, m.Endpoint.instanceId) - job := &Job{Id: util.Random(), ExtraNonce: extraNonce, Height: t.Height, Difficulty: m.Difficulty} - job.Submissions = make(map[string]bool) - m.pushJob(job) - reply := &JobReplyData{JobId: job.Id, Blob: blob, Target: m.TargetHex} - return reply +func (cs *Session) findJob(id string) *Job { + cs.Lock() + defer cs.Unlock() + for _, job := range cs.validJobs { + if job.Id == id { + return job + } + } + return nil } func (m *Miner) heartbeat() { @@ -129,17 +130,6 @@ func (m *Miner) hashrate(estimationWindow time.Duration) float64 { return float64(totalShares) / float64(boundary) } -func (m *Miner) findJob(id string) *Job { - m.RLock() - defer m.RUnlock() - for _, job := range m.ValidJobs { - if job.Id == id { - return job - } - } - return nil -} - func (m *Miner) processShare(s *StratumServer, e *Endpoint, job *Job, t *BlockTemplate, nonce string, result string) bool { r := s.rpc() diff --git a/go-pool/stratum/stratum.go b/go-pool/stratum/stratum.go index 91167fc..bc8a54c 100644 --- a/go-pool/stratum/stratum.go +++ b/go-pool/stratum/stratum.go @@ -15,6 +15,7 @@ import ( "../pool" "../rpc" + "../util" ) type StratumServer struct { @@ -30,6 +31,8 @@ type StratumServer struct { luckWindow int64 luckLargeWindow int64 roundShares int64 + sessionsMu sync.RWMutex + sessions map[*Session]struct{} } type Endpoint struct { @@ -40,9 +43,15 @@ type Endpoint struct { type Session struct { sync.Mutex - conn *net.TCPConn - enc *json.Encoder - ip string + conn *net.TCPConn + enc *json.Encoder + ip string + endpoint *Endpoint + difficulty int64 + validJobs []*Job + lastBlockHeight int64 + target uint32 + targetHex string } const ( @@ -65,6 +74,7 @@ func NewStratum(cfg *pool.Config) *StratumServer { log.Printf("Default upstream: %s => %s", stratum.rpc().Name, stratum.rpc().Url) stratum.miners = NewMinersMap() + stratum.sessions = make(map[*Session]struct{}) timeout, _ := time.ParseDuration(cfg.Stratum.Timeout) stratum.timeout = timeout @@ -150,12 +160,14 @@ func (e *Endpoint) Listen(s *StratumServer) { } conn.SetKeepAlive(true) ip, _, _ := net.SplitHostPort(conn.RemoteAddr().String()) + cs := &Session{conn: conn, ip: ip, enc: json.NewEncoder(conn), endpoint: e} n += 1 accept <- n go func() { - err = s.handleClient(conn, e, ip) + err = s.handleClient(cs, e) if err != nil { + s.removeSession(cs) conn.Close() } <-accept @@ -163,11 +175,12 @@ func (e *Endpoint) Listen(s *StratumServer) { } } -func (s *StratumServer) handleClient(conn *net.TCPConn, e *Endpoint, ip string) error { - cs := &Session{conn: conn, ip: ip} - cs.enc = json.NewEncoder(conn) - connbuff := bufio.NewReaderSize(conn, MaxReqSize) - s.setDeadline(conn) +func (s *StratumServer) handleClient(cs *Session, e *Endpoint) error { + _, targetHex := util.GetTargetHex(e.config.Difficulty) + cs.targetHex = targetHex + + connbuff := bufio.NewReaderSize(cs.conn, MaxReqSize) + s.setDeadline(cs.conn) for { data, isPrefix, err := connbuff.ReadLine() @@ -192,7 +205,7 @@ func (s *StratumServer) handleClient(conn *net.TCPConn, e *Endpoint, ip string) log.Printf("Malformed request: %v", err) return err } - s.setDeadline(conn) + s.setDeadline(cs.conn) cs.handleMessage(s, e, &req) } } @@ -292,6 +305,25 @@ func (s *StratumServer) setDeadline(conn *net.TCPConn) { conn.SetDeadline(time.Now().Add(s.timeout)) } +func (s *StratumServer) registerSession(cs *Session) { + s.sessionsMu.Lock() + defer s.sessionsMu.Unlock() + s.sessions[cs] = struct{}{} +} + +func (s *StratumServer) removeSession(cs *Session) { + s.sessionsMu.Lock() + defer s.sessionsMu.Unlock() + delete(s.sessions, cs) +} + +func (s *StratumServer) isActive(cs *Session) bool { + s.sessionsMu.RLock() + defer s.sessionsMu.RUnlock() + _, exist := s.sessions[cs] + return exist +} + func (s *StratumServer) registerMiner(miner *Miner) { s.miners.Set(miner.Id, miner) }