1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// Copyright (c) The Diem Core Contributors
// SPDX-License-Identifier: Apache-2.0

use crate::{
    connectivity_manager::{ConnectivityManager, ConnectivityRequest},
    counters,
    peer_manager::{conn_notifs_channel, ConnectionRequestSender},
};
use diem_config::{config::PeerSet, network_id::NetworkContext};
use diem_infallible::RwLock;
use diem_time_service::TimeService;
use std::{sync::Arc, time::Duration};
use tokio::runtime::Handle;
use tokio_retry::strategy::ExponentialBackoff;

pub type ConnectivityManagerService = ConnectivityManager<ExponentialBackoff>;

pub struct ConnectivityManagerBuilder {
    connectivity_manager: Option<ConnectivityManagerService>,
    conn_mgr_reqs_tx: channel::Sender<ConnectivityRequest>,
}

impl ConnectivityManagerBuilder {
    pub fn create(
        network_context: Arc<NetworkContext>,
        time_service: TimeService,
        eligible: Arc<RwLock<PeerSet>>,
        seeds: PeerSet,
        connectivity_check_interval_ms: u64,
        backoff_base: u64,
        max_connection_delay_ms: u64,
        channel_size: usize,
        connection_reqs_tx: ConnectionRequestSender,
        connection_notifs_rx: conn_notifs_channel::Receiver,
        outbound_connection_limit: Option<usize>,
        mutual_authentication: bool,
    ) -> Self {
        let (conn_mgr_reqs_tx, conn_mgr_reqs_rx) = channel::new(
            channel_size,
            &counters::PENDING_CONNECTIVITY_MANAGER_REQUESTS,
        );

        Self {
            conn_mgr_reqs_tx,
            connectivity_manager: Some(ConnectivityManager::new(
                network_context,
                time_service,
                eligible,
                seeds,
                connection_reqs_tx,
                connection_notifs_rx,
                conn_mgr_reqs_rx,
                Duration::from_millis(connectivity_check_interval_ms),
                ExponentialBackoff::from_millis(backoff_base).factor(1000),
                Duration::from_millis(max_connection_delay_ms),
                outbound_connection_limit,
                mutual_authentication,
            )),
        }
    }

    pub fn conn_mgr_reqs_tx(&self) -> channel::Sender<ConnectivityRequest> {
        self.conn_mgr_reqs_tx.clone()
    }

    pub fn start(&mut self, executor: &Handle) {
        let conn_mgr = self
            .connectivity_manager
            .take()
            .expect("Service Must be present");
        executor.spawn(conn_mgr.start());
    }
}