Rewrite multi-endpoint code

This commit is contained in:
Sammy Libre 2016-12-06 17:07:45 +05:00
parent 0b06175f18
commit 317dcad002
5 changed files with 62 additions and 55 deletions

View File

@ -5,32 +5,28 @@ import (
"encoding/binary" "encoding/binary"
"encoding/hex" "encoding/hex"
"log" "log"
"sync/atomic"
"../../cnutil" "../../cnutil"
) )
type BlockTemplate struct { type BlockTemplate struct {
Blob string
Difficulty int64 Difficulty int64
Height int64 Height int64
ReservedOffset int ReservedOffset int
PrevHash string PrevHash string
Buffer []byte Buffer []byte
ExtraNonce uint32
} }
func (b *BlockTemplate) nextBlob() (string, uint32) { func (b *BlockTemplate) nextBlob(extraNonce uint32, instanceId []byte) string {
// Preventing race by using atomic op here
// No need for using locks, because this is only one write to BT and it's atomic
extraNonce := atomic.AddUint32(&b.ExtraNonce, 1)
extraBuff := new(bytes.Buffer) extraBuff := new(bytes.Buffer)
binary.Write(extraBuff, binary.BigEndian, extraNonce) binary.Write(extraBuff, binary.BigEndian, extraNonce)
blobBuff := make([]byte, len(b.Buffer)) blobBuff := make([]byte, len(b.Buffer))
copy(blobBuff, b.Buffer) // We never write to this buffer to prevent race copy(blobBuff, b.Buffer)
copy(blobBuff[b.ReservedOffset+4:b.ReservedOffset+7], instanceId)
copy(blobBuff[b.ReservedOffset:], extraBuff.Bytes()) copy(blobBuff[b.ReservedOffset:], extraBuff.Bytes())
blob := cnutil.ConvertBlob(blobBuff) blob := cnutil.ConvertBlob(blobBuff)
return hex.EncodeToString(blob), extraNonce return hex.EncodeToString(blob)
} }
func (s *StratumServer) fetchBlockTemplate() bool { func (s *StratumServer) fetchBlockTemplate() bool {
@ -52,15 +48,12 @@ func (s *StratumServer) fetchBlockTemplate() bool {
log.Printf("New block to mine at height %v, diff: %v, prev_hash: %s", reply.Height, reply.Difficulty, reply.PrevHash) log.Printf("New block to mine at height %v, diff: %v, prev_hash: %s", reply.Height, reply.Difficulty, reply.PrevHash)
} }
newTemplate := BlockTemplate{ newTemplate := BlockTemplate{
Blob: reply.Blob,
Difficulty: reply.Difficulty, Difficulty: reply.Difficulty,
Height: reply.Height, Height: reply.Height,
PrevHash: reply.PrevHash, PrevHash: reply.PrevHash,
ReservedOffset: reply.ReservedOffset, ReservedOffset: reply.ReservedOffset,
} }
newTemplate.Buffer, _ = hex.DecodeString(reply.Blob) newTemplate.Buffer, _ = hex.DecodeString(reply.Blob)
copy(newTemplate.Buffer[reply.ReservedOffset+4:reply.ReservedOffset+7], s.instanceId)
newTemplate.ExtraNonce = 0
s.blockTemplate.Store(&newTemplate) s.blockTemplate.Store(&newTemplate)
return true return true
} }

View File

@ -14,14 +14,15 @@ func init() {
noncePattern, _ = regexp.Compile("^[0-9a-f]{8}$") noncePattern, _ = regexp.Compile("^[0-9a-f]{8}$")
} }
func (s *StratumServer) handleLoginRPC(cs *Session, params *LoginParams) (reply *JobReply, errorReply *ErrorReply) { func (s *StratumServer) handleLoginRPC(cs *Session, e *Endpoint, params *LoginParams) (reply *JobReply, errorReply *ErrorReply) {
if !s.config.BypassAddressValidation && !util.ValidateAddress(params.Login, s.config.Address) { if !s.config.BypassAddressValidation && !util.ValidateAddress(params.Login, s.config.Address) {
errorReply = &ErrorReply{Code: -1, Message: "Invalid address used for login", Close: true} errorReply = &ErrorReply{Code: -1, Message: "Invalid address used for login", Close: true}
return return
} }
miner := NewMiner(params.Login, params.Pass, s.port.Difficulty, cs.ip) miner := NewMiner(params.Login, params.Pass, e.config.Difficulty, cs.ip)
miner.Session = cs miner.Session = cs
miner.Endpoint = e
s.registerMiner(miner) s.registerMiner(miner)
miner.heartbeat() miner.heartbeat()
@ -34,7 +35,7 @@ func (s *StratumServer) handleLoginRPC(cs *Session, params *LoginParams) (reply
return return
} }
func (s *StratumServer) handleGetJobRPC(cs *Session, params *GetJobParams) (reply *JobReplyData, errorReply *ErrorReply) { func (s *StratumServer) handleGetJobRPC(cs *Session, e *Endpoint, params *GetJobParams) (reply *JobReplyData, errorReply *ErrorReply) {
miner, ok := s.miners.Get(params.Id) miner, ok := s.miners.Get(params.Id)
if !ok { if !ok {
errorReply = &ErrorReply{Code: -1, Message: "Unauthenticated", Close: true} errorReply = &ErrorReply{Code: -1, Message: "Unauthenticated", Close: true}
@ -45,7 +46,7 @@ func (s *StratumServer) handleGetJobRPC(cs *Session, params *GetJobParams) (repl
return return
} }
func (s *StratumServer) handleSubmitRPC(cs *Session, params *SubmitParams) (reply *SubmitReply, errorReply *ErrorReply) { func (s *StratumServer) handleSubmitRPC(cs *Session, e *Endpoint, params *SubmitParams) (reply *SubmitReply, errorReply *ErrorReply) {
miner, ok := s.miners.Get(params.Id) miner, ok := s.miners.Get(params.Id)
if !ok { if !ok {
errorReply = &ErrorReply{Code: -1, Message: "Unauthenticated", Close: true} errorReply = &ErrorReply{Code: -1, Message: "Unauthenticated", Close: true}
@ -77,7 +78,7 @@ func (s *StratumServer) handleSubmitRPC(cs *Session, params *SubmitParams) (repl
return return
} }
validShare := miner.processShare(s, job, t, nonce, params.Result) validShare := miner.processShare(s, e, job, t, nonce, params.Result)
if !validShare { if !validShare {
errorReply = &ErrorReply{Code: -1, Message: "Low difficulty share", Close: !ok} errorReply = &ErrorReply{Code: -1, Message: "Low difficulty share", Close: !ok}
return return

View File

@ -35,6 +35,7 @@ type Miner struct {
TargetHex string TargetHex string
LastBeat int64 LastBeat int64
Session *Session Session *Session
Endpoint *Endpoint
} }
func (job *Job) submit(nonce string) bool { func (job *Job) submit(nonce string) bool {
@ -75,7 +76,8 @@ func (m *Miner) getJob(s *StratumServer) *JobReplyData {
return &JobReplyData{} return &JobReplyData{}
} }
blob, extraNonce := t.nextBlob() extraNonce := atomic.AddUint32(&m.Endpoint.extraNonce, 1)
blob := t.nextBlob(extraNonce, m.Endpoint.instanceId)
job := &Job{Id: util.Random(), ExtraNonce: extraNonce, Height: t.Height, Difficulty: m.Difficulty} job := &Job{Id: util.Random(), ExtraNonce: extraNonce, Height: t.Height, Difficulty: m.Difficulty}
job.Submissions = make(map[string]bool) job.Submissions = make(map[string]bool)
m.pushJob(job) m.pushJob(job)
@ -99,9 +101,10 @@ func (m *Miner) findJob(id string) *Job {
return nil return nil
} }
func (m *Miner) processShare(s *StratumServer, job *Job, t *BlockTemplate, nonce string, result string) bool { func (m *Miner) processShare(s *StratumServer, e *Endpoint, job *Job, t *BlockTemplate, nonce string, result string) bool {
shareBuff := make([]byte, len(t.Buffer)) shareBuff := make([]byte, len(t.Buffer))
copy(shareBuff, t.Buffer) copy(shareBuff, t.Buffer)
copy(shareBuff[t.ReservedOffset+4:t.ReservedOffset+7], e.instanceId)
extraBuff := new(bytes.Buffer) extraBuff := new(bytes.Buffer)
binary.Write(extraBuff, binary.BigEndian, job.ExtraNonce) binary.Write(extraBuff, binary.BigEndian, job.ExtraNonce)
@ -144,6 +147,6 @@ func (m *Miner) processShare(s *StratumServer, job *Job, t *BlockTemplate, nonce
return false return false
} }
log.Printf("Valid share at difficulty %v/%v", s.port.Difficulty, hashDiff) log.Printf("Valid share at difficulty %v/%v", e.config.Difficulty, hashDiff)
return true return true
} }

View File

@ -18,14 +18,17 @@ import (
) )
type StratumServer struct { type StratumServer struct {
config *pool.Config config *pool.Config
port pool.Port miners MinersMap
miners MinersMap blockTemplate atomic.Value
blockTemplate atomic.Value rpc *rpc.RPCClient
instanceId []byte timeout time.Duration
rpc *rpc.RPCClient }
timeout time.Duration
broadcastTimer *time.Timer type Endpoint struct {
config *pool.Port
instanceId []byte
extraNonce uint32
} }
type Session struct { type Session struct {
@ -39,14 +42,8 @@ const (
MaxReqSize = 10 * 1024 MaxReqSize = 10 * 1024
) )
func NewStratum(cfg *pool.Config, port pool.Port) *StratumServer { func NewStratum(cfg *pool.Config) *StratumServer {
b := make([]byte, 4) stratum := &StratumServer{config: cfg}
_, err := rand.Read(b)
if err != nil {
log.Fatalf("Can't seed with random bytes: %v", err)
}
stratum := &StratumServer{config: cfg, port: port, instanceId: b}
stratum.rpc = rpc.NewRPCClient(cfg) stratum.rpc = rpc.NewRPCClient(cfg)
stratum.miners = NewMinersMap() stratum.miners = NewMinersMap()
@ -73,8 +70,29 @@ func NewStratum(cfg *pool.Config, port pool.Port) *StratumServer {
return stratum return stratum
} }
func NewEndpoint(cfg *pool.Port) *Endpoint {
e := &Endpoint{config: cfg}
e.instanceId = make([]byte, 4)
_, err := rand.Read(e.instanceId)
if err != nil {
log.Fatalf("Can't seed with random bytes: %v", err)
}
return e
}
func (s *StratumServer) Listen() { func (s *StratumServer) Listen() {
bindAddr := fmt.Sprintf("%s:%d", s.port.Host, s.port.Port) quit := make(chan bool)
for _, port := range s.config.Stratum.Ports {
go func(cfg pool.Port) {
e := NewEndpoint(&cfg)
e.Listen(s)
}(port)
}
<-quit
}
func (e *Endpoint) Listen(s *StratumServer) {
bindAddr := fmt.Sprintf("%s:%d", e.config.Host, e.config.Port)
addr, err := net.ResolveTCPAddr("tcp", bindAddr) addr, err := net.ResolveTCPAddr("tcp", bindAddr)
checkError(err) checkError(err)
server, err := net.ListenTCP("tcp", addr) server, err := net.ListenTCP("tcp", addr)
@ -82,7 +100,7 @@ func (s *StratumServer) Listen() {
defer server.Close() defer server.Close()
log.Printf("Stratum listening on %s", bindAddr) log.Printf("Stratum listening on %s", bindAddr)
var accept = make(chan int, s.port.MaxConn) accept := make(chan int, e.config.MaxConn)
n := 0 n := 0
for { for {
@ -96,7 +114,7 @@ func (s *StratumServer) Listen() {
accept <- n accept <- n
go func() { go func() {
err = s.handleClient(conn, ip) err = s.handleClient(conn, e, ip)
if err != nil { if err != nil {
conn.Close() conn.Close()
} }
@ -105,7 +123,7 @@ func (s *StratumServer) Listen() {
} }
} }
func (s *StratumServer) handleClient(conn *net.TCPConn, ip string) error { func (s *StratumServer) handleClient(conn *net.TCPConn, e *Endpoint, ip string) error {
cs := &Session{conn: conn, ip: ip} cs := &Session{conn: conn, ip: ip}
cs.enc = json.NewEncoder(conn) cs.enc = json.NewEncoder(conn)
connbuff := bufio.NewReaderSize(conn, MaxReqSize) connbuff := bufio.NewReaderSize(conn, MaxReqSize)
@ -135,13 +153,13 @@ func (s *StratumServer) handleClient(conn *net.TCPConn, ip string) error {
return err return err
} }
s.setDeadline(conn) s.setDeadline(conn)
cs.handleMessage(s, &req) cs.handleMessage(s, e, &req)
} }
} }
return nil return nil
} }
func (cs *Session) handleMessage(s *StratumServer, req *JSONRpcReq) { func (cs *Session) handleMessage(s *StratumServer, e *Endpoint, req *JSONRpcReq) {
if req.Id == nil { if req.Id == nil {
log.Println("Missing RPC id") log.Println("Missing RPC id")
cs.conn.Close() cs.conn.Close()
@ -163,7 +181,7 @@ func (cs *Session) handleMessage(s *StratumServer, req *JSONRpcReq) {
log.Println("Unable to parse params") log.Println("Unable to parse params")
break break
} }
reply, errReply := s.handleLoginRPC(cs, &params) reply, errReply := s.handleLoginRPC(cs, e, &params)
if errReply != nil { if errReply != nil {
err = cs.sendError(req.Id, errReply) err = cs.sendError(req.Id, errReply)
break break
@ -176,7 +194,7 @@ func (cs *Session) handleMessage(s *StratumServer, req *JSONRpcReq) {
log.Println("Unable to parse params") log.Println("Unable to parse params")
break break
} }
reply, errReply := s.handleGetJobRPC(cs, &params) reply, errReply := s.handleGetJobRPC(cs, e, &params)
if errReply != nil { if errReply != nil {
err = cs.sendError(req.Id, errReply) err = cs.sendError(req.Id, errReply)
break break
@ -189,7 +207,7 @@ func (cs *Session) handleMessage(s *StratumServer, req *JSONRpcReq) {
log.Println("Unable to parse params") log.Println("Unable to parse params")
break break
} }
reply, errReply := s.handleSubmitRPC(cs, &params) reply, errReply := s.handleSubmitRPC(cs, e, &params)
if errReply != nil { if errReply != nil {
err = cs.sendError(req.Id, errReply) err = cs.sendError(req.Id, errReply)
break break

12
main.go
View File

@ -27,16 +27,8 @@ func startStratum() {
log.Printf("Running with default %v threads", n) log.Printf("Running with default %v threads", n)
} }
quit := make(chan bool) s := stratum.NewStratum(&cfg)
for _, port := range cfg.Stratum.Ports { s.Listen()
s := stratum.NewStratum(&cfg, port)
go func() {
s.Listen()
quit <- true
}()
}
<-quit
} }
func startNewrelic() { func startNewrelic() {