Browse Source

Start audit feature and fix race when adding new twistees

master
Lyndsay Roger 9 years ago
parent
commit
d26cb31732
  1. 73
      crawler.go
  2. 49
      dnsseeder.go
  3. 15
      main.go

73
crawler.go

@ -1,7 +1,6 @@ @@ -1,7 +1,6 @@
package main
import (
"errors"
"log"
"net"
"strconv"
@ -10,6 +9,16 @@ import ( @@ -10,6 +9,16 @@ import (
"github.com/btcsuite/btcd/wire"
)
type CrawlError struct {
errLoc string
Err error
}
// Error returns a formatted error about a crawl
func (e *CrawlError) Error() string {
return "err: " + e.errLoc + ": " + e.Err.Error()
}
// crawlTwistee runs in a goroutine, crawls the remote ip and updates the master
// list of currently active addresses
func crawlTwistee(tw *Twistee) {
@ -29,35 +38,41 @@ func crawlTwistee(tw *Twistee) { @@ -29,35 +38,41 @@ func crawlTwistee(tw *Twistee) {
}
// connect to the remote ip and ask them for their addr list
ras, err := crawlIP(tw)
if err != nil {
ras, e := crawlIP(tw)
if e != nil {
// update the fact that we have not connected to this twistee
tw.lastTry = time.Now()
tw.connectFails++
tw.statusStr = e.Error()
// update the status of this failed twistee
switch tw.status {
case statusRG:
if tw.rating += 25; tw.rating > 30 {
tw.status = statusWG
tw.statusTime = time.Now()
}
case statusCG:
if tw.rating += 25; tw.rating >= 50 {
tw.status = statusWG
tw.statusTime = time.Now()
}
case statusWG:
if tw.rating += 30; tw.rating >= 100 {
tw.status = statusNG // not able to connect to this twistee so ignore
tw.statusTime = time.Now()
}
}
// no more to do so return which will shutdown the goroutine & call
// the deffered cleanup
if config.verbose {
log.Printf("debug - failed crawl: twistee %s failcount: %v newstatus: %v:%v\n",
log.Printf("debug - failed crawl: twistee %s s:r:f: %v:%v:%v %s\n",
net.JoinHostPort(tw.na.IP.String(),
strconv.Itoa(int(tw.na.Port))),
tw.connectFails,
tw.status,
tw.rating)
tw.rating,
tw.connectFails,
tw.statusStr)
}
return
}
@ -65,11 +80,13 @@ func crawlTwistee(tw *Twistee) { @@ -65,11 +80,13 @@ func crawlTwistee(tw *Twistee) {
// succesful connection and addresses received so mark status
if tw.status != statusCG {
tw.status = statusCG
tw.statusTime = time.Now()
}
tw.rating = 0
tw.connectFails = 0
tw.lastConnect = time.Now()
tw.lastTry = time.Now()
tw.statusStr = "ok: received remote address list"
added := 0
@ -103,7 +120,7 @@ func crawlEnd(tw *Twistee) { @@ -103,7 +120,7 @@ func crawlEnd(tw *Twistee) {
}
// crawlIP retrievs a slice of ip addresses from a client
func crawlIP(tw *Twistee) ([]*wire.NetAddress, error) {
func crawlIP(tw *Twistee) ([]*wire.NetAddress, *CrawlError) {
ip := tw.na.IP.String()
port := strconv.Itoa(int(tw.na.Port))
@ -115,7 +132,7 @@ func crawlIP(tw *Twistee) ([]*wire.NetAddress, error) { @@ -115,7 +132,7 @@ func crawlIP(tw *Twistee) ([]*wire.NetAddress, error) {
if config.debug {
log.Printf("error - Could not connect to %s - %v\n", ip, err)
}
return nil, err
return nil, &CrawlError{"", err}
}
defer conn.Close()
@ -127,23 +144,20 @@ func crawlIP(tw *Twistee) ([]*wire.NetAddress, error) { @@ -127,23 +144,20 @@ func crawlIP(tw *Twistee) ([]*wire.NetAddress, error) {
// last parameter is lastblock
msgver, err := wire.NewMsgVersionFromConn(conn, NOUNCE, 0)
if err != nil {
log.Printf("error - NewMsgVer from conn: %v\n", err)
return nil, err
return nil, &CrawlError{"Create NewMsgVersionFromConn", err}
}
err = wire.WriteMessage(conn, msgver, PVER, TWISTNET)
if err != nil {
// Log and handle the error
log.Printf("error - %s:%s Write Message: %v\n", ip, port, err)
return nil, err
return nil, &CrawlError{"Write Version Message", err}
}
// first message received should be version
msg, _, err := wire.ReadMessage(conn, PVER, TWISTNET)
if err != nil {
// Log and handle the error
log.Printf("error - %s:%s Read Message after sending version: %v\n", ip, port, err)
return nil, err
return nil, &CrawlError{"Read message after sending Version", err}
}
switch msg := msg.(type) {
@ -153,10 +167,7 @@ func crawlIP(tw *Twistee) ([]*wire.NetAddress, error) { @@ -153,10 +167,7 @@ func crawlIP(tw *Twistee) ([]*wire.NetAddress, error) {
log.Printf("%s - Remote version: %v\n", ip, msg.ProtocolVersion)
}
default:
if config.debug {
log.Printf("error: expected Version Message but received: %v\n", msg.Command())
}
return nil, errors.New("Error. Did not receive expected Version message from remote client")
return nil, &CrawlError{"Did not receive expected Version message from remote client", err}
}
// FIXME - update twistee client version with what they just said
@ -166,30 +177,22 @@ func crawlIP(tw *Twistee) ([]*wire.NetAddress, error) { @@ -166,30 +177,22 @@ func crawlIP(tw *Twistee) ([]*wire.NetAddress, error) {
err = wire.WriteMessage(conn, msgverack, PVER, TWISTNET)
if err != nil {
// Log and handle the error
log.Printf("error - %s:%s Writing Message Ver Ack failed: %v\n", ip, port, err)
return nil, err
return nil, &CrawlError{"writing message VerAck", err}
}
// second message received should be verack
msg, _, err = wire.ReadMessage(conn, PVER, TWISTNET)
if err != nil {
// Log and handle the error
log.Printf("error - %s:%s Reading Message expected Ver Ack: %v\n", ip, port, err)
return nil, err
return nil, &CrawlError{"reading expected Ver Ack from remote client", err}
}
switch msg := msg.(type) {
switch msg.(type) {
case *wire.MsgVerAck:
if config.debug {
// The message is a pointer to a MsgVersion struct.
log.Printf("%s - received Version Ack\n", ip)
}
default:
if config.debug {
log.Printf("error: expected Version Ack Message but received: %v\n", msg.Command())
}
return nil, errors.New("Error. Did not receive expected Version Ack message from remote client")
return nil, &CrawlError{"Did not receive expected Ver Ack message from remote client", err}
}
// send getaddr command
@ -197,9 +200,7 @@ func crawlIP(tw *Twistee) ([]*wire.NetAddress, error) { @@ -197,9 +200,7 @@ func crawlIP(tw *Twistee) ([]*wire.NetAddress, error) {
err = wire.WriteMessage(conn, msgGetAddr, PVER, TWISTNET)
if err != nil {
// Log and handle the error
log.Printf("error - %s:%s writing message Get Addr: %v\n", ip, port, err)
return nil, err
return nil, &CrawlError{"writing Addr message to remote client", err}
}
c := 0
@ -217,8 +218,8 @@ func crawlIP(tw *Twistee) ([]*wire.NetAddress, error) { @@ -217,8 +218,8 @@ func crawlIP(tw *Twistee) ([]*wire.NetAddress, error) {
if config.debug {
log.Printf("%s - received valid addr message\n", ip)
}
return msg.AddrList, nil
dowhile = false
return msg.AddrList, nil
default:
if config.debug {
log.Printf("%s - ignoring message - %v\n", ip, msg.Command())
@ -231,8 +232,8 @@ func crawlIP(tw *Twistee) ([]*wire.NetAddress, error) { @@ -231,8 +232,8 @@ func crawlIP(tw *Twistee) ([]*wire.NetAddress, error) {
}
}
// should never get here but need a return command
return nil, errors.New("FIXME - something went wrong and did not get an Addr response")
// received too many messages before requested Addr
return nil, &CrawlError{"message loop - did not receive remote addresses in first 25 messages from remote client", err}
}
/*

49
dnsseeder.go

@ -23,9 +23,7 @@ const ( @@ -23,9 +23,7 @@ const (
TWSTDPORT = 28333 // standard port twister listens on
MAXGO = 10 // max number of goroutines to start in one run
MAXFAILS = 35 // max number of connect fails before we delete a twistee
MAXFAILS = 55 // max number of connect fails before we delete a twistee. Just over 24 hours(checked every 33 minutes)
// DNS Type. Is this twistee using v4/v6 and standard or non standard ports
DNSV4STD = 1
@ -42,6 +40,7 @@ const ( @@ -42,6 +40,7 @@ const (
)
type Seeder struct {
uptime time.Time
theList map[string]*Twistee
mtx sync.RWMutex
}
@ -52,10 +51,12 @@ type Twistee struct { @@ -52,10 +51,12 @@ type Twistee struct {
lastConnect time.Time
lastTry time.Time
crawlStart time.Time
statusTime time.Time
crawlActive bool
connectFails uint32
clientVersion int32
clientSubVersion string
statusStr string
status uint32 // rg,cg,wg,ng
rating uint32 // if it reaches 100 then we ban them
nonstdIP net.IP
@ -118,7 +119,7 @@ func (s *Seeder) startCrawlers() { @@ -118,7 +119,7 @@ func (s *Seeder) startCrawlers() {
{"statusRG", statusRG, 10, 0, 0, 184},
{"statusCG", statusCG, 10, 0, 0, 325},
{"statusWG", statusWG, 10, 0, 0, 237},
{"statusNG", statusNG, 20, 0, 0, 3654},
{"statusNG", statusNG, 20, 0, 0, 1876},
}
s.mtx.RLock()
@ -200,6 +201,7 @@ func (s *Seeder) addNa(nNa *wire.NetAddress) bool { @@ -200,6 +201,7 @@ func (s *Seeder) addNa(nNa *wire.NetAddress) bool {
lastConnect: time.Now(),
clientVersion: 0, // FIXME - need to get from the crawl somehow
status: statusRG,
statusTime: time.Now(),
dnsType: DNSV4STD,
}
@ -231,7 +233,11 @@ func (s *Seeder) addNa(nNa *wire.NetAddress) bool { @@ -231,7 +233,11 @@ func (s *Seeder) addNa(nNa *wire.NetAddress) bool {
// generate the key and add to theList
k := net.JoinHostPort(nNa.IP.String(), strconv.Itoa(int(nNa.Port)))
s.mtx.Lock()
s.theList[k] = &nt
// final check to make sure another twistee & goroutine has not already added this twistee
// FIXME migrate to use channels
if _, dup := s.theList[k]; dup == false {
s.theList[k] = &nt
}
s.mtx.Unlock()
return true
@ -270,18 +276,41 @@ func crc16(bs []byte) uint16 { @@ -270,18 +276,41 @@ func crc16(bs []byte) uint16 {
return crc
}
func (s *Seeder) purgeNG() {
func (s *Seeder) auditTwistees() {
c := 0
log.Printf("status - Audit start. System Uptime: %s\n", time.Since(s.uptime).String())
s.mtx.Lock()
defer s.mtx.Unlock()
for k, tw := range s.theList {
if tw.status != statusNG {
continue
if tw.crawlActive == true {
if time.Now().Unix()-tw.crawlStart.Unix() >= 300 {
log.Printf("warning - long running crawl > 5 minutes ====\n- %s status:rating:fails %v:%v:%v crawl start: %s last status: %s\n====\n",
k,
tw.status,
tw.rating,
tw.connectFails,
tw.crawlStart.String(),
tw.statusStr)
}
}
if tw.status == statusRG || tw.status == statusWG {
if time.Now().Unix()-tw.statusTime.Unix() >= 900 {
log.Printf("warning - unchanged status > 15 minutes ====\n- %s status:rating:fails %v:%v:%v last status change: %s last status: %s\n====\n",
k,
tw.status,
tw.rating,
tw.connectFails,
tw.statusTime.String(),
tw.statusStr)
}
}
if tw.connectFails > MAXFAILS {
// last audit task is to remove twistees that we can not connect to
if tw.status == statusNG && tw.connectFails > MAXFAILS {
if config.verbose {
log.Printf("status - purging twistee %s after %v failed connections\n", k, tw.connectFails)
}
@ -295,7 +324,7 @@ func (s *Seeder) purgeNG() { @@ -295,7 +324,7 @@ func (s *Seeder) purgeNG() {
}
if config.verbose {
log.Printf("status - purging complete. %v twistees purged\n", c)
log.Printf("status - Audit complete. %v twistees purged\n", c)
}
}

15
main.go

@ -49,9 +49,14 @@ func main() { @@ -49,9 +49,14 @@ func main() {
log.Printf("Starting dnsseeder system for host %s.\n", config.host)
if config.verbose == false {
log.Printf("status - Running in quiet mode with limited output produced\n")
}
// FIXME - setup/make the data structures in Seeder
config.seeder = &Seeder{}
config.seeder.theList = make(map[string]*Twistee)
config.seeder.uptime = time.Now()
// start dns server
dns.HandleFunc("nonstd."+config.host, handleDNSNon)
@ -72,18 +77,18 @@ func main() { @@ -72,18 +77,18 @@ func main() {
// used to start crawlers on a regular basis
crawlChan := time.NewTicker(time.Second * 22).C
// used to remove old statusNG twistees that have reached fail count
purgeChan := time.NewTicker(time.Hour * 2).C
auditChan := time.NewTicker(time.Hour * 1).C
dowhile := true
for dowhile == true {
select {
case <-sig:
dowhile = false
case <-purgeChan:
case <-auditChan:
if config.debug {
log.Printf("debug - purge old statusNG twistees timer triggered\n")
log.Printf("debug - Audit twistees timer triggered\n")
}
config.seeder.purgeNG()
config.seeder.auditTwistees()
case <-dnsChan:
if config.debug {
log.Printf("debug - DNS - Updating latest ip addresses timer triggered\n")
@ -91,7 +96,7 @@ func main() { @@ -91,7 +96,7 @@ func main() {
config.seeder.loadDNS()
case <-crawlChan:
if config.debug {
log.Printf("debug - start crawlers timer triggered\n")
log.Printf("debug - Start crawlers timer triggered\n")
}
config.seeder.startCrawlers()
}

Loading…
Cancel
Save