hydro_lang/builder/
mod.rs

1use std::any::type_name;
2use std::cell::RefCell;
3use std::collections::HashMap;
4use std::marker::PhantomData;
5use std::rc::Rc;
6
7#[cfg(feature = "build")]
8use compiled::CompiledFlow;
9#[cfg(feature = "build")]
10use deploy::{DeployFlow, DeployResult};
11use stageleft::*;
12
13#[cfg(feature = "build")]
14use crate::deploy::{ClusterSpec, Deploy, ExternalSpec, IntoProcessSpec, LocalDeploy};
15use crate::ir::HydroLeaf;
16use crate::location::{Cluster, ExternalProcess, Process};
17use crate::staging_util::Invariant;
18
19#[cfg(feature = "build")]
20pub mod built;
21#[cfg(feature = "build")]
22pub mod compiled;
23#[cfg(feature = "build")]
24pub mod deploy;
25
26pub struct FlowStateInner {
27    /// Tracks the leaves of the dataflow IR. This is referenced by
28    /// `Stream` and `HfCycle` to build the IR. The inner option will
29    /// be set to `None` when this builder is finalized.
30    pub(crate) leaves: Option<Vec<HydroLeaf>>,
31
32    /// Counter for generating unique external output identifiers.
33    pub(crate) next_external_out: usize,
34
35    /// Counters for generating identifiers for cycles.
36    pub(crate) cycle_counts: HashMap<usize, usize>,
37
38    /// Counters for clock IDs.
39    pub(crate) next_clock_id: usize,
40
41    /// Counter for unique HydroNode IDs.
42    pub(crate) next_node_id: usize,
43}
44
45pub type FlowState = Rc<RefCell<FlowStateInner>>;
46
47pub const FLOW_USED_MESSAGE: &str = "Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.";
48
49pub struct FlowBuilder<'a> {
50    flow_state: FlowState,
51    processes: RefCell<Vec<(usize, String)>>,
52    clusters: RefCell<Vec<(usize, String)>>,
53    externals: RefCell<Vec<(usize, String)>>,
54
55    next_location_id: RefCell<usize>,
56
57    /// Tracks whether this flow has been finalized; it is an error to
58    /// drop without finalizing.
59    finalized: bool,
60
61    /// 'a on a FlowBuilder is used to ensure that staged code does not
62    /// capture more data that it is allowed to; 'a is generated at the
63    /// entrypoint of the staged code and we keep it invariant here
64    /// to enforce the appropriate constraints
65    _phantom: Invariant<'a>,
66}
67
68impl Drop for FlowBuilder<'_> {
69    fn drop(&mut self) {
70        if !self.finalized {
71            panic!(
72                "Dropped FlowBuilder without finalizing, you may have forgotten to call `with_default_optimize`, `optimize_with`, or `finalize`."
73            );
74        }
75    }
76}
77
78impl QuotedContext for FlowBuilder<'_> {
79    fn create() -> Self {
80        FlowBuilder::new()
81    }
82}
83
84impl<'a> FlowBuilder<'a> {
85    #[expect(
86        clippy::new_without_default,
87        reason = "call `new` explicitly, not `default`"
88    )]
89    pub fn new() -> FlowBuilder<'a> {
90        FlowBuilder {
91            flow_state: Rc::new(RefCell::new(FlowStateInner {
92                leaves: Some(vec![]),
93                next_external_out: 0,
94                cycle_counts: HashMap::new(),
95                next_clock_id: 0,
96                next_node_id: 0,
97            })),
98            processes: RefCell::new(vec![]),
99            clusters: RefCell::new(vec![]),
100            externals: RefCell::new(vec![]),
101            next_location_id: RefCell::new(0),
102            finalized: false,
103            _phantom: PhantomData,
104        }
105    }
106
107    #[cfg(feature = "build")]
108    pub fn finalize(mut self) -> built::BuiltFlow<'a> {
109        self.finalized = true;
110
111        built::BuiltFlow {
112            ir: self.flow_state.borrow_mut().leaves.take().unwrap(),
113            process_id_name: self.processes.replace(vec![]),
114            cluster_id_name: self.clusters.replace(vec![]),
115            external_id_name: self.externals.replace(vec![]),
116            used: false,
117            _phantom: PhantomData,
118        }
119    }
120
121    #[cfg(feature = "build")]
122    pub fn with_default_optimize<D: LocalDeploy<'a>>(self) -> DeployFlow<'a, D> {
123        self.finalize().with_default_optimize()
124    }
125
126    #[cfg(feature = "build")]
127    pub fn optimize_with(self, f: impl FnOnce(&mut [HydroLeaf])) -> built::BuiltFlow<'a> {
128        self.finalize().optimize_with(f)
129    }
130
131    pub fn flow_state(&self) -> &FlowState {
132        &self.flow_state
133    }
134
135    pub fn process<P>(&self) -> Process<'a, P> {
136        let mut next_location_id = self.next_location_id.borrow_mut();
137        let id = *next_location_id;
138        *next_location_id += 1;
139
140        self.processes
141            .borrow_mut()
142            .push((id, type_name::<P>().to_string()));
143
144        Process {
145            id,
146            flow_state: self.flow_state().clone(),
147            _phantom: PhantomData,
148        }
149    }
150
151    pub fn external_process<P>(&self) -> ExternalProcess<'a, P> {
152        let mut next_location_id = self.next_location_id.borrow_mut();
153        let id = *next_location_id;
154        *next_location_id += 1;
155
156        self.externals
157            .borrow_mut()
158            .push((id, type_name::<P>().to_string()));
159
160        ExternalProcess {
161            id,
162            flow_state: self.flow_state().clone(),
163            _phantom: PhantomData,
164        }
165    }
166
167    pub fn cluster<C>(&self) -> Cluster<'a, C> {
168        let mut next_location_id = self.next_location_id.borrow_mut();
169        let id = *next_location_id;
170        *next_location_id += 1;
171
172        self.clusters
173            .borrow_mut()
174            .push((id, type_name::<C>().to_string()));
175
176        Cluster {
177            id,
178            flow_state: self.flow_state().clone(),
179            _phantom: PhantomData,
180        }
181    }
182
183    #[cfg(feature = "build")]
184    pub fn with_process<P, D: LocalDeploy<'a>>(
185        self,
186        process: &Process<P>,
187        spec: impl IntoProcessSpec<'a, D>,
188    ) -> DeployFlow<'a, D> {
189        self.with_default_optimize().with_process(process, spec)
190    }
191
192    #[cfg(feature = "build")]
193    pub fn with_remaining_processes<D: LocalDeploy<'a>, S: IntoProcessSpec<'a, D> + 'a>(
194        self,
195        spec: impl Fn() -> S,
196    ) -> DeployFlow<'a, D> {
197        self.with_default_optimize().with_remaining_processes(spec)
198    }
199
200    #[cfg(feature = "build")]
201    pub fn with_external<P, D: LocalDeploy<'a>>(
202        self,
203        process: &ExternalProcess<P>,
204        spec: impl ExternalSpec<'a, D>,
205    ) -> DeployFlow<'a, D> {
206        self.with_default_optimize().with_external(process, spec)
207    }
208
209    #[cfg(feature = "build")]
210    pub fn with_remaining_externals<D: LocalDeploy<'a>, S: ExternalSpec<'a, D> + 'a>(
211        self,
212        spec: impl Fn() -> S,
213    ) -> DeployFlow<'a, D> {
214        self.with_default_optimize().with_remaining_externals(spec)
215    }
216
217    #[cfg(feature = "build")]
218    pub fn with_cluster<C, D: LocalDeploy<'a>>(
219        self,
220        cluster: &Cluster<C>,
221        spec: impl ClusterSpec<'a, D>,
222    ) -> DeployFlow<'a, D> {
223        self.with_default_optimize().with_cluster(cluster, spec)
224    }
225
226    #[cfg(feature = "build")]
227    pub fn with_remaining_clusters<D: LocalDeploy<'a>, S: ClusterSpec<'a, D> + 'a>(
228        self,
229        spec: impl Fn() -> S,
230    ) -> DeployFlow<'a, D> {
231        self.with_default_optimize().with_remaining_clusters(spec)
232    }
233
234    #[cfg(feature = "build")]
235    pub fn compile<D: Deploy<'a>>(self, env: &D::CompileEnv) -> CompiledFlow<'a, D::GraphId> {
236        self.with_default_optimize::<D>().compile(env)
237    }
238
239    #[cfg(feature = "build")]
240    pub fn compile_no_network<D: LocalDeploy<'a>>(self) -> CompiledFlow<'a, D::GraphId> {
241        self.with_default_optimize::<D>().compile_no_network()
242    }
243
244    #[cfg(feature = "build")]
245    pub fn deploy<D: Deploy<'a, CompileEnv = ()>>(
246        self,
247        env: &mut D::InstantiateEnv,
248    ) -> DeployResult<'a, D> {
249        self.with_default_optimize().deploy(env)
250    }
251}