2016-12-06 23:39:39 +05:00

341 lines
7.4 KiB
Go

package stratum
import (
"bufio"
"crypto/rand"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net"
"sync"
"sync/atomic"
"time"
"../pool"
"../rpc"
)
type StratumServer struct {
config *pool.Config
miners MinersMap
blockTemplate atomic.Value
upstream int32
upstreams []*rpc.RPCClient
timeout time.Duration
estimationWindow time.Duration
blocksMu sync.RWMutex
blockStats map[int64]float64
luckWindow int64
luckLargeWindow int64
roundShares int64
}
type Endpoint struct {
config *pool.Port
instanceId []byte
extraNonce uint32
}
type Session struct {
sync.Mutex
conn *net.TCPConn
enc *json.Encoder
ip string
}
const (
MaxReqSize = 10 * 1024
)
func NewStratum(cfg *pool.Config) *StratumServer {
stratum := &StratumServer{config: cfg}
stratum.upstreams = make([]*rpc.RPCClient, len(cfg.Upstream))
for i, v := range cfg.Upstream {
client, err := rpc.NewRPCClient(&v)
if err != nil {
log.Fatal(err)
} else {
stratum.upstreams[i] = client
log.Printf("Upstream: %s => %s", client.Name, client.Url)
}
}
log.Printf("Default upstream: %s => %s", stratum.rpc().Name, stratum.rpc().Url)
stratum.miners = NewMinersMap()
timeout, _ := time.ParseDuration(cfg.Stratum.Timeout)
stratum.timeout = timeout
estimationWindow, _ := time.ParseDuration(cfg.EstimationWindow)
stratum.estimationWindow = estimationWindow
luckWindow, _ := time.ParseDuration(cfg.LuckWindow)
stratum.luckWindow = int64(luckWindow / time.Millisecond)
luckLargeWindow, _ := time.ParseDuration(cfg.LargeLuckWindow)
stratum.luckLargeWindow = int64(luckLargeWindow / time.Millisecond)
// Init block template
stratum.refreshBlockTemplate(false)
refreshIntv, _ := time.ParseDuration(cfg.BlockRefreshInterval)
refreshTimer := time.NewTimer(refreshIntv)
log.Printf("Set block refresh every %v", refreshIntv)
checkIntv, _ := time.ParseDuration(cfg.UpstreamCheckInterval)
checkTimer := time.NewTimer(checkIntv)
go func() {
for {
select {
case <-refreshTimer.C:
stratum.refreshBlockTemplate(true)
refreshTimer.Reset(refreshIntv)
}
}
}()
go func() {
for {
select {
case <-checkTimer.C:
stratum.checkUpstreams()
checkTimer.Reset(checkIntv)
}
}
}()
return stratum
}
func NewEndpoint(cfg *pool.Port) *Endpoint {
e := &Endpoint{config: cfg}
e.instanceId = make([]byte, 4)
_, err := rand.Read(e.instanceId)
if err != nil {
log.Fatalf("Can't seed with random bytes: %v", err)
}
return e
}
func (s *StratumServer) Listen() {
quit := make(chan bool)
for _, port := range s.config.Stratum.Ports {
go func(cfg pool.Port) {
e := NewEndpoint(&cfg)
e.Listen(s)
}(port)
}
<-quit
}
func (e *Endpoint) Listen(s *StratumServer) {
bindAddr := fmt.Sprintf("%s:%d", e.config.Host, e.config.Port)
addr, err := net.ResolveTCPAddr("tcp", bindAddr)
checkError(err)
server, err := net.ListenTCP("tcp", addr)
checkError(err)
defer server.Close()
log.Printf("Stratum listening on %s", bindAddr)
accept := make(chan int, e.config.MaxConn)
n := 0
for {
conn, err := server.AcceptTCP()
if err != nil {
continue
}
conn.SetKeepAlive(true)
ip, _, _ := net.SplitHostPort(conn.RemoteAddr().String())
n += 1
accept <- n
go func() {
err = s.handleClient(conn, e, ip)
if err != nil {
conn.Close()
}
<-accept
}()
}
}
func (s *StratumServer) handleClient(conn *net.TCPConn, e *Endpoint, ip string) error {
cs := &Session{conn: conn, ip: ip}
cs.enc = json.NewEncoder(conn)
connbuff := bufio.NewReaderSize(conn, MaxReqSize)
s.setDeadline(conn)
for {
data, isPrefix, err := connbuff.ReadLine()
if isPrefix {
log.Printf("Socket flood detected")
// TODO: Ban client
return errors.New("Socket flood")
} else if err == io.EOF {
log.Printf("Client disconnected")
break
} else if err != nil {
log.Printf("Error reading: %v", err)
return err
}
// NOTICE: cpuminer-multi sends junk newlines, so we demand at least 1 byte for decode
// NOTICE: Ns*CNMiner.exe will send malformed JSON on very low diff, not sure we should handle this
if len(data) > 1 {
var req JSONRpcReq
err = json.Unmarshal(data, &req)
if err != nil {
log.Printf("Malformed request: %v", err)
return err
}
s.setDeadline(conn)
cs.handleMessage(s, e, &req)
}
}
return nil
}
func (cs *Session) handleMessage(s *StratumServer, e *Endpoint, req *JSONRpcReq) {
if req.Id == nil {
log.Println("Missing RPC id")
cs.conn.Close()
return
} else if req.Params == nil {
log.Println("Missing RPC params")
cs.conn.Close()
return
}
var err error
// Handle RPC methods
switch req.Method {
case "login":
var params LoginParams
err = json.Unmarshal(*req.Params, &params)
if err != nil {
log.Println("Unable to parse params")
break
}
reply, errReply := s.handleLoginRPC(cs, e, &params)
if errReply != nil {
err = cs.sendError(req.Id, errReply)
break
}
err = cs.sendResult(req.Id, &reply)
case "getjob":
var params GetJobParams
err = json.Unmarshal(*req.Params, &params)
if err != nil {
log.Println("Unable to parse params")
break
}
reply, errReply := s.handleGetJobRPC(cs, e, &params)
if errReply != nil {
err = cs.sendError(req.Id, errReply)
break
}
err = cs.sendResult(req.Id, &reply)
case "submit":
var params SubmitParams
err := json.Unmarshal(*req.Params, &params)
if err != nil {
log.Println("Unable to parse params")
break
}
reply, errReply := s.handleSubmitRPC(cs, e, &params)
if errReply != nil {
err = cs.sendError(req.Id, errReply)
break
}
err = cs.sendResult(req.Id, &reply)
default:
errReply := s.handleUnknownRPC(cs, req)
err = cs.sendError(req.Id, errReply)
}
if err != nil {
cs.conn.Close()
}
}
func (cs *Session) sendResult(id *json.RawMessage, result interface{}) error {
cs.Lock()
defer cs.Unlock()
message := JSONRpcResp{Id: id, Version: "2.0", Error: nil, Result: result}
return cs.enc.Encode(&message)
}
func (cs *Session) pushMessage(method string, params interface{}) error {
cs.Lock()
defer cs.Unlock()
message := JSONPushMessage{Version: "2.0", Method: method, Params: params}
return cs.enc.Encode(&message)
}
func (cs *Session) sendError(id *json.RawMessage, reply *ErrorReply) error {
cs.Lock()
defer cs.Unlock()
message := JSONRpcResp{Id: id, Version: "2.0", Error: reply}
err := cs.enc.Encode(&message)
if reply.Close {
return errors.New("Force close")
}
return err
}
func (s *StratumServer) setDeadline(conn *net.TCPConn) {
conn.SetDeadline(time.Now().Add(s.timeout))
}
func (s *StratumServer) registerMiner(miner *Miner) {
s.miners.Set(miner.Id, miner)
}
func (s *StratumServer) removeMiner(id string) {
s.miners.Remove(id)
}
func (s *StratumServer) currentBlockTemplate() *BlockTemplate {
if t := s.blockTemplate.Load(); t != nil {
return t.(*BlockTemplate)
}
return nil
}
func (s *StratumServer) checkUpstreams() {
candidate := int32(0)
backup := false
for i, v := range s.upstreams {
ok, err := v.Check(8, s.config.Address)
if err != nil {
log.Printf("Upstream %v didn't pass check: %v", v.Name, err)
}
if ok && !backup {
candidate = int32(i)
backup = true
}
}
if s.upstream != candidate {
log.Printf("Switching to %v upstream", s.upstreams[candidate].Name)
atomic.StoreInt32(&s.upstream, candidate)
}
}
func (s *StratumServer) rpc() *rpc.RPCClient {
i := atomic.LoadInt32(&s.upstream)
return s.upstreams[i]
}
func checkError(err error) {
if err != nil {
log.Fatalf("Error: %v", err)
}
}