Browse Source

Rework sessions

pool
Sammy Libre 8 years ago
parent
commit
4332be5f1c
  1. 34
      go-pool/stratum/handlers.go
  2. 78
      go-pool/stratum/miner.go
  3. 52
      go-pool/stratum/stratum.go

34
go-pool/stratum/handlers.go

@ -30,17 +30,16 @@ func (s *StratumServer) handleLoginRPC(cs *Session, e *Endpoint, params *LoginPa
id := extractWorkerId(params.Login) id := extractWorkerId(params.Login)
miner, ok := s.miners.Get(id) miner, ok := s.miners.Get(id)
if !ok { if !ok {
miner = NewMiner(id, e.config.Difficulty, cs.ip) miner = NewMiner(id, cs.ip)
s.registerMiner(miner)
} }
log.Printf("Miner connected %s@%s", id, cs.ip) log.Printf("Miner connected %s@%s", id, cs.ip)
miner.Session = cs s.registerSession(cs)
miner.Endpoint = e
s.registerMiner(miner)
miner.heartbeat() miner.heartbeat()
return &JobReply{Id: id, Job: miner.getJob(t, true), Status: "OK"}, nil return &JobReply{Id: id, Job: cs.getJob(t), Status: "OK"}, nil
} }
func (s *StratumServer) handleGetJobRPC(cs *Session, e *Endpoint, params *GetJobParams) (reply *JobReplyData, errorReply *ErrorReply) { func (s *StratumServer) handleGetJobRPC(cs *Session, e *Endpoint, params *GetJobParams) (reply *JobReplyData, errorReply *ErrorReply) {
@ -55,7 +54,7 @@ func (s *StratumServer) handleGetJobRPC(cs *Session, e *Endpoint, params *GetJob
return return
} }
miner.heartbeat() miner.heartbeat()
reply = miner.getJob(t, false) reply = cs.getJob(t)
return return
} }
@ -67,7 +66,7 @@ func (s *StratumServer) handleSubmitRPC(cs *Session, e *Endpoint, params *Submit
} }
miner.heartbeat() miner.heartbeat()
job := miner.findJob(params.JobId) job := cs.findJob(params.JobId)
if job == nil { if job == nil {
errorReply = &ErrorReply{Code: -1, Message: "Invalid job id", Close: true} errorReply = &ErrorReply{Code: -1, Message: "Invalid job id", Close: true}
atomic.AddUint64(&miner.invalidShares, 1) atomic.AddUint64(&miner.invalidShares, 1)
@ -115,24 +114,27 @@ func (s *StratumServer) broadcastNewJobs() {
if t == nil { if t == nil {
return return
} }
log.Printf("Broadcasting new jobs to %v miners", s.miners.Count()) s.sessionsMu.RLock()
defer s.sessionsMu.RUnlock()
count := len(s.sessions)
log.Printf("Broadcasting new jobs to %d miners", count)
bcast := make(chan int, 1024*16) bcast := make(chan int, 1024*16)
n := 0 n := 0
for m := range s.miners.IterBuffered() { for m := range s.sessions {
n++ n++
bcast <- n bcast <- n
go func(miner *Miner) { go func(cs *Session) {
reply := miner.getJob(t, true) reply := cs.getJob(t)
err := miner.Session.pushMessage("job", &reply) err := cs.pushMessage("job", &reply)
<-bcast <-bcast
if err != nil { if err != nil {
log.Printf("Job transmit error to %v@%v: %v", miner.Id, miner.IP, err) log.Printf("Job transmit error to %s: %v", cs.ip, err)
s.removeMiner(miner.Id) s.removeSession(cs)
} else { } else {
s.setDeadline(miner.Session.conn) s.setDeadline(cs.conn)
} }
}(m.Val) }(m)
} }
} }

78
go-pool/stratum/miner.go

@ -25,16 +25,10 @@ type Job struct {
type Miner struct { type Miner struct {
sync.RWMutex sync.RWMutex
Id string Id string
IP string IP string
Difficulty int64 LastBeat int64
ValidJobs []*Job Endpoint *Endpoint
LastBlockHeight int64
Target uint32
TargetHex string
LastBeat int64
Session *Session
Endpoint *Endpoint
startedAt int64 startedAt int64
validShares uint64 validShares uint64
@ -56,39 +50,46 @@ func (job *Job) submit(nonce string) bool {
return false return false
} }
func NewMiner(id string, diff int64, ip string) *Miner { func NewMiner(id string, ip string) *Miner {
shares := make(map[int64]int64) shares := make(map[int64]int64)
miner := &Miner{Id: id, Difficulty: diff, IP: ip, shares: shares} return &Miner{Id: id, IP: ip, shares: shares}
target, targetHex := util.GetTargetHex(diff)
miner.Target = target
miner.TargetHex = targetHex
return miner
} }
func (m *Miner) pushJob(job *Job) { func (cs *Session) getJob(t *BlockTemplate) *JobReplyData {
m.Lock() height := atomic.SwapInt64(&cs.lastBlockHeight, t.Height)
defer m.Unlock()
m.ValidJobs = append(m.ValidJobs, job)
if len(m.ValidJobs) > 4 { if height == t.Height {
m.ValidJobs = m.ValidJobs[1:] return &JobReplyData{}
} }
extraNonce := atomic.AddUint32(&cs.endpoint.extraNonce, 1)
blob := t.nextBlob(extraNonce, cs.endpoint.instanceId)
job := &Job{Id: util.Random(), ExtraNonce: extraNonce, Height: t.Height, Difficulty: cs.difficulty}
job.Submissions = make(map[string]bool)
cs.pushJob(job)
reply := &JobReplyData{JobId: job.Id, Blob: blob, Target: cs.targetHex}
return reply
} }
func (m *Miner) getJob(t *BlockTemplate, force bool) *JobReplyData { func (cs *Session) pushJob(job *Job) {
height := atomic.SwapInt64(&m.LastBlockHeight, t.Height) cs.Lock()
defer cs.Unlock()
cs.validJobs = append(cs.validJobs, job)
if !force && height == t.Height { if len(cs.validJobs) > 4 {
return &JobReplyData{} cs.validJobs = cs.validJobs[1:]
} }
}
extraNonce := atomic.AddUint32(&m.Endpoint.extraNonce, 1) func (cs *Session) findJob(id string) *Job {
blob := t.nextBlob(extraNonce, m.Endpoint.instanceId) cs.Lock()
job := &Job{Id: util.Random(), ExtraNonce: extraNonce, Height: t.Height, Difficulty: m.Difficulty} defer cs.Unlock()
job.Submissions = make(map[string]bool) for _, job := range cs.validJobs {
m.pushJob(job) if job.Id == id {
reply := &JobReplyData{JobId: job.Id, Blob: blob, Target: m.TargetHex} return job
return reply }
}
return nil
} }
func (m *Miner) heartbeat() { func (m *Miner) heartbeat() {
@ -129,17 +130,6 @@ func (m *Miner) hashrate(estimationWindow time.Duration) float64 {
return float64(totalShares) / float64(boundary) return float64(totalShares) / float64(boundary)
} }
func (m *Miner) findJob(id string) *Job {
m.RLock()
defer m.RUnlock()
for _, job := range m.ValidJobs {
if job.Id == id {
return job
}
}
return nil
}
func (m *Miner) processShare(s *StratumServer, e *Endpoint, job *Job, t *BlockTemplate, nonce string, result string) bool { func (m *Miner) processShare(s *StratumServer, e *Endpoint, job *Job, t *BlockTemplate, nonce string, result string) bool {
r := s.rpc() r := s.rpc()

52
go-pool/stratum/stratum.go

@ -15,6 +15,7 @@ import (
"../pool" "../pool"
"../rpc" "../rpc"
"../util"
) )
type StratumServer struct { type StratumServer struct {
@ -30,6 +31,8 @@ type StratumServer struct {
luckWindow int64 luckWindow int64
luckLargeWindow int64 luckLargeWindow int64
roundShares int64 roundShares int64
sessionsMu sync.RWMutex
sessions map[*Session]struct{}
} }
type Endpoint struct { type Endpoint struct {
@ -40,9 +43,15 @@ type Endpoint struct {
type Session struct { type Session struct {
sync.Mutex sync.Mutex
conn *net.TCPConn conn *net.TCPConn
enc *json.Encoder enc *json.Encoder
ip string ip string
endpoint *Endpoint
difficulty int64
validJobs []*Job
lastBlockHeight int64
target uint32
targetHex string
} }
const ( const (
@ -65,6 +74,7 @@ func NewStratum(cfg *pool.Config) *StratumServer {
log.Printf("Default upstream: %s => %s", stratum.rpc().Name, stratum.rpc().Url) log.Printf("Default upstream: %s => %s", stratum.rpc().Name, stratum.rpc().Url)
stratum.miners = NewMinersMap() stratum.miners = NewMinersMap()
stratum.sessions = make(map[*Session]struct{})
timeout, _ := time.ParseDuration(cfg.Stratum.Timeout) timeout, _ := time.ParseDuration(cfg.Stratum.Timeout)
stratum.timeout = timeout stratum.timeout = timeout
@ -150,12 +160,14 @@ func (e *Endpoint) Listen(s *StratumServer) {
} }
conn.SetKeepAlive(true) conn.SetKeepAlive(true)
ip, _, _ := net.SplitHostPort(conn.RemoteAddr().String()) ip, _, _ := net.SplitHostPort(conn.RemoteAddr().String())
cs := &Session{conn: conn, ip: ip, enc: json.NewEncoder(conn), endpoint: e}
n += 1 n += 1
accept <- n accept <- n
go func() { go func() {
err = s.handleClient(conn, e, ip) err = s.handleClient(cs, e)
if err != nil { if err != nil {
s.removeSession(cs)
conn.Close() conn.Close()
} }
<-accept <-accept
@ -163,11 +175,12 @@ func (e *Endpoint) Listen(s *StratumServer) {
} }
} }
func (s *StratumServer) handleClient(conn *net.TCPConn, e *Endpoint, ip string) error { func (s *StratumServer) handleClient(cs *Session, e *Endpoint) error {
cs := &Session{conn: conn, ip: ip} _, targetHex := util.GetTargetHex(e.config.Difficulty)
cs.enc = json.NewEncoder(conn) cs.targetHex = targetHex
connbuff := bufio.NewReaderSize(conn, MaxReqSize)
s.setDeadline(conn) connbuff := bufio.NewReaderSize(cs.conn, MaxReqSize)
s.setDeadline(cs.conn)
for { for {
data, isPrefix, err := connbuff.ReadLine() data, isPrefix, err := connbuff.ReadLine()
@ -192,7 +205,7 @@ func (s *StratumServer) handleClient(conn *net.TCPConn, e *Endpoint, ip string)
log.Printf("Malformed request: %v", err) log.Printf("Malformed request: %v", err)
return err return err
} }
s.setDeadline(conn) s.setDeadline(cs.conn)
cs.handleMessage(s, e, &req) cs.handleMessage(s, e, &req)
} }
} }
@ -292,6 +305,25 @@ func (s *StratumServer) setDeadline(conn *net.TCPConn) {
conn.SetDeadline(time.Now().Add(s.timeout)) conn.SetDeadline(time.Now().Add(s.timeout))
} }
func (s *StratumServer) registerSession(cs *Session) {
s.sessionsMu.Lock()
defer s.sessionsMu.Unlock()
s.sessions[cs] = struct{}{}
}
func (s *StratumServer) removeSession(cs *Session) {
s.sessionsMu.Lock()
defer s.sessionsMu.Unlock()
delete(s.sessions, cs)
}
func (s *StratumServer) isActive(cs *Session) bool {
s.sessionsMu.RLock()
defer s.sessionsMu.RUnlock()
_, exist := s.sessions[cs]
return exist
}
func (s *StratumServer) registerMiner(miner *Miner) { func (s *StratumServer) registerMiner(miner *Miner) {
s.miners.Set(miner.Id, miner) s.miners.Set(miner.Id, miner)
} }

Loading…
Cancel
Save