Browse Source

Redesign seeder logic to remove races

master
Lyndsay Roger 9 years ago
parent
commit
132bd7ce88
  1. 130
      crawler.go
  2. 62
      main.go
  3. 8
      node.go
  4. 165
      seeder.go

130
crawler.go

@ -22,130 +22,46 @@ func (e *crawlError) Error() string { @@ -22,130 +22,46 @@ func (e *crawlError) Error() string {
// crawlNode runs in a goroutine, crawls the remote ip and updates the master
// list of currently active addresses
func crawlNode(s *dnsseeder, nd *node) {
nd.crawlActive = true
nd.crawlStart = time.Now()
defer crawlEnd(nd)
if config.debug {
log.Printf("%s - debug - start crawl: node %s status: %v:%v lastcrawl: %s\n",
s.name,
net.JoinHostPort(nd.na.IP.String(),
strconv.Itoa(int(nd.na.Port))),
nd.status,
nd.rating,
time.Since(nd.crawlStart).String())
}
func crawlNode(rc chan *result, s *dnsseeder, nd *node) {
// connect to the remote ip and ask them for their addr list
rna, e := s.crawlIP(nd)
rna, e := crawlIP(s, nd)
if e != nil {
// update the fact that we have not connected to this node
nd.lastTry = time.Now()
nd.connectFails++
nd.statusStr = e.Error()
// update the status of this failed node
switch nd.status {
case statusRG:
// if we are full then any RG failures will skip directly to NG
if s.isFull() {
nd.status = statusNG // not able to connect to this node so ignore
nd.statusTime = time.Now()
} else {
if nd.rating += 25; nd.rating > 30 {
nd.status = statusWG
nd.statusTime = time.Now()
}
}
case statusCG:
if nd.rating += 25; nd.rating >= 50 {
nd.status = statusWG
nd.statusTime = time.Now()
}
case statusWG:
if nd.rating += 15; nd.rating >= 100 {
nd.status = statusNG // not able to connect to this node so ignore
nd.statusTime = time.Now()
}
}
// no more to do so return which will shutdown the goroutine & call
// the deffered cleanup
if config.verbose {
log.Printf("%s: failed crawl node: %s s:r:f: %v:%v:%v %s\n",
s.name,
net.JoinHostPort(nd.na.IP.String(),
strconv.Itoa(int(nd.na.Port))),
nd.status,
nd.rating,
nd.connectFails,
nd.statusStr)
}
return
res := &result{
nas: rna,
msg: e,
node: net.JoinHostPort(nd.na.IP.String(), strconv.Itoa(int(nd.na.Port))),
}
// succesful connection and addresses received so mark status
if nd.status != statusCG {
nd.status = statusCG
nd.statusTime = time.Now()
}
cs := nd.lastConnect
nd.rating = 0
nd.connectFails = 0
nd.lastConnect = time.Now()
nd.lastTry = time.Now()
nd.statusStr = "ok: received remote address list"
added := 0
// do not accept more than one third of maxSize addresses from one node
oneThird := int(float64(s.maxSize / 3))
// if we are full then skip adding more possible clients
if s.isFull() == false {
// loop through all the received network addresses and add to thelist if not present
for _, na := range rna {
// a new network address so add to the system
if x := s.addNa(na); x == true {
if added++; added > oneThird {
break
}
}
}
}
if config.verbose {
log.Printf("%s: crawl done: node: %s s:r:f: %v:%v:%v addr: %v:%v CrawlTime: %s Last connect: %v ago\n",
s.name,
net.JoinHostPort(nd.na.IP.String(),
strconv.Itoa(int(nd.na.Port))),
nd.status,
nd.rating,
nd.connectFails,
len(rna),
added,
time.Since(nd.crawlStart).String(),
time.Since(cs).String())
if e != nil {
res.success = true
}
// goroutine ends. deffered cleanup runs
}
// all done so push the result back to the seeder.
//This will block until the seeder reads the result
rc <- res
// crawlEnd is a deffered func to update theList after a crawl is all done
func crawlEnd(nd *node) {
nd.crawlActive = false
// goroutine will end and be cleaned up
}
// crawlIP retrievs a slice of ip addresses from a client
func (s *dnsseeder) crawlIP(nd *node) ([]*wire.NetAddress, *crawlError) {
func crawlIP(s *dnsseeder, nd *node) ([]*wire.NetAddress, *crawlError) {
ip := nd.na.IP.String()
port := strconv.Itoa(int(nd.na.Port))
// get correct formatting for ipv6 addresses
dialString := net.JoinHostPort(ip, port)
if config.debug {
log.Printf("%s - debug - start crawl: node %s status: %v:%v lastcrawl: %s\n",
s.name,
dialString,
nd.status,
nd.rating,
time.Since(nd.crawlStart).String())
}
conn, err := net.DialTimeout("tcp", dialString, time.Second*10)
if err != nil {
if config.debug {

62
main.go

@ -28,19 +28,19 @@ type NodeCounts struct { @@ -28,19 +28,19 @@ type NodeCounts struct {
// configData holds information on the application
type configData struct {
dnsUnknown uint64 // the number of dns requests for we are not configured to handle
uptime time.Time // application start time
port string // port for the dns server to listen on
http string // port for the web server to listen on
version string // application version
verbose bool // verbose output cmdline option
debug bool // debug cmdline option
stats bool // stats cmdline option
seeders map[string]*dnsseeder // holds a pointer to all the current seeders
smtx sync.RWMutex // protect the seeders map
order []string // the order of loading the netfiles so we can display in this order
dns map[string][]dns.RR // holds details of all the currently served dns records
dnsmtx sync.RWMutex // protect the dns map
dnsUnknown uint64 // the number of dns requests for we are not configured to handle
verbose bool // verbose output cmdline option
debug bool // debug cmdline option
stats bool // stats cmdline option
}
var config configData
@ -50,7 +50,6 @@ func main() { @@ -50,7 +50,6 @@ func main() {
var j bool
// FIXME - update with git hash during build
config.version = "0.8.0"
config.uptime = time.Now()
@ -118,51 +117,28 @@ func main() { @@ -118,51 +117,28 @@ func main() {
go serve("udp", config.port)
//go serve("tcp", config.port)
// seed the seeder with some ip addresses
var wg sync.WaitGroup
done := make(chan struct{})
// start a goroutine for each seeder
for _, s := range config.seeders {
s.initCrawlers()
s.startCrawlers()
wg.Add(1)
go s.runSeeder(done, &wg)
}
sig := make(chan os.Signal)
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
// extract good dns records from all nodes on regular basis
dnsChan := time.NewTicker(time.Second * dnsDelay).C
// used to start crawlers on a regular basis
crawlChan := time.NewTicker(time.Second * crawlDelay).C
// used to remove old statusNG nodes that have reached fail count
auditChan := time.NewTicker(time.Minute * auditDelay).C
dowhile := true
for dowhile == true {
select {
case <-sig:
dowhile = false
case <-auditChan:
if config.debug {
log.Printf("debug - Audit nodes timer triggered\n")
}
for _, s := range config.seeders {
s.auditNodes()
}
case <-dnsChan:
if config.debug {
log.Printf("debug - DNS - Updating latest ip addresses timer triggered\n")
}
for _, s := range config.seeders {
s.loadDNS()
}
case <-crawlChan:
if config.debug {
log.Printf("debug - Start crawlers timer triggered\n")
}
for _, s := range config.seeders {
s.startCrawlers()
}
}
}
// block until a signal is received
fmt.Println("\nShutting down on signal:", <-sig)
// FIXME - call dns server.Shutdown()
// close the done channel to signal to all seeders to shutdown
// and wait for them to exit
close(done)
wg.Wait()
fmt.Printf("\nProgram exiting. Bye\n")
}

8
node.go

@ -14,17 +14,17 @@ type node struct { @@ -14,17 +14,17 @@ type node struct {
lastTry time.Time // last time we tried to connect to this client
crawlStart time.Time // time when we started the last crawl
statusTime time.Time // time the status was last updated
crawlActive bool // are we currently crawling this client
connectFails uint32 // number of times we have failed to connect to this client
nonstdIP net.IP // if not using the default port then this is the encoded ip containing the actual port
statusStr string // string with last error or OK details
version int32 // remote client protocol version
strVersion string // remote client user agent
services wire.ServiceFlag // remote client supported services
connectFails uint32 // number of times we have failed to connect to this client
version int32 // remote client protocol version
lastBlock int32 // remote client last block
status uint32 // rg,cg,wg,ng
rating uint32 // if it reaches 100 then we mark them statusNG
nonstdIP net.IP // if not using the default port then this is the encoded ip containing the actual port
dnsType uint32 // what dns type this client is
crawlActive bool // are we currently crawling this client
}
// status2str will return the string description of the status

165
seeder.go

@ -62,6 +62,14 @@ type dnsseeder struct { @@ -62,6 +62,14 @@ type dnsseeder struct {
maxStart []uint32 // max number of goroutines to start each run for each status type
delay []int64 // number of seconds to wait before we connect to a known client for each status
counts NodeCounts // structure to hold stats for this seeder
shutdown bool // seeder is shutting down
}
type result struct {
nas []*wire.NetAddress // slice of node addresses returned from a node
msg *crawlError // error string or nil if no problems
node string // theList key to the node that was crawled
success bool // was the crawl successful
}
// initCrawlers needs to be run before the startCrawlers so it can get
@ -113,9 +121,51 @@ func (s *dnsseeder) initCrawlers() { @@ -113,9 +121,51 @@ func (s *dnsseeder) initCrawlers() {
}
}
// runSeeder runs a seeder in an endless goroutine
func (s *dnsseeder) runSeeder(done <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
// receive the results from the crawl goroutines
resultsChan := make(chan *result)
// start initial scan now
s.startCrawlers(resultsChan)
// used to cleanout and cycle records in theList
auditChan := time.NewTicker(time.Minute * auditDelay).C
// used to start crawlers on a regular basis
crawlChan := time.NewTicker(time.Second * crawlDelay).C
// extract good dns records from all nodes on regular basis
dnsChan := time.NewTicker(time.Second * dnsDelay).C
dowhile := true
for dowhile == true {
select {
case r := <-resultsChan:
// process a results structure from a crawl
s.processResult(r)
case <-dnsChan:
// update the system with the latest selection of dns records
s.loadDNS()
case <-auditChan:
// keep theList clean and tidy
s.auditNodes()
case <-crawlChan:
// start a scan to crawl nodes
s.startCrawlers(resultsChan)
case <-done:
// done channel closed so exit the select and shutdown the seeder
dowhile = false
}
}
fmt.Printf(".")
// end the goroutine & defer will call wg.Done()
}
// startCrawlers is called on a time basis to start maxcrawlers new
// goroutines if there are spare goroutine slots available
func (s *dnsseeder) startCrawlers() {
func (s *dnsseeder) startCrawlers(resultsChan chan *result) {
tcount := len(s.theList)
if tcount == 0 {
@ -169,8 +219,10 @@ func (s *dnsseeder) startCrawlers() { @@ -169,8 +219,10 @@ func (s *dnsseeder) startCrawlers() {
continue
}
nd.crawlActive = true
nd.crawlStart = time.Now()
// all looks good so start a go routine to crawl the remote node
go crawlNode(s, nd)
go crawlNode(resultsChan, s, nd)
c.started++
}
@ -190,6 +242,115 @@ func (s *dnsseeder) startCrawlers() { @@ -190,6 +242,115 @@ func (s *dnsseeder) startCrawlers() {
// returns and read lock released
}
// processResult will add new nodes to the list and update the status of the crawled node
func (s *dnsseeder) processResult(r *result) {
var nd *node
if _, ok := s.theList[r.node]; ok {
nd = s.theList[r.node]
} else {
log.Printf("%s: warning - ignoring results from unknown node: %s\n", s.name, r.node)
return
}
defer crawlEnd(nd)
//if r.success != true {
if r.msg != nil {
// update the fact that we have not connected to this node
nd.lastTry = time.Now()
nd.connectFails++
nd.statusStr = r.msg.Error()
// update the status of this failed node
switch nd.status {
case statusRG:
// if we are full then any RG failures will skip directly to NG
if s.isFull() {
nd.status = statusNG // not able to connect to this node so ignore
nd.statusTime = time.Now()
} else {
if nd.rating += 25; nd.rating > 30 {
nd.status = statusWG
nd.statusTime = time.Now()
}
}
case statusCG:
if nd.rating += 25; nd.rating >= 50 {
nd.status = statusWG
nd.statusTime = time.Now()
}
case statusWG:
if nd.rating += 15; nd.rating >= 100 {
nd.status = statusNG // not able to connect to this node so ignore
nd.statusTime = time.Now()
}
}
// no more to do so return which will shutdown the goroutine & call
// the deffered cleanup
if config.verbose {
log.Printf("%s: failed crawl node: %s s:r:f: %v:%v:%v %s\n",
s.name,
net.JoinHostPort(nd.na.IP.String(),
strconv.Itoa(int(nd.na.Port))),
nd.status,
nd.rating,
nd.connectFails,
nd.statusStr)
}
return
}
// succesful connection and addresses received so mark status
if nd.status != statusCG {
nd.status = statusCG
nd.statusTime = time.Now()
}
cs := nd.lastConnect
nd.rating = 0
nd.connectFails = 0
nd.lastConnect = time.Now()
nd.lastTry = time.Now()
nd.statusStr = "ok: received remote address list"
added := 0
// do not accept more than one third of maxSize addresses from one node
oneThird := int(float64(s.maxSize / 3))
// if we are full then skip adding more possible clients
if s.isFull() == false {
// loop through all the received network addresses and add to thelist if not present
for _, na := range r.nas {
// a new network address so add to the system
if x := s.addNa(na); x == true {
if added++; added > oneThird {
break
}
}
}
}
if config.verbose {
log.Printf("%s: crawl done: node: %s s:r:f: %v:%v:%v addr: %v:%v CrawlTime: %s Last connect: %v ago\n",
s.name,
net.JoinHostPort(nd.na.IP.String(),
strconv.Itoa(int(nd.na.Port))),
nd.status,
nd.rating,
nd.connectFails,
len(r.nas),
added,
time.Since(nd.crawlStart).String(),
time.Since(cs).String())
}
}
func crawlEnd(nd *node) {
nd.crawlActive = false
}
// isDup will return true or false depending if the ip exists in theList
func (s *dnsseeder) isDup(ipport string) bool {
s.mtx.RLock()

Loading…
Cancel
Save