Browse Source

Further connection handling updates

pool
Sammy Libre 8 years ago
parent
commit
a4249cdb0d
  1. 45
      go-pool/stratum/handlers.go
  2. 1
      go-pool/stratum/proto.go
  3. 2
      go-pool/stratum/stratum.go

45
go-pool/stratum/handlers.go

@ -19,12 +19,12 @@ func init() {
func (s *StratumServer) handleLoginRPC(cs *Session, e *Endpoint, params *LoginParams) (*JobReply, *ErrorReply) { func (s *StratumServer) handleLoginRPC(cs *Session, e *Endpoint, params *LoginParams) (*JobReply, *ErrorReply) {
if !s.config.BypassAddressValidation && !util.ValidateAddress(params.Login, s.config.Address) { if !s.config.BypassAddressValidation && !util.ValidateAddress(params.Login, s.config.Address) {
return nil, &ErrorReply{Code: -1, Message: "Invalid address used for login", Close: true} return nil, &ErrorReply{Code: -1, Message: "Invalid address used for login"}
} }
t := s.currentBlockTemplate() t := s.currentBlockTemplate()
if t == nil { if t == nil {
return nil, &ErrorReply{Code: -1, Message: "Job not ready", Close: true} return nil, &ErrorReply{Code: -1, Message: "Job not ready"}
} }
id := extractWorkerId(params.Login) id := extractWorkerId(params.Login)
@ -42,71 +42,58 @@ func (s *StratumServer) handleLoginRPC(cs *Session, e *Endpoint, params *LoginPa
return &JobReply{Id: id, Job: cs.getJob(t), Status: "OK"}, nil return &JobReply{Id: id, Job: cs.getJob(t), Status: "OK"}, nil
} }
func (s *StratumServer) handleGetJobRPC(cs *Session, e *Endpoint, params *GetJobParams) (reply *JobReplyData, errorReply *ErrorReply) { func (s *StratumServer) handleGetJobRPC(cs *Session, e *Endpoint, params *GetJobParams) (*JobReplyData, *ErrorReply) {
miner, ok := s.miners.Get(params.Id) miner, ok := s.miners.Get(params.Id)
if !ok { if !ok {
errorReply = &ErrorReply{Code: -1, Message: "Unauthenticated", Close: true} return nil, &ErrorReply{Code: -1, Message: "Unauthenticated"}
return
} }
t := s.currentBlockTemplate() t := s.currentBlockTemplate()
if t == nil { if t == nil {
errorReply = &ErrorReply{Code: -1, Message: "Job not ready", Close: true} return nil, &ErrorReply{Code: -1, Message: "Job not ready"}
return
} }
miner.heartbeat() miner.heartbeat()
reply = cs.getJob(t) return cs.getJob(t), nil
return
} }
func (s *StratumServer) handleSubmitRPC(cs *Session, e *Endpoint, params *SubmitParams) (reply *SubmitReply, errorReply *ErrorReply) { func (s *StratumServer) handleSubmitRPC(cs *Session, e *Endpoint, params *SubmitParams) (*SubmitReply, *ErrorReply) {
miner, ok := s.miners.Get(params.Id) miner, ok := s.miners.Get(params.Id)
if !ok { if !ok {
errorReply = &ErrorReply{Code: -1, Message: "Unauthenticated", Close: true} return nil, &ErrorReply{Code: -1, Message: "Unauthenticated"}
return
} }
miner.heartbeat() miner.heartbeat()
job := cs.findJob(params.JobId) job := cs.findJob(params.JobId)
if job == nil { if job == nil {
errorReply = &ErrorReply{Code: -1, Message: "Invalid job id", Close: true} return nil, &ErrorReply{Code: -1, Message: "Invalid job id"}
atomic.AddUint64(&miner.invalidShares, 1)
return
} }
if !noncePattern.MatchString(params.Nonce) { if !noncePattern.MatchString(params.Nonce) {
errorReply = &ErrorReply{Code: -1, Message: "Malformed nonce", Close: true} return nil, &ErrorReply{Code: -1, Message: "Malformed nonce"}
atomic.AddUint64(&miner.invalidShares, 1)
return
} }
nonce := strings.ToLower(params.Nonce) nonce := strings.ToLower(params.Nonce)
exist := job.submit(nonce) exist := job.submit(nonce)
if exist { if exist {
errorReply = &ErrorReply{Code: -1, Message: "Duplicate share", Close: true}
atomic.AddUint64(&miner.invalidShares, 1) atomic.AddUint64(&miner.invalidShares, 1)
return return nil, &ErrorReply{Code: -1, Message: "Duplicate share"}
} }
t := s.currentBlockTemplate() t := s.currentBlockTemplate()
if job.height != t.Height { if job.height != t.Height {
log.Printf("Block expired for height %v %s@%s", job.height, miner.Id, miner.IP) log.Printf("Stale share for height %d from %s@%s", job.height, miner.Id, miner.IP)
errorReply = &ErrorReply{Code: -1, Message: "Block expired", Close: false}
atomic.AddUint64(&miner.staleShares, 1) atomic.AddUint64(&miner.staleShares, 1)
return return nil, &ErrorReply{Code: -1, Message: "Block expired"}
} }
validShare := miner.processShare(s, e, job, t, nonce, params.Result) validShare := miner.processShare(s, e, job, t, nonce, params.Result)
if !validShare { if !validShare {
errorReply = &ErrorReply{Code: -1, Message: "Low difficulty share", Close: !ok} return nil, &ErrorReply{Code: -1, Message: "Low difficulty share"}
return
} }
return &SubmitReply{Status: "OK"}, nil
reply = &SubmitReply{Status: "OK"}
return
} }
func (s *StratumServer) handleUnknownRPC(cs *Session, req *JSONRpcReq) *ErrorReply { func (s *StratumServer) handleUnknownRPC(cs *Session, req *JSONRpcReq) *ErrorReply {
log.Printf("Unknown RPC method: %v", req) log.Printf("Unknown RPC method: %v", req)
return &ErrorReply{Code: -1, Message: "Invalid method", Close: true} return &ErrorReply{Code: -1, Message: "Invalid method"}
} }
func (s *StratumServer) broadcastNewJobs() { func (s *StratumServer) broadcastNewJobs() {

1
go-pool/stratum/proto.go

@ -57,5 +57,4 @@ type SubmitReply struct {
type ErrorReply struct { type ErrorReply struct {
Code int `json:"code"` Code int `json:"code"`
Message string `json:"message"` Message string `json:"message"`
Close bool `json:"-"`
} }

2
go-pool/stratum/stratum.go

@ -261,7 +261,7 @@ func (cs *Session) handleMessage(s *StratumServer, e *Endpoint, req *JSONRpcReq)
} }
reply, errReply := s.handleSubmitRPC(cs, e, &params) reply, errReply := s.handleSubmitRPC(cs, e, &params)
if errReply != nil { if errReply != nil {
return cs.sendError(req.Id, errReply, true) return cs.sendError(req.Id, errReply, false)
} }
return cs.sendResult(req.Id, &reply) return cs.sendResult(req.Id, &reply)
default: default:

Loading…
Cancel
Save