You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
264 lines
5.8 KiB
264 lines
5.8 KiB
package stratum |
|
|
|
import ( |
|
"bufio" |
|
"crypto/rand" |
|
"encoding/json" |
|
"errors" |
|
"fmt" |
|
"io" |
|
"log" |
|
"net" |
|
"sync" |
|
"sync/atomic" |
|
"time" |
|
|
|
"../pool" |
|
"../rpc" |
|
"../storage" |
|
"./policy" |
|
) |
|
|
|
type StratumServer struct { |
|
config *pool.Config |
|
port pool.Port |
|
miners MinersMap |
|
blockTemplate atomic.Value |
|
instanceId []byte |
|
rpc *rpc.RPCClient |
|
timeout time.Duration |
|
broadcastTimer *time.Timer |
|
storage *storage.RedisClient |
|
policy *policy.PolicyServer |
|
} |
|
|
|
type Session struct { |
|
sync.Mutex |
|
conn *net.TCPConn |
|
enc *json.Encoder |
|
ip string |
|
} |
|
|
|
const ( |
|
MaxReqSize = 10 * 1024 |
|
) |
|
|
|
func NewStratum(cfg *pool.Config, port pool.Port, storage *storage.RedisClient, policy *policy.PolicyServer) *StratumServer { |
|
b := make([]byte, 4) |
|
_, err := rand.Read(b) |
|
if err != nil { |
|
log.Fatalf("Can't seed with random bytes: %v", err) |
|
} |
|
|
|
stratum := &StratumServer{config: cfg, port: port, policy: policy, instanceId: b} |
|
stratum.rpc = rpc.NewRPCClient(cfg) |
|
stratum.miners = NewMinersMap() |
|
stratum.storage = storage |
|
|
|
timeout, _ := time.ParseDuration(cfg.Stratum.Timeout) |
|
stratum.timeout = timeout |
|
|
|
// Init block template |
|
stratum.blockTemplate.Store(&BlockTemplate{}) |
|
stratum.refreshBlockTemplate(false) |
|
|
|
refreshIntv, _ := time.ParseDuration(cfg.Stratum.BlockRefreshInterval) |
|
refreshTimer := time.NewTimer(refreshIntv) |
|
log.Printf("Set block refresh every %v", refreshIntv) |
|
|
|
go func() { |
|
for { |
|
select { |
|
case <-refreshTimer.C: |
|
stratum.refreshBlockTemplate(true) |
|
refreshTimer.Reset(refreshIntv) |
|
} |
|
} |
|
}() |
|
return stratum |
|
} |
|
|
|
func (s *StratumServer) Listen() { |
|
bindAddr := fmt.Sprintf("%s:%d", s.port.Host, s.port.Port) |
|
addr, err := net.ResolveTCPAddr("tcp", bindAddr) |
|
checkError(err) |
|
server, err := net.ListenTCP("tcp", addr) |
|
checkError(err) |
|
defer server.Close() |
|
|
|
log.Printf("Stratum listening on %s", bindAddr) |
|
var accept = make(chan int, s.port.MaxConn) |
|
n := 0 |
|
|
|
for { |
|
conn, err := server.AcceptTCP() |
|
if err != nil { |
|
continue |
|
} |
|
conn.SetKeepAlive(true) |
|
ip, _, _ := net.SplitHostPort(conn.RemoteAddr().String()) |
|
ok := s.policy.ApplyLimitPolicy(ip) |
|
if !ok { |
|
conn.Close() |
|
continue |
|
} |
|
n += 1 |
|
|
|
accept <- n |
|
go func() { |
|
err = s.handleClient(conn, ip) |
|
if err != nil { |
|
conn.Close() |
|
} |
|
<-accept |
|
}() |
|
} |
|
} |
|
|
|
func (s *StratumServer) handleClient(conn *net.TCPConn, ip string) error { |
|
cs := &Session{conn: conn, ip: ip} |
|
cs.enc = json.NewEncoder(conn) |
|
connbuff := bufio.NewReaderSize(conn, MaxReqSize) |
|
s.setDeadline(conn) |
|
|
|
for { |
|
data, isPrefix, err := connbuff.ReadLine() |
|
if isPrefix { |
|
log.Printf("Socket flood detected") |
|
// TODO: Ban client |
|
return errors.New("Socket flood") |
|
} else if err == io.EOF { |
|
log.Printf("Client disconnected") |
|
break |
|
} else if err != nil { |
|
log.Printf("Error reading: %v", err) |
|
return err |
|
} |
|
|
|
// NOTICE: cpuminer-multi sends junk newlines, so we demand at least 1 byte for decode |
|
// NOTICE: Ns*CNMiner.exe will send malformed JSON on very low diff, not sure we should handle this |
|
if len(data) > 1 { |
|
var req JSONRpcReq |
|
err = json.Unmarshal(data, &req) |
|
if err != nil { |
|
s.policy.ApplyMalformedPolicy(ip) |
|
log.Printf("Malformed request: %v", err) |
|
return err |
|
} |
|
s.setDeadline(conn) |
|
cs.handleMessage(s, &req) |
|
} |
|
} |
|
return nil |
|
} |
|
|
|
func (cs *Session) handleMessage(s *StratumServer, req *JSONRpcReq) { |
|
if req.Id == nil { |
|
log.Println("Missing RPC id") |
|
cs.conn.Close() |
|
return |
|
} else if req.Params == nil { |
|
log.Println("Missing RPC params") |
|
cs.conn.Close() |
|
return |
|
} |
|
|
|
var err error |
|
|
|
// Handle RPC methods |
|
switch req.Method { |
|
case "login": |
|
var params LoginParams |
|
err = json.Unmarshal(*req.Params, ¶ms) |
|
if err != nil { |
|
log.Println("Unable to parse params") |
|
break |
|
} |
|
reply, errReply := s.handleLoginRPC(cs, ¶ms) |
|
if errReply != nil { |
|
err = cs.sendError(req.Id, errReply) |
|
break |
|
} |
|
err = cs.sendResult(req.Id, &reply) |
|
case "getjob": |
|
var params GetJobParams |
|
err = json.Unmarshal(*req.Params, ¶ms) |
|
if err != nil { |
|
log.Println("Unable to parse params") |
|
break |
|
} |
|
reply, errReply := s.handleGetJobRPC(cs, ¶ms) |
|
if errReply != nil { |
|
err = cs.sendError(req.Id, errReply) |
|
break |
|
} |
|
err = cs.sendResult(req.Id, &reply) |
|
case "submit": |
|
var params SubmitParams |
|
err := json.Unmarshal(*req.Params, ¶ms) |
|
if err != nil { |
|
log.Println("Unable to parse params") |
|
break |
|
} |
|
reply, errReply := s.handleSubmitRPC(cs, ¶ms) |
|
if errReply != nil { |
|
err = cs.sendError(req.Id, errReply) |
|
break |
|
} |
|
err = cs.sendResult(req.Id, &reply) |
|
default: |
|
errReply := s.handleUnknownRPC(cs, req) |
|
err = cs.sendError(req.Id, errReply) |
|
} |
|
|
|
if err != nil { |
|
cs.conn.Close() |
|
} |
|
} |
|
|
|
func (cs *Session) sendResult(id *json.RawMessage, result interface{}) error { |
|
cs.Lock() |
|
defer cs.Unlock() |
|
message := JSONRpcResp{Id: id, Version: "2.0", Error: nil, Result: result} |
|
return cs.enc.Encode(&message) |
|
} |
|
|
|
func (cs *Session) pushMessage(method string, params interface{}) error { |
|
cs.Lock() |
|
defer cs.Unlock() |
|
message := JSONPushMessage{Version: "2.0", Method: method, Params: params} |
|
return cs.enc.Encode(&message) |
|
} |
|
|
|
func (cs *Session) sendError(id *json.RawMessage, reply *ErrorReply) error { |
|
cs.Lock() |
|
defer cs.Unlock() |
|
message := JSONRpcResp{Id: id, Version: "2.0", Error: reply} |
|
err := cs.enc.Encode(&message) |
|
if reply.Close { |
|
return errors.New("Force close") |
|
} |
|
return err |
|
} |
|
|
|
func (s *StratumServer) setDeadline(conn *net.TCPConn) { |
|
conn.SetDeadline(time.Now().Add(s.timeout)) |
|
} |
|
|
|
func (s *StratumServer) registerMiner(miner *Miner) { |
|
s.miners.Set(miner.Id, miner) |
|
} |
|
|
|
func (s *StratumServer) removeMiner(id string) { |
|
s.miners.Remove(id) |
|
} |
|
|
|
func (s *StratumServer) currentBlockTemplate() *BlockTemplate { |
|
return s.blockTemplate.Load().(*BlockTemplate) |
|
} |
|
|
|
func checkError(err error) { |
|
if err != nil { |
|
log.Fatalf("Error: %v", err) |
|
} |
|
}
|
|
|