1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
// Copyright (c) The Diem Core Contributors
// SPDX-License-Identifier: Apache-2.0

//! `conn_notifs_channel` is a channel which delivers to the receiver only the last of N
//! messages that might have been sent by sender(s) since the last poll. The items are separated
//! using a key that is provided by the sender with each message.
//!
//! It provides an mpsc channel which has two ends `conn_notifs_channel::Receiver`
//! and `conn_notifs_channel::Sender` which behave similarly to existing mpsc data structures.

use crate::peer_manager::ConnectionNotification;
use channel::{diem_channel, message_queues::QueueStyle};
use diem_types::PeerId;

pub type Sender = diem_channel::Sender<PeerId, ConnectionNotification>;
pub type Receiver = diem_channel::Receiver<PeerId, ConnectionNotification>;

pub fn new() -> (Sender, Receiver) {
    diem_channel::new(QueueStyle::LIFO, 1, None)
}

#[cfg(test)]
mod test {
    use super::*;
    use crate::{peer::DisconnectReason, transport::ConnectionMetadata};
    use diem_config::network_id::NetworkContext;
    use futures::{executor::block_on, future::FutureExt, stream::StreamExt};

    fn send_new_peer(sender: &mut Sender, connection: ConnectionMetadata) {
        let peer_id = connection.remote_peer_id;
        let notif =
            ConnectionNotification::NewPeer(connection, NetworkContext::mock_with_peer_id(peer_id));
        sender.push(peer_id, notif).unwrap()
    }

    fn send_lost_peer(
        sender: &mut Sender,
        connection: ConnectionMetadata,
        reason: DisconnectReason,
    ) {
        let peer_id = connection.remote_peer_id;
        let notif = ConnectionNotification::LostPeer(
            connection,
            NetworkContext::mock_with_peer_id(peer_id),
            reason,
        );
        sender.push(peer_id, notif).unwrap()
    }

    #[test]
    fn send_n_get_1() {
        let (mut sender, mut receiver) = super::new();
        let peer_id_a = PeerId::random();
        let peer_id_b = PeerId::random();
        let task = async move {
            let conn_a = ConnectionMetadata::mock(peer_id_a);
            let conn_b = ConnectionMetadata::mock(peer_id_b);
            send_new_peer(&mut sender, conn_a.clone());
            send_lost_peer(
                &mut sender,
                conn_a.clone(),
                DisconnectReason::ConnectionLost,
            );
            send_new_peer(&mut sender, conn_a.clone());
            send_lost_peer(&mut sender, conn_a.clone(), DisconnectReason::Requested);

            // Ensure that only the last message is received.
            let notif = ConnectionNotification::LostPeer(
                conn_a.clone(),
                NetworkContext::mock_with_peer_id(peer_id_a),
                DisconnectReason::Requested,
            );
            assert_eq!(receiver.select_next_some().await, notif,);
            // Ensures that there is no other value which is ready
            assert_eq!(receiver.select_next_some().now_or_never(), None);

            send_new_peer(&mut sender, conn_a);
            send_new_peer(&mut sender, conn_b);

            // Assert that we receive 2 updates, since they are sent for different peers.
            let _ = receiver.select_next_some().await;
            let _ = receiver.select_next_some().await;
            // Ensures that there is no other value which is ready
            assert_eq!(receiver.select_next_some().now_or_never(), None);
        };
        block_on(task);
    }
}