keva-stratum/stratum/stratum.go

401 lines
9.2 KiB
Go
Raw Normal View History

2015-07-05 14:49:07 +05:00
package stratum
import (
"bufio"
"crypto/rand"
"encoding/json"
"fmt"
"io"
"log"
2018-02-10 05:08:43 +05:00
"math/big"
2015-07-05 14:49:07 +05:00
"net"
"sync"
"sync/atomic"
"time"
2019-02-26 15:27:48 -08:00
"github.com/kevacoin-project/keva-stratum/pool"
"github.com/kevacoin-project/keva-stratum/rpc"
"github.com/kevacoin-project/keva-stratum/util"
2015-07-05 14:49:07 +05:00
)
type StratumServer struct {
2016-12-06 19:25:07 +05:00
config *pool.Config
miners MinersMap
blockTemplate atomic.Value
2016-12-06 22:16:57 +05:00
upstream int32
upstreams []*rpc.RPCClient
2016-12-06 19:25:07 +05:00
timeout time.Duration
estimationWindow time.Duration
blocksMu sync.RWMutex
2017-03-27 03:54:04 +05:00
blockStats map[int64]blockEntry
2016-12-06 19:25:07 +05:00
luckWindow int64
luckLargeWindow int64
roundShares int64
2016-12-07 01:29:59 +05:00
sessionsMu sync.RWMutex
sessions map[*Session]struct{}
2016-12-06 17:07:45 +05:00
}
2017-03-27 03:54:04 +05:00
type blockEntry struct {
height int64
hash string
variance float64
}
2016-12-06 17:07:45 +05:00
type Endpoint struct {
2016-12-07 13:42:41 +05:00
config *pool.Port
2018-02-10 05:08:43 +05:00
difficulty *big.Int
2016-12-07 13:42:41 +05:00
instanceId []byte
extraNonce uint32
2016-12-07 13:57:02 +05:00
targetHex string
2016-12-07 13:42:41 +05:00
jobSequence uint64
2015-07-05 14:49:07 +05:00
}
type Session struct {
sync.Mutex
2016-12-07 01:29:59 +05:00
conn *net.TCPConn
enc *json.Encoder
ip string
endpoint *Endpoint
validJobs []*Job
lastBlockHeight int64
2015-07-05 14:49:07 +05:00
}
const (
MaxReqSize = 10 * 1024
)
2016-12-06 17:07:45 +05:00
func NewStratum(cfg *pool.Config) *StratumServer {
2017-03-27 03:54:04 +05:00
stratum := &StratumServer{config: cfg, blockStats: make(map[int64]blockEntry)}
2016-12-06 22:16:57 +05:00
stratum.upstreams = make([]*rpc.RPCClient, len(cfg.Upstream))
for i, v := range cfg.Upstream {
client, err := rpc.NewRPCClient(&v)
if err != nil {
log.Fatal(err)
} else {
stratum.upstreams[i] = client
log.Printf("Upstream: %s => %s", client.Name, client.Url)
}
}
log.Printf("Default upstream: %s => %s", stratum.rpc().Name, stratum.rpc().Url)
2015-07-05 14:49:07 +05:00
stratum.miners = NewMinersMap()
2016-12-07 01:29:59 +05:00
stratum.sessions = make(map[*Session]struct{})
2015-07-05 14:49:07 +05:00
timeout, _ := time.ParseDuration(cfg.Stratum.Timeout)
stratum.timeout = timeout
2016-12-06 19:21:06 +05:00
estimationWindow, _ := time.ParseDuration(cfg.EstimationWindow)
stratum.estimationWindow = estimationWindow
luckWindow, _ := time.ParseDuration(cfg.LuckWindow)
stratum.luckWindow = int64(luckWindow / time.Millisecond)
luckLargeWindow, _ := time.ParseDuration(cfg.LargeLuckWindow)
stratum.luckLargeWindow = int64(luckLargeWindow / time.Millisecond)
2016-12-06 23:39:39 +05:00
refreshIntv, _ := time.ParseDuration(cfg.BlockRefreshInterval)
2015-07-05 14:49:07 +05:00
refreshTimer := time.NewTimer(refreshIntv)
log.Printf("Set block refresh every %v", refreshIntv)
2016-12-06 22:16:57 +05:00
checkIntv, _ := time.ParseDuration(cfg.UpstreamCheckInterval)
checkTimer := time.NewTimer(checkIntv)
infoIntv, _ := time.ParseDuration(cfg.UpstreamCheckInterval)
infoTimer := time.NewTimer(infoIntv)
// Init block template
2016-12-07 12:32:37 +05:00
go stratum.refreshBlockTemplate(false)
2015-07-05 14:49:07 +05:00
go func() {
for {
select {
case <-refreshTimer.C:
stratum.refreshBlockTemplate(true)
refreshTimer.Reset(refreshIntv)
}
}
}()
2016-12-06 22:16:57 +05:00
go func() {
for {
select {
case <-checkTimer.C:
stratum.checkUpstreams()
checkTimer.Reset(checkIntv)
}
}
}()
go func() {
for {
select {
case <-infoTimer.C:
poll := func(v *rpc.RPCClient) {
_, err := v.UpdateInfo()
if err != nil {
log.Printf("Unable to update info on upstream %s: %v", v.Name, err)
}
}
current := stratum.rpc()
poll(current)
// Async rpc call to not block on rpc timeout, ignoring current
go func() {
for _, v := range stratum.upstreams {
if v != current {
poll(v)
}
}
}()
infoTimer.Reset(infoIntv)
}
}
}()
2015-07-05 14:49:07 +05:00
return stratum
}
2016-12-06 17:07:45 +05:00
func NewEndpoint(cfg *pool.Port) *Endpoint {
e := &Endpoint{config: cfg}
e.instanceId = make([]byte, 4)
_, err := rand.Read(e.instanceId)
if err != nil {
log.Fatalf("Can't seed with random bytes: %v", err)
}
2016-12-07 13:57:02 +05:00
e.targetHex = util.GetTargetHex(e.config.Difficulty)
2018-02-10 05:08:43 +05:00
e.difficulty = big.NewInt(e.config.Difficulty)
2016-12-06 17:07:45 +05:00
return e
}
2015-07-05 14:49:07 +05:00
func (s *StratumServer) Listen() {
2016-12-06 17:07:45 +05:00
quit := make(chan bool)
for _, port := range s.config.Stratum.Ports {
go func(cfg pool.Port) {
e := NewEndpoint(&cfg)
e.Listen(s)
}(port)
}
<-quit
}
func (e *Endpoint) Listen(s *StratumServer) {
bindAddr := fmt.Sprintf("%s:%d", e.config.Host, e.config.Port)
2015-07-05 14:49:07 +05:00
addr, err := net.ResolveTCPAddr("tcp", bindAddr)
2016-12-07 02:19:42 +05:00
if err != nil {
log.Fatalf("Error: %v", err)
}
2015-07-05 14:49:07 +05:00
server, err := net.ListenTCP("tcp", addr)
2016-12-07 02:19:42 +05:00
if err != nil {
log.Fatalf("Error: %v", err)
}
2015-07-05 14:49:07 +05:00
defer server.Close()
log.Printf("Stratum listening on %s", bindAddr)
2016-12-06 17:07:45 +05:00
accept := make(chan int, e.config.MaxConn)
2015-07-05 14:49:07 +05:00
n := 0
for {
conn, err := server.AcceptTCP()
2016-07-31 02:36:07 +05:00
if err != nil {
continue
}
2015-07-05 14:49:07 +05:00
conn.SetKeepAlive(true)
ip, _, _ := net.SplitHostPort(conn.RemoteAddr().String())
2016-12-07 01:29:59 +05:00
cs := &Session{conn: conn, ip: ip, enc: json.NewEncoder(conn), endpoint: e}
2015-07-05 14:49:07 +05:00
n += 1
accept <- n
go func() {
2016-12-07 02:19:42 +05:00
s.handleClient(cs, e)
2015-07-05 14:49:07 +05:00
<-accept
}()
}
}
2016-12-07 02:19:42 +05:00
func (s *StratumServer) handleClient(cs *Session, e *Endpoint) {
2016-12-07 01:29:59 +05:00
connbuff := bufio.NewReaderSize(cs.conn, MaxReqSize)
s.setDeadline(cs.conn)
2015-07-05 14:49:07 +05:00
for {
data, isPrefix, err := connbuff.ReadLine()
if isPrefix {
2016-12-07 02:19:42 +05:00
log.Println("Socket flood detected from", cs.ip)
break
2015-07-05 14:49:07 +05:00
} else if err == io.EOF {
2016-12-07 01:59:22 +05:00
log.Println("Client disconnected", cs.ip)
2015-07-05 14:49:07 +05:00
break
} else if err != nil {
2016-12-07 02:19:42 +05:00
log.Println("Error reading:", err)
break
2015-07-05 14:49:07 +05:00
}
// NOTICE: cpuminer-multi sends junk newlines, so we demand at least 1 byte for decode
// NOTICE: Ns*CNMiner.exe will send malformed JSON on very low diff, not sure we should handle this
if len(data) > 1 {
var req JSONRpcReq
err = json.Unmarshal(data, &req)
if err != nil {
2016-12-07 15:25:48 +05:00
log.Printf("Malformed request from %s: %v", cs.ip, err)
2016-12-07 02:19:42 +05:00
break
2015-07-05 14:49:07 +05:00
}
2016-12-07 01:29:59 +05:00
s.setDeadline(cs.conn)
2016-12-07 02:19:42 +05:00
err = cs.handleMessage(s, e, &req)
if err != nil {
break
}
2015-07-05 14:49:07 +05:00
}
}
2016-12-07 02:19:42 +05:00
s.removeSession(cs)
cs.conn.Close()
2015-07-05 14:49:07 +05:00
}
2016-12-07 02:19:42 +05:00
func (cs *Session) handleMessage(s *StratumServer, e *Endpoint, req *JSONRpcReq) error {
2015-07-05 14:49:07 +05:00
if req.Id == nil {
2016-12-07 02:19:42 +05:00
err := fmt.Errorf("Server disconnect request")
log.Println(err)
return err
2015-07-05 14:49:07 +05:00
} else if req.Params == nil {
2016-12-07 02:19:42 +05:00
err := fmt.Errorf("Server RPC request params")
log.Println(err)
return err
2015-07-05 14:49:07 +05:00
}
// Handle RPC methods
switch req.Method {
2016-12-07 02:19:42 +05:00
2015-07-05 14:49:07 +05:00
case "login":
var params LoginParams
2016-12-07 02:19:42 +05:00
err := json.Unmarshal(*req.Params, &params)
2015-07-05 14:49:07 +05:00
if err != nil {
log.Println("Unable to parse params")
2016-12-07 02:19:42 +05:00
return err
2015-07-05 14:49:07 +05:00
}
2016-12-07 12:07:31 +05:00
reply, errReply := s.handleLoginRPC(cs, &params)
2015-07-05 14:49:07 +05:00
if errReply != nil {
2016-12-07 02:19:42 +05:00
return cs.sendError(req.Id, errReply, true)
2015-07-05 14:49:07 +05:00
}
2016-12-07 02:19:42 +05:00
return cs.sendResult(req.Id, &reply)
2015-07-05 14:49:07 +05:00
case "getjob":
var params GetJobParams
2016-12-07 02:19:42 +05:00
err := json.Unmarshal(*req.Params, &params)
2015-07-05 14:49:07 +05:00
if err != nil {
log.Println("Unable to parse params")
2016-12-07 02:19:42 +05:00
return err
2015-07-05 14:49:07 +05:00
}
2016-12-07 12:07:31 +05:00
reply, errReply := s.handleGetJobRPC(cs, &params)
2015-07-05 14:49:07 +05:00
if errReply != nil {
2016-12-07 02:19:42 +05:00
return cs.sendError(req.Id, errReply, true)
2015-07-05 14:49:07 +05:00
}
2016-12-07 02:19:42 +05:00
return cs.sendResult(req.Id, &reply)
2015-07-05 14:49:07 +05:00
case "submit":
var params SubmitParams
err := json.Unmarshal(*req.Params, &params)
if err != nil {
log.Println("Unable to parse params")
2016-12-07 02:19:42 +05:00
return err
2015-07-05 14:49:07 +05:00
}
2016-12-07 12:07:31 +05:00
reply, errReply := s.handleSubmitRPC(cs, &params)
2015-07-05 14:49:07 +05:00
if errReply != nil {
2016-12-07 02:29:01 +05:00
return cs.sendError(req.Id, errReply, false)
2015-07-05 14:49:07 +05:00
}
2016-12-07 02:19:42 +05:00
return cs.sendResult(req.Id, &reply)
2017-09-24 00:43:26 +05:00
case "keepalived":
return cs.sendResult(req.Id, &StatusReply{Status: "KEEPALIVED"})
2015-07-05 14:49:07 +05:00
default:
2016-12-07 12:07:31 +05:00
errReply := s.handleUnknownRPC(req)
2016-12-07 02:19:42 +05:00
return cs.sendError(req.Id, errReply, true)
2015-07-05 14:49:07 +05:00
}
}
func (cs *Session) sendResult(id *json.RawMessage, result interface{}) error {
cs.Lock()
defer cs.Unlock()
message := JSONRpcResp{Id: id, Version: "2.0", Error: nil, Result: result}
return cs.enc.Encode(&message)
}
func (cs *Session) pushMessage(method string, params interface{}) error {
cs.Lock()
defer cs.Unlock()
message := JSONPushMessage{Version: "2.0", Method: method, Params: params}
return cs.enc.Encode(&message)
}
2016-12-07 02:19:42 +05:00
func (cs *Session) sendError(id *json.RawMessage, reply *ErrorReply, drop bool) error {
2015-07-05 14:49:07 +05:00
cs.Lock()
defer cs.Unlock()
message := JSONRpcResp{Id: id, Version: "2.0", Error: reply}
err := cs.enc.Encode(&message)
2016-12-07 02:19:42 +05:00
if err != nil {
return err
}
if drop {
return fmt.Errorf("Server disconnect request")
2015-07-05 14:49:07 +05:00
}
2016-12-07 02:19:42 +05:00
return nil
2015-07-05 14:49:07 +05:00
}
func (s *StratumServer) setDeadline(conn *net.TCPConn) {
conn.SetDeadline(time.Now().Add(s.timeout))
}
2016-12-07 01:29:59 +05:00
func (s *StratumServer) registerSession(cs *Session) {
s.sessionsMu.Lock()
defer s.sessionsMu.Unlock()
s.sessions[cs] = struct{}{}
}
func (s *StratumServer) removeSession(cs *Session) {
s.sessionsMu.Lock()
defer s.sessionsMu.Unlock()
delete(s.sessions, cs)
}
func (s *StratumServer) isActive(cs *Session) bool {
s.sessionsMu.RLock()
defer s.sessionsMu.RUnlock()
_, exist := s.sessions[cs]
return exist
}
2015-07-05 14:49:07 +05:00
func (s *StratumServer) registerMiner(miner *Miner) {
2016-12-07 11:55:29 +05:00
s.miners.Set(miner.id, miner)
2015-07-05 14:49:07 +05:00
}
func (s *StratumServer) removeMiner(id string) {
s.miners.Remove(id)
}
func (s *StratumServer) currentBlockTemplate() *BlockTemplate {
2016-12-06 19:25:07 +05:00
if t := s.blockTemplate.Load(); t != nil {
return t.(*BlockTemplate)
}
return nil
2015-07-05 14:49:07 +05:00
}
2016-12-06 22:16:57 +05:00
func (s *StratumServer) checkUpstreams() {
candidate := int32(0)
backup := false
for i, v := range s.upstreams {
ok, err := v.Check(8, s.config.Address)
if err != nil {
log.Printf("Upstream %v didn't pass check: %v", v.Name, err)
}
if ok && !backup {
candidate = int32(i)
backup = true
}
}
if s.upstream != candidate {
log.Printf("Switching to %v upstream", s.upstreams[candidate].Name)
atomic.StoreInt32(&s.upstream, candidate)
}
}
func (s *StratumServer) rpc() *rpc.RPCClient {
i := atomic.LoadInt32(&s.upstream)
return s.upstreams[i]
}