From 132bd7ce884334c26e6bf82eafbb76d91aac2f77 Mon Sep 17 00:00:00 2001 From: Lyndsay Roger Date: Thu, 1 Oct 2015 09:01:10 +1300 Subject: [PATCH] Redesign seeder logic to remove races --- crawler.go | 130 ++++++++--------------------------------- main.go | 62 ++++++-------------- node.go | 8 +-- seeder.go | 165 ++++++++++++++++++++++++++++++++++++++++++++++++++++- 4 files changed, 209 insertions(+), 156 deletions(-) diff --git a/crawler.go b/crawler.go index beecb08..581e7db 100644 --- a/crawler.go +++ b/crawler.go @@ -22,130 +22,46 @@ func (e *crawlError) Error() string { // crawlNode runs in a goroutine, crawls the remote ip and updates the master // list of currently active addresses -func crawlNode(s *dnsseeder, nd *node) { - - nd.crawlActive = true - nd.crawlStart = time.Now() - - defer crawlEnd(nd) - - if config.debug { - log.Printf("%s - debug - start crawl: node %s status: %v:%v lastcrawl: %s\n", - s.name, - net.JoinHostPort(nd.na.IP.String(), - strconv.Itoa(int(nd.na.Port))), - nd.status, - nd.rating, - time.Since(nd.crawlStart).String()) - } +func crawlNode(rc chan *result, s *dnsseeder, nd *node) { // connect to the remote ip and ask them for their addr list - rna, e := s.crawlIP(nd) + rna, e := crawlIP(s, nd) - if e != nil { - // update the fact that we have not connected to this node - nd.lastTry = time.Now() - nd.connectFails++ - nd.statusStr = e.Error() - - // update the status of this failed node - switch nd.status { - case statusRG: - // if we are full then any RG failures will skip directly to NG - if s.isFull() { - nd.status = statusNG // not able to connect to this node so ignore - nd.statusTime = time.Now() - } else { - if nd.rating += 25; nd.rating > 30 { - nd.status = statusWG - nd.statusTime = time.Now() - } - } - case statusCG: - if nd.rating += 25; nd.rating >= 50 { - nd.status = statusWG - nd.statusTime = time.Now() - } - case statusWG: - if nd.rating += 15; nd.rating >= 100 { - nd.status = statusNG // not able to connect to this node so ignore - nd.statusTime = time.Now() - } - } - // no more to do so return which will shutdown the goroutine & call - // the deffered cleanup - if config.verbose { - log.Printf("%s: failed crawl node: %s s:r:f: %v:%v:%v %s\n", - s.name, - net.JoinHostPort(nd.na.IP.String(), - strconv.Itoa(int(nd.na.Port))), - nd.status, - nd.rating, - nd.connectFails, - nd.statusStr) - } - return + res := &result{ + nas: rna, + msg: e, + node: net.JoinHostPort(nd.na.IP.String(), strconv.Itoa(int(nd.na.Port))), } - // succesful connection and addresses received so mark status - if nd.status != statusCG { - nd.status = statusCG - nd.statusTime = time.Now() - } - cs := nd.lastConnect - nd.rating = 0 - nd.connectFails = 0 - nd.lastConnect = time.Now() - nd.lastTry = time.Now() - nd.statusStr = "ok: received remote address list" - - added := 0 - // do not accept more than one third of maxSize addresses from one node - oneThird := int(float64(s.maxSize / 3)) - - // if we are full then skip adding more possible clients - if s.isFull() == false { - // loop through all the received network addresses and add to thelist if not present - for _, na := range rna { - // a new network address so add to the system - if x := s.addNa(na); x == true { - if added++; added > oneThird { - break - } - } - } - } - - if config.verbose { - log.Printf("%s: crawl done: node: %s s:r:f: %v:%v:%v addr: %v:%v CrawlTime: %s Last connect: %v ago\n", - s.name, - net.JoinHostPort(nd.na.IP.String(), - strconv.Itoa(int(nd.na.Port))), - nd.status, - nd.rating, - nd.connectFails, - len(rna), - added, - time.Since(nd.crawlStart).String(), - time.Since(cs).String()) + if e != nil { + res.success = true } - // goroutine ends. deffered cleanup runs -} + // all done so push the result back to the seeder. + //This will block until the seeder reads the result + rc <- res -// crawlEnd is a deffered func to update theList after a crawl is all done -func crawlEnd(nd *node) { - nd.crawlActive = false + // goroutine will end and be cleaned up } // crawlIP retrievs a slice of ip addresses from a client -func (s *dnsseeder) crawlIP(nd *node) ([]*wire.NetAddress, *crawlError) { +func crawlIP(s *dnsseeder, nd *node) ([]*wire.NetAddress, *crawlError) { ip := nd.na.IP.String() port := strconv.Itoa(int(nd.na.Port)) + // get correct formatting for ipv6 addresses dialString := net.JoinHostPort(ip, port) + if config.debug { + log.Printf("%s - debug - start crawl: node %s status: %v:%v lastcrawl: %s\n", + s.name, + dialString, + nd.status, + nd.rating, + time.Since(nd.crawlStart).String()) + } + conn, err := net.DialTimeout("tcp", dialString, time.Second*10) if err != nil { if config.debug { diff --git a/main.go b/main.go index b99ff53..f7ec5f2 100644 --- a/main.go +++ b/main.go @@ -28,19 +28,19 @@ type NodeCounts struct { // configData holds information on the application type configData struct { + dnsUnknown uint64 // the number of dns requests for we are not configured to handle uptime time.Time // application start time port string // port for the dns server to listen on http string // port for the web server to listen on version string // application version - verbose bool // verbose output cmdline option - debug bool // debug cmdline option - stats bool // stats cmdline option seeders map[string]*dnsseeder // holds a pointer to all the current seeders smtx sync.RWMutex // protect the seeders map order []string // the order of loading the netfiles so we can display in this order dns map[string][]dns.RR // holds details of all the currently served dns records dnsmtx sync.RWMutex // protect the dns map - dnsUnknown uint64 // the number of dns requests for we are not configured to handle + verbose bool // verbose output cmdline option + debug bool // debug cmdline option + stats bool // stats cmdline option } var config configData @@ -50,7 +50,6 @@ func main() { var j bool - // FIXME - update with git hash during build config.version = "0.8.0" config.uptime = time.Now() @@ -118,51 +117,28 @@ func main() { go serve("udp", config.port) //go serve("tcp", config.port) - // seed the seeder with some ip addresses + var wg sync.WaitGroup + + done := make(chan struct{}) + // start a goroutine for each seeder for _, s := range config.seeders { s.initCrawlers() - s.startCrawlers() + wg.Add(1) + go s.runSeeder(done, &wg) } - sig := make(chan os.Signal) + sig := make(chan os.Signal, 1) signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) - // extract good dns records from all nodes on regular basis - dnsChan := time.NewTicker(time.Second * dnsDelay).C - // used to start crawlers on a regular basis - crawlChan := time.NewTicker(time.Second * crawlDelay).C - // used to remove old statusNG nodes that have reached fail count - auditChan := time.NewTicker(time.Minute * auditDelay).C - - dowhile := true - for dowhile == true { - select { - case <-sig: - dowhile = false - case <-auditChan: - if config.debug { - log.Printf("debug - Audit nodes timer triggered\n") - } - for _, s := range config.seeders { - s.auditNodes() - } - case <-dnsChan: - if config.debug { - log.Printf("debug - DNS - Updating latest ip addresses timer triggered\n") - } - for _, s := range config.seeders { - s.loadDNS() - } - case <-crawlChan: - if config.debug { - log.Printf("debug - Start crawlers timer triggered\n") - } - for _, s := range config.seeders { - s.startCrawlers() - } - } - } + // block until a signal is received + fmt.Println("\nShutting down on signal:", <-sig) + // FIXME - call dns server.Shutdown() + + // close the done channel to signal to all seeders to shutdown + // and wait for them to exit + close(done) + wg.Wait() fmt.Printf("\nProgram exiting. Bye\n") } diff --git a/node.go b/node.go index 95fba6c..e54507d 100644 --- a/node.go +++ b/node.go @@ -14,17 +14,17 @@ type node struct { lastTry time.Time // last time we tried to connect to this client crawlStart time.Time // time when we started the last crawl statusTime time.Time // time the status was last updated - crawlActive bool // are we currently crawling this client - connectFails uint32 // number of times we have failed to connect to this client + nonstdIP net.IP // if not using the default port then this is the encoded ip containing the actual port statusStr string // string with last error or OK details - version int32 // remote client protocol version strVersion string // remote client user agent services wire.ServiceFlag // remote client supported services + connectFails uint32 // number of times we have failed to connect to this client + version int32 // remote client protocol version lastBlock int32 // remote client last block status uint32 // rg,cg,wg,ng rating uint32 // if it reaches 100 then we mark them statusNG - nonstdIP net.IP // if not using the default port then this is the encoded ip containing the actual port dnsType uint32 // what dns type this client is + crawlActive bool // are we currently crawling this client } // status2str will return the string description of the status diff --git a/seeder.go b/seeder.go index a4938da..9c56d73 100644 --- a/seeder.go +++ b/seeder.go @@ -62,6 +62,14 @@ type dnsseeder struct { maxStart []uint32 // max number of goroutines to start each run for each status type delay []int64 // number of seconds to wait before we connect to a known client for each status counts NodeCounts // structure to hold stats for this seeder + shutdown bool // seeder is shutting down +} + +type result struct { + nas []*wire.NetAddress // slice of node addresses returned from a node + msg *crawlError // error string or nil if no problems + node string // theList key to the node that was crawled + success bool // was the crawl successful } // initCrawlers needs to be run before the startCrawlers so it can get @@ -113,9 +121,51 @@ func (s *dnsseeder) initCrawlers() { } } +// runSeeder runs a seeder in an endless goroutine +func (s *dnsseeder) runSeeder(done <-chan struct{}, wg *sync.WaitGroup) { + + defer wg.Done() + + // receive the results from the crawl goroutines + resultsChan := make(chan *result) + + // start initial scan now + s.startCrawlers(resultsChan) + + // used to cleanout and cycle records in theList + auditChan := time.NewTicker(time.Minute * auditDelay).C + // used to start crawlers on a regular basis + crawlChan := time.NewTicker(time.Second * crawlDelay).C + // extract good dns records from all nodes on regular basis + dnsChan := time.NewTicker(time.Second * dnsDelay).C + + dowhile := true + for dowhile == true { + select { + case r := <-resultsChan: + // process a results structure from a crawl + s.processResult(r) + case <-dnsChan: + // update the system with the latest selection of dns records + s.loadDNS() + case <-auditChan: + // keep theList clean and tidy + s.auditNodes() + case <-crawlChan: + // start a scan to crawl nodes + s.startCrawlers(resultsChan) + case <-done: + // done channel closed so exit the select and shutdown the seeder + dowhile = false + } + } + fmt.Printf(".") + // end the goroutine & defer will call wg.Done() +} + // startCrawlers is called on a time basis to start maxcrawlers new // goroutines if there are spare goroutine slots available -func (s *dnsseeder) startCrawlers() { +func (s *dnsseeder) startCrawlers(resultsChan chan *result) { tcount := len(s.theList) if tcount == 0 { @@ -169,8 +219,10 @@ func (s *dnsseeder) startCrawlers() { continue } + nd.crawlActive = true + nd.crawlStart = time.Now() // all looks good so start a go routine to crawl the remote node - go crawlNode(s, nd) + go crawlNode(resultsChan, s, nd) c.started++ } @@ -190,6 +242,115 @@ func (s *dnsseeder) startCrawlers() { // returns and read lock released } +// processResult will add new nodes to the list and update the status of the crawled node +func (s *dnsseeder) processResult(r *result) { + + var nd *node + + if _, ok := s.theList[r.node]; ok { + nd = s.theList[r.node] + } else { + log.Printf("%s: warning - ignoring results from unknown node: %s\n", s.name, r.node) + return + } + + defer crawlEnd(nd) + + //if r.success != true { + if r.msg != nil { + // update the fact that we have not connected to this node + nd.lastTry = time.Now() + nd.connectFails++ + nd.statusStr = r.msg.Error() + + // update the status of this failed node + switch nd.status { + case statusRG: + // if we are full then any RG failures will skip directly to NG + if s.isFull() { + nd.status = statusNG // not able to connect to this node so ignore + nd.statusTime = time.Now() + } else { + if nd.rating += 25; nd.rating > 30 { + nd.status = statusWG + nd.statusTime = time.Now() + } + } + case statusCG: + if nd.rating += 25; nd.rating >= 50 { + nd.status = statusWG + nd.statusTime = time.Now() + } + case statusWG: + if nd.rating += 15; nd.rating >= 100 { + nd.status = statusNG // not able to connect to this node so ignore + nd.statusTime = time.Now() + } + } + // no more to do so return which will shutdown the goroutine & call + // the deffered cleanup + if config.verbose { + log.Printf("%s: failed crawl node: %s s:r:f: %v:%v:%v %s\n", + s.name, + net.JoinHostPort(nd.na.IP.String(), + strconv.Itoa(int(nd.na.Port))), + nd.status, + nd.rating, + nd.connectFails, + nd.statusStr) + } + return + } + + // succesful connection and addresses received so mark status + if nd.status != statusCG { + nd.status = statusCG + nd.statusTime = time.Now() + } + cs := nd.lastConnect + nd.rating = 0 + nd.connectFails = 0 + nd.lastConnect = time.Now() + nd.lastTry = time.Now() + nd.statusStr = "ok: received remote address list" + + added := 0 + // do not accept more than one third of maxSize addresses from one node + oneThird := int(float64(s.maxSize / 3)) + + // if we are full then skip adding more possible clients + if s.isFull() == false { + // loop through all the received network addresses and add to thelist if not present + for _, na := range r.nas { + // a new network address so add to the system + if x := s.addNa(na); x == true { + if added++; added > oneThird { + break + } + } + } + } + + if config.verbose { + log.Printf("%s: crawl done: node: %s s:r:f: %v:%v:%v addr: %v:%v CrawlTime: %s Last connect: %v ago\n", + s.name, + net.JoinHostPort(nd.na.IP.String(), + strconv.Itoa(int(nd.na.Port))), + nd.status, + nd.rating, + nd.connectFails, + len(r.nas), + added, + time.Since(nd.crawlStart).String(), + time.Since(cs).String()) + } + +} + +func crawlEnd(nd *node) { + nd.crawlActive = false +} + // isDup will return true or false depending if the ip exists in theList func (s *dnsseeder) isDup(ipport string) bool { s.mtx.RLock()