maplibre/io/
apc.rs

1use std::{
2    any::Any,
3    cell::RefCell,
4    fmt::Debug,
5    future::Future,
6    marker::PhantomData,
7    pin::Pin,
8    sync::{
9        mpsc,
10        mpsc::{Receiver, Sender},
11    },
12    vec::IntoIter,
13};
14
15use serde::{Deserialize, Serialize};
16use thiserror::Error;
17
18use crate::{
19    coords::WorldTileCoords,
20    define_label,
21    environment::{OffscreenKernel, OffscreenKernelConfig},
22    io::scheduler::Scheduler,
23    style::Style,
24};
25
26define_label!(MessageTag);
27
28impl MessageTag for u32 {
29    fn dyn_clone(&self) -> Box<dyn MessageTag> {
30        Box::new(*self)
31    }
32}
33
34#[derive(Error, Debug)]
35pub enum MessageError {
36    #[error("the message did not contain the expected data")]
37    CastError(Box<dyn Any>),
38}
39
40/// The result of the tessellation of a tile. This is sent as a message from a worker to the caller
41/// of an [`AsyncProcedure`].
42#[derive(Debug)]
43pub struct Message {
44    tag: &'static dyn MessageTag,
45    transferable: Box<dyn Any + Send>,
46}
47
48impl Message {
49    pub fn new(tag: &'static dyn MessageTag, transferable: Box<dyn Any + Send>) -> Self {
50        Self { tag, transferable }
51    }
52
53    pub fn into_transferable<T: 'static>(self) -> Box<T> {
54        self.transferable
55            .downcast::<T>()
56            .expect("message has wrong tag")
57    }
58
59    pub fn has_tag(&self, tag: &'static dyn MessageTag) -> bool {
60        self.tag == tag
61    }
62
63    pub fn tag(&self) -> &'static dyn MessageTag {
64        self.tag
65    }
66}
67
68pub trait IntoMessage {
69    fn into(self) -> Message;
70}
71
72/// Inputs for an [`AsyncProcedure`]
73#[derive(Clone, Serialize, Deserialize)]
74pub enum Input {
75    TileRequest {
76        coords: WorldTileCoords,
77        style: Style, // TODO
78    },
79    NotYetImplemented, // TODO: Placeholder, should be removed when second input is added
80}
81
82#[derive(Error, Debug)]
83pub enum SendError {
84    #[error("could not transmit data")]
85    Transmission,
86}
87
88/// Allows sending messages from workers to back to the caller.
89pub trait Context: 'static {
90    /// Send a message back to the caller.
91    fn send_back<T: IntoMessage>(&self, message: T) -> Result<(), SendError>;
92}
93
94#[derive(Error, Debug)]
95pub enum ProcedureError {
96    /// The [`Input`] is not compatible with the procedure
97    #[error("provided input is not compatible with procedure")]
98    IncompatibleInput,
99    #[error("execution of procedure failed")]
100    Execution(Box<dyn std::error::Error>),
101    #[error("sending data failed")]
102    Send(SendError),
103}
104
105#[cfg(feature = "thread-safe-futures")]
106pub type AsyncProcedureFuture =
107    Pin<Box<(dyn Future<Output = Result<(), ProcedureError>> + Send + 'static)>>;
108#[cfg(not(feature = "thread-safe-futures"))]
109pub type AsyncProcedureFuture =
110    Pin<Box<(dyn Future<Output = Result<(), ProcedureError>> + 'static)>>;
111
112#[derive(Error, Debug)]
113pub enum CallError {
114    #[error("scheduling work failed")]
115    Schedule,
116    #[error("serializing data failed")]
117    Serialize(Box<dyn std::error::Error>),
118    #[error("deserializing failed")]
119    Deserialize(Box<dyn std::error::Error>),
120    #[error("deserializing input failed")]
121    DeserializeInput(Box<dyn std::error::Error>),
122}
123
124/// Type definitions for asynchronous procedure calls. These functions can be called in an
125/// [`AsyncProcedureCall`]. Functions of this type are required to be statically available at
126/// compile time. It is explicitly not possible to use closures, as they would require special
127/// serialization which is currently not supported.
128pub type AsyncProcedure<K, C> = fn(input: Input, context: C, kernel: K) -> AsyncProcedureFuture;
129
130/// APCs define an interface for performing work asynchronously.
131/// This work can be implemented through procedures which can be called asynchronously, hence the
132/// name AsyncProcedureCall or APC for short.
133///
134/// APCs serve as an abstraction for doing work on a separate thread, and then getting responses
135/// back. An asynchronous procedure call can for example be performed by using message passing. In
136/// fact this could theoretically work over a network socket.
137///
138/// It is possible to schedule work on a  remote host by calling [`AsyncProcedureCall::call()`]
139/// and getting the results back by calling the non-blocking function
140/// [`AsyncProcedureCall::receive()`]. The [`AsyncProcedureCall::receive()`] function returns a
141/// struct which implements [`Transferables`].
142///
143/// ## Transferables
144///
145/// Based on whether the current platform supports shared-memory or not, the implementation of APCs
146/// might want to send the whole data from the worker to the caller back or just pointers to that
147/// data. The [`Transferables`] trait allows developers to define that and use different data
148/// layouts for different platforms.
149///
150/// ## Message Passing vs APC
151///
152/// One might wonder why this is called [`AsyncProcedureCall`] instead of `MessagePassingInterface`.
153/// The reason for this is quite simple. We are actually referencing and calling procedures which
154/// are defined in different threads, processes or hosts. That means, that an [`AsyncProcedureCall`]
155/// is actually distinct from a `MessagePassingInterface`.
156///
157///
158/// ## Current Implementations
159///
160/// We currently have two implementation for APCs. One uses the Tokio async runtime on native
161/// targets in [`SchedulerAsyncProcedureCall`].
162/// For the web we implemented an alternative way to call APCs which is called
163/// [`PassingAsyncProcedureCall`]. This implementation does not depend on shared-memory compared to
164/// [`SchedulerAsyncProcedureCall`]. In fact, on the web we are currently not depending on
165/// shared-memory because that feature is hidden behind feature flags in browsers
166/// (see [here](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/SharedArrayBuffer)).
167///
168///
169// TODO: Rename to AsyncProcedureCaller?
170pub trait AsyncProcedureCall<K: OffscreenKernel>: 'static {
171    type Context: Context + Send + Clone;
172
173    type ReceiveIterator<F: FnMut(&Message) -> bool>: Iterator<Item = Message>;
174
175    /// Try to receive a message non-blocking.
176    fn receive<F: FnMut(&Message) -> bool>(&self, filter: F) -> Self::ReceiveIterator<F>;
177
178    /// Call an [`AsyncProcedure`] using some [`Input`]. This function is non-blocking and
179    /// returns immediately.
180    fn call(
181        &self,
182        input: Input,
183        procedure: AsyncProcedure<K, Self::Context>,
184    ) -> Result<(), CallError>;
185}
186
187#[derive(Clone)]
188pub struct SchedulerContext {
189    sender: Sender<Message>,
190}
191
192impl Context for SchedulerContext {
193    fn send_back<T: IntoMessage>(&self, message: T) -> Result<(), SendError> {
194        self.sender
195            .send(message.into())
196            .map_err(|_e| SendError::Transmission)
197    }
198}
199
200// An APC that uses a scheduler to execute work asynchronously.
201// An async sender and receiver to exchange return values of calls.
202pub struct SchedulerAsyncProcedureCall<K: OffscreenKernel, S: Scheduler> {
203    channel: (Sender<Message>, Receiver<Message>),
204    buffer: RefCell<Vec<Message>>,
205    scheduler: S,
206    phantom_k: PhantomData<K>,
207    offscreen_kernel_config: OffscreenKernelConfig,
208}
209
210impl<K: OffscreenKernel, S: Scheduler> SchedulerAsyncProcedureCall<K, S> {
211    pub fn new(scheduler: S, offscreen_kernel_config: OffscreenKernelConfig) -> Self {
212        Self {
213            channel: mpsc::channel(),
214            buffer: RefCell::new(Vec::new()),
215            phantom_k: PhantomData::default(),
216            scheduler,
217            offscreen_kernel_config,
218        }
219    }
220}
221
222impl<K: OffscreenKernel, S: Scheduler> AsyncProcedureCall<K> for SchedulerAsyncProcedureCall<K, S> {
223    type Context = SchedulerContext;
224    type ReceiveIterator<F: FnMut(&Message) -> bool> = IntoIter<Message>;
225
226    fn receive<F: FnMut(&Message) -> bool>(&self, mut filter: F) -> Self::ReceiveIterator<F> {
227        let mut buffer = self.buffer.borrow_mut();
228        let mut ret = Vec::new();
229
230        // FIXME tcs: Verify this!
231        let mut index = 0usize;
232        let mut max_len = buffer.len();
233        while index < max_len {
234            if filter(&buffer[index]) {
235                ret.push(buffer.swap_remove(index));
236                max_len -= 1;
237            }
238            index += 1;
239        }
240
241        // TODO: (optimize) Using while instead of if means that we are processing all that is
242        // TODO: available this might cause frame drops.
243        while let Ok(message) = self.channel.1.try_recv() {
244            tracing::debug!("Data reached main thread: {message:?}");
245            log::debug!("Data reached main thread: {message:?}");
246
247            if filter(&message) {
248                ret.push(message);
249            } else {
250                buffer.push(message)
251            }
252        }
253
254        ret.into_iter()
255    }
256
257    fn call(
258        &self,
259        input: Input,
260        procedure: AsyncProcedure<K, Self::Context>,
261    ) -> Result<(), CallError> {
262        let sender = self.channel.0.clone();
263        let offscreen_kernel_config = self.offscreen_kernel_config.clone();
264
265        self.scheduler
266            .schedule(move || async move {
267                log::info!("Processing on thread: {:?}", std::thread::current().name());
268
269                let kernel = K::create(offscreen_kernel_config);
270                procedure(input, SchedulerContext { sender }, kernel)
271                    .await
272                    .unwrap();
273            })
274            .map_err(|_e| CallError::Schedule)
275    }
276}
277
278#[cfg(test)]
279pub mod tests {
280    use crate::io::apc::{Context, IntoMessage, SendError};
281
282    pub struct DummyContext;
283
284    impl Context for DummyContext {
285        fn send_back<T: IntoMessage>(&self, _message: T) -> Result<(), SendError> {
286            Ok(())
287        }
288    }
289}