Skip to main content

hydro_lang/sim/
compiled.rs

1//! Interfaces for compiled Hydro simulators and concrete simulation instances.
2
3use core::{fmt, panic};
4use std::cell::{Cell, RefCell};
5use std::collections::{HashMap, VecDeque};
6use std::fmt::Debug;
7use std::panic::RefUnwindSafe;
8use std::path::Path;
9use std::pin::{Pin, pin};
10use std::rc::Rc;
11use std::task::ready;
12
13use bytes::Bytes;
14use colored::Colorize;
15use dfir_rs::scheduled::context::DfirErased;
16use futures::{Stream, StreamExt};
17use libloading::Library;
18use serde::Serialize;
19use serde::de::DeserializeOwned;
20use tempfile::TempPath;
21use tokio::sync::mpsc::UnboundedSender;
22use tokio::sync::{Mutex, Notify};
23use tokio_stream::wrappers::UnboundedReceiverStream;
24
25use super::runtime::{Hooks, InlineHooks};
26use super::{SimClusterReceiver, SimClusterSender, SimReceiver, SimSender};
27use crate::compile::builder::ExternalPortId;
28use crate::live_collections::stream::{ExactlyOnce, NoOrder, Ordering, Retries, TotalOrder};
29use crate::location::dynamic::LocationId;
30use crate::sim::graph::{SimExternalPort, SimExternalPortRegistry};
31use crate::sim::runtime::SimHook;
32
33struct QuiescenceState {
34    /// Set to true when the scheduler reaches quiescence; reset to false when new input is sent.
35    quiescent: Cell<bool>,
36    /// Notified when the scheduler reaches quiescence (wakes receivers waiting for data).
37    quiescence_notify: Notify,
38    /// Notified when new input is sent, signaling the scheduler to resume.
39    resume_notify: Notify,
40}
41
42impl QuiescenceState {
43    /// Signal that new input has been sent, waking the scheduler if it was quiescent.
44    fn resume(&self) {
45        self.quiescent.set(false);
46        self.resume_notify.notify_waiters();
47    }
48
49    /// Whether the scheduler is currently quiescent (no more progress possible without input).
50    fn is_quiescent(&self) -> bool {
51        self.quiescent.get()
52    }
53
54    /// Returns a future that completes when the scheduler next reaches quiescence.
55    fn notified(&self) -> tokio::sync::futures::Notified<'_> {
56        self.quiescence_notify.notified()
57    }
58
59    /// Enter quiescence and wait for new input before continuing.
60    async fn wait_for_resume(&self) {
61        self.quiescent.set(true);
62        self.quiescence_notify.notify_waiters();
63        self.resume_notify.notified().await;
64        self.quiescent.set(false);
65    }
66}
67
68struct SimConnections {
69    input_senders: HashMap<SimExternalPort, Rc<UnboundedSender<Bytes>>>,
70    output_receivers: HashMap<SimExternalPort, Rc<Mutex<UnboundedReceiverStream<Bytes>>>>,
71    cluster_input_senders: HashMap<SimExternalPort, Vec<Rc<UnboundedSender<Bytes>>>>,
72    cluster_output_receivers:
73        HashMap<SimExternalPort, Vec<Rc<Mutex<UnboundedReceiverStream<Bytes>>>>>,
74    external_registered: HashMap<ExternalPortId, SimExternalPort>,
75    quiescence: Rc<QuiescenceState>,
76}
77
78tokio::task_local! {
79    static CURRENT_SIM_CONNECTIONS: RefCell<SimConnections>;
80}
81
82/// A handle to a compiled Hydro simulation, which can be instantiated and run.
83pub struct CompiledSim {
84    pub(super) _path: TempPath,
85    pub(super) lib: Library,
86    pub(super) externals_port_registry: SimExternalPortRegistry,
87    pub(super) unit_test_fuzz_iterations: usize,
88}
89
90#[sealed::sealed]
91/// A trait implemented by closures that can instantiate a compiled simulation.
92///
93/// This is needed to ensure [`RefUnwindSafe`] so instances can be created during fuzzing.
94pub trait Instantiator<'a>: RefUnwindSafe + Fn() -> CompiledSimInstance<'a> {}
95#[sealed::sealed]
96impl<'a, T: RefUnwindSafe + Fn() -> CompiledSimInstance<'a>> Instantiator<'a> for T {}
97
98fn null_handler(_args: fmt::Arguments) {}
99
100fn println_handler(args: fmt::Arguments) {
101    println!("{}", args);
102}
103
104fn eprintln_handler(args: fmt::Arguments) {
105    eprintln!("{}", args);
106}
107
108/// Creates a simulation instance, returning:
109/// - A list of async DFIRs to run (all process / cluster logic outside a tick)
110/// - A list of tick DFIRs to run (where the &'static str is for the tick location id)
111/// - A mapping of hooks for non-deterministic decisions at tick-input boundaries
112/// - A mapping of inline hooks for non-deterministic decisions inside ticks
113type SimLoaded<'a> = libloading::Symbol<
114    'a,
115    unsafe extern "Rust" fn(
116        should_color: bool,
117        external_out: &mut HashMap<usize, UnboundedReceiverStream<Bytes>>,
118        external_in: &mut HashMap<usize, UnboundedSender<Bytes>>,
119        cluster_external_out: &mut HashMap<usize, Vec<UnboundedReceiverStream<Bytes>>>,
120        cluster_external_in: &mut HashMap<usize, Vec<UnboundedSender<Bytes>>>,
121        println_handler: fn(fmt::Arguments<'_>),
122        eprintln_handler: fn(fmt::Arguments<'_>),
123    ) -> (
124        Vec<(&'static str, Option<u32>, DfirErased)>,
125        Vec<(&'static str, Option<u32>, DfirErased)>,
126        Hooks<&'static str>,
127        InlineHooks<&'static str>,
128    ),
129>;
130
131impl CompiledSim {
132    /// Executes the given closure with a single instance of the compiled simulation.
133    pub fn with_instance<T>(&self, thunk: impl FnOnce(CompiledSimInstance) -> T) -> T {
134        self.with_instantiator(|instantiator| thunk(instantiator()), true)
135    }
136
137    /// Executes the given closure with an [`Instantiator`], which can be called to create
138    /// independent instances of the simulation. This is useful for fuzzing, where we need to
139    /// re-execute the simulation several times with different decisions.
140    ///
141    /// The `always_log` parameter controls whether to log tick executions and stream releases. If
142    /// it is `true`, logging will always be enabled. If it is `false`, logging will only be
143    /// enabled if the `HYDRO_SIM_LOG` environment variable is set to `1`.
144    pub fn with_instantiator<T>(
145        &self,
146        thunk: impl FnOnce(&dyn Instantiator) -> T,
147        always_log: bool,
148    ) -> T {
149        let func: SimLoaded = unsafe { self.lib.get(b"__hydro_runtime").unwrap() };
150        let log = always_log || std::env::var("HYDRO_SIM_LOG").is_ok_and(|v| v == "1");
151        thunk(
152            &(|| CompiledSimInstance {
153                func: func.clone(),
154                externals_port_registry: self.externals_port_registry.clone(),
155                dylib_result: None,
156                log,
157            }),
158        )
159    }
160
161    /// Uses a fuzzing strategy to explore possible executions of the simulation. The provided
162    /// closure will be repeatedly executed with instances of the Hydro program where the
163    /// batching boundaries, order of messages, and retries are varied.
164    ///
165    /// During development, you should run the test that invokes this function with the `cargo sim`
166    /// command, which will use `libfuzzer` to intelligently explore the execution space. If a
167    /// failure is found, a minimized test case will be produced in a `sim-failures` directory.
168    /// When running the test with `cargo test` (such as in CI), if a reproducer is found it will
169    /// be executed, and if no reproducer is found a small number of random executions will be
170    /// performed.
171    pub fn fuzz(&self, mut thunk: impl AsyncFn() + RefUnwindSafe) {
172        let caller_fn = crate::compile::ir::backtrace::Backtrace::get_backtrace(0)
173            .elements()
174            .into_iter()
175            .find(|e| {
176                !e.fn_name.starts_with("hydro_lang::sim::compiled")
177                    && !e.fn_name.starts_with("hydro_lang::sim::flow")
178                    && !e.fn_name.starts_with("fuzz<")
179                    && !e.fn_name.starts_with("<hydro_lang::sim")
180            })
181            .unwrap();
182
183        let caller_path = Path::new(&caller_fn.filename.unwrap()).to_path_buf();
184        let repro_folder = caller_path.parent().unwrap().join("sim-failures");
185
186        let caller_fuzz_repro_path = repro_folder
187            .join(caller_fn.fn_name.replace("::", "__"))
188            .with_extension("bin");
189
190        if std::env::var("BOLERO_FUZZER").is_ok() {
191            let corpus_dir = std::env::current_dir().unwrap().join(".fuzz-corpus");
192            std::fs::create_dir_all(&corpus_dir).unwrap();
193            let libfuzzer_args = format!(
194                "{} {} -artifact_prefix={}/ -handle_abrt=0",
195                corpus_dir.to_str().unwrap(),
196                corpus_dir.to_str().unwrap(),
197                corpus_dir.to_str().unwrap(),
198            );
199
200            std::fs::create_dir_all(&repro_folder).unwrap();
201
202            if !std::env::var("HYDRO_NO_FAILURE_OUTPUT").is_ok_and(|v| v == "1") {
203                unsafe {
204                    std::env::set_var(
205                        "BOLERO_FAILURE_OUTPUT",
206                        caller_fuzz_repro_path.to_str().unwrap(),
207                    );
208                }
209            }
210
211            unsafe {
212                std::env::set_var("BOLERO_LIBFUZZER_ARGS", libfuzzer_args);
213            }
214
215            self.with_instantiator(
216                |instantiator| {
217                    bolero::test(bolero::TargetLocation {
218                        package_name: "",
219                        manifest_dir: "",
220                        module_path: "",
221                        file: "",
222                        line: 0,
223                        item_path: "<unknown>::__bolero_item_path__",
224                        test_name: None,
225                    })
226                    .run_with_replay(move |is_replay| {
227                        let mut instance = instantiator();
228
229                        if instance.log {
230                            eprintln!(
231                                "{}",
232                                "\n==== New Simulation Instance ===="
233                                    .color(colored::Color::Cyan)
234                                    .bold()
235                            );
236                        }
237
238                        if is_replay {
239                            instance.log = true;
240                        }
241
242                        tokio::runtime::Builder::new_current_thread()
243                            .build()
244                            .unwrap()
245                            .block_on(async { instance.run(&mut thunk).await })
246                    })
247                },
248                false,
249            );
250        } else if let Ok(existing_bytes) = std::fs::read(&caller_fuzz_repro_path) {
251            self.fuzz_repro(existing_bytes, async |compiled| {
252                compiled.launch();
253                thunk().await
254            });
255        } else {
256            eprintln!(
257                "Running a fuzz test without `cargo sim` and no reproducer found at {}, using {} iterations with random inputs.",
258                caller_fuzz_repro_path.display(),
259                self.unit_test_fuzz_iterations,
260            );
261            self.with_instantiator(
262                |instantiator| {
263                    bolero::test(bolero::TargetLocation {
264                        package_name: "",
265                        manifest_dir: "",
266                        module_path: "",
267                        file: ".",
268                        line: 0,
269                        item_path: "<unknown>::__bolero_item_path__",
270                        test_name: None,
271                    })
272                    .with_iterations(self.unit_test_fuzz_iterations)
273                    .run(move || {
274                        let instance = instantiator();
275                        tokio::runtime::Builder::new_current_thread()
276                            .build()
277                            .unwrap()
278                            .block_on(async { instance.run(&mut thunk).await })
279                    })
280                },
281                false,
282            );
283        }
284    }
285
286    /// Executes the given closure with a single instance of the compiled simulation, using the
287    /// provided bytes as the source of fuzzing decisions. This can be used to manually reproduce a
288    /// failure found during fuzzing.
289    pub fn fuzz_repro<'a>(
290        &'a self,
291        bytes: Vec<u8>,
292        thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
293    ) {
294        self.with_instance(|instance| {
295            bolero::bolero_engine::any::scope::with(
296                Box::new(bolero::bolero_engine::driver::object::Object(
297                    bolero::bolero_engine::driver::bytes::Driver::new(bytes, &Default::default()),
298                )),
299                || {
300                    tokio::runtime::Builder::new_current_thread()
301                        .build()
302                        .unwrap()
303                        .block_on(async { instance.run_without_launching(thunk).await })
304                },
305            )
306        });
307    }
308
309    /// Exhaustively searches all possible executions of the simulation. The provided
310    /// closure will be repeatedly executed with instances of the Hydro program where the
311    /// batching boundaries, order of messages, and retries are varied.
312    ///
313    /// Exhaustive searching is feasible when the inputs to the Hydro program are finite and there
314    /// are no dataflow loops that generate infinite messages. Exhaustive searching provides a
315    /// stronger guarantee of correctness than fuzzing, but may take a long time to complete.
316    /// Because no fuzzer is involved, you can run exhaustive tests with `cargo test`.
317    ///
318    /// Returns the number of distinct executions explored.
319    pub fn exhaustive(&self, mut thunk: impl AsyncFnMut() + RefUnwindSafe) -> usize {
320        if std::env::var("BOLERO_FUZZER").is_ok() {
321            eprintln!(
322                "Cannot run exhaustive tests with a fuzzer. Please use `cargo test` instead of `cargo sim`."
323            );
324            std::process::abort();
325        }
326
327        let mut count = 0;
328        let count_mut = &mut count;
329
330        self.with_instantiator(
331            |instantiator| {
332                bolero::test(bolero::TargetLocation {
333                    package_name: "",
334                    manifest_dir: "",
335                    module_path: "",
336                    file: "",
337                    line: 0,
338                    item_path: "<unknown>::__bolero_item_path__",
339                    test_name: None,
340                })
341                .exhaustive()
342                .run_with_replay(move |is_replay| {
343                    *count_mut += 1;
344
345                    let mut instance = instantiator();
346                    if instance.log {
347                        eprintln!(
348                            "{}",
349                            "\n==== New Simulation Instance ===="
350                                .color(colored::Color::Cyan)
351                                .bold()
352                        );
353                    }
354
355                    if is_replay {
356                        instance.log = true;
357                    }
358
359                    tokio::runtime::Builder::new_current_thread()
360                        .build()
361                        .unwrap()
362                        .block_on(async { instance.run(&mut thunk).await })
363                })
364            },
365            false,
366        );
367
368        count
369    }
370}
371
372// This must be a tuple because it is referenced from generated code in `graph.rs`.
373type DylibResult = (
374    Vec<(&'static str, Option<u32>, DfirErased)>,
375    Vec<(&'static str, Option<u32>, DfirErased)>,
376    Hooks<&'static str>,
377    InlineHooks<&'static str>,
378);
379
380/// A single instance of a compiled Hydro simulation, which provides methods to interactively
381/// execute the simulation, feed inputs, and receive outputs.
382pub struct CompiledSimInstance<'a> {
383    func: SimLoaded<'a>,
384    externals_port_registry: SimExternalPortRegistry,
385    dylib_result: Option<DylibResult>,
386    log: bool,
387}
388
389impl<'a> CompiledSimInstance<'a> {
390    async fn run(self, thunk: impl AsyncFnOnce() + RefUnwindSafe) {
391        self.run_without_launching(async |instance| {
392            instance.launch();
393            thunk().await;
394        })
395        .await;
396    }
397
398    async fn run_without_launching(
399        mut self,
400        thunk: impl AsyncFnOnce(CompiledSimInstance) + RefUnwindSafe,
401    ) {
402        let mut external_out: HashMap<usize, UnboundedReceiverStream<Bytes>> = HashMap::new();
403        let mut external_in: HashMap<usize, UnboundedSender<Bytes>> = HashMap::new();
404        let mut cluster_external_out: HashMap<usize, Vec<UnboundedReceiverStream<Bytes>>> =
405            HashMap::new();
406        let mut cluster_external_in: HashMap<usize, Vec<UnboundedSender<Bytes>>> = HashMap::new();
407
408        let dylib_result = unsafe {
409            (self.func)(
410                colored::control::SHOULD_COLORIZE.should_colorize(),
411                &mut external_out,
412                &mut external_in,
413                &mut cluster_external_out,
414                &mut cluster_external_in,
415                if self.log {
416                    println_handler
417                } else {
418                    null_handler
419                },
420                if self.log {
421                    eprintln_handler
422                } else {
423                    null_handler
424                },
425            )
426        };
427
428        let registered = &self.externals_port_registry.registered;
429
430        let quiescence = Rc::new(QuiescenceState {
431            quiescent: Cell::new(false),
432            quiescence_notify: Notify::new(),
433            resume_notify: Notify::new(),
434        });
435
436        let mut input_senders = HashMap::new();
437        let mut output_receivers = HashMap::new();
438        let mut cluster_input_senders = HashMap::new();
439        let mut cluster_output_receivers = HashMap::new();
440
441        #[expect(
442            clippy::disallowed_methods,
443            reason = "inserts into maps also unordered"
444        )]
445        for sim_port in registered.values() {
446            let usize_key = sim_port.into_inner();
447            if let Some(sender) = external_in.remove(&usize_key) {
448                input_senders.insert(*sim_port, Rc::new(sender));
449            }
450            if let Some(receiver) = external_out.remove(&usize_key) {
451                output_receivers.insert(*sim_port, Rc::new(Mutex::new(receiver)));
452            }
453            if let Some(senders) = cluster_external_in.remove(&usize_key) {
454                cluster_input_senders.insert(*sim_port, senders.into_iter().map(Rc::new).collect());
455            }
456            if let Some(receivers) = cluster_external_out.remove(&usize_key) {
457                cluster_output_receivers.insert(
458                    *sim_port,
459                    receivers
460                        .into_iter()
461                        .map(|r| Rc::new(Mutex::new(r)))
462                        .collect(),
463                );
464            }
465        }
466
467        self.dylib_result = Some(dylib_result);
468
469        let local_set = tokio::task::LocalSet::new();
470        local_set
471            .run_until(CURRENT_SIM_CONNECTIONS.scope(
472                RefCell::new(SimConnections {
473                    input_senders,
474                    output_receivers,
475                    cluster_input_senders,
476                    cluster_output_receivers,
477                    external_registered: self.externals_port_registry.registered.clone(),
478                    quiescence: quiescence.clone(),
479                }),
480                async move {
481                    thunk(self).await;
482                },
483            ))
484            .await;
485    }
486
487    /// Launches the simulation, which will asynchronously simulate the Hydro program. This should
488    /// be invoked but before receiving any messages.
489    fn launch(self) {
490        tokio::task::spawn_local(self.schedule_with_maybe_logger::<std::io::Empty>(None));
491    }
492
493    /// Returns a future that schedules simulation with the given logger for reporting the
494    /// simulation trace.
495    pub fn schedule_with_logger<W: std::io::Write>(
496        self,
497        log_writer: W,
498    ) -> impl use<W> + Future<Output = ()> {
499        self.schedule_with_maybe_logger(Some(log_writer))
500    }
501
502    fn schedule_with_maybe_logger<W: std::io::Write>(
503        mut self,
504        log_override: Option<W>,
505    ) -> impl use<W> + Future<Output = ()> {
506        let (async_dfirs, tick_dfirs, hooks, inline_hooks) = self.dylib_result.take().unwrap();
507
508        let not_ready_observation = async_dfirs
509            .iter()
510            .map(|(lid, c_id, _)| (serde_json::from_str(lid).unwrap(), *c_id))
511            .collect();
512
513        let quiescence = CURRENT_SIM_CONNECTIONS.with(|connections| {
514            let connections = connections.borrow();
515            connections.quiescence.clone()
516        });
517
518        let mut launched = LaunchedSim {
519            async_dfirs: async_dfirs
520                .into_iter()
521                .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
522                .collect(),
523            possibly_ready_ticks: vec![],
524            not_ready_ticks: tick_dfirs
525                .into_iter()
526                .map(|(lid, c_id, dfir)| (serde_json::from_str(lid).unwrap(), c_id, dfir))
527                .collect(),
528            possibly_ready_observation: vec![],
529            not_ready_observation,
530            hooks: hooks
531                .into_iter()
532                .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
533                .collect(),
534            inline_hooks: inline_hooks
535                .into_iter()
536                .map(|((lid, cid), hs)| ((serde_json::from_str(lid).unwrap(), cid), hs))
537                .collect(),
538            log: if self.log {
539                if let Some(w) = log_override {
540                    LogKind::Custom(w)
541                } else {
542                    LogKind::Stderr
543                }
544            } else {
545                LogKind::Null
546            },
547            quiescence,
548        };
549
550        async move { launched.scheduler().await }
551    }
552}
553
554impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Clone for SimReceiver<T, O, R> {
555    fn clone(&self) -> Self {
556        *self
557    }
558}
559
560impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Copy for SimReceiver<T, O, R> {}
561
562impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimReceiver<T, O, R> {
563    async fn with_stream<Out>(
564        &self,
565        thunk: impl AsyncFnOnce(&mut Pin<&mut dyn Stream<Item = T>>) -> Out,
566    ) -> Out {
567        let (receiver, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
568            let connections = connections.borrow();
569            let port = connections.external_registered.get(&self.0).unwrap();
570            (
571                connections.output_receivers.get(port).unwrap().clone(),
572                connections.quiescence.clone(),
573            )
574        });
575
576        let mut receiver_stream = receiver.lock().await;
577        let mut notified_fut = pin!(quiescence.notified());
578        let mut quiescence_aware = futures::stream::poll_fn(|cx| {
579            use std::task::Poll;
580            match receiver_stream.poll_next_unpin(cx) {
581                Poll::Ready(Some(bytes)) => {
582                    return Poll::Ready(Some(bincode::deserialize(&bytes).unwrap()));
583                }
584                Poll::Ready(None) => return Poll::Ready(None),
585                Poll::Pending => {}
586            }
587            if quiescence.is_quiescent() {
588                return Poll::Ready(None);
589            }
590            let () = ready!(notified_fut.as_mut().poll(cx));
591            notified_fut.set(quiescence.notified());
592            Poll::Ready(None)
593        });
594        thunk(&mut pin!(&mut quiescence_aware)).await
595    }
596
597    /// Asserts that the stream has ended and no more messages can possibly arrive.
598    pub fn assert_no_more(self) -> impl Future<Output = ()>
599    where
600        T: Debug,
601    {
602        FutureTrackingCaller {
603            future: async move {
604                self.with_stream(async |stream| {
605                    if let Some(next) = stream.next().await {
606                        return Err(format!(
607                            "Stream yielded unexpected message: {:?}, expected termination",
608                            next
609                        ));
610                    }
611                    Ok(())
612                })
613                .await
614            },
615        }
616    }
617}
618
619impl<T: Serialize + DeserializeOwned> SimReceiver<T, TotalOrder, ExactlyOnce> {
620    /// Receives the next message from the external bincode stream. This will wait until a message
621    /// is available, or return `None` if no more messages can possibly arrive.
622    pub async fn next(&self) -> Option<T> {
623        self.with_stream(async |stream| stream.next().await).await
624    }
625
626    /// Collects all remaining messages from the external bincode stream into a collection. This
627    /// will wait until no more messages can possibly arrive.
628    pub async fn collect<C: Default + Extend<T>>(self) -> C {
629        self.with_stream(async |stream| stream.collect().await)
630            .await
631    }
632
633    /// Asserts that the stream yields exactly the expected sequence of messages, in order.
634    /// This does not check that the stream ends, use [`Self::assert_yields_only`] for that.
635    pub fn assert_yields<T2: Debug, I: IntoIterator<Item = T2>>(
636        &self,
637        expected: I,
638    ) -> impl use<'_, T, T2, I> + Future<Output = ()>
639    where
640        T: Debug + PartialEq<T2>,
641    {
642        FutureTrackingCaller {
643            future: async {
644                let mut expected: VecDeque<T2> = expected.into_iter().collect();
645
646                while !expected.is_empty() {
647                    if let Some(next) = self.next().await {
648                        let next_expected = expected.pop_front().unwrap();
649                        if next != next_expected {
650                            return Err(format!(
651                                "Stream yielded unexpected message: {:?}, expected: {:?}",
652                                next, next_expected
653                            ));
654                        }
655                    } else {
656                        return Err(format!(
657                            "Stream ended early, still expected: {:?}",
658                            expected
659                        ));
660                    }
661                }
662
663                Ok(())
664            },
665        }
666    }
667
668    /// Asserts that the stream yields only the expected sequence of messages, in order,
669    /// and then ends.
670    pub fn assert_yields_only<T2: Debug, I: IntoIterator<Item = T2>>(
671        &self,
672        expected: I,
673    ) -> impl use<'_, T, T2, I> + Future<Output = ()>
674    where
675        T: Debug + PartialEq<T2>,
676    {
677        ChainedFuture {
678            first: self.assert_yields(expected),
679            second: self.assert_no_more(),
680            first_done: false,
681        }
682    }
683}
684
685pin_project_lite::pin_project! {
686    // A future that tracks the location of the `.await` call for better panic messages.
687    //
688    // `#[track_caller]` is important for us to create assertion methods because it makes
689    // the panic backtrace show up at that method (instead of inside the call tree within
690    // that method). This is e.g. what `Option::unwrap` uses. Unfortunately, `#[track_caller]`
691    // does not work correctly for async methods (or `dyn Future` either), so we have to
692    // create these concrete future types that (1) have `#[track_caller]` on their `poll()`
693    // method and (2) have the `panic!` triggered in their `poll()` method (or in a directly
694    // nested concrete future).
695    struct FutureTrackingCaller<F: Future<Output = Result<(), String>>> {
696        #[pin]
697        future: F,
698    }
699}
700
701impl<F: Future<Output = Result<(), String>>> Future for FutureTrackingCaller<F> {
702    type Output = ();
703
704    #[track_caller]
705    fn poll(
706        mut self: Pin<&mut Self>,
707        cx: &mut std::task::Context<'_>,
708    ) -> std::task::Poll<Self::Output> {
709        match ready!(self.as_mut().project().future.poll(cx)) {
710            Ok(()) => std::task::Poll::Ready(()),
711            Err(e) => panic!("{}", e),
712        }
713    }
714}
715
716pin_project_lite::pin_project! {
717    // A future that first awaits the first future, then the second, propagating caller info.
718    //
719    // See [`FutureTrackingCaller`] for context.
720    struct ChainedFuture<F1: Future<Output = ()>, F2: Future<Output = ()>> {
721        #[pin]
722        first: F1,
723        #[pin]
724        second: F2,
725        first_done: bool,
726    }
727}
728
729impl<F1: Future<Output = ()>, F2: Future<Output = ()>> Future for ChainedFuture<F1, F2> {
730    type Output = ();
731
732    #[track_caller]
733    fn poll(
734        mut self: Pin<&mut Self>,
735        cx: &mut std::task::Context<'_>,
736    ) -> std::task::Poll<Self::Output> {
737        if !self.first_done {
738            ready!(self.as_mut().project().first.poll(cx));
739            *self.as_mut().project().first_done = true;
740        }
741
742        self.as_mut().project().second.poll(cx)
743    }
744}
745
746impl<T: Serialize + DeserializeOwned> SimReceiver<T, NoOrder, ExactlyOnce> {
747    /// Collects all remaining messages from the external bincode stream into a collection,
748    /// sorting them. This will wait until no more messages can possibly arrive.
749    pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self) -> C
750    where
751        T: Ord,
752    {
753        self.with_stream(async |stream| {
754            let mut collected: C = stream.collect().await;
755            collected.as_mut().sort();
756            collected
757        })
758        .await
759    }
760
761    /// Asserts that the stream yields exactly the expected sequence of messages, in some order.
762    /// This does not check that the stream ends, use [`Self::assert_yields_only_unordered`] for that.
763    pub fn assert_yields_unordered<T2: Debug, I: IntoIterator<Item = T2>>(
764        &self,
765        expected: I,
766    ) -> impl use<'_, T, T2, I> + Future<Output = ()>
767    where
768        T: Debug + PartialEq<T2>,
769    {
770        FutureTrackingCaller {
771            future: async {
772                self.with_stream(async |stream| {
773                    let mut expected: Vec<T2> = expected.into_iter().collect();
774
775                    while !expected.is_empty() {
776                        if let Some(next) = stream.next().await {
777                            let idx = expected.iter().enumerate().find(|(_, e)| &next == *e);
778                            if let Some((i, _)) = idx {
779                                expected.swap_remove(i);
780                            } else {
781                                return Err(format!(
782                                    "Stream yielded unexpected message: {:?}",
783                                    next
784                                ));
785                            }
786                        } else {
787                            return Err(format!(
788                                "Stream ended early, still expected: {:?}",
789                                expected
790                            ));
791                        }
792                    }
793
794                    Ok(())
795                })
796                .await
797            },
798        }
799    }
800
801    /// Asserts that the stream yields only the expected sequence of messages, in some order,
802    /// and then ends.
803    pub fn assert_yields_only_unordered<T2: Debug, I: IntoIterator<Item = T2>>(
804        &self,
805        expected: I,
806    ) -> impl use<'_, T, T2, I> + Future<Output = ()>
807    where
808        T: Debug + PartialEq<T2>,
809    {
810        ChainedFuture {
811            first: self.assert_yields_unordered(expected),
812            second: self.assert_no_more(),
813            first_done: false,
814        }
815    }
816}
817
818impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimSender<T, O, R> {
819    fn with_sink<Out>(
820        &self,
821        thunk: impl FnOnce(&dyn Fn(T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>) -> Out,
822    ) -> Out {
823        let (sender, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
824            let connections = connections.borrow();
825            (
826                connections
827                    .input_senders
828                    .get(connections.external_registered.get(&self.0).unwrap())
829                    .unwrap()
830                    .clone(),
831                connections.quiescence.clone(),
832            )
833        });
834
835        thunk(&move |t| {
836            let res = sender.send(bincode::serialize(&t).unwrap().into());
837            quiescence.resume();
838            res
839        })
840    }
841}
842
843impl<T: Serialize + DeserializeOwned, O: Ordering> SimSender<T, O, ExactlyOnce> {
844    /// Sends several messages to the external bincode sink. The messages will be asynchronously
845    /// processed as part of the simulation, in non-deterministic order.
846    pub fn send_many_unordered<I: IntoIterator<Item = T>>(&self, iter: I) {
847        self.with_sink(|send| {
848            for t in iter {
849                send(t).unwrap();
850            }
851        })
852    }
853}
854
855impl<T: Serialize + DeserializeOwned> SimSender<T, TotalOrder, ExactlyOnce> {
856    /// Sends a message to the external bincode sink. The message will be asynchronously processed
857    /// as part of the simulation.
858    pub fn send(&self, t: T) {
859        self.with_sink(|send| send(t)).unwrap();
860    }
861
862    /// Sends several messages to the external bincode sink. The messages will be asynchronously
863    /// processed as part of the simulation.
864    pub fn send_many<I: IntoIterator<Item = T>>(&self, iter: I) {
865        self.with_sink(|send| {
866            for t in iter {
867                send(t).unwrap();
868            }
869        })
870    }
871}
872
873impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Clone
874    for SimClusterReceiver<T, O, R>
875{
876    fn clone(&self) -> Self {
877        *self
878    }
879}
880
881impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> Copy
882    for SimClusterReceiver<T, O, R>
883{
884}
885
886impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimClusterReceiver<T, O, R> {
887    async fn with_member_stream<Out>(
888        &self,
889        member_id: u32,
890        thunk: impl AsyncFnOnce(&mut Pin<&mut dyn Stream<Item = T>>) -> Out,
891    ) -> Out {
892        let (receiver, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
893            let connections = connections.borrow();
894            let port = connections.external_registered.get(&self.0).unwrap();
895            let receivers = connections.cluster_output_receivers.get(port).unwrap();
896            (
897                receivers[member_id as usize].clone(),
898                connections.quiescence.clone(),
899            )
900        });
901
902        let mut lock = receiver.lock().await;
903        let mut notified_fut = pin!(quiescence.notified());
904        let mut quiescence_aware = futures::stream::poll_fn(|cx| {
905            use std::task::Poll;
906            match lock.poll_next_unpin(cx) {
907                Poll::Ready(Some(bytes)) => {
908                    return Poll::Ready(Some(bincode::deserialize(&bytes).unwrap()));
909                }
910                Poll::Ready(None) => return Poll::Ready(None),
911                Poll::Pending => {}
912            }
913            if quiescence.is_quiescent() {
914                return Poll::Ready(None);
915            }
916            let () = ready!(notified_fut.as_mut().poll(cx));
917            notified_fut.set(quiescence.notified());
918            Poll::Ready(None)
919        });
920        thunk(&mut pin!(&mut quiescence_aware)).await
921    }
922}
923
924impl<T: Serialize + DeserializeOwned> SimClusterReceiver<T, TotalOrder, ExactlyOnce> {
925    /// Receives the next value from a specific cluster member.
926    pub async fn next(&self, member_id: u32) -> Option<T> {
927        self.with_member_stream(member_id, async |stream| stream.next().await)
928            .await
929    }
930
931    /// Collects all remaining values from a specific cluster member into a collection.
932    pub async fn collect<C: Default + Extend<T>>(self, member_id: u32) -> C {
933        self.with_member_stream(member_id, async |stream| stream.collect().await)
934            .await
935    }
936}
937
938impl<T: Serialize + DeserializeOwned> SimClusterReceiver<T, NoOrder, ExactlyOnce> {
939    /// Collects all remaining values from a specific cluster member, sorted.
940    pub async fn collect_sorted<C: Default + Extend<T> + AsMut<[T]>>(self, member_id: u32) -> C
941    where
942        T: Ord,
943    {
944        self.with_member_stream(member_id, async |stream| {
945            let mut collected: C = stream.collect().await;
946            collected.as_mut().sort();
947            collected
948        })
949        .await
950    }
951}
952
953impl<T: Serialize + DeserializeOwned, O: Ordering, R: Retries> SimClusterSender<T, O, R> {
954    fn with_sink<Out>(
955        &self,
956        thunk: impl FnOnce(
957            &dyn Fn(u32, T) -> Result<(), tokio::sync::mpsc::error::SendError<Bytes>>,
958        ) -> Out,
959    ) -> Out {
960        let (senders, quiescence) = CURRENT_SIM_CONNECTIONS.with(|connections| {
961            let connections = connections.borrow();
962            (
963                connections
964                    .cluster_input_senders
965                    .get(connections.external_registered.get(&self.0).unwrap())
966                    .unwrap()
967                    .clone(),
968                connections.quiescence.clone(),
969            )
970        });
971
972        thunk(&move |member_id: u32, t: T| {
973            let payload = bincode::serialize(&t).unwrap();
974            let res = senders[member_id as usize].send(Bytes::from(payload));
975            quiescence.resume();
976            res
977        })
978    }
979}
980
981impl<T: Serialize + DeserializeOwned> SimClusterSender<T, TotalOrder, ExactlyOnce> {
982    /// Sends a value to a specific cluster member.
983    pub fn send(&self, member_id: u32, t: T) {
984        self.with_sink(|send| send(member_id, t)).unwrap();
985    }
986
987    /// Sends multiple values to specific cluster members.
988    pub fn send_many<I: IntoIterator<Item = (u32, T)>>(&self, iter: I) {
989        self.with_sink(|send| {
990            for (member_id, t) in iter {
991                send(member_id, t).unwrap();
992            }
993        })
994    }
995}
996
997enum LogKind<W: std::io::Write> {
998    Null,
999    Stderr,
1000    Custom(W),
1001}
1002
1003// via https://www.reddit.com/r/rust/comments/t69sld/is_there_a_way_to_allow_either_stdfmtwrite_or/
1004impl<W: std::io::Write> std::fmt::Write for LogKind<W> {
1005    fn write_str(&mut self, s: &str) -> Result<(), std::fmt::Error> {
1006        match self {
1007            LogKind::Null => Ok(()),
1008            LogKind::Stderr => {
1009                eprint!("{}", s);
1010                Ok(())
1011            }
1012            LogKind::Custom(w) => w.write_all(s.as_bytes()).map_err(|_| std::fmt::Error),
1013        }
1014    }
1015}
1016
1017/// A running simulation, which manages the async DFIRs, tick DFIRs, and hook-based
1018/// scheduling decisions for non-deterministic operators like `batch` and `assume_ordering`.
1019///
1020/// The scheduler loops between three kinds of work:
1021/// - **Async DFIRs**: long-running top-level dataflows (one per process/cluster member) that
1022///   produce data consumed by ticks and observations.
1023/// - **Ticks**: tick-scoped DFIRs that execute a single tick. Before running, their associated
1024///   hooks (e.g. from `batch`) are resolved to decide what data to release into the tick.
1025/// - **Observations**: top-level locations that have hooks (e.g. from `assume_ordering` on a
1026///   non-tick stream) needing decisions, but no tick DFIR to execute. The scheduler just
1027///   resolves their hooks.
1028struct LaunchedSim<W: std::io::Write> {
1029    /// Top-level async DFIRs, one per process/cluster member. These run continuously and
1030    /// produce data that feeds into ticks and observations.
1031    async_dfirs: Vec<(LocationId, Option<u32>, DfirErased)>,
1032    /// Tick DFIRs whose parent async DFIR has made progress, so they may be ready to run.
1033    /// The scheduler further filters these by checking whether their hooks have pending decisions.
1034    possibly_ready_ticks: Vec<(LocationId, Option<u32>, DfirErased)>,
1035    /// Tick DFIRs whose parent async DFIR has not yet made progress since they were last checked.
1036    not_ready_ticks: Vec<(LocationId, Option<u32>, DfirErased)>,
1037    /// Top-level locations whose async DFIR has made progress and whose hooks (from top-level
1038    /// `assume_ordering`) may have ordering decisions to resolve. Unlike ticks, these have no
1039    /// DFIR to execute — only hook resolution.
1040    possibly_ready_observation: Vec<(LocationId, Option<u32>)>,
1041    /// Top-level locations whose async DFIR has not yet made progress since they were last checked.
1042    not_ready_observation: Vec<(LocationId, Option<u32>)>,
1043    /// Hooks keyed by (location, cluster_member_id). These are resolved *before* a tick runs
1044    /// (for `batch` hooks) or standalone (for top-level `assume_ordering` hooks via observations).
1045    hooks: Hooks<LocationId>,
1046    /// Inline hooks keyed by (tick location, cluster_member_id). These are resolved *during*
1047    /// tick execution via a `tokio::select!` loop, for operators like `assume_ordering` inside
1048    /// a tick that block on ordering decisions while the tick DFIR is running.
1049    inline_hooks: InlineHooks<LocationId>,
1050    log: LogKind<W>,
1051    /// Represents quiescence state of the simulation.
1052    quiescence: Rc<QuiescenceState>,
1053}
1054
1055impl<W: std::io::Write> LaunchedSim<W> {
1056    async fn scheduler(&mut self) {
1057        loop {
1058            tokio::task::yield_now().await;
1059            let mut any_made_progress = false;
1060            for (loc, c_id, dfir) in &mut self.async_dfirs {
1061                if dfir.run_tick().await {
1062                    any_made_progress = true;
1063                    let (now_ready, still_not_ready): (Vec<_>, Vec<_>) = self
1064                        .not_ready_ticks
1065                        .drain(..)
1066                        .partition(|(tick_loc, tick_c_id, _)| {
1067                            let LocationId::Tick(_, outer) = tick_loc else {
1068                                unreachable!()
1069                            };
1070                            outer.as_ref() == loc && tick_c_id == c_id
1071                        });
1072
1073                    self.possibly_ready_ticks.extend(now_ready);
1074                    self.not_ready_ticks.extend(still_not_ready);
1075
1076                    let (now_ready_obs, still_not_ready_obs): (Vec<_>, Vec<_>) = self
1077                        .not_ready_observation
1078                        .drain(..)
1079                        .partition(|(obs_loc, obs_c_id)| obs_loc == loc && obs_c_id == c_id);
1080
1081                    self.possibly_ready_observation.extend(now_ready_obs);
1082                    self.not_ready_observation.extend(still_not_ready_obs);
1083                }
1084            }
1085
1086            if any_made_progress {
1087                continue;
1088            } else {
1089                use bolero::generator::*;
1090
1091                let (ready_tick, mut not_ready_tick): (Vec<_>, Vec<_>) = self
1092                    .possibly_ready_ticks
1093                    .drain(..)
1094                    .partition(|(name, cid, _)| {
1095                        self.hooks
1096                            .get(&(name.clone(), *cid))
1097                            .unwrap()
1098                            .iter()
1099                            .any(|hook| {
1100                                hook.current_decision().unwrap_or(false)
1101                                    || hook.can_make_nontrivial_decision()
1102                            })
1103                    });
1104
1105                self.possibly_ready_ticks = ready_tick;
1106                self.not_ready_ticks.append(&mut not_ready_tick);
1107
1108                let (ready_obs, mut not_ready_obs): (Vec<_>, Vec<_>) = self
1109                    .possibly_ready_observation
1110                    .drain(..)
1111                    .partition(|(name, cid)| {
1112                        self.hooks
1113                            .get(&(name.clone(), *cid))
1114                            .into_iter()
1115                            .flatten()
1116                            .any(|hook| {
1117                                hook.current_decision().unwrap_or(false)
1118                                    || hook.can_make_nontrivial_decision()
1119                            })
1120                    });
1121
1122                self.possibly_ready_observation = ready_obs;
1123                self.not_ready_observation.append(&mut not_ready_obs);
1124
1125                if self.possibly_ready_ticks.is_empty()
1126                    && self.possibly_ready_observation.is_empty()
1127                {
1128                    // Signal quiescence and wait for new input.
1129                    self.quiescence.wait_for_resume().await;
1130                } else {
1131                    let next_tick_or_obs = (0..(self.possibly_ready_ticks.len()
1132                        + self.possibly_ready_observation.len()))
1133                        .any();
1134
1135                    if next_tick_or_obs < self.possibly_ready_ticks.len() {
1136                        let next_tick = next_tick_or_obs;
1137                        let mut removed = self.possibly_ready_ticks.remove(next_tick);
1138
1139                        match &mut self.log {
1140                            LogKind::Null => {}
1141                            LogKind::Stderr => {
1142                                if let Some(cid) = &removed.1 {
1143                                    eprintln!(
1144                                        "\n{}",
1145                                        format!("Running Tick (Cluster Member {})", cid)
1146                                            .color(colored::Color::Magenta)
1147                                            .bold()
1148                                    )
1149                                } else {
1150                                    eprintln!(
1151                                        "\n{}",
1152                                        "Running Tick".color(colored::Color::Magenta).bold()
1153                                    )
1154                                }
1155                            }
1156                            LogKind::Custom(writer) => {
1157                                writeln!(
1158                                    writer,
1159                                    "\n{}",
1160                                    "Running Tick".color(colored::Color::Magenta).bold()
1161                                )
1162                                .unwrap();
1163                            }
1164                        }
1165
1166                        let mut asterisk_indenter = |_line_no, write: &mut dyn std::fmt::Write| {
1167                            write.write_str(&"*".color(colored::Color::Magenta).bold())?;
1168                            write.write_str(" ")
1169                        };
1170
1171                        let mut tick_decision_writer = indenter::indented(&mut self.log)
1172                            .with_format(indenter::Format::Custom {
1173                                inserter: &mut asterisk_indenter,
1174                            });
1175
1176                        let hooks = self.hooks.get_mut(&(removed.0.clone(), removed.1)).unwrap();
1177                        run_hooks(&mut tick_decision_writer, hooks);
1178
1179                        let run_tick_future = removed.2.run_tick();
1180                        if let Some(inline_hooks) =
1181                            self.inline_hooks.get_mut(&(removed.0.clone(), removed.1))
1182                        {
1183                            let mut run_tick_future_pinned = pin!(run_tick_future);
1184
1185                            loop {
1186                                tokio::select! {
1187                                    biased;
1188                                    r = &mut run_tick_future_pinned => {
1189                                        assert!(r);
1190                                        break;
1191                                    }
1192                                    _ = async {} => {
1193                                        bolero_generator::any::scope::borrow_with(|driver| {
1194                                            for hook in inline_hooks.iter_mut() {
1195                                                if hook.pending_decision() {
1196                                                    if !hook.has_decision() {
1197                                                        hook.autonomous_decision(driver);
1198                                                    }
1199
1200                                                    hook.release_decision(&mut tick_decision_writer);
1201                                                }
1202                                            }
1203                                        });
1204                                    }
1205                                }
1206                            }
1207                        } else {
1208                            assert!(run_tick_future.await);
1209                        }
1210
1211                        self.possibly_ready_ticks.push(removed);
1212                    } else {
1213                        let next_obs = next_tick_or_obs - self.possibly_ready_ticks.len();
1214                        let mut default_hooks = vec![];
1215                        let hooks = self
1216                            .hooks
1217                            .get_mut(&self.possibly_ready_observation[next_obs])
1218                            .unwrap_or(&mut default_hooks);
1219
1220                        run_hooks(&mut self.log, hooks);
1221                    }
1222                }
1223            }
1224        }
1225    }
1226}
1227
1228fn run_hooks(tick_decision_writer: &mut impl std::fmt::Write, hooks: &mut Vec<Box<dyn SimHook>>) {
1229    let mut remaining_decision_count = hooks.len();
1230    let mut made_nontrivial_decision = false;
1231
1232    bolero::generator::bolero_generator::any::scope::borrow_with(|driver| {
1233        // first, scan manual decisions
1234        hooks.iter_mut().for_each(|hook| {
1235            if let Some(is_nontrivial) = hook.current_decision() {
1236                made_nontrivial_decision |= is_nontrivial;
1237                remaining_decision_count -= 1;
1238            } else if !hook.can_make_nontrivial_decision() {
1239                // if no nontrivial decision is possible, make a trivial one
1240                // (we need to do this in the first pass to force nontrivial decisions
1241                // on the remaining hooks)
1242                hook.autonomous_decision(driver, false);
1243                remaining_decision_count -= 1;
1244            }
1245        });
1246
1247        hooks.iter_mut().for_each(|hook| {
1248            if hook.current_decision().is_none() {
1249                made_nontrivial_decision |= hook.autonomous_decision(
1250                    driver,
1251                    !made_nontrivial_decision && remaining_decision_count == 1,
1252                );
1253                remaining_decision_count -= 1;
1254            }
1255
1256            hook.release_decision(tick_decision_writer);
1257        });
1258    });
1259}