1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
use thiserror::Error;
use tokio::sync::watch;
use tower::{timeout::Timeout, Service, ServiceExt};
use zebra_network as zn;
use zebra_state::ChainTipChange;
use crate::BoxError;
use super::{SyncStatus, TIPS_RESPONSE_TIMEOUT};
use BlockGossipError::*;
#[derive(Error, Debug)]
pub enum BlockGossipError {
#[error("chain tip sender was dropped")]
TipChange(watch::error::RecvError),
#[error("sync status sender was dropped")]
SyncStatus(watch::error::RecvError),
#[error("permanent peer set failure")]
PeerSetReadiness(zn::BoxError),
}
pub async fn gossip_best_tip_block_hashes<ZN>(
mut sync_status: SyncStatus,
mut chain_state: ChainTipChange,
broadcast_network: ZN,
) -> Result<(), BlockGossipError>
where
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
ZN::Future: Send,
{
info!("initializing block gossip task");
let mut broadcast_network = Timeout::new(broadcast_network, TIPS_RESPONSE_TIMEOUT);
loop {
let tip_action = chain_state.wait_for_tip_change().await.map_err(TipChange)?;
sync_status
.wait_until_close_to_tip()
.await
.map_err(SyncStatus)?;
let tip_action = chain_state.last_tip_change().unwrap_or(tip_action);
let request = zn::Request::AdvertiseBlock(tip_action.best_tip_hash());
let height = tip_action.best_tip_height();
debug!(?height, ?request, "sending committed block broadcast");
let _ = broadcast_network
.ready()
.await
.map_err(PeerSetReadiness)?
.call(request)
.await;
}
}