You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
2114 lines
65 KiB
2114 lines
65 KiB
// Copyright (c) 2013-2015 The btcsuite developers |
|
// Use of this source code is governed by an ISC |
|
// license that can be found in the LICENSE file. |
|
|
|
package main |
|
|
|
import ( |
|
"bytes" |
|
"container/list" |
|
"crypto/subtle" |
|
"encoding/base64" |
|
"encoding/hex" |
|
"encoding/json" |
|
"errors" |
|
"fmt" |
|
"io" |
|
"sync" |
|
"time" |
|
|
|
"github.com/btcsuite/btcd/btcjson" |
|
"github.com/btcsuite/btcd/database" |
|
"github.com/btcsuite/btcd/txscript" |
|
"github.com/btcsuite/btcd/wire" |
|
"github.com/btcsuite/btcutil" |
|
"github.com/btcsuite/fastsha256" |
|
"github.com/btcsuite/golangcrypto/ripemd160" |
|
"github.com/btcsuite/websocket" |
|
) |
|
|
|
const ( |
|
// websocketSendBufferSize is the number of elements the send channel |
|
// can queue before blocking. Note that this only applies to requests |
|
// handled directly in the websocket client input handler or the async |
|
// handler since notifications have their own queueing mechanism |
|
// independent of the send channel buffer. |
|
websocketSendBufferSize = 50 |
|
) |
|
|
|
// timeZeroVal is simply the zero value for a time.Time and is used to avoid |
|
// creating multiple instances. |
|
var timeZeroVal time.Time |
|
|
|
// wsCommandHandler describes a callback function used to handle a specific |
|
// command. |
|
type wsCommandHandler func(*wsClient, interface{}) (interface{}, error) |
|
|
|
// wsHandlers maps RPC command strings to appropriate websocket handler |
|
// functions. This is set by init because help references wsHandlers and thus |
|
// causes a dependency loop. |
|
var wsHandlers map[string]wsCommandHandler |
|
var wsHandlersBeforeInit = map[string]wsCommandHandler{ |
|
"help": handleWebsocketHelp, |
|
"notifyblocks": handleNotifyBlocks, |
|
"notifynewtransactions": handleNotifyNewTransactions, |
|
"notifyreceived": handleNotifyReceived, |
|
"notifyspent": handleNotifySpent, |
|
"stopnotifyblocks": handleStopNotifyBlocks, |
|
"stopnotifynewtransactions": handleStopNotifyNewTransactions, |
|
"stopnotifyspent": handleStopNotifySpent, |
|
"stopnotifyreceived": handleStopNotifyReceived, |
|
"rescan": handleRescan, |
|
} |
|
|
|
// wsAsyncHandlers holds the websocket commands which should be run |
|
// asynchronously to the main input handler goroutine. This allows long-running |
|
// operations to run concurrently (and one at a time) while still responding |
|
// to the majority of normal requests which can be answered quickly. |
|
var wsAsyncHandlers = map[string]struct{}{ |
|
"rescan": struct{}{}, |
|
} |
|
|
|
// WebsocketHandler handles a new websocket client by creating a new wsClient, |
|
// starting it, and blocking until the connection closes. Since it blocks, it |
|
// must be run in a separate goroutine. It should be invoked from the websocket |
|
// server handler which runs each new connection in a new goroutine thereby |
|
// satisfying the requirement. |
|
func (s *rpcServer) WebsocketHandler(conn *websocket.Conn, remoteAddr string, |
|
authenticated bool, isAdmin bool) { |
|
|
|
// Clear the read deadline that was set before the websocket hijacked |
|
// the connection. |
|
conn.SetReadDeadline(timeZeroVal) |
|
|
|
// Limit max number of websocket clients. |
|
rpcsLog.Infof("New websocket client %s", remoteAddr) |
|
if s.ntfnMgr.NumClients()+1 > cfg.RPCMaxWebsockets { |
|
rpcsLog.Infof("Max websocket clients exceeded [%d] - "+ |
|
"disconnecting client %s", cfg.RPCMaxWebsockets, |
|
remoteAddr) |
|
conn.Close() |
|
return |
|
} |
|
|
|
// Create a new websocket client to handle the new websocket connection |
|
// and wait for it to shutdown. Once it has shutdown (and hence |
|
// disconnected), remove it and any notifications it registered for. |
|
client := newWebsocketClient(s, conn, remoteAddr, authenticated, isAdmin) |
|
s.ntfnMgr.AddClient(client) |
|
client.Start() |
|
client.WaitForShutdown() |
|
s.ntfnMgr.RemoveClient(client) |
|
rpcsLog.Infof("Disconnected websocket client %s", remoteAddr) |
|
} |
|
|
|
// wsNotificationManager is a connection and notification manager used for |
|
// websockets. It allows websocket clients to register for notifications they |
|
// are interested in. When an event happens elsewhere in the code such as |
|
// transactions being added to the memory pool or block connects/disconnects, |
|
// the notification manager is provided with the relevant details needed to |
|
// figure out which websocket clients need to be notified based on what they |
|
// have registered for and notifies them accordingly. It is also used to keep |
|
// track of all connected websocket clients. |
|
type wsNotificationManager struct { |
|
// server is the RPC server the notification manager is associated with. |
|
server *rpcServer |
|
|
|
// queueNotification queues a notification for handling. |
|
queueNotification chan interface{} |
|
|
|
// notificationMsgs feeds notificationHandler with notifications |
|
// and client (un)registeration requests from a queue as well as |
|
// registeration and unregisteration requests from clients. |
|
notificationMsgs chan interface{} |
|
|
|
// Access channel for current number of connected clients. |
|
numClients chan int |
|
|
|
// Shutdown handling |
|
wg sync.WaitGroup |
|
quit chan struct{} |
|
} |
|
|
|
// queueHandler manages a queue of empty interfaces, reading from in and |
|
// sending the oldest unsent to out. This handler stops when either of the |
|
// in or quit channels are closed, and closes out before returning, without |
|
// waiting to send any variables still remaining in the queue. |
|
func queueHandler(in <-chan interface{}, out chan<- interface{}, quit <-chan struct{}) { |
|
var q []interface{} |
|
var dequeue chan<- interface{} |
|
skipQueue := out |
|
var next interface{} |
|
out: |
|
for { |
|
select { |
|
case n, ok := <-in: |
|
if !ok { |
|
// Sender closed input channel. |
|
break out |
|
} |
|
|
|
// Either send to out immediately if skipQueue is |
|
// non-nil (queue is empty) and reader is ready, |
|
// or append to the queue and send later. |
|
select { |
|
case skipQueue <- n: |
|
default: |
|
q = append(q, n) |
|
dequeue = out |
|
skipQueue = nil |
|
next = q[0] |
|
} |
|
|
|
case dequeue <- next: |
|
copy(q, q[1:]) |
|
q[len(q)-1] = nil // avoid leak |
|
q = q[:len(q)-1] |
|
if len(q) == 0 { |
|
dequeue = nil |
|
skipQueue = out |
|
} else { |
|
next = q[0] |
|
} |
|
|
|
case <-quit: |
|
break out |
|
} |
|
} |
|
close(out) |
|
} |
|
|
|
// queueHandler maintains a queue of notifications and notification handler |
|
// control messages. |
|
func (m *wsNotificationManager) queueHandler() { |
|
queueHandler(m.queueNotification, m.notificationMsgs, m.quit) |
|
m.wg.Done() |
|
} |
|
|
|
// NotifyBlockConnected passes a block newly-connected to the best chain |
|
// to the notification manager for block and transaction notification |
|
// processing. |
|
func (m *wsNotificationManager) NotifyBlockConnected(block *btcutil.Block) { |
|
// As NotifyBlockConnected will be called by the block manager |
|
// and the RPC server may no longer be running, use a select |
|
// statement to unblock enqueueing the notification once the RPC |
|
// server has begun shutting down. |
|
select { |
|
case m.queueNotification <- (*notificationBlockConnected)(block): |
|
case <-m.quit: |
|
} |
|
} |
|
|
|
// NotifyBlockDisconnected passes a block disconnected from the best chain |
|
// to the notification manager for block notification processing. |
|
func (m *wsNotificationManager) NotifyBlockDisconnected(block *btcutil.Block) { |
|
// As NotifyBlockDisconnected will be called by the block manager |
|
// and the RPC server may no longer be running, use a select |
|
// statement to unblock enqueueing the notification once the RPC |
|
// server has begun shutting down. |
|
select { |
|
case m.queueNotification <- (*notificationBlockDisconnected)(block): |
|
case <-m.quit: |
|
} |
|
} |
|
|
|
// NotifyMempoolTx passes a transaction accepted by mempool to the |
|
// notification manager for transaction notification processing. If |
|
// isNew is true, the tx is is a new transaction, rather than one |
|
// added to the mempool during a reorg. |
|
func (m *wsNotificationManager) NotifyMempoolTx(tx *btcutil.Tx, isNew bool) { |
|
n := ¬ificationTxAcceptedByMempool{ |
|
isNew: isNew, |
|
tx: tx, |
|
} |
|
|
|
// As NotifyMempoolTx will be called by mempool and the RPC server |
|
// may no longer be running, use a select statement to unblock |
|
// enqueueing the notification once the RPC server has begun |
|
// shutting down. |
|
select { |
|
case m.queueNotification <- n: |
|
case <-m.quit: |
|
} |
|
} |
|
|
|
// Notification types |
|
type notificationBlockConnected btcutil.Block |
|
type notificationBlockDisconnected btcutil.Block |
|
type notificationTxAcceptedByMempool struct { |
|
isNew bool |
|
tx *btcutil.Tx |
|
} |
|
|
|
// Notification control requests |
|
type notificationRegisterClient wsClient |
|
type notificationUnregisterClient wsClient |
|
type notificationRegisterBlocks wsClient |
|
type notificationUnregisterBlocks wsClient |
|
type notificationRegisterNewMempoolTxs wsClient |
|
type notificationUnregisterNewMempoolTxs wsClient |
|
type notificationRegisterSpent struct { |
|
wsc *wsClient |
|
ops []*wire.OutPoint |
|
} |
|
type notificationUnregisterSpent struct { |
|
wsc *wsClient |
|
op *wire.OutPoint |
|
} |
|
type notificationRegisterAddr struct { |
|
wsc *wsClient |
|
addrs []string |
|
} |
|
type notificationUnregisterAddr struct { |
|
wsc *wsClient |
|
addr string |
|
} |
|
|
|
// notificationHandler reads notifications and control messages from the queue |
|
// handler and processes one at a time. |
|
func (m *wsNotificationManager) notificationHandler() { |
|
// clients is a map of all currently connected websocket clients. |
|
clients := make(map[chan struct{}]*wsClient) |
|
|
|
// Maps used to hold lists of websocket clients to be notified on |
|
// certain events. Each websocket client also keeps maps for the events |
|
// which have multiple triggers to make removal from these lists on |
|
// connection close less horrendously expensive. |
|
// |
|
// Where possible, the quit channel is used as the unique id for a client |
|
// since it is quite a bit more efficient than using the entire struct. |
|
blockNotifications := make(map[chan struct{}]*wsClient) |
|
txNotifications := make(map[chan struct{}]*wsClient) |
|
watchedOutPoints := make(map[wire.OutPoint]map[chan struct{}]*wsClient) |
|
watchedAddrs := make(map[string]map[chan struct{}]*wsClient) |
|
|
|
out: |
|
for { |
|
select { |
|
case n, ok := <-m.notificationMsgs: |
|
if !ok { |
|
// queueHandler quit. |
|
break out |
|
} |
|
switch n := n.(type) { |
|
case *notificationBlockConnected: |
|
block := (*btcutil.Block)(n) |
|
|
|
// Skip iterating through all txs if no |
|
// tx notification requests exist. |
|
if len(watchedOutPoints) != 0 || len(watchedAddrs) != 0 { |
|
for _, tx := range block.Transactions() { |
|
m.notifyForTx(watchedOutPoints, |
|
watchedAddrs, tx, block) |
|
} |
|
} |
|
|
|
if len(blockNotifications) != 0 { |
|
m.notifyBlockConnected(blockNotifications, |
|
block) |
|
} |
|
|
|
case *notificationBlockDisconnected: |
|
m.notifyBlockDisconnected(blockNotifications, |
|
(*btcutil.Block)(n)) |
|
|
|
case *notificationTxAcceptedByMempool: |
|
if n.isNew && len(txNotifications) != 0 { |
|
m.notifyForNewTx(txNotifications, n.tx) |
|
} |
|
m.notifyForTx(watchedOutPoints, watchedAddrs, n.tx, nil) |
|
|
|
case *notificationRegisterBlocks: |
|
wsc := (*wsClient)(n) |
|
blockNotifications[wsc.quit] = wsc |
|
|
|
case *notificationUnregisterBlocks: |
|
wsc := (*wsClient)(n) |
|
delete(blockNotifications, wsc.quit) |
|
|
|
case *notificationRegisterClient: |
|
wsc := (*wsClient)(n) |
|
clients[wsc.quit] = wsc |
|
|
|
case *notificationUnregisterClient: |
|
wsc := (*wsClient)(n) |
|
// Remove any requests made by the client as well as |
|
// the client itself. |
|
delete(blockNotifications, wsc.quit) |
|
delete(txNotifications, wsc.quit) |
|
for k := range wsc.spentRequests { |
|
op := k |
|
m.removeSpentRequest(watchedOutPoints, wsc, &op) |
|
} |
|
for addr := range wsc.addrRequests { |
|
m.removeAddrRequest(watchedAddrs, wsc, addr) |
|
} |
|
delete(clients, wsc.quit) |
|
|
|
case *notificationRegisterSpent: |
|
m.addSpentRequests(watchedOutPoints, n.wsc, n.ops) |
|
|
|
case *notificationUnregisterSpent: |
|
m.removeSpentRequest(watchedOutPoints, n.wsc, n.op) |
|
|
|
case *notificationRegisterAddr: |
|
m.addAddrRequests(watchedAddrs, n.wsc, n.addrs) |
|
|
|
case *notificationUnregisterAddr: |
|
m.removeAddrRequest(watchedAddrs, n.wsc, n.addr) |
|
|
|
case *notificationRegisterNewMempoolTxs: |
|
wsc := (*wsClient)(n) |
|
txNotifications[wsc.quit] = wsc |
|
|
|
case *notificationUnregisterNewMempoolTxs: |
|
wsc := (*wsClient)(n) |
|
delete(txNotifications, wsc.quit) |
|
|
|
default: |
|
rpcsLog.Warn("Unhandled notification type") |
|
} |
|
|
|
case m.numClients <- len(clients): |
|
|
|
case <-m.quit: |
|
// RPC server shutting down. |
|
break out |
|
} |
|
} |
|
|
|
for _, c := range clients { |
|
c.Disconnect() |
|
} |
|
m.wg.Done() |
|
} |
|
|
|
// NumClients returns the number of clients actively being served. |
|
func (m *wsNotificationManager) NumClients() (n int) { |
|
select { |
|
case n = <-m.numClients: |
|
case <-m.quit: // Use default n (0) if server has shut down. |
|
} |
|
return |
|
} |
|
|
|
// RegisterBlockUpdates requests block update notifications to the passed |
|
// websocket client. |
|
func (m *wsNotificationManager) RegisterBlockUpdates(wsc *wsClient) { |
|
m.queueNotification <- (*notificationRegisterBlocks)(wsc) |
|
} |
|
|
|
// UnregisterBlockUpdates removes block update notifications for the passed |
|
// websocket client. |
|
func (m *wsNotificationManager) UnregisterBlockUpdates(wsc *wsClient) { |
|
m.queueNotification <- (*notificationUnregisterBlocks)(wsc) |
|
} |
|
|
|
// notifyBlockConnected notifies websocket clients that have registered for |
|
// block updates when a block is connected to the main chain. |
|
func (*wsNotificationManager) notifyBlockConnected(clients map[chan struct{}]*wsClient, |
|
block *btcutil.Block) { |
|
|
|
// Notify interested websocket clients about the connected block. |
|
ntfn := btcjson.NewBlockConnectedNtfn(block.Sha().String(), |
|
int32(block.Height()), block.MsgBlock().Header.Timestamp.Unix()) |
|
marshalledJSON, err := btcjson.MarshalCmd(nil, ntfn) |
|
if err != nil { |
|
rpcsLog.Error("Failed to marshal block connected notification: "+ |
|
"%v", err) |
|
return |
|
} |
|
for _, wsc := range clients { |
|
wsc.QueueNotification(marshalledJSON) |
|
} |
|
} |
|
|
|
// notifyBlockDisconnected notifies websocket clients that have registered for |
|
// block updates when a block is disconnected from the main chain (due to a |
|
// reorganize). |
|
func (*wsNotificationManager) notifyBlockDisconnected(clients map[chan struct{}]*wsClient, block *btcutil.Block) { |
|
// Skip notification creation if no clients have requested block |
|
// connected/disconnected notifications. |
|
if len(clients) == 0 { |
|
return |
|
} |
|
|
|
// Notify interested websocket clients about the disconnected block. |
|
ntfn := btcjson.NewBlockDisconnectedNtfn(block.Sha().String(), |
|
int32(block.Height()), block.MsgBlock().Header.Timestamp.Unix()) |
|
marshalledJSON, err := btcjson.MarshalCmd(nil, ntfn) |
|
if err != nil { |
|
rpcsLog.Error("Failed to marshal block disconnected "+ |
|
"notification: %v", err) |
|
return |
|
} |
|
for _, wsc := range clients { |
|
wsc.QueueNotification(marshalledJSON) |
|
} |
|
} |
|
|
|
// RegisterNewMempoolTxsUpdates requests notifications to the passed websocket |
|
// client when new transactions are added to the memory pool. |
|
func (m *wsNotificationManager) RegisterNewMempoolTxsUpdates(wsc *wsClient) { |
|
m.queueNotification <- (*notificationRegisterNewMempoolTxs)(wsc) |
|
} |
|
|
|
// UnregisterNewMempoolTxsUpdates removes notifications to the passed websocket |
|
// client when new transaction are added to the memory pool. |
|
func (m *wsNotificationManager) UnregisterNewMempoolTxsUpdates(wsc *wsClient) { |
|
m.queueNotification <- (*notificationUnregisterNewMempoolTxs)(wsc) |
|
} |
|
|
|
// notifyForNewTx notifies websocket clients that have registered for updates |
|
// when a new transaction is added to the memory pool. |
|
func (m *wsNotificationManager) notifyForNewTx(clients map[chan struct{}]*wsClient, tx *btcutil.Tx) { |
|
txShaStr := tx.Sha().String() |
|
mtx := tx.MsgTx() |
|
|
|
var amount int64 |
|
for _, txOut := range mtx.TxOut { |
|
amount += txOut.Value |
|
} |
|
|
|
ntfn := btcjson.NewTxAcceptedNtfn(txShaStr, btcutil.Amount(amount).ToBTC()) |
|
marshalledJSON, err := btcjson.MarshalCmd(nil, ntfn) |
|
if err != nil { |
|
rpcsLog.Errorf("Failed to marshal tx notification: %s", err.Error()) |
|
return |
|
} |
|
|
|
var verboseNtfn *btcjson.TxAcceptedVerboseNtfn |
|
var marshalledJSONVerbose []byte |
|
for _, wsc := range clients { |
|
if wsc.verboseTxUpdates { |
|
if marshalledJSONVerbose != nil { |
|
wsc.QueueNotification(marshalledJSONVerbose) |
|
continue |
|
} |
|
|
|
net := m.server.server.chainParams |
|
rawTx, err := createTxRawResult(net, mtx, txShaStr, nil, |
|
"", 0, 0) |
|
if err != nil { |
|
return |
|
} |
|
|
|
verboseNtfn = btcjson.NewTxAcceptedVerboseNtfn(*rawTx) |
|
marshalledJSONVerbose, err = btcjson.MarshalCmd(nil, |
|
verboseNtfn) |
|
if err != nil { |
|
rpcsLog.Errorf("Failed to marshal verbose tx "+ |
|
"notification: %s", err.Error()) |
|
return |
|
} |
|
wsc.QueueNotification(marshalledJSONVerbose) |
|
} else { |
|
wsc.QueueNotification(marshalledJSON) |
|
} |
|
} |
|
} |
|
|
|
// RegisterSpentRequests requests a notification when each of the passed |
|
// outpoints is confirmed spent (contained in a block connected to the main |
|
// chain) for the passed websocket client. The request is automatically |
|
// removed once the notification has been sent. |
|
func (m *wsNotificationManager) RegisterSpentRequests(wsc *wsClient, ops []*wire.OutPoint) { |
|
m.queueNotification <- ¬ificationRegisterSpent{ |
|
wsc: wsc, |
|
ops: ops, |
|
} |
|
} |
|
|
|
// addSpentRequests modifies a map of watched outpoints to sets of websocket |
|
// clients to add a new request watch all of the outpoints in ops and create |
|
// and send a notification when spent to the websocket client wsc. |
|
func (*wsNotificationManager) addSpentRequests(opMap map[wire.OutPoint]map[chan struct{}]*wsClient, |
|
wsc *wsClient, ops []*wire.OutPoint) { |
|
|
|
for _, op := range ops { |
|
// Track the request in the client as well so it can be quickly |
|
// be removed on disconnect. |
|
wsc.spentRequests[*op] = struct{}{} |
|
|
|
// Add the client to the list to notify when the outpoint is seen. |
|
// Create the list as needed. |
|
cmap, ok := opMap[*op] |
|
if !ok { |
|
cmap = make(map[chan struct{}]*wsClient) |
|
opMap[*op] = cmap |
|
} |
|
cmap[wsc.quit] = wsc |
|
} |
|
} |
|
|
|
// UnregisterSpentRequest removes a request from the passed websocket client |
|
// to be notified when the passed outpoint is confirmed spent (contained in a |
|
// block connected to the main chain). |
|
func (m *wsNotificationManager) UnregisterSpentRequest(wsc *wsClient, op *wire.OutPoint) { |
|
m.queueNotification <- ¬ificationUnregisterSpent{ |
|
wsc: wsc, |
|
op: op, |
|
} |
|
} |
|
|
|
// removeSpentRequest modifies a map of watched outpoints to remove the |
|
// websocket client wsc from the set of clients to be notified when a |
|
// watched outpoint is spent. If wsc is the last client, the outpoint |
|
// key is removed from the map. |
|
func (*wsNotificationManager) removeSpentRequest(ops map[wire.OutPoint]map[chan struct{}]*wsClient, |
|
wsc *wsClient, op *wire.OutPoint) { |
|
|
|
// Remove the request tracking from the client. |
|
delete(wsc.spentRequests, *op) |
|
|
|
// Remove the client from the list to notify. |
|
notifyMap, ok := ops[*op] |
|
if !ok { |
|
rpcsLog.Warnf("Attempt to remove nonexistent spent request "+ |
|
"for websocket client %s", wsc.addr) |
|
return |
|
} |
|
delete(notifyMap, wsc.quit) |
|
|
|
// Remove the map entry altogether if there are |
|
// no more clients interested in it. |
|
if len(notifyMap) == 0 { |
|
delete(ops, *op) |
|
} |
|
} |
|
|
|
// txHexString returns the serialized transaction encoded in hexadecimal. |
|
func txHexString(tx *btcutil.Tx) string { |
|
buf := bytes.NewBuffer(make([]byte, 0, tx.MsgTx().SerializeSize())) |
|
// Ignore Serialize's error, as writing to a bytes.buffer cannot fail. |
|
tx.MsgTx().Serialize(buf) |
|
return hex.EncodeToString(buf.Bytes()) |
|
} |
|
|
|
// blockDetails creates a BlockDetails struct to include in btcws notifications |
|
// from a block and a transaction's block index. |
|
func blockDetails(block *btcutil.Block, txIndex int) *btcjson.BlockDetails { |
|
if block == nil { |
|
return nil |
|
} |
|
return &btcjson.BlockDetails{ |
|
Height: int32(block.Height()), |
|
Hash: block.Sha().String(), |
|
Index: txIndex, |
|
Time: block.MsgBlock().Header.Timestamp.Unix(), |
|
} |
|
} |
|
|
|
// newRedeemingTxNotification returns a new marshalled redeemingtx notification |
|
// with the passed parameters. |
|
func newRedeemingTxNotification(txHex string, index int, block *btcutil.Block) ([]byte, error) { |
|
// Create and marshal the notification. |
|
ntfn := btcjson.NewRedeemingTxNtfn(txHex, blockDetails(block, index)) |
|
return btcjson.MarshalCmd(nil, ntfn) |
|
} |
|
|
|
// notifyForTxOuts examines each transaction output, notifying interested |
|
// websocket clients of the transaction if an output spends to a watched |
|
// address. A spent notification request is automatically registered for |
|
// the client for each matching output. |
|
func (m *wsNotificationManager) notifyForTxOuts(ops map[wire.OutPoint]map[chan struct{}]*wsClient, |
|
addrs map[string]map[chan struct{}]*wsClient, tx *btcutil.Tx, block *btcutil.Block) { |
|
|
|
// Nothing to do if nobody is listening for address notifications. |
|
if len(addrs) == 0 { |
|
return |
|
} |
|
|
|
txHex := "" |
|
wscNotified := make(map[chan struct{}]struct{}) |
|
for i, txOut := range tx.MsgTx().TxOut { |
|
_, txAddrs, _, err := txscript.ExtractPkScriptAddrs( |
|
txOut.PkScript, m.server.server.chainParams) |
|
if err != nil { |
|
continue |
|
} |
|
|
|
for _, txAddr := range txAddrs { |
|
cmap, ok := addrs[txAddr.EncodeAddress()] |
|
if !ok { |
|
continue |
|
} |
|
|
|
if txHex == "" { |
|
txHex = txHexString(tx) |
|
} |
|
ntfn := btcjson.NewRecvTxNtfn(txHex, blockDetails(block, |
|
tx.Index())) |
|
|
|
marshalledJSON, err := btcjson.MarshalCmd(nil, ntfn) |
|
if err != nil { |
|
rpcsLog.Errorf("Failed to marshal processedtx notification: %v", err) |
|
continue |
|
} |
|
|
|
op := []*wire.OutPoint{wire.NewOutPoint(tx.Sha(), uint32(i))} |
|
for wscQuit, wsc := range cmap { |
|
m.addSpentRequests(ops, wsc, op) |
|
|
|
if _, ok := wscNotified[wscQuit]; !ok { |
|
wscNotified[wscQuit] = struct{}{} |
|
wsc.QueueNotification(marshalledJSON) |
|
} |
|
} |
|
} |
|
} |
|
} |
|
|
|
// notifyForTx examines the inputs and outputs of the passed transaction, |
|
// notifying websocket clients of outputs spending to a watched address |
|
// and inputs spending a watched outpoint. |
|
func (m *wsNotificationManager) notifyForTx(ops map[wire.OutPoint]map[chan struct{}]*wsClient, |
|
addrs map[string]map[chan struct{}]*wsClient, tx *btcutil.Tx, block *btcutil.Block) { |
|
|
|
if len(ops) != 0 { |
|
m.notifyForTxIns(ops, tx, block) |
|
} |
|
if len(addrs) != 0 { |
|
m.notifyForTxOuts(ops, addrs, tx, block) |
|
} |
|
} |
|
|
|
// notifyForTxIns examines the inputs of the passed transaction and sends |
|
// interested websocket clients a redeemingtx notification if any inputs |
|
// spend a watched output. If block is non-nil, any matching spent |
|
// requests are removed. |
|
func (m *wsNotificationManager) notifyForTxIns(ops map[wire.OutPoint]map[chan struct{}]*wsClient, |
|
tx *btcutil.Tx, block *btcutil.Block) { |
|
|
|
// Nothing to do if nobody is watching outpoints. |
|
if len(ops) == 0 { |
|
return |
|
} |
|
|
|
txHex := "" |
|
wscNotified := make(map[chan struct{}]struct{}) |
|
for _, txIn := range tx.MsgTx().TxIn { |
|
prevOut := &txIn.PreviousOutPoint |
|
if cmap, ok := ops[*prevOut]; ok { |
|
if txHex == "" { |
|
txHex = txHexString(tx) |
|
} |
|
marshalledJSON, err := newRedeemingTxNotification(txHex, tx.Index(), block) |
|
if err != nil { |
|
rpcsLog.Warnf("Failed to marshal redeemingtx notification: %v", err) |
|
continue |
|
} |
|
for wscQuit, wsc := range cmap { |
|
if block != nil { |
|
m.removeSpentRequest(ops, wsc, prevOut) |
|
} |
|
|
|
if _, ok := wscNotified[wscQuit]; !ok { |
|
wscNotified[wscQuit] = struct{}{} |
|
wsc.QueueNotification(marshalledJSON) |
|
} |
|
} |
|
} |
|
} |
|
} |
|
|
|
// RegisterTxOutAddressRequests requests notifications to the passed websocket |
|
// client when a transaction output spends to the passed address. |
|
func (m *wsNotificationManager) RegisterTxOutAddressRequests(wsc *wsClient, addrs []string) { |
|
m.queueNotification <- ¬ificationRegisterAddr{ |
|
wsc: wsc, |
|
addrs: addrs, |
|
} |
|
} |
|
|
|
// addAddrRequests adds the websocket client wsc to the address to client set |
|
// addrMap so wsc will be notified for any mempool or block transaction outputs |
|
// spending to any of the addresses in addrs. |
|
func (*wsNotificationManager) addAddrRequests(addrMap map[string]map[chan struct{}]*wsClient, |
|
wsc *wsClient, addrs []string) { |
|
|
|
for _, addr := range addrs { |
|
// Track the request in the client as well so it can be quickly be |
|
// removed on disconnect. |
|
wsc.addrRequests[addr] = struct{}{} |
|
|
|
// Add the client to the set of clients to notify when the |
|
// outpoint is seen. Create map as needed. |
|
cmap, ok := addrMap[addr] |
|
if !ok { |
|
cmap = make(map[chan struct{}]*wsClient) |
|
addrMap[addr] = cmap |
|
} |
|
cmap[wsc.quit] = wsc |
|
} |
|
} |
|
|
|
// UnregisterTxOutAddressRequest removes a request from the passed websocket |
|
// client to be notified when a transaction spends to the passed address. |
|
func (m *wsNotificationManager) UnregisterTxOutAddressRequest(wsc *wsClient, addr string) { |
|
m.queueNotification <- ¬ificationUnregisterAddr{ |
|
wsc: wsc, |
|
addr: addr, |
|
} |
|
} |
|
|
|
// removeAddrRequest removes the websocket client wsc from the address to |
|
// client set addrs so it will no longer receive notification updates for |
|
// any transaction outputs send to addr. |
|
func (*wsNotificationManager) removeAddrRequest(addrs map[string]map[chan struct{}]*wsClient, |
|
wsc *wsClient, addr string) { |
|
|
|
// Remove the request tracking from the client. |
|
delete(wsc.addrRequests, addr) |
|
|
|
// Remove the client from the list to notify. |
|
cmap, ok := addrs[addr] |
|
if !ok { |
|
rpcsLog.Warnf("Attempt to remove nonexistent addr request "+ |
|
"<%s> for websocket client %s", addr, wsc.addr) |
|
return |
|
} |
|
delete(cmap, wsc.quit) |
|
|
|
// Remove the map entry altogether if there are no more clients |
|
// interested in it. |
|
if len(cmap) == 0 { |
|
delete(addrs, addr) |
|
} |
|
} |
|
|
|
// AddClient adds the passed websocket client to the notification manager. |
|
func (m *wsNotificationManager) AddClient(wsc *wsClient) { |
|
m.queueNotification <- (*notificationRegisterClient)(wsc) |
|
} |
|
|
|
// RemoveClient removes the passed websocket client and all notifications |
|
// registered for it. |
|
func (m *wsNotificationManager) RemoveClient(wsc *wsClient) { |
|
select { |
|
case m.queueNotification <- (*notificationUnregisterClient)(wsc): |
|
case <-m.quit: |
|
} |
|
} |
|
|
|
// Start starts the goroutines required for the manager to queue and process |
|
// websocket client notifications. |
|
func (m *wsNotificationManager) Start() { |
|
m.wg.Add(2) |
|
go m.queueHandler() |
|
go m.notificationHandler() |
|
} |
|
|
|
// WaitForShutdown blocks until all notification manager goroutines have |
|
// finished. |
|
func (m *wsNotificationManager) WaitForShutdown() { |
|
m.wg.Wait() |
|
} |
|
|
|
// Shutdown shuts down the manager, stopping the notification queue and |
|
// notification handler goroutines. |
|
func (m *wsNotificationManager) Shutdown() { |
|
close(m.quit) |
|
} |
|
|
|
// newWsNotificationManager returns a new notification manager ready for use. |
|
// See wsNotificationManager for more details. |
|
func newWsNotificationManager(server *rpcServer) *wsNotificationManager { |
|
return &wsNotificationManager{ |
|
server: server, |
|
queueNotification: make(chan interface{}), |
|
notificationMsgs: make(chan interface{}), |
|
numClients: make(chan int), |
|
quit: make(chan struct{}), |
|
} |
|
} |
|
|
|
// wsResponse houses a message to send to a connected websocket client as |
|
// well as a channel to reply on when the message is sent. |
|
type wsResponse struct { |
|
msg []byte |
|
doneChan chan bool |
|
} |
|
|
|
// wsClient provides an abstraction for handling a websocket client. The |
|
// overall data flow is split into 3 main goroutines, a possible 4th goroutine |
|
// for long-running operations (only started if request is made), and a |
|
// websocket manager which is used to allow things such as broadcasting |
|
// requested notifications to all connected websocket clients. Inbound |
|
// messages are read via the inHandler goroutine and generally dispatched to |
|
// their own handler. However, certain potentially long-running operations such |
|
// as rescans, are sent to the asyncHander goroutine and are limited to one at a |
|
// time. There are two outbound message types - one for responding to client |
|
// requests and another for async notifications. Responses to client requests |
|
// use SendMessage which employs a buffered channel thereby limiting the number |
|
// of outstanding requests that can be made. Notifications are sent via |
|
// QueueNotification which implements a queue via notificationQueueHandler to |
|
// ensure sending notifications from other subsystems can't block. Ultimately, |
|
// all messages are sent via the outHandler. |
|
type wsClient struct { |
|
sync.Mutex |
|
|
|
// server is the RPC server that is servicing the client. |
|
server *rpcServer |
|
|
|
// conn is the underlying websocket connection. |
|
conn *websocket.Conn |
|
|
|
// disconnected indicated whether or not the websocket client is |
|
// disconnected. |
|
disconnected bool |
|
|
|
// addr is the remote address of the client. |
|
addr string |
|
|
|
// authenticated specifies whether a client has been authenticated |
|
// and therefore is allowed to communicated over the websocket. |
|
authenticated bool |
|
|
|
// isAdmin specifies whether a client may change the state of the server; |
|
// false means its access is only to the limited set of RPC calls. |
|
isAdmin bool |
|
|
|
// verboseTxUpdates specifies whether a client has requested verbose |
|
// information about all new transactions. |
|
verboseTxUpdates bool |
|
|
|
// addrRequests is a set of addresses the caller has requested to be |
|
// notified about. It is maintained here so all requests can be removed |
|
// when a wallet disconnects. Owned by the notification manager. |
|
addrRequests map[string]struct{} |
|
|
|
// spentRequests is a set of unspent Outpoints a wallet has requested |
|
// notifications for when they are spent by a processed transaction. |
|
// Owned by the notification manager. |
|
spentRequests map[wire.OutPoint]struct{} |
|
|
|
// Networking infrastructure. |
|
asyncStarted bool |
|
asyncChan chan *parsedRPCCmd |
|
ntfnChan chan []byte |
|
sendChan chan wsResponse |
|
quit chan struct{} |
|
wg sync.WaitGroup |
|
} |
|
|
|
// handleMessage is the main handler for incoming requests. It enforces |
|
// authentication, parses the incoming json, looks up and executes handlers |
|
// (including pass through for standard RPC commands), and sends the appropriate |
|
// response. It also detects commands which are marked as long-running and |
|
// sends them off to the asyncHander for processing. |
|
func (c *wsClient) handleMessage(msg []byte) { |
|
if !c.authenticated { |
|
// Disconnect immediately if the provided command fails to |
|
// parse when the client is not already authenticated. |
|
var request btcjson.Request |
|
if err := json.Unmarshal(msg, &request); err != nil { |
|
c.Disconnect() |
|
return |
|
} |
|
parsedCmd := parseCmd(&request) |
|
if parsedCmd.err != nil { |
|
c.Disconnect() |
|
return |
|
} |
|
|
|
// Disconnect immediately if the first command is not |
|
// authenticate when not already authenticated. |
|
authCmd, ok := parsedCmd.cmd.(*btcjson.AuthenticateCmd) |
|
if !ok { |
|
rpcsLog.Warnf("Unauthenticated websocket message " + |
|
"received") |
|
c.Disconnect() |
|
return |
|
} |
|
|
|
// Check credentials. |
|
login := authCmd.Username + ":" + authCmd.Passphrase |
|
auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login)) |
|
authSha := fastsha256.Sum256([]byte(auth)) |
|
cmp := subtle.ConstantTimeCompare(authSha[:], c.server.authsha[:]) |
|
limitcmp := subtle.ConstantTimeCompare(authSha[:], c.server.limitauthsha[:]) |
|
if cmp != 1 && limitcmp != 1 { |
|
rpcsLog.Warnf("Auth failure.") |
|
c.Disconnect() |
|
return |
|
} |
|
c.authenticated = true |
|
c.isAdmin = cmp == 1 |
|
|
|
// Marshal and send response. |
|
reply, err := createMarshalledReply(parsedCmd.id, nil, nil) |
|
if err != nil { |
|
rpcsLog.Errorf("Failed to marshal authenticate reply: "+ |
|
"%v", err.Error()) |
|
return |
|
} |
|
c.SendMessage(reply, nil) |
|
return |
|
} |
|
|
|
// Attempt to parse the raw message into a JSON-RPC request. |
|
var request btcjson.Request |
|
if err := json.Unmarshal(msg, &request); err != nil { |
|
jsonErr := &btcjson.RPCError{ |
|
Code: btcjson.ErrRPCParse.Code, |
|
Message: "Failed to parse request: " + err.Error(), |
|
} |
|
|
|
// Marshal and send response. |
|
reply, err := createMarshalledReply(nil, nil, jsonErr) |
|
if err != nil { |
|
rpcsLog.Errorf("Failed to marshal parse failure "+ |
|
"reply: %v", err) |
|
return |
|
} |
|
c.SendMessage(reply, nil) |
|
return |
|
} |
|
// Requests with no ID (notifications) must not have a response per the |
|
// JSON-RPC spec. |
|
if request.ID == nil { |
|
return |
|
} |
|
|
|
// Check if the user is limited and disconnect client if unauthorized |
|
if !c.isAdmin { |
|
if _, ok := rpcLimited[request.Method]; !ok { |
|
jsonErr := &btcjson.RPCError{ |
|
Code: btcjson.ErrRPCInvalidParams.Code, |
|
Message: "limited user not authorized for this method", |
|
} |
|
// Marshal and send response. |
|
reply, err := createMarshalledReply(request.ID, nil, jsonErr) |
|
if err != nil { |
|
rpcsLog.Errorf("Failed to marshal parse failure "+ |
|
"reply: %v", err) |
|
return |
|
} |
|
c.SendMessage(reply, nil) |
|
return |
|
} |
|
} |
|
|
|
// Attempt to parse the JSON-RPC request into a known concrete command. |
|
cmd := parseCmd(&request) |
|
if cmd.err != nil { |
|
// Marshal and send response. |
|
reply, err := createMarshalledReply(cmd.id, nil, cmd.err) |
|
if err != nil { |
|
rpcsLog.Errorf("Failed to marshal parse failure "+ |
|
"reply: %v", err) |
|
return |
|
} |
|
c.SendMessage(reply, nil) |
|
return |
|
} |
|
rpcsLog.Debugf("Received command <%s> from %s", cmd.method, c.addr) |
|
|
|
// Disconnect if already authenticated and another authenticate command |
|
// is received. |
|
if _, ok := cmd.cmd.(*btcjson.AuthenticateCmd); ok { |
|
rpcsLog.Warnf("Websocket client %s is already authenticated", |
|
c.addr) |
|
c.Disconnect() |
|
return |
|
} |
|
|
|
// When the command is marked as a long-running command, send it off |
|
// to the asyncHander goroutine for processing. |
|
if _, ok := wsAsyncHandlers[cmd.method]; ok { |
|
// Start up the async goroutine for handling long-running |
|
// requests asynchonrously if needed. |
|
if !c.asyncStarted { |
|
rpcsLog.Tracef("Starting async handler for %s", c.addr) |
|
c.wg.Add(1) |
|
go c.asyncHandler() |
|
c.asyncStarted = true |
|
} |
|
c.asyncChan <- cmd |
|
return |
|
} |
|
|
|
// Lookup the websocket extension for the command and if it doesn't |
|
// exist fallback to handling the command as a standard command. |
|
wsHandler, ok := wsHandlers[cmd.method] |
|
if !ok { |
|
// No websocket-specific handler so handle like a legacy |
|
// RPC connection. |
|
result, jsonErr := c.server.standardCmdResult(cmd, nil) |
|
reply, err := createMarshalledReply(cmd.id, result, jsonErr) |
|
if err != nil { |
|
rpcsLog.Errorf("Failed to marshal reply for <%s> "+ |
|
"command: %v", cmd.method, err) |
|
return |
|
} |
|
|
|
c.SendMessage(reply, nil) |
|
return |
|
} |
|
|
|
// Invoke the handler and marshal and send response. |
|
result, jsonErr := wsHandler(c, cmd.cmd) |
|
reply, err := createMarshalledReply(cmd.id, result, jsonErr) |
|
if err != nil { |
|
rpcsLog.Errorf("Failed to marshal reply for <%s> command: %v", |
|
cmd.method, err) |
|
return |
|
} |
|
c.SendMessage(reply, nil) |
|
} |
|
|
|
// inHandler handles all incoming messages for the websocket connection. It |
|
// must be run as a goroutine. |
|
func (c *wsClient) inHandler() { |
|
out: |
|
for { |
|
// Break out of the loop once the quit channel has been closed. |
|
// Use a non-blocking select here so we fall through otherwise. |
|
select { |
|
case <-c.quit: |
|
break out |
|
default: |
|
} |
|
|
|
_, msg, err := c.conn.ReadMessage() |
|
if err != nil { |
|
// Log the error if it's not due to disconnecting. |
|
if err != io.EOF { |
|
rpcsLog.Errorf("Websocket receive error from "+ |
|
"%s: %v", c.addr, err) |
|
} |
|
break out |
|
} |
|
c.handleMessage(msg) |
|
} |
|
|
|
// Ensure the connection is closed. |
|
c.Disconnect() |
|
c.wg.Done() |
|
rpcsLog.Tracef("Websocket client input handler done for %s", c.addr) |
|
} |
|
|
|
// notificationQueueHandler handles the queueing of outgoing notifications for |
|
// the websocket client. This runs as a muxer for various sources of input to |
|
// ensure that queueing up notifications to be sent will not block. Otherwise, |
|
// slow clients could bog down the other systems (such as the mempool or block |
|
// manager) which are queueing the data. The data is passed on to outHandler to |
|
// actually be written. It must be run as a goroutine. |
|
func (c *wsClient) notificationQueueHandler() { |
|
ntfnSentChan := make(chan bool, 1) // nonblocking sync |
|
|
|
// pendingNtfns is used as a queue for notifications that are ready to |
|
// be sent once there are no outstanding notifications currently being |
|
// sent. The waiting flag is used over simply checking for items in the |
|
// pending list to ensure cleanup knows what has and hasn't been sent |
|
// to the outHandler. Currently no special cleanup is needed, however |
|
// if something like a done channel is added to notifications in the |
|
// future, not knowing what has and hasn't been sent to the outHandler |
|
// (and thus who should respond to the done channel) would be |
|
// problematic without using this approach. |
|
pendingNtfns := list.New() |
|
waiting := false |
|
out: |
|
for { |
|
select { |
|
// This channel is notified when a message is being queued to |
|
// be sent across the network socket. It will either send the |
|
// message immediately if a send is not already in progress, or |
|
// queue the message to be sent once the other pending messages |
|
// are sent. |
|
case msg := <-c.ntfnChan: |
|
if !waiting { |
|
c.SendMessage(msg, ntfnSentChan) |
|
} else { |
|
pendingNtfns.PushBack(msg) |
|
} |
|
waiting = true |
|
|
|
// This channel is notified when a notification has been sent |
|
// across the network socket. |
|
case <-ntfnSentChan: |
|
// No longer waiting if there are no more messages in |
|
// the pending messages queue. |
|
next := pendingNtfns.Front() |
|
if next == nil { |
|
waiting = false |
|
continue |
|
} |
|
|
|
// Notify the outHandler about the next item to |
|
// asynchronously send. |
|
msg := pendingNtfns.Remove(next).([]byte) |
|
c.SendMessage(msg, ntfnSentChan) |
|
|
|
case <-c.quit: |
|
break out |
|
} |
|
} |
|
|
|
// Drain any wait channels before exiting so nothing is left waiting |
|
// around to send. |
|
cleanup: |
|
for { |
|
select { |
|
case <-c.ntfnChan: |
|
case <-ntfnSentChan: |
|
default: |
|
break cleanup |
|
} |
|
} |
|
c.wg.Done() |
|
rpcsLog.Tracef("Websocket client notification queue handler done "+ |
|
"for %s", c.addr) |
|
} |
|
|
|
// outHandler handles all outgoing messages for the websocket connection. It |
|
// must be run as a goroutine. It uses a buffered channel to serialize output |
|
// messages while allowing the sender to continue running asynchronously. It |
|
// must be run as a goroutine. |
|
func (c *wsClient) outHandler() { |
|
out: |
|
for { |
|
// Send any messages ready for send until the quit channel is |
|
// closed. |
|
select { |
|
case r := <-c.sendChan: |
|
err := c.conn.WriteMessage(websocket.TextMessage, r.msg) |
|
if err != nil { |
|
c.Disconnect() |
|
break out |
|
} |
|
if r.doneChan != nil { |
|
r.doneChan <- true |
|
} |
|
|
|
case <-c.quit: |
|
break out |
|
} |
|
} |
|
|
|
// Drain any wait channels before exiting so nothing is left waiting |
|
// around to send. |
|
cleanup: |
|
for { |
|
select { |
|
case r := <-c.sendChan: |
|
if r.doneChan != nil { |
|
r.doneChan <- false |
|
} |
|
default: |
|
break cleanup |
|
} |
|
} |
|
c.wg.Done() |
|
rpcsLog.Tracef("Websocket client output handler done for %s", c.addr) |
|
} |
|
|
|
// asyncHandler handles all long-running requests such as rescans which are |
|
// not run directly in the inHandler routine unlike most requests. This allows |
|
// normal quick requests to continue to be processed and responded to even while |
|
// lengthy operations are underway. Only one long-running operation is |
|
// permitted at a time, so multiple long-running requests are queued and |
|
// serialized. It must be run as a goroutine. Also, this goroutine is not |
|
// started until/if the first long-running request is made. |
|
func (c *wsClient) asyncHandler() { |
|
asyncHandlerDoneChan := make(chan struct{}, 1) // nonblocking sync |
|
pendingCmds := list.New() |
|
waiting := false |
|
|
|
// runHandler runs the handler for the passed command and sends the |
|
// reply. |
|
runHandler := func(parsedCmd *parsedRPCCmd) { |
|
wsHandler, ok := wsHandlers[parsedCmd.method] |
|
if !ok { |
|
rpcsLog.Warnf("No handler for command <%s>", |
|
parsedCmd.method) |
|
return |
|
} |
|
|
|
// Invoke the handler and marshal and send response. |
|
result, jsonErr := wsHandler(c, parsedCmd.cmd) |
|
reply, err := createMarshalledReply(parsedCmd.id, result, |
|
jsonErr) |
|
if err != nil { |
|
rpcsLog.Errorf("Failed to marshal reply for <%s> "+ |
|
"command: %v", parsedCmd.method, err) |
|
return |
|
} |
|
c.SendMessage(reply, nil) |
|
} |
|
|
|
out: |
|
for { |
|
select { |
|
case cmd := <-c.asyncChan: |
|
if !waiting { |
|
c.wg.Add(1) |
|
go func(cmd *parsedRPCCmd) { |
|
runHandler(cmd) |
|
asyncHandlerDoneChan <- struct{}{} |
|
c.wg.Done() |
|
}(cmd) |
|
} else { |
|
pendingCmds.PushBack(cmd) |
|
} |
|
waiting = true |
|
|
|
case <-asyncHandlerDoneChan: |
|
// No longer waiting if there are no more messages in |
|
// the pending messages queue. |
|
next := pendingCmds.Front() |
|
if next == nil { |
|
waiting = false |
|
continue |
|
} |
|
|
|
// Notify the outHandler about the next item to |
|
// asynchronously send. |
|
element := pendingCmds.Remove(next) |
|
c.wg.Add(1) |
|
go func(cmd *parsedRPCCmd) { |
|
runHandler(cmd) |
|
asyncHandlerDoneChan <- struct{}{} |
|
c.wg.Done() |
|
}(element.(*parsedRPCCmd)) |
|
|
|
case <-c.quit: |
|
break out |
|
} |
|
} |
|
|
|
// Drain any wait channels before exiting so nothing is left waiting |
|
// around to send. |
|
cleanup: |
|
for { |
|
select { |
|
case <-c.asyncChan: |
|
case <-asyncHandlerDoneChan: |
|
default: |
|
break cleanup |
|
} |
|
} |
|
|
|
c.wg.Done() |
|
rpcsLog.Tracef("Websocket client async handler done for %s", c.addr) |
|
} |
|
|
|
// SendMessage sends the passed json to the websocket client. It is backed |
|
// by a buffered channel, so it will not block until the send channel is full. |
|
// Note however that QueueNotification must be used for sending async |
|
// notifications instead of the this function. This approach allows a limit to |
|
// the number of outstanding requests a client can make without preventing or |
|
// blocking on async notifications. |
|
func (c *wsClient) SendMessage(marshalledJSON []byte, doneChan chan bool) { |
|
// Don't send the message if disconnected. |
|
if c.Disconnected() { |
|
if doneChan != nil { |
|
doneChan <- false |
|
} |
|
return |
|
} |
|
|
|
c.sendChan <- wsResponse{msg: marshalledJSON, doneChan: doneChan} |
|
} |
|
|
|
// ErrClientQuit describes the error where a client send is not processed due |
|
// to the client having already been disconnected or dropped. |
|
var ErrClientQuit = errors.New("client quit") |
|
|
|
// QueueNotification queues the passed notification to be sent to the websocket |
|
// client. This function, as the name implies, is only intended for |
|
// notifications since it has additional logic to prevent other subsystems, such |
|
// as the memory pool and block manager, from blocking even when the send |
|
// channel is full. |
|
// |
|
// If the client is in the process of shutting down, this function returns |
|
// ErrClientQuit. This is intended to be checked by long-running notification |
|
// handlers to stop processing if there is no more work needed to be done. |
|
func (c *wsClient) QueueNotification(marshalledJSON []byte) error { |
|
// Don't queue the message if disconnected. |
|
if c.Disconnected() { |
|
return ErrClientQuit |
|
} |
|
|
|
c.ntfnChan <- marshalledJSON |
|
return nil |
|
} |
|
|
|
// Disconnected returns whether or not the websocket client is disconnected. |
|
func (c *wsClient) Disconnected() bool { |
|
c.Lock() |
|
defer c.Unlock() |
|
|
|
return c.disconnected |
|
} |
|
|
|
// Disconnect disconnects the websocket client. |
|
func (c *wsClient) Disconnect() { |
|
c.Lock() |
|
defer c.Unlock() |
|
|
|
// Nothing to do if already disconnected. |
|
if c.disconnected { |
|
return |
|
} |
|
|
|
rpcsLog.Tracef("Disconnecting websocket client %s", c.addr) |
|
close(c.quit) |
|
c.conn.Close() |
|
c.disconnected = true |
|
} |
|
|
|
// Start begins processing input and output messages. |
|
func (c *wsClient) Start() { |
|
rpcsLog.Tracef("Starting websocket client %s", c.addr) |
|
|
|
// Start processing input and output. |
|
c.wg.Add(3) |
|
go c.inHandler() |
|
go c.notificationQueueHandler() |
|
go c.outHandler() |
|
} |
|
|
|
// WaitForShutdown blocks until the websocket client goroutines are stopped |
|
// and the connection is closed. |
|
func (c *wsClient) WaitForShutdown() { |
|
c.wg.Wait() |
|
} |
|
|
|
// newWebsocketClient returns a new websocket client given the notification |
|
// manager, websocket connection, remote address, and whether or not the client |
|
// has already been authenticated (via HTTP Basic access authentication). The |
|
// returned client is ready to start. Once started, the client will process |
|
// incoming and outgoing messages in separate goroutines complete with queueing |
|
// and asynchrous handling for long-running operations. |
|
func newWebsocketClient(server *rpcServer, conn *websocket.Conn, |
|
remoteAddr string, authenticated bool, isAdmin bool) *wsClient { |
|
|
|
return &wsClient{ |
|
conn: conn, |
|
addr: remoteAddr, |
|
authenticated: authenticated, |
|
isAdmin: isAdmin, |
|
server: server, |
|
addrRequests: make(map[string]struct{}), |
|
spentRequests: make(map[wire.OutPoint]struct{}), |
|
ntfnChan: make(chan []byte, 1), // nonblocking sync |
|
asyncChan: make(chan *parsedRPCCmd, 1), // nonblocking sync |
|
sendChan: make(chan wsResponse, websocketSendBufferSize), |
|
quit: make(chan struct{}), |
|
} |
|
} |
|
|
|
// handleWebsocketHelp implements the help command for websocket connections. |
|
func handleWebsocketHelp(wsc *wsClient, icmd interface{}) (interface{}, error) { |
|
cmd, ok := icmd.(*btcjson.HelpCmd) |
|
if !ok { |
|
return nil, btcjson.ErrRPCInternal |
|
} |
|
|
|
// Provide a usage overview of all commands when no specific command |
|
// was specified. |
|
var command string |
|
if cmd.Command != nil { |
|
command = *cmd.Command |
|
} |
|
if command == "" { |
|
usage, err := wsc.server.helpCacher.rpcUsage(true) |
|
if err != nil { |
|
context := "Failed to generate RPC usage" |
|
return nil, internalRPCError(err.Error(), context) |
|
} |
|
return usage, nil |
|
} |
|
|
|
// Check that the command asked for is supported and implemented. |
|
// Search the list of websocket handlers as well as the main list of |
|
// handlers since help should only be provided for those cases. |
|
valid := true |
|
if _, ok := rpcHandlers[command]; !ok { |
|
if _, ok := wsHandlers[command]; !ok { |
|
valid = false |
|
} |
|
} |
|
if !valid { |
|
return nil, &btcjson.RPCError{ |
|
Code: btcjson.ErrRPCInvalidParameter, |
|
Message: "Unknown command: " + command, |
|
} |
|
} |
|
|
|
// Get the help for the command. |
|
help, err := wsc.server.helpCacher.rpcMethodHelp(command) |
|
if err != nil { |
|
context := "Failed to generate help" |
|
return nil, internalRPCError(err.Error(), context) |
|
} |
|
return help, nil |
|
} |
|
|
|
// handleNotifyBlocks implements the notifyblocks command extension for |
|
// websocket connections. |
|
func handleNotifyBlocks(wsc *wsClient, icmd interface{}) (interface{}, error) { |
|
wsc.server.ntfnMgr.RegisterBlockUpdates(wsc) |
|
return nil, nil |
|
} |
|
|
|
// handleStopNotifyBlocks implements the stopnotifyblocks command extension for |
|
// websocket connections. |
|
func handleStopNotifyBlocks(wsc *wsClient, icmd interface{}) (interface{}, error) { |
|
wsc.server.ntfnMgr.UnregisterBlockUpdates(wsc) |
|
return nil, nil |
|
} |
|
|
|
// handleNotifySpent implements the notifyspent command extension for |
|
// websocket connections. |
|
func handleNotifySpent(wsc *wsClient, icmd interface{}) (interface{}, error) { |
|
cmd, ok := icmd.(*btcjson.NotifySpentCmd) |
|
if !ok { |
|
return nil, btcjson.ErrRPCInternal |
|
} |
|
|
|
outpoints, err := deserializeOutpoints(cmd.OutPoints) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
wsc.server.ntfnMgr.RegisterSpentRequests(wsc, outpoints) |
|
return nil, nil |
|
} |
|
|
|
// handleNotifyNewTransations implements the notifynewtransactions command |
|
// extension for websocket connections. |
|
func handleNotifyNewTransactions(wsc *wsClient, icmd interface{}) (interface{}, error) { |
|
cmd, ok := icmd.(*btcjson.NotifyNewTransactionsCmd) |
|
if !ok { |
|
return nil, btcjson.ErrRPCInternal |
|
} |
|
|
|
wsc.verboseTxUpdates = cmd.Verbose != nil && *cmd.Verbose |
|
wsc.server.ntfnMgr.RegisterNewMempoolTxsUpdates(wsc) |
|
return nil, nil |
|
} |
|
|
|
// handleStopNotifyNewTransations implements the stopnotifynewtransactions |
|
// command extension for websocket connections. |
|
func handleStopNotifyNewTransactions(wsc *wsClient, icmd interface{}) (interface{}, error) { |
|
wsc.server.ntfnMgr.UnregisterNewMempoolTxsUpdates(wsc) |
|
return nil, nil |
|
} |
|
|
|
// handleNotifyReceived implements the notifyreceived command extension for |
|
// websocket connections. |
|
func handleNotifyReceived(wsc *wsClient, icmd interface{}) (interface{}, error) { |
|
cmd, ok := icmd.(*btcjson.NotifyReceivedCmd) |
|
if !ok { |
|
return nil, btcjson.ErrRPCInternal |
|
} |
|
|
|
// Decode addresses to validate input, but the strings slice is used |
|
// directly if these are all ok. |
|
err := checkAddressValidity(cmd.Addresses) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
wsc.server.ntfnMgr.RegisterTxOutAddressRequests(wsc, cmd.Addresses) |
|
return nil, nil |
|
} |
|
|
|
// handleStopNotifySpent implements the stopnotifyspent command extension for |
|
// websocket connections. |
|
func handleStopNotifySpent(wsc *wsClient, icmd interface{}) (interface{}, error) { |
|
cmd, ok := icmd.(*btcjson.StopNotifySpentCmd) |
|
if !ok { |
|
return nil, btcjson.ErrRPCInternal |
|
} |
|
|
|
outpoints, err := deserializeOutpoints(cmd.OutPoints) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
for _, outpoint := range outpoints { |
|
wsc.server.ntfnMgr.UnregisterSpentRequest(wsc, outpoint) |
|
} |
|
|
|
return nil, nil |
|
} |
|
|
|
// handleStopNotifyReceived implements the stopnotifyreceived command extension |
|
// for websocket connections. |
|
func handleStopNotifyReceived(wsc *wsClient, icmd interface{}) (interface{}, error) { |
|
cmd, ok := icmd.(*btcjson.StopNotifyReceivedCmd) |
|
if !ok { |
|
return nil, btcjson.ErrRPCInternal |
|
} |
|
|
|
// Decode addresses to validate input, but the strings slice is used |
|
// directly if these are all ok. |
|
err := checkAddressValidity(cmd.Addresses) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
for _, addr := range cmd.Addresses { |
|
wsc.server.ntfnMgr.UnregisterTxOutAddressRequest(wsc, addr) |
|
} |
|
|
|
return nil, nil |
|
} |
|
|
|
// checkAddressValidity checks the validity of each address in the passed |
|
// string slice. It does this by attempting to decode each address using the |
|
// current active network parameters. If any single address fails to decode |
|
// properly, the function returns an error. Otherwise, nil is returned. |
|
func checkAddressValidity(addrs []string) error { |
|
for _, addr := range addrs { |
|
_, err := btcutil.DecodeAddress(addr, activeNetParams.Params) |
|
if err != nil { |
|
return &btcjson.RPCError{ |
|
Code: btcjson.ErrRPCInvalidAddressOrKey, |
|
Message: fmt.Sprintf("Invalid address or key: %v", |
|
addr), |
|
} |
|
} |
|
} |
|
return nil |
|
} |
|
|
|
// deserializeOutpoints deserializes each serialized outpoint. |
|
func deserializeOutpoints(serializedOuts []btcjson.OutPoint) ([]*wire.OutPoint, error) { |
|
outpoints := make([]*wire.OutPoint, 0, len(serializedOuts)) |
|
for i := range serializedOuts { |
|
blockHash, err := wire.NewShaHashFromStr(serializedOuts[i].Hash) |
|
if err != nil { |
|
return nil, rpcDecodeHexError(serializedOuts[i].Hash) |
|
} |
|
index := serializedOuts[i].Index |
|
outpoints = append(outpoints, wire.NewOutPoint(blockHash, index)) |
|
} |
|
|
|
return outpoints, nil |
|
} |
|
|
|
type rescanKeys struct { |
|
fallbacks map[string]struct{} |
|
pubKeyHashes map[[ripemd160.Size]byte]struct{} |
|
scriptHashes map[[ripemd160.Size]byte]struct{} |
|
compressedPubKeys map[[33]byte]struct{} |
|
uncompressedPubKeys map[[65]byte]struct{} |
|
unspent map[wire.OutPoint]struct{} |
|
} |
|
|
|
// unspentSlice returns a slice of currently-unspent outpoints for the rescan |
|
// lookup keys. This is primarily intended to be used to register outpoints |
|
// for continuous notifications after a rescan has completed. |
|
func (r *rescanKeys) unspentSlice() []*wire.OutPoint { |
|
ops := make([]*wire.OutPoint, 0, len(r.unspent)) |
|
for op := range r.unspent { |
|
opCopy := op |
|
ops = append(ops, &opCopy) |
|
} |
|
return ops |
|
} |
|
|
|
// ErrRescanReorg defines the error that is returned when an unrecoverable |
|
// reorganize is detected during a rescan. |
|
var ErrRescanReorg = btcjson.RPCError{ |
|
Code: btcjson.ErrRPCDatabase, |
|
Message: "Reorganize", |
|
} |
|
|
|
// rescanBlock rescans all transactions in a single block. This is a helper |
|
// function for handleRescan. |
|
func rescanBlock(wsc *wsClient, lookups *rescanKeys, blk *btcutil.Block) { |
|
for _, tx := range blk.Transactions() { |
|
// Hexadecimal representation of this tx. Only created if |
|
// needed, and reused for later notifications if already made. |
|
var txHex string |
|
|
|
// All inputs and outputs must be iterated through to correctly |
|
// modify the unspent map, however, just a single notification |
|
// for any matching transaction inputs or outputs should be |
|
// created and sent. |
|
spentNotified := false |
|
recvNotified := false |
|
|
|
for _, txin := range tx.MsgTx().TxIn { |
|
if _, ok := lookups.unspent[txin.PreviousOutPoint]; ok { |
|
delete(lookups.unspent, txin.PreviousOutPoint) |
|
|
|
if spentNotified { |
|
continue |
|
} |
|
|
|
if txHex == "" { |
|
txHex = txHexString(tx) |
|
} |
|
marshalledJSON, err := newRedeemingTxNotification(txHex, tx.Index(), blk) |
|
if err != nil { |
|
rpcsLog.Errorf("Failed to marshal redeemingtx notification: %v", err) |
|
continue |
|
} |
|
|
|
err = wsc.QueueNotification(marshalledJSON) |
|
// Stop the rescan early if the websocket client |
|
// disconnected. |
|
if err == ErrClientQuit { |
|
return |
|
} |
|
spentNotified = true |
|
} |
|
} |
|
|
|
for txOutIdx, txout := range tx.MsgTx().TxOut { |
|
_, addrs, _, _ := txscript.ExtractPkScriptAddrs( |
|
txout.PkScript, wsc.server.server.chainParams) |
|
|
|
for _, addr := range addrs { |
|
switch a := addr.(type) { |
|
case *btcutil.AddressPubKeyHash: |
|
if _, ok := lookups.pubKeyHashes[*a.Hash160()]; !ok { |
|
continue |
|
} |
|
|
|
case *btcutil.AddressScriptHash: |
|
if _, ok := lookups.scriptHashes[*a.Hash160()]; !ok { |
|
continue |
|
} |
|
|
|
case *btcutil.AddressPubKey: |
|
found := false |
|
switch sa := a.ScriptAddress(); len(sa) { |
|
case 33: // Compressed |
|
var key [33]byte |
|
copy(key[:], sa) |
|
if _, ok := lookups.compressedPubKeys[key]; ok { |
|
found = true |
|
} |
|
|
|
case 65: // Uncompressed |
|
var key [65]byte |
|
copy(key[:], sa) |
|
if _, ok := lookups.uncompressedPubKeys[key]; ok { |
|
found = true |
|
} |
|
|
|
default: |
|
rpcsLog.Warnf("Skipping rescanned pubkey of unknown "+ |
|
"serialized length %d", len(sa)) |
|
continue |
|
} |
|
|
|
// If the transaction output pays to the pubkey of |
|
// a rescanned P2PKH address, include it as well. |
|
if !found { |
|
pkh := a.AddressPubKeyHash() |
|
if _, ok := lookups.pubKeyHashes[*pkh.Hash160()]; !ok { |
|
continue |
|
} |
|
} |
|
|
|
default: |
|
// A new address type must have been added. Encode as a |
|
// payment address string and check the fallback map. |
|
addrStr := addr.EncodeAddress() |
|
_, ok := lookups.fallbacks[addrStr] |
|
if !ok { |
|
continue |
|
} |
|
} |
|
|
|
outpoint := wire.OutPoint{ |
|
Hash: *tx.Sha(), |
|
Index: uint32(txOutIdx), |
|
} |
|
lookups.unspent[outpoint] = struct{}{} |
|
|
|
if recvNotified { |
|
continue |
|
} |
|
|
|
if txHex == "" { |
|
txHex = txHexString(tx) |
|
} |
|
ntfn := btcjson.NewRecvTxNtfn(txHex, |
|
blockDetails(blk, tx.Index())) |
|
|
|
marshalledJSON, err := btcjson.MarshalCmd(nil, ntfn) |
|
if err != nil { |
|
rpcsLog.Errorf("Failed to marshal recvtx notification: %v", err) |
|
return |
|
} |
|
|
|
err = wsc.QueueNotification(marshalledJSON) |
|
// Stop the rescan early if the websocket client |
|
// disconnected. |
|
if err == ErrClientQuit { |
|
return |
|
} |
|
recvNotified = true |
|
} |
|
} |
|
} |
|
} |
|
|
|
// recoverFromReorg attempts to recover from a detected reorganize during a |
|
// rescan. It fetches a new range of block shas from the database and |
|
// verifies that the new range of blocks is on the same fork as a previous |
|
// range of blocks. If this condition does not hold true, the JSON-RPC error |
|
// for an unrecoverable reorganize is returned. |
|
func recoverFromReorg(db database.Db, minBlock, maxBlock int64, |
|
lastBlock *wire.ShaHash) ([]wire.ShaHash, error) { |
|
|
|
hashList, err := db.FetchHeightRange(minBlock, maxBlock) |
|
if err != nil { |
|
rpcsLog.Errorf("Error looking up block range: %v", err) |
|
return nil, &btcjson.RPCError{ |
|
Code: btcjson.ErrRPCDatabase, |
|
Message: "Database error: " + err.Error(), |
|
} |
|
} |
|
if lastBlock == nil || len(hashList) == 0 { |
|
return hashList, nil |
|
} |
|
blk, err := db.FetchBlockBySha(&hashList[0]) |
|
if err != nil { |
|
rpcsLog.Errorf("Error looking up possibly reorged block: %v", |
|
err) |
|
return nil, &btcjson.RPCError{ |
|
Code: btcjson.ErrRPCDatabase, |
|
Message: "Database error: " + err.Error(), |
|
} |
|
} |
|
jsonErr := descendantBlock(lastBlock, blk) |
|
if jsonErr != nil { |
|
return nil, jsonErr |
|
} |
|
return hashList, nil |
|
} |
|
|
|
// descendantBlock returns the appropiate JSON-RPC error if a current block |
|
// fetched during a reorganize is not a direct child of the parent block hash. |
|
func descendantBlock(prevHash *wire.ShaHash, curBlock *btcutil.Block) error { |
|
curHash := &curBlock.MsgBlock().Header.PrevBlock |
|
if !prevHash.IsEqual(curHash) { |
|
rpcsLog.Errorf("Stopping rescan for reorged block %v "+ |
|
"(replaced by block %v)", prevHash, curHash) |
|
return &ErrRescanReorg |
|
} |
|
return nil |
|
} |
|
|
|
// handleRescan implements the rescan command extension for websocket |
|
// connections. |
|
// |
|
// NOTE: This does not smartly handle reorgs, and fixing requires database |
|
// changes (for safe, concurrent access to full block ranges, and support |
|
// for other chains than the best chain). It will, however, detect whether |
|
// a reorg removed a block that was previously processed, and result in the |
|
// handler erroring. Clients must handle this by finding a block still in |
|
// the chain (perhaps from a rescanprogress notification) to resume their |
|
// rescan. |
|
func handleRescan(wsc *wsClient, icmd interface{}) (interface{}, error) { |
|
cmd, ok := icmd.(*btcjson.RescanCmd) |
|
if !ok { |
|
return nil, btcjson.ErrRPCInternal |
|
} |
|
|
|
outpoints := make([]*wire.OutPoint, 0, len(cmd.OutPoints)) |
|
for i := range cmd.OutPoints { |
|
blockHash, err := wire.NewShaHashFromStr(cmd.OutPoints[i].Hash) |
|
if err != nil { |
|
return nil, rpcDecodeHexError(cmd.OutPoints[i].Hash) |
|
} |
|
index := cmd.OutPoints[i].Index |
|
outpoints = append(outpoints, wire.NewOutPoint(blockHash, index)) |
|
} |
|
|
|
numAddrs := len(cmd.Addresses) |
|
if numAddrs == 1 { |
|
rpcsLog.Info("Beginning rescan for 1 address") |
|
} else { |
|
rpcsLog.Infof("Beginning rescan for %d addresses", numAddrs) |
|
} |
|
|
|
// Build lookup maps. |
|
lookups := rescanKeys{ |
|
fallbacks: map[string]struct{}{}, |
|
pubKeyHashes: map[[ripemd160.Size]byte]struct{}{}, |
|
scriptHashes: map[[ripemd160.Size]byte]struct{}{}, |
|
compressedPubKeys: map[[33]byte]struct{}{}, |
|
uncompressedPubKeys: map[[65]byte]struct{}{}, |
|
unspent: map[wire.OutPoint]struct{}{}, |
|
} |
|
var compressedPubkey [33]byte |
|
var uncompressedPubkey [65]byte |
|
for _, addrStr := range cmd.Addresses { |
|
addr, err := btcutil.DecodeAddress(addrStr, activeNetParams.Params) |
|
if err != nil { |
|
jsonErr := btcjson.RPCError{ |
|
Code: btcjson.ErrRPCInvalidAddressOrKey, |
|
Message: "Rescan address " + addrStr + ": " + |
|
err.Error(), |
|
} |
|
return nil, &jsonErr |
|
} |
|
switch a := addr.(type) { |
|
case *btcutil.AddressPubKeyHash: |
|
lookups.pubKeyHashes[*a.Hash160()] = struct{}{} |
|
|
|
case *btcutil.AddressScriptHash: |
|
lookups.scriptHashes[*a.Hash160()] = struct{}{} |
|
|
|
case *btcutil.AddressPubKey: |
|
pubkeyBytes := a.ScriptAddress() |
|
switch len(pubkeyBytes) { |
|
case 33: // Compressed |
|
copy(compressedPubkey[:], pubkeyBytes) |
|
lookups.compressedPubKeys[compressedPubkey] = struct{}{} |
|
|
|
case 65: // Uncompressed |
|
copy(uncompressedPubkey[:], pubkeyBytes) |
|
lookups.uncompressedPubKeys[uncompressedPubkey] = struct{}{} |
|
|
|
default: |
|
jsonErr := btcjson.RPCError{ |
|
Code: btcjson.ErrRPCInvalidAddressOrKey, |
|
Message: "Pubkey " + addrStr + " is of unknown length", |
|
} |
|
return nil, &jsonErr |
|
} |
|
|
|
default: |
|
// A new address type must have been added. Use encoded |
|
// payment address string as a fallback until a fast path |
|
// is added. |
|
lookups.fallbacks[addrStr] = struct{}{} |
|
} |
|
} |
|
for _, outpoint := range outpoints { |
|
lookups.unspent[*outpoint] = struct{}{} |
|
} |
|
|
|
db := wsc.server.server.db |
|
|
|
minBlockSha, err := wire.NewShaHashFromStr(cmd.BeginBlock) |
|
if err != nil { |
|
return nil, rpcDecodeHexError(cmd.BeginBlock) |
|
} |
|
minBlock, err := db.FetchBlockHeightBySha(minBlockSha) |
|
if err != nil { |
|
return nil, &btcjson.RPCError{ |
|
Code: btcjson.ErrRPCBlockNotFound, |
|
Message: "Error getting block: " + err.Error(), |
|
} |
|
} |
|
|
|
maxBlock := database.AllShas |
|
if cmd.EndBlock != nil { |
|
maxBlockSha, err := wire.NewShaHashFromStr(*cmd.EndBlock) |
|
if err != nil { |
|
return nil, rpcDecodeHexError(*cmd.EndBlock) |
|
} |
|
maxBlock, err = db.FetchBlockHeightBySha(maxBlockSha) |
|
if err != nil { |
|
return nil, &btcjson.RPCError{ |
|
Code: btcjson.ErrRPCBlockNotFound, |
|
Message: "Error getting block: " + err.Error(), |
|
} |
|
} |
|
} |
|
|
|
// lastBlock and lastBlockHash track the previously-rescanned block. |
|
// They equal nil when no previous blocks have been rescanned. |
|
var lastBlock *btcutil.Block |
|
var lastBlockHash *wire.ShaHash |
|
|
|
// A ticker is created to wait at least 10 seconds before notifying the |
|
// websocket client of the current progress completed by the rescan. |
|
ticker := time.NewTicker(10 * time.Second) |
|
defer ticker.Stop() |
|
|
|
// FetchHeightRange may not return a complete list of block shas for |
|
// the given range, so fetch range as many times as necessary. |
|
fetchRange: |
|
for minBlock < maxBlock { |
|
hashList, err := db.FetchHeightRange(minBlock, maxBlock) |
|
if err != nil { |
|
rpcsLog.Errorf("Error looking up block range: %v", err) |
|
return nil, &btcjson.RPCError{ |
|
Code: btcjson.ErrRPCDatabase, |
|
Message: "Database error: " + err.Error(), |
|
} |
|
} |
|
if len(hashList) == 0 { |
|
// The rescan is finished if no blocks hashes for this |
|
// range were successfully fetched and a stop block |
|
// was provided. |
|
if maxBlock != database.AllShas { |
|
break |
|
} |
|
|
|
// If the rescan is through the current block, set up |
|
// the client to continue to receive notifications |
|
// regarding all rescanned addresses and the current set |
|
// of unspent outputs. |
|
// |
|
// This is done safely by temporarily grabbing exclusive |
|
// access of the block manager. If no more blocks have |
|
// been attached between this pause and the fetch above, |
|
// then it is safe to register the websocket client for |
|
// continuous notifications if necessary. Otherwise, |
|
// continue the fetch loop again to rescan the new |
|
// blocks (or error due to an irrecoverable reorganize). |
|
pauseGuard := wsc.server.server.blockManager.Pause() |
|
curHash, _, err := db.NewestSha() |
|
again := true |
|
if err == nil && (lastBlockHash == nil || *lastBlockHash == *curHash) { |
|
again = false |
|
n := wsc.server.ntfnMgr |
|
n.RegisterSpentRequests(wsc, lookups.unspentSlice()) |
|
n.RegisterTxOutAddressRequests(wsc, cmd.Addresses) |
|
} |
|
close(pauseGuard) |
|
if err != nil { |
|
rpcsLog.Errorf("Error fetching best block "+ |
|
"hash: %v", err) |
|
return nil, &btcjson.RPCError{ |
|
Code: btcjson.ErrRPCDatabase, |
|
Message: "Database error: " + |
|
err.Error(), |
|
} |
|
} |
|
if again { |
|
continue |
|
} |
|
break |
|
} |
|
|
|
loopHashList: |
|
for i := range hashList { |
|
blk, err := db.FetchBlockBySha(&hashList[i]) |
|
if err != nil { |
|
// Only handle reorgs if a block could not be |
|
// found for the hash. |
|
if err != database.ErrBlockShaMissing { |
|
rpcsLog.Errorf("Error looking up "+ |
|
"block: %v", err) |
|
return nil, &btcjson.RPCError{ |
|
Code: btcjson.ErrRPCDatabase, |
|
Message: "Database error: " + |
|
err.Error(), |
|
} |
|
} |
|
|
|
// If an absolute max block was specified, don't |
|
// attempt to handle the reorg. |
|
if maxBlock != database.AllShas { |
|
rpcsLog.Errorf("Stopping rescan for "+ |
|
"reorged block %v", |
|
cmd.EndBlock) |
|
return nil, &ErrRescanReorg |
|
} |
|
|
|
// If the lookup for the previously valid block |
|
// hash failed, there may have been a reorg. |
|
// Fetch a new range of block hashes and verify |
|
// that the previously processed block (if there |
|
// was any) still exists in the database. If it |
|
// doesn't, we error. |
|
// |
|
// A goto is used to branch executation back to |
|
// before the range was evaluated, as it must be |
|
// reevaluated for the new hashList. |
|
minBlock += int64(i) |
|
hashList, err = recoverFromReorg(db, minBlock, |
|
maxBlock, lastBlockHash) |
|
if err != nil { |
|
return nil, err |
|
} |
|
if len(hashList) == 0 { |
|
break fetchRange |
|
} |
|
goto loopHashList |
|
} |
|
if i == 0 && lastBlockHash != nil { |
|
// Ensure the new hashList is on the same fork |
|
// as the last block from the old hashList. |
|
jsonErr := descendantBlock(lastBlockHash, blk) |
|
if jsonErr != nil { |
|
return nil, jsonErr |
|
} |
|
} |
|
|
|
// A select statement is used to stop rescans if the |
|
// client requesting the rescan has disconnected. |
|
select { |
|
case <-wsc.quit: |
|
rpcsLog.Debugf("Stopped rescan at height %v "+ |
|
"for disconnected client", blk.Height()) |
|
return nil, nil |
|
default: |
|
rescanBlock(wsc, &lookups, blk) |
|
lastBlock = blk |
|
lastBlockHash = blk.Sha() |
|
} |
|
|
|
// Periodically notify the client of the progress |
|
// completed. Continue with next block if no progress |
|
// notification is needed yet. |
|
select { |
|
case <-ticker.C: // fallthrough |
|
default: |
|
continue |
|
} |
|
|
|
n := btcjson.NewRescanProgressNtfn(hashList[i].String(), |
|
int32(blk.Height()), |
|
blk.MsgBlock().Header.Timestamp.Unix()) |
|
mn, err := btcjson.MarshalCmd(nil, n) |
|
if err != nil { |
|
rpcsLog.Errorf("Failed to marshal rescan "+ |
|
"progress notification: %v", err) |
|
continue |
|
} |
|
|
|
if err = wsc.QueueNotification(mn); err == ErrClientQuit { |
|
// Finished if the client disconnected. |
|
rpcsLog.Debugf("Stopped rescan at height %v "+ |
|
"for disconnected client", blk.Height()) |
|
return nil, nil |
|
} |
|
} |
|
|
|
minBlock += int64(len(hashList)) |
|
} |
|
|
|
// Notify websocket client of the finished rescan. Due to how btcd |
|
// asynchronously queues notifications to not block calling code, |
|
// there is no guarantee that any of the notifications created during |
|
// rescan (such as rescanprogress, recvtx and redeemingtx) will be |
|
// received before the rescan RPC returns. Therefore, another method |
|
// is needed to safely inform clients that all rescan notifications have |
|
// been sent. |
|
n := btcjson.NewRescanFinishedNtfn(lastBlockHash.String(), |
|
int32(lastBlock.Height()), |
|
lastBlock.MsgBlock().Header.Timestamp.Unix()) |
|
if mn, err := btcjson.MarshalCmd(nil, n); err != nil { |
|
rpcsLog.Errorf("Failed to marshal rescan finished "+ |
|
"notification: %v", err) |
|
} else { |
|
// The rescan is finished, so we don't care whether the client |
|
// has disconnected at this point, so discard error. |
|
_ = wsc.QueueNotification(mn) |
|
} |
|
|
|
rpcsLog.Info("Finished rescan") |
|
return nil, nil |
|
} |
|
|
|
func init() { |
|
wsHandlers = wsHandlersBeforeInit |
|
}
|
|
|