Browse Source

Collect stats for miners by worker id

pool
Sammy Libre 8 years ago
parent
commit
135531a372
  1. 43
      go-pool/stratum/handlers.go
  2. 17
      go-pool/stratum/miner.go

43
go-pool/stratum/handlers.go

@ -11,35 +11,36 @@ import ( @@ -11,35 +11,36 @@ import (
var noncePattern *regexp.Regexp
const defaultWorkerId = "0"
func init() {
noncePattern, _ = regexp.Compile("^[0-9a-f]{8}$")
}
func (s *StratumServer) handleLoginRPC(cs *Session, e *Endpoint, params *LoginParams) (reply *JobReply, errorReply *ErrorReply) {
func (s *StratumServer) handleLoginRPC(cs *Session, e *Endpoint, params *LoginParams) (*JobReply, *ErrorReply) {
if !s.config.BypassAddressValidation && !util.ValidateAddress(params.Login, s.config.Address) {
errorReply = &ErrorReply{Code: -1, Message: "Invalid address used for login", Close: true}
return
return nil, &ErrorReply{Code: -1, Message: "Invalid address used for login", Close: true}
}
t := s.currentBlockTemplate()
if t == nil {
errorReply = &ErrorReply{Code: -1, Message: "Job not ready", Close: true}
return
return nil, &ErrorReply{Code: -1, Message: "Job not ready", Close: true}
}
id := extractWorkerId(params.Login)
miner, ok := s.miners.Get(id)
if !ok {
miner = NewMiner(id, e.config.Difficulty, cs.ip)
}
miner := NewMiner(params.Login, params.Pass, e.config.Difficulty, cs.ip)
log.Printf("Miner connected %s@%s", id, cs.ip)
miner.Session = cs
miner.Endpoint = e
s.registerMiner(miner)
miner.heartbeat()
log.Printf("Miner connected %s@%s", params.Login, miner.IP)
reply = &JobReply{}
reply.Id = miner.Id
reply.Job = miner.getJob(t)
reply.Status = "OK"
return
return &JobReply{Id: id, Job: miner.getJob(t, true), Status: "OK"}, nil
}
func (s *StratumServer) handleGetJobRPC(cs *Session, e *Endpoint, params *GetJobParams) (reply *JobReplyData, errorReply *ErrorReply) {
@ -54,7 +55,7 @@ func (s *StratumServer) handleGetJobRPC(cs *Session, e *Endpoint, params *GetJob @@ -54,7 +55,7 @@ func (s *StratumServer) handleGetJobRPC(cs *Session, e *Endpoint, params *GetJob
return
}
miner.heartbeat()
reply = miner.getJob(t)
reply = miner.getJob(t, false)
return
}
@ -88,7 +89,7 @@ func (s *StratumServer) handleSubmitRPC(cs *Session, e *Endpoint, params *Submit @@ -88,7 +89,7 @@ func (s *StratumServer) handleSubmitRPC(cs *Session, e *Endpoint, params *Submit
t := s.currentBlockTemplate()
if job.Height != t.Height {
log.Printf("Block expired for height %v %s@%s", job.Height, miner.Login, miner.IP)
log.Printf("Block expired for height %v %s@%s", job.Height, miner.Id, miner.IP)
errorReply = &ErrorReply{Code: -1, Message: "Block expired", Close: false}
atomic.AddUint64(&miner.staleShares, 1)
return
@ -122,11 +123,11 @@ func (s *StratumServer) broadcastNewJobs() { @@ -122,11 +123,11 @@ func (s *StratumServer) broadcastNewJobs() {
n++
bcast <- n
go func(miner *Miner) {
reply := miner.getJob(t)
reply := miner.getJob(t, true)
err := miner.Session.pushMessage("job", &reply)
<-bcast
if err != nil {
log.Printf("Job transmit error to %v@%v: %v", miner.Login, miner.IP, err)
log.Printf("Job transmit error to %v@%v: %v", miner.Id, miner.IP, err)
s.removeMiner(miner.Id)
} else {
s.setDeadline(miner.Session.conn)
@ -141,3 +142,11 @@ func (s *StratumServer) refreshBlockTemplate(bcast bool) { @@ -141,3 +142,11 @@ func (s *StratumServer) refreshBlockTemplate(bcast bool) {
s.broadcastNewJobs()
}
}
func extractWorkerId(loginWorkerPair string) string {
parts := strings.SplitN(loginWorkerPair, ".", 2)
if len(parts) > 1 {
return parts[1]
}
return defaultWorkerId
}

17
go-pool/stratum/miner.go

@ -26,8 +26,6 @@ type Job struct { @@ -26,8 +26,6 @@ type Job struct {
type Miner struct {
sync.RWMutex
Id string
Login string
Pass string
IP string
Difficulty int64
ValidJobs []*Job
@ -58,10 +56,9 @@ func (job *Job) submit(nonce string) bool { @@ -58,10 +56,9 @@ func (job *Job) submit(nonce string) bool {
return false
}
func NewMiner(login, pass string, diff int64, ip string) *Miner {
id := util.Random()
func NewMiner(id string, diff int64, ip string) *Miner {
shares := make(map[int64]int64)
miner := &Miner{Id: id, Login: login, Pass: pass, Difficulty: diff, IP: ip, shares: shares}
miner := &Miner{Id: id, Difficulty: diff, IP: ip, shares: shares}
target, targetHex := util.GetTargetHex(diff)
miner.Target = target
miner.TargetHex = targetHex
@ -78,10 +75,10 @@ func (m *Miner) pushJob(job *Job) { @@ -78,10 +75,10 @@ func (m *Miner) pushJob(job *Job) {
}
}
func (m *Miner) getJob(t *BlockTemplate) *JobReplyData {
func (m *Miner) getJob(t *BlockTemplate, force bool) *JobReplyData {
height := atomic.SwapInt64(&m.LastBlockHeight, t.Height)
if height == t.Height {
if !force && height == t.Height {
return &JobReplyData{}
}
@ -167,7 +164,7 @@ func (m *Miner) processShare(s *StratumServer, e *Endpoint, job *Job, t *BlockTe @@ -167,7 +164,7 @@ func (m *Miner) processShare(s *StratumServer, e *Endpoint, job *Job, t *BlockTe
}
if !s.config.BypassShareValidation && hex.EncodeToString(hashBytes) != result {
log.Printf("Bad hash from miner %v@%v", m.Login, m.IP)
log.Printf("Bad hash from miner %v@%v", m.Id, m.IP)
atomic.AddUint64(&m.invalidShares, 1)
return false
}
@ -195,10 +192,10 @@ func (m *Miner) processShare(s *StratumServer, e *Endpoint, job *Job, t *BlockTe @@ -195,10 +192,10 @@ func (m *Miner) processShare(s *StratumServer, e *Endpoint, job *Job, t *BlockTe
atomic.AddUint64(&m.accepts, 1)
atomic.AddUint64(&r.Accepts, 1)
atomic.StoreInt64(&r.LastSubmissionAt, util.MakeTimestamp())
log.Printf("Block %v found at height %v by miner %v@%v", blockFastHash[0:6], t.Height, m.Login, m.IP)
log.Printf("Block %v found at height %v by miner %v@%v", blockFastHash[0:6], t.Height, m.Id, m.IP)
}
} else if hashDiff < job.Difficulty {
log.Printf("Rejected low difficulty share of %v from %v@%v", hashDiff, m.Login, m.IP)
log.Printf("Rejected low difficulty share of %v from %v@%v", hashDiff, m.Id, m.IP)
atomic.AddUint64(&m.invalidShares, 1)
return false
}

Loading…
Cancel
Save