use core::net::IpAddr;
use core::net::SocketAddr;
use std::time::Instant;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use bitcoin::p2p::address::AddrV2;
use bitcoin::p2p::address::AddrV2Message;
use bitcoin::p2p::message_blockdata::Inventory;
use bitcoin::p2p::ServiceFlags;
use bitcoin::Transaction;
use floresta_chain::ChainBackend;
use floresta_common::service_flags;
use rand::distributions::Distribution;
use rand::distributions::WeightedIndex;
use rand::prelude::SliceRandom;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing::warn;
use super::ConnectionKind;
use super::InflightRequests;
use super::LocalPeerView;
use super::NodeRequest;
use super::PeerStatus;
use super::UtreexoNode;
use crate::address_man::AddressState;
use crate::address_man::LocalAddress;
use crate::block_proof::Bitmap;
use crate::node::running_ctx::RunningNode;
use crate::node::try_and_log;
use crate::node_context::NodeContext;
use crate::node_context::PeerId;
use crate::node_interface::NodeResponse;
use crate::node_interface::PeerInfo;
use crate::node_interface::UserRequest;
use crate::p2p_wire::error::WireError;
use crate::p2p_wire::peer::PeerMessages;
use crate::p2p_wire::peer::Version;
#[derive(Debug, Clone)]
pub struct AddedPeerInfo {
pub(crate) address: AddrV2,
pub(crate) port: u16,
pub(crate) v1_fallback: bool,
}
impl<T, Chain> UtreexoNode<Chain, T>
where
T: 'static + Default + NodeContext,
Chain: ChainBackend + 'static,
WireError: From<Chain::Error>,
{
fn choose_peer_by_latency(&self, service: ServiceFlags) -> Option<(&PeerId, &LocalPeerView)> {
const EPS: f64 = 1e-9;
let candidates: Vec<(&PeerId, &LocalPeerView, f64)> = self
.peers
.iter()
.filter(|(_, peer)| peer.services.has(service) && peer.state == PeerStatus::Ready)
.filter_map(|(id, peer)| {
let Some(t) = peer.message_times.value() else {
error!("Peer {peer:?} has no message times");
return None;
};
Some((id, peer, t.max(EPS)))
})
.collect();
let lowest_time = candidates.iter().map(|(_, _, t)| *t).reduce(f64::min)?;
let weights: Vec<f64> = candidates
.iter()
.map(|(_, _, time)| lowest_time / time)
.collect();
let dist = WeightedIndex::new(&weights).ok()?;
let idx = dist.sample(&mut rand::thread_rng());
let (id, peer, _) = candidates[idx];
Some((id, peer))
}
pub(crate) fn connected_peers(&self) -> usize {
self.peers
.values()
.filter(|p| p.state == PeerStatus::Ready && p.is_long_lived())
.count()
}
pub(crate) fn send_to_fast_peer(
&self,
request: NodeRequest,
required_service: ServiceFlags,
) -> Result<PeerId, WireError> {
let (peer_id, peer) = self
.choose_peer_by_latency(required_service)
.ok_or(WireError::NoPeersAvailable)?;
peer.channel.send(request)?;
Ok(*peer_id)
}
#[inline]
pub(crate) fn send_to_random_peer(
&mut self,
req: NodeRequest,
required_service: ServiceFlags,
) -> Result<u32, WireError> {
if self.peers.is_empty() {
return Err(WireError::NoPeersAvailable);
}
let peers = match required_service {
ServiceFlags::NONE => &self.peer_ids,
_ => self
.peer_by_service
.get(&required_service)
.ok_or(WireError::NoPeersAvailable)?,
};
if peers.is_empty() {
return Err(WireError::NoPeersAvailable);
}
let peer = peers
.choose(&mut rand::thread_rng())
.expect("infallible: we checked that peers isn't empty");
self.peers
.get(peer)
.ok_or(WireError::NoPeersAvailable)?
.channel
.send(req)
.map_err(WireError::ChannelSend)?;
Ok(*peer)
}
pub(crate) fn send_to_peer(&self, peer_id: u32, req: NodeRequest) -> Result<(), WireError> {
if let Some(peer) = &self.peers.get(&peer_id) {
if peer.state == PeerStatus::Awaiting {
return Ok(());
}
peer.channel.send(req)?;
}
Ok(())
}
pub(crate) fn broadcast_to_peers(&mut self, request: NodeRequest) {
for peer in self.peers.values() {
if peer.state != PeerStatus::Ready {
continue;
}
if let Err(err) = peer.channel.send(request.clone()) {
warn!("Failed to send request to peer {}: {err}", peer.address);
}
}
}
pub(crate) fn ask_for_addresses(&mut self) -> Result<(), WireError> {
let _ = self.send_to_random_peer(NodeRequest::GetAddresses, ServiceFlags::NONE)?;
Ok(())
}
fn is_peer_good(peer: &LocalPeerView, needs: ServiceFlags) -> bool {
if peer.state == PeerStatus::Banned {
return false;
}
peer.services.has(needs)
}
pub(crate) fn handle_peer_ready(
&mut self,
peer: u32,
mut version: Version,
) -> Result<(), WireError> {
self.inflight.remove(&InflightRequests::Connect(peer));
self.peers.entry(peer).and_modify(|p| {
p.state = PeerStatus::Ready;
});
self.send_to_peer(peer, NodeRequest::GetAddresses)?;
self.inflight
.insert(InflightRequests::GetAddresses, (peer, Instant::now()));
let good_peers_count = self.connected_peers();
if good_peers_count > T::MAX_OUTGOING_PEERS {
let is_utreexo_peer = matches!(version.kind, ConnectionKind::Regular(services) if services.has(service_flags::UTREEXO.into()));
let is_manual_peer = version.kind == ConnectionKind::Manual;
let is_extra = version.kind == ConnectionKind::Extra;
if !(is_utreexo_peer || is_manual_peer || is_extra) {
debug!(
"Already have {} peers, disconnecting peer to avoid blowing up our max of {}",
good_peers_count,
T::MAX_OUTGOING_PEERS
);
self.peers.entry(peer).and_modify(|p| {
p.kind = ConnectionKind::Feeler;
});
version.kind = ConnectionKind::Feeler;
}
}
if version.kind == ConnectionKind::Feeler {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
self.address_man
.update_set_service_flag(version.address_id, version.services)
.update_set_state(version.address_id, AddressState::Tried(now));
return Ok(());
}
if version.kind == ConnectionKind::Extra {
let locator = self.chain.get_block_locator()?;
self.send_to_peer(peer, NodeRequest::GetHeaders(locator))?;
self.inflight
.insert(InflightRequests::Headers, (peer, Instant::now()));
return Ok(());
}
info!(
"New peer id={} version={} blocks={} services={}",
version.id, version.user_agent, version.blocks, version.services
);
if let Some(peer_data) = self.common.peers.get_mut(&peer) {
peer_data.services = version.services;
peer_data.user_agent.clone_from(&version.user_agent);
peer_data.height = version.blocks;
peer_data.transport_protocol = version.transport_protocol;
if let ConnectionKind::Regular(needs) = version.kind {
if !Self::is_peer_good(peer_data, needs) {
info!(
"Disconnecting peer {peer} for not having the required services. has={} needs={}", peer_data.services, needs
);
peer_data.channel.send(NodeRequest::Shutdown)?;
self.address_man.update_set_state(
version.address_id,
AddressState::Tried(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs(),
),
);
self.address_man
.update_set_service_flag(version.address_id, version.services);
return Ok(());
}
};
if peer_data.services.has(service_flags::UTREEXO.into()) {
self.common
.peer_by_service
.entry(service_flags::UTREEXO.into())
.or_default()
.push(peer);
}
if peer_data.services.has(ServiceFlags::COMPACT_FILTERS) {
self.common
.peer_by_service
.entry(ServiceFlags::COMPACT_FILTERS)
.or_default()
.push(peer);
}
if peer_data
.services
.has(service_flags::UTREEXO_ARCHIVE.into())
{
self.common
.peer_by_service
.entry(service_flags::UTREEXO_ARCHIVE.into())
.or_default()
.push(peer);
}
if peer_data.services.has(ServiceFlags::NETWORK) {
self.common
.peer_by_service
.entry(ServiceFlags::NETWORK)
.or_default()
.push(peer);
}
self.address_man
.update_set_state(version.address_id, AddressState::Connected)
.update_set_service_flag(version.address_id, version.services);
self.peer_ids.push(peer);
}
#[cfg(feature = "metrics")]
self.update_peer_metrics();
Ok(())
}
pub(crate) fn handle_notfound_msg(&mut self, inv: Inventory) -> Result<(), WireError> {
match inv {
Inventory::Error => {}
Inventory::Block(block)
| Inventory::WitnessBlock(block)
| Inventory::CompactBlock(block) => {
if let Some(request) = self
.inflight_user_requests
.remove(&UserRequest::Block(block))
{
request
.2
.send(NodeResponse::Block(None))
.map_err(|_| WireError::ResponseSendError)?;
}
}
Inventory::WitnessTransaction(tx) | Inventory::Transaction(tx) => {
if let Some(request) = self
.inflight_user_requests
.remove(&UserRequest::MempoolTransaction(tx))
{
request
.2
.send(NodeResponse::MempoolTransaction(None))
.map_err(|_| WireError::ResponseSendError)?;
}
}
_ => {}
}
Ok(())
}
pub(crate) fn handle_tx_msg(&mut self, tx: Transaction) -> Result<(), WireError> {
let txid = tx.compute_txid();
debug!("saw a mempool transaction with txid={txid}");
if let Some(request) = self
.inflight_user_requests
.remove(&UserRequest::MempoolTransaction(txid))
{
request
.2
.send(NodeResponse::MempoolTransaction(Some(tx)))
.map_err(|_| WireError::ResponseSendError)?;
}
Ok(())
}
pub(crate) fn handle_peer_msg_common(
&mut self,
msg: PeerMessages,
peer: PeerId,
) -> Result<Option<PeerMessages>, WireError> {
match msg {
PeerMessages::Addr(addresses) => {
self.handle_addresses_from_peer(peer, addresses)?;
Ok(None)
}
PeerMessages::NotFound(inv) => {
self.handle_notfound_msg(inv)?;
Ok(None)
}
PeerMessages::Transaction(tx) => {
self.handle_tx_msg(tx)?;
Ok(None)
}
PeerMessages::UtreexoState(_) => {
warn!("Utreexo state received from peer {peer}, but we didn't ask");
self.increase_banscore(peer, 5)?;
Ok(None)
}
_ => Ok(Some(msg)),
}
}
pub(crate) fn handle_disconnection(&mut self, peer: u32, idx: usize) -> Result<(), WireError> {
if let Some(p) = self.peers.remove(&peer) {
if p.is_long_lived() && p.state == PeerStatus::Ready {
info!("Peer disconnected: {peer}");
}
std::mem::drop(p.channel);
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
match p.state {
PeerStatus::Ready => {
self.address_man
.update_set_state(idx, AddressState::Tried(now));
}
PeerStatus::Awaiting => {
self.address_man
.update_set_state(idx, AddressState::Failed(now));
}
PeerStatus::Banned => {
self.address_man
.update_set_state(idx, AddressState::Banned(RunningNode::BAN_TIME));
}
}
}
self.peer_ids.retain(|&id| id != peer);
for (_, v) in self.peer_by_service.iter_mut() {
v.retain(|&id| id != peer);
}
let inflight = self
.inflight
.clone()
.into_iter()
.filter(|(_k, v)| v.0 == peer)
.collect::<Vec<_>>();
for req in inflight {
self.inflight.remove(&req.0);
if let Err(e) = self.redo_inflight_request(&req.0) {
self.inflight.insert(req.0, req.1);
return Err(e);
}
}
#[cfg(feature = "metrics")]
self.update_peer_metrics();
Ok(())
}
pub(crate) fn increase_banscore(&mut self, peer_id: u32, factor: u32) -> Result<(), WireError> {
let Some(peer) = self.common.peers.get_mut(&peer_id) else {
return Ok(());
};
if peer.is_manual_peer() {
return Ok(());
}
peer.banscore += factor;
let is_misbehaving = peer.banscore >= self.common.max_banscore;
let is_extra = peer.kind == ConnectionKind::Extra;
if is_misbehaving || is_extra {
warn!("banning peer {peer_id} for misbehaving");
self.disconnect_and_ban(peer_id)?;
return Ok(());
}
debug!("increasing banscore for peer {peer_id}");
Ok(())
}
pub(crate) fn disconnect_and_ban(&mut self, peer: PeerId) -> Result<(), WireError> {
if let Some(peer) = self.peers.get_mut(&peer) {
if peer.is_manual_peer() {
return Ok(());
}
peer.state = PeerStatus::Banned;
}
self.send_to_peer(peer, NodeRequest::Shutdown)?;
Ok(())
}
pub(crate) fn check_for_timeout(&mut self) -> Result<(), WireError> {
let now = Instant::now();
let timed_out_fn = |req: &InflightRequests, time: &Instant| match req {
InflightRequests::Connect(_)
if now.duration_since(*time).as_secs() > T::CONNECTION_TIMEOUT =>
{
Some(req.clone())
}
_ if now.duration_since(*time).as_secs() > T::REQUEST_TIMEOUT => Some(req.clone()),
_ => None,
};
let timed_out = self
.inflight
.iter()
.filter_map(|(req, (_, time))| timed_out_fn(req, time))
.collect::<Vec<_>>();
for req in timed_out {
let Some((peer, time)) = self.inflight.remove(&req) else {
continue;
};
if let Some(peer_data) = self.peers.get(&peer) {
if peer_data.kind == ConnectionKind::Feeler {
debug!("Feeler peer {peer} timed out request");
self.send_to_peer(peer, NodeRequest::Shutdown)?;
self.peers.remove(&peer);
continue;
}
}
if let InflightRequests::Connect(_) = req {
let _ = self.send_to_peer(peer, NodeRequest::Shutdown);
self.peers.remove(&peer);
continue;
}
debug!("Request timed out: {req:?}");
try_and_log!(self.increase_banscore(peer, 1));
if let Err(e) = self.redo_inflight_request(&req) {
self.inflight.insert(req, (peer, time));
return Err(e);
}
}
Ok(())
}
pub(crate) fn handle_addresses_from_peer(
&mut self,
peer: u32,
addresses: Vec<AddrV2Message>,
) -> Result<(), WireError> {
self.inflight.remove(&InflightRequests::GetAddresses);
debug!("Got {} addresses from peer {}", addresses.len(), peer);
let addresses: Vec<_> = addresses.into_iter().map(|addr| addr.into()).collect();
self.address_man.push_addresses(&addresses);
let Some(peer_data) = self.peers.get(&peer) else {
return Ok(());
};
if matches!(peer_data.kind, ConnectionKind::Feeler) {
self.send_to_peer(peer, NodeRequest::Shutdown)?;
}
Ok(())
}
pub(crate) fn redo_inflight_request(
&mut self,
req: &InflightRequests,
) -> Result<(), WireError> {
match req {
InflightRequests::UtreexoProof(block_hash) => {
if !self.has_utreexo_peers() {
return Ok(());
}
if !self.blocks.contains_key(block_hash) {
return Ok(());
}
if self.inflight.contains_key(req) {
return Ok(());
}
let peer = self.send_to_fast_peer(
NodeRequest::GetBlockProof((*block_hash, Bitmap::new(), Bitmap::new())),
service_flags::UTREEXO.into(),
)?;
self.inflight.insert(
InflightRequests::UtreexoProof(*block_hash),
(peer, Instant::now()),
);
}
InflightRequests::Blocks(block) => {
self.request_blocks(vec![*block])?;
}
InflightRequests::Headers => {
let locator = self.chain.get_block_locator()?;
let peer =
self.send_to_fast_peer(NodeRequest::GetHeaders(locator), ServiceFlags::NONE)?;
self.inflight
.insert(InflightRequests::Headers, (peer, Instant::now()));
}
InflightRequests::UtreexoState(_) => {
let peer = self.send_to_fast_peer(
NodeRequest::GetUtreexoState((self.chain.get_block_hash(0).unwrap(), 0)),
service_flags::UTREEXO.into(),
)?;
self.inflight
.insert(InflightRequests::UtreexoState(peer), (peer, Instant::now()));
}
InflightRequests::GetFilters => {
if !self.has_compact_filters_peer() {
return Ok(());
}
let peer = self.send_to_fast_peer(
NodeRequest::GetFilter((self.chain.get_block_hash(0).unwrap(), 0)),
ServiceFlags::COMPACT_FILTERS,
)?;
self.inflight
.insert(InflightRequests::GetFilters, (peer, Instant::now()));
}
InflightRequests::Connect(_) | InflightRequests::GetAddresses => {
}
}
Ok(())
}
pub(crate) fn save_peers(&self) -> Result<(), WireError> {
self.address_man
.dump_peers(&self.datadir)
.map_err(WireError::Io)
}
pub(crate) fn save_utreexo_peers(&self) -> Result<(), WireError> {
let peers: &Vec<u32> = self
.peer_by_service
.get(&service_flags::UTREEXO.into())
.ok_or(WireError::NoUtreexoPeersAvailable)?;
let peers_usize: Vec<usize> = peers.iter().map(|&peer| peer as usize).collect();
if peers_usize.is_empty() {
warn!("No connected Utreexo peers to save to disk");
return Ok(());
}
info!("Saving utreexo peers to disk...");
self.address_man
.dump_utreexo_peers(&self.datadir, &peers_usize)
.map_err(WireError::Io)
}
pub(crate) fn register_message_time(
&mut self,
notification: &PeerMessages,
peer: PeerId,
read_at: Instant,
) -> Option<()> {
let sent_at = match notification {
PeerMessages::Block(block) => {
let inflight = self
.inflight
.get(&InflightRequests::Blocks(block.block_hash()))?;
inflight.1
}
PeerMessages::Ready(_) => {
let inflight = self.inflight.get(&InflightRequests::Connect(peer))?;
inflight.1
}
PeerMessages::Headers(_) => {
let inflight = self.inflight.get(&InflightRequests::Headers)?;
inflight.1
}
PeerMessages::BlockFilter((_, _)) => {
let inflight = self.inflight.get(&InflightRequests::GetFilters)?;
inflight.1
}
PeerMessages::UtreexoState(_) => {
let inflight = self.inflight.get(&InflightRequests::UtreexoState(peer))?;
inflight.1
}
_ => return None,
};
let elapsed = read_at.duration_since(sent_at).as_secs_f64();
if let Some(peer) = self.peers.get_mut(&peer) {
peer.message_times.add(elapsed * 1_000.0); }
#[cfg(feature = "metrics")]
{
use metrics::get_metrics;
let metrics = get_metrics();
metrics.message_times.observe(elapsed);
}
Some(())
}
#[cfg(feature = "metrics")]
pub(crate) fn update_peer_metrics(&self) {
use metrics::get_metrics;
let metrics = get_metrics();
metrics.peer_count.set(self.peer_ids.len() as f64);
}
pub(crate) fn has_utreexo_peers(&self) -> bool {
!self
.peer_by_service
.get(&service_flags::UTREEXO.into())
.unwrap_or(&Vec::new())
.is_empty()
}
pub(crate) fn has_compact_filters_peer(&self) -> bool {
self.peer_by_service
.get(&ServiceFlags::COMPACT_FILTERS)
.map(|peers| !peers.is_empty())
.unwrap_or(false)
}
pub(crate) fn get_peer_info(&self, peer_id: &u32) -> Option<PeerInfo> {
let peer = self.peers.get(peer_id)?;
Some(PeerInfo {
id: *peer_id,
address: SocketAddr::new(peer.address, peer.port),
services: peer.services,
user_agent: peer.user_agent.clone(),
initial_height: peer.height,
state: peer.state,
kind: peer.kind,
transport_protocol: peer.transport_protocol,
})
}
pub(crate) fn to_addr_v2(&self, addr: IpAddr) -> AddrV2 {
match addr {
IpAddr::V4(addr) => AddrV2::Ipv4(addr),
IpAddr::V6(addr) => AddrV2::Ipv6(addr),
}
}
pub fn handle_addnode_add_peer(
&mut self,
addr: IpAddr,
port: u16,
v2_transport: bool,
) -> Result<(), WireError> {
debug!("Adding node {addr}:{port}");
let address = self.to_addr_v2(addr);
if self
.added_peers
.iter()
.any(|info| address == info.address && port == info.port)
{
return Err(WireError::PeerAlreadyExists(addr, port));
}
let mut local_address = LocalAddress::from(address.clone());
local_address.set_services(ServiceFlags::NETWORK_LIMITED | ServiceFlags::WITNESS);
self.address_man.push_addresses(&[local_address]);
self.added_peers.push(AddedPeerInfo {
address,
port,
v1_fallback: !v2_transport,
});
self.maybe_open_connection_with_added_peers()
}
pub fn handle_addnode_remove_peer(&mut self, addr: IpAddr, port: u16) -> Result<(), WireError> {
debug!("Trying to remove peer {addr}:{port}");
let address = self.to_addr_v2(addr);
let index = self
.added_peers
.iter()
.position(|info| address == info.address && port == info.port);
match index {
Some(peer_id) => self.added_peers.remove(peer_id),
None => return Err(WireError::PeerNotFoundAtAddress(addr, port)),
};
Ok(())
}
pub fn handle_disconnect_peer(&mut self, addr: IpAddr, port: u16) -> Result<(), WireError> {
let index = self
.peers
.iter()
.find(|(_, peer)| addr == peer.address && port == peer.port)
.map(|(&peer_id, _)| peer_id);
match index {
Some(peer_id) => {
self.send_to_peer(peer_id, NodeRequest::Shutdown)?;
Ok(())
}
None => Err(WireError::PeerNotFoundAtAddress(addr, port)),
}
}
pub fn handle_addnode_onetry_peer(
&mut self,
addr: IpAddr,
port: u16,
v2_transport: bool,
) -> Result<(), WireError> {
debug!("Creating an one-try connection with {addr}:{port}");
if self
.peers
.iter()
.any(|(_, peer)| addr == peer.address && port == peer.port)
{
return Err(WireError::PeerAlreadyExists(addr, port));
}
let kind = ConnectionKind::Manual;
let address_v2 = self.to_addr_v2(addr);
let mut local_address = LocalAddress::from(address_v2.clone());
local_address.set_port(port);
local_address.set_services(ServiceFlags::NETWORK_LIMITED | ServiceFlags::WITNESS);
self.address_man.push_addresses(&[local_address.clone()]);
self.open_connection(kind, local_address.id, local_address, !v2_transport)
}
}