Connect the user back to any channels they were previously connected to
Diff
src/client.rs | 18 ++++++++++++++++++
src/main.rs | 7 ++++++-
src/persistence.rs | 28 ++++++++++++++++++++++++++++
src/persistence/events.rs | 7 +++++++
4 files changed, 58 insertions(+), 2 deletions(-)
@@ -19,6 +19,7 @@
ServerDisconnect, ServerFetchMotd, UserKickedFromChannel, UserNickChange,
UserNickChangeInternal,
},
persistence::{events::FetchUserChannels, Persistence},
server::Server,
SERVER_NAME,
};
@@ -44,6 +45,8 @@
pub server_leave_reason: Option<String>,
pub persistence: Addr<Persistence>,
pub span: Span,
}
@@ -71,6 +74,21 @@
command: Command::PING(SERVER_NAME.to_string(), None),
});
});
ctx.spawn(
self.persistence
.send(FetchUserChannels {
username: self.connection.user.to_string(),
span: Span::current(),
})
.into_actor(self)
.map(move |res, this, ctx| {
ctx.notify(JoinChannelRequest {
channels: res.unwrap(),
span: this.span.clone(),
});
}),
);
}
@@ -79,7 +79,7 @@
let server_arbiter = Arbiter::new();
let persistence = {
let persistence_addr = {
let database = database.clone();
Supervisor::start_in_arbiter(&server_arbiter.handle(), move |_ctx| Persistence {
@@ -87,6 +87,7 @@
})
};
let persistence = persistence_addr.clone();
let server = Supervisor::start_in_arbiter(&server_arbiter.handle(), move |_ctx| Server {
channels: HashMap::default(),
clients: HashMap::default(),
@@ -100,6 +101,7 @@
actix_rt::spawn(start_tcp_acceptor_loop(
listener,
database,
persistence_addr,
server,
client_threads,
));
@@ -117,6 +119,7 @@
async fn start_tcp_acceptor_loop(
listener: TcpListener,
database: sqlx::Pool<sqlx::Any>,
persistence: Addr<Persistence>,
server: Addr<Server>,
client_threads: usize,
) {
@@ -131,6 +134,7 @@
let database = database.clone();
let server = server.clone();
let client_arbiters = client_arbiters.clone();
let persistence = persistence.clone();
actix_rt::spawn(async move {
@@ -170,6 +174,7 @@
graceful_shutdown: false,
server_leave_reason: None,
span,
persistence,
}
})
};
@@ -1,9 +1,9 @@
pub mod events;
use actix::{Context, Handler, ResponseFuture};
use tracing::instrument;
use crate::persistence::events::{ChannelCreated, ChannelJoined, ChannelParted};
use crate::persistence::events::{ChannelCreated, ChannelJoined, ChannelParted, FetchUserChannels};
pub struct Persistence {
@@ -78,6 +78,32 @@
.execute(&conn)
.await
.unwrap();
})
}
}
impl Handler<FetchUserChannels> for Persistence {
type Result = ResponseFuture<Vec<String>>;
fn handle(&mut self, msg: FetchUserChannels, _ctx: &mut Self::Context) -> Self::Result {
let conn = self.database.clone();
Box::pin(async move {
sqlx::query_as(
"SELECT channels.name
FROM channel_users
INNER JOIN channels
ON channels.id = channel_users.channel
WHERE user = (SELECT id FROM users WHERE username = ?)
AND in_channel = true",
)
.bind(msg.username)
.fetch_all(&conn)
.await
.unwrap()
.into_iter()
.map(|(v,)| v)
.collect()
})
}
}
@@ -22,3 +22,10 @@
pub username: String,
pub span: Span,
}
#[derive(Message)]
#[rtype(result = "Vec<String>")]
pub struct FetchUserChannels {
pub username: String,
pub span: Span,
}