1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
// Copyright (c) The Diem Core Contributors
// SPDX-License-Identifier: Apache-2.0

#![forbid(unsafe_code)]

mod commit_check;
mod debug_interface_log_tail;
mod fullnode_check;
mod liveness_check;
mod log_tail;

use crate::cluster::Cluster;
use anyhow::{bail, Result};
use async_trait::async_trait;
pub use commit_check::CommitHistoryHealthCheck;
pub use debug_interface_log_tail::DebugPortLogWorker;
pub use fullnode_check::FullNodeHealthCheck;
use itertools::Itertools;
pub use liveness_check::LivenessHealthCheck;
pub use log_tail::LogTail;
use std::{
    collections::{HashMap, HashSet},
    env, fmt,
    time::{Duration, Instant},
};
use termion::color::*;

#[derive(Clone, Debug)]
pub struct Commit {
    commit: String,
    epoch: u64,
    round: u64,
    parent: String,
}

impl Commit {
    pub fn epoch_and_round(&self) -> (u64, u64) {
        (self.epoch, self.round)
    }
}

#[derive(Clone, Debug)]
pub enum Event {
    Commit(Commit),
    ConsensusStarted,
}

#[derive(Clone)]
pub struct ValidatorEvent {
    validator: String,
    timestamp: Duration,
    received_timestamp: Duration,
    event: Event,
}

impl fmt::Debug for ValidatorEvent {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(
            f,
            "recv: {}; {} {} {:?}",
            self.received_timestamp.as_millis(),
            self.timestamp.as_millis(),
            self.validator,
            self.event
        )
    }
}

#[async_trait]
pub trait HealthCheck: Send {
    /// Verify specific event
    fn on_event(&mut self, _event: &ValidatorEvent, _ctx: &mut HealthCheckContext) {}
    /// Periodic verification (happens even if when no events produced)
    async fn verify(&mut self, _ctx: &mut HealthCheckContext);
    /// Optionally marks validator as failed, requiring waiting for at least one event from it to
    /// mark it as healthy again
    fn invalidate(&mut self, _validator: &str) {}
    /// Clean is invoked when cluster is wiped
    /// This means that checks like commit history check should wipe internal state
    fn clear(&mut self) {}

    fn name(&self) -> &'static str;
}

pub struct HealthCheckRunner {
    cluster: Cluster,
    health_checks: Vec<Box<dyn HealthCheck>>,
    debug: bool,
}

impl HealthCheckRunner {
    pub fn new(cluster: Cluster, health_checks: Vec<Box<dyn HealthCheck>>) -> Self {
        Self {
            cluster,
            health_checks,
            debug: env::var("HEALTH_CHECK_DEBUG").is_ok(),
        }
    }

    pub fn new_all(cluster: Cluster) -> Self {
        let liveness_health_check = LivenessHealthCheck::new(&cluster);
        let fullnode_check = FullNodeHealthCheck::new(cluster.clone());
        Self::new(
            cluster,
            vec![
                Box::new(CommitHistoryHealthCheck::new()),
                Box::new(liveness_health_check),
                Box::new(fullnode_check),
            ],
        )
    }

    /// Takes a list of affected_validators. If there are validators which failed
    /// which were not part of the experiment, then it returns an Err with a string
    /// of all the unexpected failures.
    /// Otherwise, it returns a list of ALL the failed validators
    /// It also takes print_failures parameter that controls level of verbosity of health check
    pub async fn run(
        &mut self,
        events: &[ValidatorEvent],
        affected_validators_set: &HashSet<String>,
        print_failures: PrintFailures,
    ) -> Result<Vec<String>> {
        let mut node_health = HashMap::new();
        for instance in self.cluster.validator_instances() {
            node_health.insert(instance.peer_name().clone(), true);
        }
        let mut messages = vec![];

        let mut context = HealthCheckContext::new();
        for health_check in self.health_checks.iter_mut() {
            let start = Instant::now();
            for event in events {
                health_check.on_event(event, &mut context);
            }
            let events_processed = Instant::now();
            health_check.verify(&mut context).await;
            let verified = Instant::now();
            if self.debug {
                messages.push(format!(
                    "{} {}, on_event time: {}ms, verify time: {}ms, events: {}",
                    diem_infallible::duration_since_epoch().as_millis(),
                    health_check.name(),
                    (events_processed - start).as_millis(),
                    (verified - events_processed).as_millis(),
                    events.len(),
                ));
            }
        }
        for err in context.err_acc {
            node_health.insert(err.validator.clone(), false);
            messages.push(format!(
                "{} {:?}",
                diem_infallible::duration_since_epoch().as_millis(),
                err
            ));
        }

        let mut failed = vec![];
        let mut validators_message = "".to_string();
        use std::fmt::Write as _;
        for (i, (node, healthy)) in node_health.into_iter().sorted().enumerate() {
            if healthy {
                let _ = write!(
                    validators_message,
                    "{}* {}{}   ",
                    Fg(Green),
                    node,
                    Fg(Reset)
                );
            } else {
                let _ = write!(validators_message, "{}* {}{}   ", Fg(Red), node, Fg(Reset));
                failed.push(node);
            }
            if (i + 1) % 15 == 0 {
                validators_message.push('\n');
            }
        }
        messages.push(validators_message);
        messages.push(String::new());
        messages.push(String::new());

        let affected_validators_set_refs: HashSet<_> = affected_validators_set.iter().collect();
        let failed_set: HashSet<_> = failed.iter().collect();
        let has_unexpected_failures = !failed_set.is_subset(&affected_validators_set_refs);

        if print_failures.should_print(has_unexpected_failures) {
            messages.iter().for_each(|m| println!("{}", m));
        }

        if has_unexpected_failures {
            let unexpected_failures = failed_set
                .difference(&affected_validators_set_refs)
                .join(",");
            bail!(unexpected_failures);
        }
        Ok(failed)
    }

    pub fn invalidate(&mut self, validator: &str) {
        for hc in self.health_checks.iter_mut() {
            hc.invalidate(validator);
        }
    }

    pub fn clear(&mut self) {
        for hc in self.health_checks.iter_mut() {
            hc.clear();
        }
    }
}

pub enum PrintFailures {
    None,
    UnexpectedOnly,
    All,
}

impl PrintFailures {
    fn should_print(&self, has_unexpected_failures: bool) -> bool {
        match self {
            PrintFailures::None => false,
            PrintFailures::UnexpectedOnly => has_unexpected_failures,
            PrintFailures::All => true,
        }
    }
}

pub struct HealthCheckContext {
    now: Duration,
    err_acc: Vec<HealthCheckError>,
}

#[derive(Debug)]
pub struct HealthCheckError {
    pub validator: String,
    pub message: String,
}

impl HealthCheckContext {
    pub fn new() -> Self {
        let now = diem_infallible::duration_since_epoch();
        Self {
            now,
            err_acc: vec![],
        }
    }

    pub fn now(&self) -> Duration {
        self.now
    }

    pub fn report_failure(&mut self, validator: String, message: String) {
        self.err_acc.push(HealthCheckError { validator, message })
    }
}

impl Default for HealthCheckContext {
    fn default() -> Self {
        Self::new()
    }
}