155 lines
3.8 KiB
Go
Raw Normal View History

2015-07-05 14:49:07 +05:00
package stratum
import (
"log"
"regexp"
"strings"
2016-12-12 02:12:16 +05:00
"sync"
2016-12-06 19:25:07 +05:00
"sync/atomic"
2016-12-12 02:12:16 +05:00
"time"
2015-07-05 14:49:07 +05:00
"../util"
)
var noncePattern *regexp.Regexp
2016-12-07 00:55:16 +05:00
const defaultWorkerId = "0"
2015-07-05 14:49:07 +05:00
func init() {
noncePattern, _ = regexp.Compile("^[0-9a-f]{8}$")
}
2016-12-07 12:07:31 +05:00
func (s *StratumServer) handleLoginRPC(cs *Session, params *LoginParams) (*JobReply, *ErrorReply) {
2016-12-07 20:08:23 +05:00
address, id := extractWorkerId(params.Login)
if !s.config.BypassAddressValidation && !util.ValidateAddress(address, s.config.Address) {
log.Printf("Invalid address %s used for login by %s", address, cs.ip)
2016-12-07 02:29:01 +05:00
return nil, &ErrorReply{Code: -1, Message: "Invalid address used for login"}
2015-07-05 14:49:07 +05:00
}
2016-12-06 23:03:54 +05:00
t := s.currentBlockTemplate()
if t == nil {
2016-12-07 02:29:01 +05:00
return nil, &ErrorReply{Code: -1, Message: "Job not ready"}
2016-12-07 00:55:16 +05:00
}
miner, ok := s.miners.Get(id)
if !ok {
2016-12-07 01:29:59 +05:00
miner = NewMiner(id, cs.ip)
s.registerMiner(miner)
2016-12-06 23:03:54 +05:00
}
2016-12-07 00:55:16 +05:00
log.Printf("Miner connected %s@%s", id, cs.ip)
2016-12-07 01:29:59 +05:00
s.registerSession(cs)
2015-07-05 14:49:07 +05:00
miner.heartbeat()
2016-12-07 01:29:59 +05:00
return &JobReply{Id: id, Job: cs.getJob(t), Status: "OK"}, nil
2015-07-05 14:49:07 +05:00
}
2016-12-07 12:07:31 +05:00
func (s *StratumServer) handleGetJobRPC(cs *Session, params *GetJobParams) (*JobReplyData, *ErrorReply) {
2015-07-05 14:49:07 +05:00
miner, ok := s.miners.Get(params.Id)
if !ok {
2016-12-07 02:29:01 +05:00
return nil, &ErrorReply{Code: -1, Message: "Unauthenticated"}
2015-07-05 14:49:07 +05:00
}
2016-12-06 23:03:54 +05:00
t := s.currentBlockTemplate()
if t == nil {
2016-12-07 02:29:01 +05:00
return nil, &ErrorReply{Code: -1, Message: "Job not ready"}
2016-12-06 23:03:54 +05:00
}
2015-07-05 14:49:07 +05:00
miner.heartbeat()
2016-12-07 02:29:01 +05:00
return cs.getJob(t), nil
2015-07-05 14:49:07 +05:00
}
2016-12-07 12:07:31 +05:00
func (s *StratumServer) handleSubmitRPC(cs *Session, params *SubmitParams) (*SubmitReply, *ErrorReply) {
2015-07-05 14:49:07 +05:00
miner, ok := s.miners.Get(params.Id)
if !ok {
2016-12-07 02:29:01 +05:00
return nil, &ErrorReply{Code: -1, Message: "Unauthenticated"}
2015-07-05 14:49:07 +05:00
}
miner.heartbeat()
2016-12-07 01:29:59 +05:00
job := cs.findJob(params.JobId)
2015-07-05 14:49:07 +05:00
if job == nil {
2016-12-07 02:29:01 +05:00
return nil, &ErrorReply{Code: -1, Message: "Invalid job id"}
2015-07-05 14:49:07 +05:00
}
if !noncePattern.MatchString(params.Nonce) {
2016-12-07 02:29:01 +05:00
return nil, &ErrorReply{Code: -1, Message: "Malformed nonce"}
2015-07-05 14:49:07 +05:00
}
nonce := strings.ToLower(params.Nonce)
exist := job.submit(nonce)
if exist {
2016-12-06 19:25:07 +05:00
atomic.AddUint64(&miner.invalidShares, 1)
2016-12-07 02:29:01 +05:00
return nil, &ErrorReply{Code: -1, Message: "Duplicate share"}
2015-07-05 14:49:07 +05:00
}
t := s.currentBlockTemplate()
2016-12-07 12:00:18 +05:00
if job.height != t.height {
2016-12-10 18:10:49 +05:00
log.Printf("Stale share for height %d from %s@%s", job.height, miner.id, cs.ip)
2016-12-06 19:25:07 +05:00
atomic.AddUint64(&miner.staleShares, 1)
2016-12-07 02:29:01 +05:00
return nil, &ErrorReply{Code: -1, Message: "Block expired"}
2015-07-05 14:49:07 +05:00
}
2016-12-10 18:10:49 +05:00
validShare := miner.processShare(s, cs, job, t, nonce, params.Result)
2015-07-05 14:49:07 +05:00
if !validShare {
2016-12-07 02:29:01 +05:00
return nil, &ErrorReply{Code: -1, Message: "Low difficulty share"}
2015-07-05 14:49:07 +05:00
}
2016-12-07 02:29:01 +05:00
return &SubmitReply{Status: "OK"}, nil
2015-07-05 14:49:07 +05:00
}
2016-12-07 12:07:31 +05:00
func (s *StratumServer) handleUnknownRPC(req *JSONRpcReq) *ErrorReply {
2015-07-05 14:49:07 +05:00
log.Printf("Unknown RPC method: %v", req)
2016-12-07 02:29:01 +05:00
return &ErrorReply{Code: -1, Message: "Invalid method"}
2015-07-05 14:49:07 +05:00
}
func (s *StratumServer) broadcastNewJobs() {
2016-12-06 23:03:54 +05:00
t := s.currentBlockTemplate()
if t == nil {
return
}
2016-12-07 01:29:59 +05:00
s.sessionsMu.RLock()
defer s.sessionsMu.RUnlock()
count := len(s.sessions)
log.Printf("Broadcasting new jobs to %d miners", count)
2016-12-12 02:12:16 +05:00
start := time.Now()
slots := make(chan bool, 1024*16)
var ok, fails int64
var wg sync.WaitGroup
2015-07-05 14:49:07 +05:00
2016-12-07 01:29:59 +05:00
for m := range s.sessions {
2016-12-12 02:12:16 +05:00
wg.Add(1)
slots <- true
2016-12-07 01:29:59 +05:00
go func(cs *Session) {
reply := cs.getJob(t)
err := cs.pushMessage("job", &reply)
2016-12-12 02:12:16 +05:00
<-slots
2015-07-05 14:49:07 +05:00
if err != nil {
2016-12-07 01:29:59 +05:00
log.Printf("Job transmit error to %s: %v", cs.ip, err)
2016-12-12 02:12:16 +05:00
atomic.AddInt64(&fails, 1)
wg.Done()
2016-12-07 01:29:59 +05:00
s.removeSession(cs)
2016-08-07 08:11:10 +05:00
} else {
2016-12-12 02:12:16 +05:00
atomic.AddInt64(&ok, 1)
2016-12-07 01:29:59 +05:00
s.setDeadline(cs.conn)
2016-12-12 02:12:16 +05:00
wg.Done()
2015-07-05 14:49:07 +05:00
}
2016-12-07 01:29:59 +05:00
}(m)
2015-07-05 14:49:07 +05:00
}
2016-12-12 02:12:16 +05:00
wg.Wait()
log.Printf("Done jobs broadcast in %s for %d/%d/%d miners", time.Since(start), count, ok, fails)
2015-07-05 14:49:07 +05:00
}
func (s *StratumServer) refreshBlockTemplate(bcast bool) {
newBlock := s.fetchBlockTemplate()
if newBlock && bcast {
s.broadcastNewJobs()
}
}
2016-12-07 00:55:16 +05:00
2016-12-07 20:08:23 +05:00
func extractWorkerId(loginWorkerPair string) (string, string) {
2016-12-07 00:55:16 +05:00
parts := strings.SplitN(loginWorkerPair, ".", 2)
if len(parts) > 1 {
2016-12-07 20:08:23 +05:00
return parts[0], parts[1]
2016-12-07 00:55:16 +05:00
}
2016-12-07 20:08:23 +05:00
return loginWorkerPair, defaultWorkerId
2016-12-07 00:55:16 +05:00
}