From f3d2a5412cdcd19ff187242845529ac735bcc26b Mon Sep 17 00:00:00 2001 From: Sammy Libre Date: Sun, 7 Aug 2016 07:41:53 +0500 Subject: [PATCH] Temporarily drop redis and policy --- config.example.json | 28 ------- go-pool/pool/pool.go | 34 --------- go-pool/storage/redis.go | 135 ---------------------------------- go-pool/storage/redis_test.go | 74 ------------------- go-pool/stratum/handlers.go | 7 -- go-pool/stratum/miner.go | 3 - go-pool/stratum/stratum.go | 15 +--- main.go | 8 +- 8 files changed, 3 insertions(+), 301 deletions(-) delete mode 100644 go-pool/storage/redis.go delete mode 100644 go-pool/storage/redis_test.go diff --git a/config.example.json b/config.example.json index 732f6b6..ec7a808 100644 --- a/config.example.json +++ b/config.example.json @@ -37,34 +37,6 @@ "timeout": "1s" }, - "redis": { - "endpoint": "127.0.0.1:6379", - "poolSize": 8, - "database": 0 - }, - - "policy": { - "workers": 8, - "resetInterval": "60m", - "refreshInterval": "1m", - - "banning": { - "enabled": true, - "ipset": "blacklist", - "timeout": 1800, - "invalidPercent": 30, - "checkThreshold": 30, - "malformedLimit": 5 - }, - - "limits": { - "enabled": false, - "limit": 30, - "grace": "5m", - "limitJump": 10 - } - }, - "newrelicEnabled": false, "newrelicName": "MyStratum", "newrelicKey": "SECRET_KEY", diff --git a/go-pool/pool/pool.go b/go-pool/pool/pool.go index ac0db9a..80845e3 100644 --- a/go-pool/pool/pool.go +++ b/go-pool/pool/pool.go @@ -5,13 +5,10 @@ type Config struct { BypassAddressValidation bool `json:"bypassAddressValidation"` Stratum Stratum `json:"stratum"` Daemon Daemon `json:"daemon"` - Redis Redis `json:"redis"` Threads int `json:"threads"` Coin string `json:"coin"` - Policy Policy `json:"policy"` - NewrelicName string `json:"newrelicName"` NewrelicKey string `json:"newrelicKey"` NewrelicVerbose bool `json:"newrelicVerbose"` @@ -36,34 +33,3 @@ type Daemon struct { Port int `json:"port"` Timeout string `json:"timeout"` } - -type Redis struct { - Endpoint string `json:"endpoint"` - Password string `json:"password"` - Database int64 `json:"database"` - PoolSize int `json:"poolSize"` -} - -type Policy struct { - Workers int `json:"workers"` - Banning Banning `json:"banning"` - Limits Limits `json:"limits"` - ResetInterval string `json:"resetInterval"` - RefreshInterval string `json:"refreshInterval"` -} - -type Banning struct { - Enabled bool `json:"enabled"` - IPSet string `json:"ipset"` - Timeout int64 `json:"timeout"` - InvalidPercent float32 `json:"invalidPercent"` - CheckThreshold uint32 `json:"checkThreshold"` - MalformedLimit uint32 `json:"malformedLimit"` -} - -type Limits struct { - Enabled bool `json:"enabled"` - Limit int32 `json:"limit"` - Grace string `json:"grace"` - LimitJump int32 `json:"limitJump"` -} diff --git a/go-pool/storage/redis.go b/go-pool/storage/redis.go deleted file mode 100644 index 8592fce..0000000 --- a/go-pool/storage/redis.go +++ /dev/null @@ -1,135 +0,0 @@ -package storage - -import ( - "log" - "strconv" - "strings" - "time" - - "gopkg.in/redis.v3" - - "../pool" -) - -type RedisClient struct { - client *redis.Client - prefix string -} - -func NewRedisClient(cfg *pool.Redis, prefix string) *RedisClient { - client := redis.NewClient(&redis.Options{ - Addr: cfg.Endpoint, - Password: cfg.Password, - DB: cfg.Database, - PoolSize: cfg.PoolSize, - }) - return &RedisClient{client: client, prefix: prefix} -} - -func (r *RedisClient) Check() { - pong, err := r.client.Ping().Result() - if err != nil { - log.Fatalf("Can't establish Redis connection: %v", err) - } - log.Printf("Redis PING command reply: %v", pong) -} - -// Always returns list of addresses. If Redis fails it will return empty list. -func (r *RedisClient) GetBlacklist() []string { - cmd := r.client.SMembers(r.formatKey("blacklist")) - if cmd.Err() != nil { - log.Printf("Failed to get blacklist from Redis: %v", cmd.Err()) - return []string{} - } - return cmd.Val() -} - -// Always returns list of IPs. If Redis fails it will return empty list. -func (r *RedisClient) GetWhitelist() []string { - cmd := r.client.SMembers(r.formatKey("whitelist")) - if cmd.Err() != nil { - log.Printf("Failed to get blacklist from Redis: %v", cmd.Err()) - return []string{} - } - return cmd.Val() -} - -func (r *RedisClient) WriteShare(login string, diff int64) { - tx := r.client.Multi() - defer tx.Close() - - ms := time.Now().UnixNano() / 1000000 - ts := ms / 1000 - - _, err := tx.Exec(func() error { - r.writeShare(tx, ms, ts, login, diff) - return nil - }) - if err != nil { - log.Printf("Failed to insert share data into Redis: %v", err) - } -} - -func (r *RedisClient) WriteBlock(login string, diff, roundDiff, height int64, hashHex string) { - tx := r.client.Multi() - defer tx.Close() - - ms := time.Now().UnixNano() / 1000000 - ts := ms / 1000 - - cmds, err := tx.Exec(func() error { - r.writeShare(tx, ms, ts, login, diff) - tx.HSet(r.formatKey("stats"), "lastBlockFound", strconv.FormatInt(ms, 10)) - tx.ZIncrBy(r.formatKey("finders"), 1, login) - tx.Rename(r.formatKey("shares", "roundCurrent"), r.formatKey("shares", formatRound(height))) - tx.HGetAllMap(r.formatKey("shares", formatRound(height))) - return nil - }) - if err != nil { - log.Printf("Failed to insert block candidate into Redis: %v", err) - } else { - sharesMap, _ := cmds[7].(*redis.StringStringMapCmd).Result() - totalShares := int64(0) - for _, v := range sharesMap { - n, _ := strconv.ParseInt(v, 10, 64) - totalShares += n - } - s := join(hashHex, ts, roundDiff, totalShares) - cmd := r.client.ZAdd(r.formatKey("blocks", "candidates"), redis.Z{Score: float64(height), Member: s}) - if cmd.Err() != nil { - log.Printf("Failed to insert block candidate shares into Redis: %v", cmd.Err()) - } else { - log.Printf("Inserted block to Redis, height: %v, variance: %v/%v, %v", height, totalShares, roundDiff, cmd.Val()) - } - } -} - -func (r *RedisClient) writeShare(tx *redis.Multi, ms, ts int64, login string, diff int64) { - tx.HIncrBy(r.formatKey("shares", "roundCurrent"), login, diff) - tx.ZAdd(r.formatKey("hashrate"), redis.Z{Score: float64(ts), Member: join(diff, login, ms)}) - tx.HIncrBy(r.formatKey("workers", login), "hashes", diff) - tx.HSet(r.formatKey("workers", login), "lastShare", strconv.FormatInt(ts, 10)) -} - -func (r *RedisClient) formatKey(args ...interface{}) string { - return join(r.prefix, join(args...)) -} - -func formatRound(height int64) string { - return "round" + strconv.FormatInt(height, 10) -} - -func join(args ...interface{}) string { - s := make([]string, len(args)) - for i, v := range args { - switch v.(type) { - case string: - s[i] = v.(string) - case int64: - s[i] = strconv.FormatInt(v.(int64), 10) - default: - panic("Invalid type specified for conversion") - } - } - return strings.Join(s, ":") -} diff --git a/go-pool/storage/redis_test.go b/go-pool/storage/redis_test.go deleted file mode 100644 index 5d36162..0000000 --- a/go-pool/storage/redis_test.go +++ /dev/null @@ -1,74 +0,0 @@ -package storage - -import ( - "gopkg.in/redis.v3" - "os" - "reflect" - "strings" - "testing" -) - -var r *RedisClient - -func TestMain(m *testing.M) { - client := redis.NewClient(&redis.Options{Addr: "127.0.0.1:6379"}) - r = &RedisClient{client: client, prefix: "test"} - r.client.FlushAll() - - os.Exit(m.Run()) -} - -func TestWriteBlock(t *testing.T) { - r.client.FlushAll() - r.WriteBlock("addy", 1000, 999, 0, "abcdef") - - sharesRes := r.client.HGetAllMap("test:shares:round0").Val() - expectedRound := map[string]string{"addy": "1000"} - if !reflect.DeepEqual(sharesRes, expectedRound) { - t.Errorf("Invalid round data: %v", sharesRes) - } - - blockRes := r.client.ZRevRangeWithScores("test:blocks:candidates", 0, 99999).Val() - blockRes = stripTimestampsFromZs(blockRes) - expectedCandidates := []redis.Z{redis.Z{0, "abcdef:*:999:1000"}} - if !reflect.DeepEqual(blockRes, expectedCandidates) { - t.Errorf("Invalid candidates data: %v, expected: %v", blockRes, expectedCandidates) - } -} - -func TestWriteBlockAtSameHeight(t *testing.T) { - r.client.FlushAll() - r.WriteBlock("addy", 1000, 999, 1, "00000000") - r.WriteBlock("addy", 2000, 999, 1, "00000001") - r.WriteBlock("addy", 3000, 999, 1, "00000002") - - sharesRes := r.client.HGetAllMap("test:shares:round1").Val() - expectedRound := map[string]string{"addy": "3000"} - if !reflect.DeepEqual(sharesRes, expectedRound) { - t.Errorf("Invalid round data: %v", sharesRes) - } - - blockRes := r.client.ZRevRangeWithScores("test:blocks:candidates", 0, 99999).Val() - blockRes = stripTimestampsFromZs(blockRes) - expectedBlocks := []redis.Z{redis.Z{1, "00000002:*:999:3000"}, redis.Z{1, "00000001:*:999:2000"}, redis.Z{1, "00000000:*:999:1000"}} - if len(blockRes) != 3 { - t.Errorf("Invalid number of candidates: %v, expected: %v", len(blockRes), 3) - } - if !reflect.DeepEqual(blockRes, expectedBlocks) { - t.Errorf("Invalid candidates data: %v, expected: %v", blockRes, expectedBlocks) - } -} - -func stripTimestampFromZ(z redis.Z) redis.Z { - k := strings.Split(z.Member.(string), ":") - res := []string{k[0], "*", k[2], k[3]} - return redis.Z{Score: z.Score, Member: strings.Join(res, ":")} -} - -func stripTimestampsFromZs(zs []redis.Z) []redis.Z { - var res []redis.Z - for _, z := range zs { - res = append(res, stripTimestampFromZ(z)) - } - return res -} diff --git a/go-pool/stratum/handlers.go b/go-pool/stratum/handlers.go index 64adf62..377fa02 100644 --- a/go-pool/stratum/handlers.go +++ b/go-pool/stratum/handlers.go @@ -20,11 +20,6 @@ func (s *StratumServer) handleLoginRPC(cs *Session, params *LoginParams) (reply return } - if !s.policy.ApplyLoginPolicy(params.Login, cs.ip) { - errorReply = &ErrorReply{Code: -1, Message: "Your address blacklisted", Close: true} - return - } - miner := NewMiner(params.Login, params.Pass, s.port.Difficulty, cs.ip) miner.Session = cs s.registerMiner(miner) @@ -83,8 +78,6 @@ func (s *StratumServer) handleSubmitRPC(cs *Session, params *SubmitParams) (repl } validShare := miner.processShare(s, job, t, nonce, params.Result) - ok = s.policy.ApplySharePolicy(miner.IP, validShare) - if !validShare { errorReply = &ErrorReply{Code: -1, Message: "Low difficulty share", Close: !ok} return diff --git a/go-pool/stratum/miner.go b/go-pool/stratum/miner.go index 3c80c42..9a1218a 100644 --- a/go-pool/stratum/miner.go +++ b/go-pool/stratum/miner.go @@ -126,7 +126,6 @@ func (m *Miner) processShare(s *StratumServer, job *Job, t *BlockTemplate, nonce log.Printf("Block submission failure at height %v: %v", t.Height, err) } else { blockFastHash := hex.EncodeToString(hashing.FastHash(convertedBlob)) - s.storage.WriteBlock(m.Login, job.Difficulty, t.Difficulty, t.Height, blockFastHash) // Immediately refresh current BT and send new jobs s.refreshBlockTemplate(true) log.Printf("Block %v found at height %v by miner %v@%v", blockFastHash[0:6], t.Height, m.Login, m.IP) @@ -134,8 +133,6 @@ func (m *Miner) processShare(s *StratumServer, job *Job, t *BlockTemplate, nonce } else if hashDiff < job.Difficulty { log.Printf("Rejected low difficulty share of %v from %v@%v", hashDiff, m.Login, m.IP) return false - } else { - s.storage.WriteShare(m.Login, job.Difficulty) } log.Printf("Valid share at difficulty %v/%v", s.port.Difficulty, hashDiff) diff --git a/go-pool/stratum/stratum.go b/go-pool/stratum/stratum.go index ad80b4f..22a5818 100644 --- a/go-pool/stratum/stratum.go +++ b/go-pool/stratum/stratum.go @@ -15,8 +15,6 @@ import ( "../pool" "../rpc" - "../storage" - "./policy" ) type StratumServer struct { @@ -28,8 +26,6 @@ type StratumServer struct { rpc *rpc.RPCClient timeout time.Duration broadcastTimer *time.Timer - storage *storage.RedisClient - policy *policy.PolicyServer } type Session struct { @@ -43,17 +39,16 @@ const ( MaxReqSize = 10 * 1024 ) -func NewStratum(cfg *pool.Config, port pool.Port, storage *storage.RedisClient, policy *policy.PolicyServer) *StratumServer { +func NewStratum(cfg *pool.Config, port pool.Port) *StratumServer { b := make([]byte, 4) _, err := rand.Read(b) if err != nil { log.Fatalf("Can't seed with random bytes: %v", err) } - stratum := &StratumServer{config: cfg, port: port, policy: policy, instanceId: b} + stratum := &StratumServer{config: cfg, port: port, instanceId: b} stratum.rpc = rpc.NewRPCClient(cfg) stratum.miners = NewMinersMap() - stratum.storage = storage timeout, _ := time.ParseDuration(cfg.Stratum.Timeout) stratum.timeout = timeout @@ -97,11 +92,6 @@ func (s *StratumServer) Listen() { } conn.SetKeepAlive(true) ip, _, _ := net.SplitHostPort(conn.RemoteAddr().String()) - ok := s.policy.ApplyLimitPolicy(ip) - if !ok { - conn.Close() - continue - } n += 1 accept <- n @@ -141,7 +131,6 @@ func (s *StratumServer) handleClient(conn *net.TCPConn, ip string) error { var req JSONRpcReq err = json.Unmarshal(data, &req) if err != nil { - s.policy.ApplyMalformedPolicy(ip) log.Printf("Malformed request: %v", err) return err } diff --git a/main.go b/main.go index 83491e8..1fd47ce 100644 --- a/main.go +++ b/main.go @@ -10,9 +10,7 @@ import ( "time" "./go-pool/pool" - "./go-pool/storage" "./go-pool/stratum" - "./go-pool/stratum/policy" "github.com/yvasiyarov/gorelic" ) @@ -29,13 +27,9 @@ func startStratum() { log.Printf("Running with default %v threads", n) } - storage := storage.NewRedisClient(&cfg.Redis, cfg.Coin) - storage.Check() - policy := policy.Start(&cfg, storage) - quit := make(chan bool) for _, port := range cfg.Stratum.Ports { - s := stratum.NewStratum(&cfg, port, storage, policy) + s := stratum.NewStratum(&cfg, port) go func() { s.Listen()