1use std::{
2 any::Any,
3 cell::RefCell,
4 fmt::Debug,
5 future::Future,
6 marker::PhantomData,
7 pin::Pin,
8 sync::{
9 mpsc,
10 mpsc::{Receiver, Sender},
11 },
12 vec::IntoIter,
13};
14
15use serde::{Deserialize, Serialize};
16use thiserror::Error;
17
18use crate::{
19 coords::WorldTileCoords,
20 define_label,
21 environment::{OffscreenKernel, OffscreenKernelConfig},
22 io::scheduler::Scheduler,
23 style::Style,
24};
25
26define_label!(MessageTag);
27
28impl MessageTag for u32 {
29 fn dyn_clone(&self) -> Box<dyn MessageTag> {
30 Box::new(*self)
31 }
32}
33
34#[derive(Error, Debug)]
35pub enum MessageError {
36 #[error("the message did not contain the expected data")]
37 CastError(Box<dyn Any>),
38}
39
40#[derive(Debug)]
43pub struct Message {
44 tag: &'static dyn MessageTag,
45 transferable: Box<dyn Any + Send>,
46}
47
48impl Message {
49 pub fn new(tag: &'static dyn MessageTag, transferable: Box<dyn Any + Send>) -> Self {
50 Self { tag, transferable }
51 }
52
53 pub fn into_transferable<T: 'static>(self) -> Box<T> {
54 self.transferable
55 .downcast::<T>()
56 .expect("message has wrong tag")
57 }
58
59 pub fn has_tag(&self, tag: &'static dyn MessageTag) -> bool {
60 self.tag == tag
61 }
62
63 pub fn tag(&self) -> &'static dyn MessageTag {
64 self.tag
65 }
66}
67
68pub trait IntoMessage {
69 fn into(self) -> Message;
70}
71
72#[derive(Clone, Serialize, Deserialize)]
74pub enum Input {
75 TileRequest {
76 coords: WorldTileCoords,
77 style: Style, },
79 NotYetImplemented, }
81
82#[derive(Error, Debug)]
83pub enum SendError {
84 #[error("could not transmit data")]
85 Transmission,
86}
87
88pub trait Context: 'static {
90 fn send_back<T: IntoMessage>(&self, message: T) -> Result<(), SendError>;
92}
93
94#[derive(Error, Debug)]
95pub enum ProcedureError {
96 #[error("provided input is not compatible with procedure")]
98 IncompatibleInput,
99 #[error("execution of procedure failed")]
100 Execution(Box<dyn std::error::Error>),
101 #[error("sending data failed")]
102 Send(SendError),
103}
104
105#[cfg(feature = "thread-safe-futures")]
106pub type AsyncProcedureFuture =
107 Pin<Box<(dyn Future<Output = Result<(), ProcedureError>> + Send + 'static)>>;
108#[cfg(not(feature = "thread-safe-futures"))]
109pub type AsyncProcedureFuture =
110 Pin<Box<(dyn Future<Output = Result<(), ProcedureError>> + 'static)>>;
111
112#[derive(Error, Debug)]
113pub enum CallError {
114 #[error("scheduling work failed")]
115 Schedule,
116 #[error("serializing data failed")]
117 Serialize(Box<dyn std::error::Error>),
118 #[error("deserializing failed")]
119 Deserialize(Box<dyn std::error::Error>),
120 #[error("deserializing input failed")]
121 DeserializeInput(Box<dyn std::error::Error>),
122}
123
124pub type AsyncProcedure<K, C> = fn(input: Input, context: C, kernel: K) -> AsyncProcedureFuture;
129
130pub trait AsyncProcedureCall<K: OffscreenKernel>: 'static {
171 type Context: Context + Send + Clone;
172
173 type ReceiveIterator<F: FnMut(&Message) -> bool>: Iterator<Item = Message>;
174
175 fn receive<F: FnMut(&Message) -> bool>(&self, filter: F) -> Self::ReceiveIterator<F>;
177
178 fn call(
181 &self,
182 input: Input,
183 procedure: AsyncProcedure<K, Self::Context>,
184 ) -> Result<(), CallError>;
185}
186
187#[derive(Clone)]
188pub struct SchedulerContext {
189 sender: Sender<Message>,
190}
191
192impl Context for SchedulerContext {
193 fn send_back<T: IntoMessage>(&self, message: T) -> Result<(), SendError> {
194 self.sender
195 .send(message.into())
196 .map_err(|_e| SendError::Transmission)
197 }
198}
199
200pub struct SchedulerAsyncProcedureCall<K: OffscreenKernel, S: Scheduler> {
203 channel: (Sender<Message>, Receiver<Message>),
204 buffer: RefCell<Vec<Message>>,
205 scheduler: S,
206 phantom_k: PhantomData<K>,
207 offscreen_kernel_config: OffscreenKernelConfig,
208}
209
210impl<K: OffscreenKernel, S: Scheduler> SchedulerAsyncProcedureCall<K, S> {
211 pub fn new(scheduler: S, offscreen_kernel_config: OffscreenKernelConfig) -> Self {
212 Self {
213 channel: mpsc::channel(),
214 buffer: RefCell::new(Vec::new()),
215 phantom_k: PhantomData::default(),
216 scheduler,
217 offscreen_kernel_config,
218 }
219 }
220}
221
222impl<K: OffscreenKernel, S: Scheduler> AsyncProcedureCall<K> for SchedulerAsyncProcedureCall<K, S> {
223 type Context = SchedulerContext;
224 type ReceiveIterator<F: FnMut(&Message) -> bool> = IntoIter<Message>;
225
226 fn receive<F: FnMut(&Message) -> bool>(&self, mut filter: F) -> Self::ReceiveIterator<F> {
227 let mut buffer = self.buffer.borrow_mut();
228 let mut ret = Vec::new();
229
230 let mut index = 0usize;
232 let mut max_len = buffer.len();
233 while index < max_len {
234 if filter(&buffer[index]) {
235 ret.push(buffer.swap_remove(index));
236 max_len -= 1;
237 }
238 index += 1;
239 }
240
241 while let Ok(message) = self.channel.1.try_recv() {
244 tracing::debug!("Data reached main thread: {message:?}");
245 log::debug!("Data reached main thread: {message:?}");
246
247 if filter(&message) {
248 ret.push(message);
249 } else {
250 buffer.push(message)
251 }
252 }
253
254 ret.into_iter()
255 }
256
257 fn call(
258 &self,
259 input: Input,
260 procedure: AsyncProcedure<K, Self::Context>,
261 ) -> Result<(), CallError> {
262 let sender = self.channel.0.clone();
263 let offscreen_kernel_config = self.offscreen_kernel_config.clone();
264
265 self.scheduler
266 .schedule(move || async move {
267 log::info!("Processing on thread: {:?}", std::thread::current().name());
268
269 let kernel = K::create(offscreen_kernel_config);
270 procedure(input, SchedulerContext { sender }, kernel)
271 .await
272 .unwrap();
273 })
274 .map_err(|_e| CallError::Schedule)
275 }
276}
277
278#[cfg(test)]
279pub mod tests {
280 use crate::io::apc::{Context, IntoMessage, SendError};
281
282 pub struct DummyContext;
283
284 impl Context for DummyContext {
285 fn send_back<T: IntoMessage>(&self, _message: T) -> Result<(), SendError> {
286 Ok(())
287 }
288 }
289}