1#[cfg(feature = "runtime_support")]
5use std::marker::Unpin;
6#[cfg(feature = "runtime_support")]
7use std::panic::AssertUnwindSafe;
8use std::time::Duration;
9#[cfg(feature = "runtime_support")]
10use std::time::SystemTime;
11
12#[cfg(feature = "runtime_support")]
13use dfir_rs::Never;
14#[cfg(feature = "runtime_support")]
15use dfir_rs::scheduled::metrics::{DfirMetrics, DfirMetricsIntervals};
16#[cfg(feature = "runtime_support")]
17use futures::FutureExt;
18use quote::quote;
19#[cfg(feature = "runtime_support")]
20use serde_json::json;
21use syn::parse_quote;
22#[cfg(feature = "runtime_support")]
23use tokio::io::{AsyncWrite, AsyncWriteExt};
24#[cfg(feature = "runtime_support")]
25use tokio_metrics::RuntimeMetrics;
26
27use crate::location::{LocationKey, LocationType};
28use crate::staging_util::get_this_crate;
29use crate::telemetry::Sidecar;
30
31pub const DEFAULT_FILE_PATH: &str = "/var/log/hydro/metrics.log";
33pub const DEFAULT_INTERVAL: Duration = Duration::from_secs(30);
35
36pub struct RecordMetricsSidecar {
38 file_path: String,
39 interval: Duration,
40}
41
42#[buildstructor::buildstructor]
43impl RecordMetricsSidecar {
44 #[builder]
46 pub fn new(file_path: Option<String>, interval: Option<Duration>) -> Self {
47 Self {
48 file_path: file_path.unwrap_or_else(|| DEFAULT_FILE_PATH.to_owned()),
49 interval: interval.unwrap_or(DEFAULT_INTERVAL),
50 }
51 }
52}
53
54impl Sidecar for RecordMetricsSidecar {
55 fn to_expr(
56 &self,
57 flow_name: &str,
58 _location_key: LocationKey,
59 _location_type: LocationType,
60 location_name: &str,
61 dfir_ident: &syn::Ident,
62 ) -> syn::Expr {
63 let Self {
64 file_path,
65 interval,
66 } = self;
67
68 let root = get_this_crate();
69 let namespace = flow_name.replace(char::is_whitespace, "_");
70 let interval: proc_macro2::TokenStream = {
71 let secs = interval.as_secs();
72 let nanos = interval.subsec_nanos();
73 quote!(::std::time::Duration::new(#secs, #nanos))
74 };
75
76 parse_quote! {
77 #root::telemetry::emf::record_metrics_sidecar(#dfir_ident.metrics_intervals(), #namespace, #location_name, #file_path, #interval)
78 }
79 }
80}
81
82#[cfg(feature = "runtime_support")]
84#[doc(hidden)]
85pub fn record_metrics_sidecar(
86 mut dfir_intervals: DfirMetricsIntervals,
87 namespace: &'static str,
88 location_name: &'static str,
89 file_path: &'static str,
90 interval: Duration,
91) -> impl 'static + Future<Output = Never> {
92 assert!(!namespace.contains(char::is_whitespace));
93
94 async move {
95 if let Some(parent_dir) = std::path::Path::new(file_path).parent()
97 && let Err(e) = tokio::fs::create_dir_all(parent_dir).await
98 {
99 eprintln!("Failed to create log file directory for EMF metrics: {}", e);
101 }
102
103 let rt_monitor = tokio_metrics::RuntimeMonitor::new(&tokio::runtime::Handle::current());
105 let mut rt_intervals = rt_monitor.intervals();
106
107 loop {
108 let _ = tokio::time::sleep(interval).await;
109
110 let dfir_metrics = dfir_intervals.take_interval();
111 let rt_metrics = rt_intervals.next().unwrap();
112
113 let unwind_result = AssertUnwindSafe(async {
114 let timestamp = SystemTime::now();
115
116 let file = tokio::fs::OpenOptions::new()
117 .write(true)
118 .create(true)
119 .truncate(false)
120 .append(true)
121 .open(file_path)
122 .await
123 .expect("Failed to open log file for EMF metrics.");
124 let mut writer = tokio::io::BufWriter::new(file);
125
126 record_metrics_dfir(
127 namespace,
128 location_name,
129 timestamp,
130 dfir_metrics,
131 &mut writer,
132 )
133 .await
134 .unwrap();
135
136 record_metrics_tokio(namespace, location_name, timestamp, rt_metrics, &mut writer)
137 .await
138 .unwrap();
139
140 writer.shutdown().await.unwrap();
141 })
142 .catch_unwind()
143 .await;
144
145 if let Err(panic_reason) = unwind_result {
146 eprintln!("Panic in metrics sidecar: {panic_reason:?}");
148 }
149 }
150 }
151}
152
153#[cfg(feature = "runtime_support")]
154async fn record_metrics_dfir<W>(
156 namespace: &str,
157 location_name: &str,
158 timestamp: SystemTime,
159 metrics: DfirMetrics,
160 writer: &mut W,
161) -> Result<(), std::io::Error>
162where
163 W: AsyncWrite + Unpin,
164{
165 let ts_millis = timestamp
166 .duration_since(SystemTime::UNIX_EPOCH)
167 .unwrap()
168 .as_millis();
169
170 for (hoff_id, hoff_metrics) in metrics.handoffs.iter() {
172 let emf = json!({
173 "_aws": {
174 "Timestamp": ts_millis,
175 "CloudWatchMetrics": [
176 {
177 "Namespace": namespace,
178 "Dimensions": [["LocationName"], ["LocationName", "HandoffId"]],
179 "Metrics": [
180 {"Name": "CurrItemsCount", "Unit": Unit::Count},
181 {"Name": "TotalItemsCount", "Unit": Unit::Count},
182 ]
183 }
184 ]
185 },
186 "LocationName": location_name,
187 "HandoffId": format!("{:?}", hoff_id),
188 "CurrItemsCount": hoff_metrics.curr_items_count(),
189 "TotalItemsCount": hoff_metrics.total_items_count(),
190 })
191 .to_string();
192 writer.write_all(emf.as_bytes()).await?;
193 writer.write_u8(b'\n').await?;
194 }
195
196 for (sg_id, sg_metrics) in metrics.subgraphs.iter() {
198 let emf = json!({
199 "_aws": {
200 "Timestamp": ts_millis,
201 "CloudWatchMetrics": [
202 {
203 "Namespace": namespace,
204 "Dimensions": [["LocationName"], ["LocationName", "SubgraphId"]],
205 "Metrics": [
206 {"Name": "TotalRunCount", "Unit": Unit::Count},
207 {"Name": "TotalPollDuration", "Unit": Unit::Microseconds},
208 {"Name": "TotalPollCount", "Unit": Unit::Count},
209 {"Name": "TotalIdleDuration", "Unit": Unit::Microseconds},
210 {"Name": "TotalIdleCount", "Unit": Unit::Count},
211 ]
212 }
213 ]
214 },
215 "LocationName": location_name,
216 "SubgraphId": format!("{:?}", sg_id),
217 "TotalRunCount": sg_metrics.total_run_count(),
218 "TotalPollDuration": sg_metrics.total_poll_duration().as_micros(),
219 "TotalPollCount": sg_metrics.total_poll_count(),
220 "TotalIdleDuration": sg_metrics.total_idle_duration().as_micros(),
221 "TotalIdleCount": sg_metrics.total_idle_count(),
222 })
223 .to_string();
224 writer.write_all(emf.as_bytes()).await?;
225 writer.write_u8(b'\n').await?;
226 }
227
228 Ok(())
229}
230
231#[cfg(feature = "runtime_support")]
232async fn record_metrics_tokio<W>(
234 namespace: &str,
235 location_name: &str,
236 timestamp: SystemTime,
237 rt_metrics: RuntimeMetrics,
238 writer: &mut W,
239) -> Result<(), std::io::Error>
240where
241 W: AsyncWrite + Unpin,
242{
243 let ts_millis = timestamp
244 .duration_since(SystemTime::UNIX_EPOCH)
245 .unwrap()
246 .as_millis();
247
248 let emf = json!({
250 "_aws": {
251 "Timestamp": ts_millis,
252 "CloudWatchMetrics": [
253 {
254 "Namespace": namespace,
255 "Dimensions": [["LocationName"]],
256 "Metrics": [
257 {"Name": "TotalBusyDuration", "Unit": Unit::Microseconds},
259 {"Name": "GlobalQueueDepth", "Unit": Unit::Count},
260 ]
261 }
262 ]
263 },
264 "LocationName": location_name,
265 "TotalBusyDuration": rt_metrics.total_busy_duration.as_micros(),
267 "GlobalQueueDepth": rt_metrics.global_queue_depth,
268 })
270 .to_string();
271 writer.write_all(emf.as_bytes()).await?;
272 writer.write_u8(b'\n').await?;
273
274 Ok(())
275}
276
277#[expect(missing_docs, reason = "self-explanatory")]
281#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
282pub enum Unit {
283 #[default]
285 None,
286 Seconds,
287 Microseconds,
288 Milliseconds,
289 Bytes,
290 Kilobytes,
291 Megabytes,
292 Gigabytes,
293 Terabytes,
294 Bits,
295 Kilobits,
296 Megabits,
297 Gigabits,
298 Terabits,
299 Percent,
300 Count,
301 #[serde(rename = "Bytes/Second")]
302 BytesPerSecond,
303 #[serde(rename = "Kilobytes/Second")]
304 KilobytesPerSecond,
305 #[serde(rename = "Megabytes/Second")]
306 MegabytesPerSecond,
307 #[serde(rename = "Gigabytes/Second")]
308 GigabytesPerSecond,
309 #[serde(rename = "Terabytes/Second")]
310 TerabytesPerSecond,
311 #[serde(rename = "Bits/Second")]
312 BitsPerSecond,
313 #[serde(rename = "Kilobits/Second")]
314 KilobitsPerSecond,
315 #[serde(rename = "Megabits/Second")]
316 MegabitsPerSecond,
317 #[serde(rename = "Gigabits/Second")]
318 GigabitsPerSecond,
319 #[serde(rename = "Terabits/Second")]
320 TerabitsPerSecond,
321 #[serde(rename = "Count/Second")]
322 CountPerSecond,
323}