use futures::{future::TryFutureExt, stream::TryStreamExt}; use rmp_serde::{decode, Serializer}; use rocket::fairing::{self, AdHoc}; use rocket::serde::ser::Serialize; use rocket::serde::Deserialize; use rocket::{Build, Rocket}; use rocket_db_pools::{sqlx, Database, Pool}; use std::borrow::Cow; use std::collections::HashMap; use std::ops::Deref; use std::path::PathBuf; use std::sync::{Arc, Mutex}; use tokio::net::{UnixListener, UnixStream}; use tokio::task; use crate::api_model; use crate::fs_utils; use crate::git; use crate::git_socket; use crate::Db; type DbPool = ::Pool; type DbConnection = ::Connection; #[derive(Debug, Deserialize)] pub struct Config<'a> { git_server_root: Cow<'a, str>, } struct RootsData { project_repo: HashMap>, } pub struct Roots { data: Mutex, } impl Roots { pub async fn new_project( &self, config: &Config<'_>, db: &Db, project_id: &str, remote: &str, main_branch: &str, ) -> Result<(), git::Error> { let project_id = project_id.to_string(); let repo = setup_project_root( config, db, &project_id, remote.to_string(), main_branch.to_string(), ) .await?; { let mut data = self.data.lock().unwrap(); data.project_repo.insert(project_id, repo); } Ok(()) } } #[derive(Debug)] struct IoError { message: String, } impl IoError { fn new(message: impl Into) -> Self { Self { message: message.into(), } } } impl std::fmt::Display for IoError { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, "{}", self.message) } } impl std::error::Error for IoError {} fn is_printable(name: &str) -> bool { name.as_bytes().iter().all(|c| c.is_ascii_graphic()) } fn valid_branch_name(name: &str) -> bool { name.starts_with("refs/heads/") && is_printable(name) } async fn git_process_prehook( repo: &git::Repository, mut db: DbConnection, receive: &Vec, ) -> Result { let mut errors: Vec = Vec::new(); for row in receive { if !valid_branch_name(row.reference.as_str()) { if row.reference.starts_with("refs/heads/") { errors.push(format!( "{}: Bad branch name", row.reference.strip_prefix("refs/heads/").unwrap() )); } else { errors.push(format!("{}: Only branches are allowed", row.reference)); } continue; } let branch = row.reference.strip_prefix("refs/heads/").unwrap(); if row.new_value != git::EMPTY { match row.commiter { Some(ref commiter) => match sqlx::query!( "SELECT id FROM users WHERE id=? AND dn IS NOT NULL", commiter, ) .fetch_one(&mut *db) .map_err(|_| IoError::new(format!("{branch}: Unknown commiter {}", commiter))) .await { Ok(_) => {} Err(e) => { errors.push(e.message); continue; } }, None => { errors.push(format!("{branch}: Missing commiter")); continue; } } } if row.old_value == git::EMPTY { // Creating new review, nothing to check (yet). continue; } let result = sqlx::query!( "SELECT state, rewrite FROM reviews WHERE project=? AND branch=?", repo.project_id().unwrap_or(""), branch ) .fetch_one(&mut *db) .map_ok(|r| { ( api_model::ReviewState::try_from(r.state).unwrap(), api_model::Rewrite::try_from(r.rewrite).unwrap(), ) }) .map_err(|_| IoError::new(format!("{branch}: Unknown branch"))) .await; if row.new_value == git::EMPTY { // Do not allow to delete branch if there is a review connected to the branch. // All branches should be connected to a branch, but in case of errors this might // be relevant. if result.is_ok() { errors.push(format!( "{branch}: Not allowed to delete branch, delete review instead." )); } continue; } let (state, rewrite) = match result { Ok(data) => data, Err(e) => { errors.push(e.message); continue; } }; match state { api_model::ReviewState::Dropped => { errors.push(format!("{branch}: Review is dropped, no pushes allowed")); continue; } api_model::ReviewState::Closed => { errors.push(format!("{branch}: Review is closed, no pushes allowed")); continue; } api_model::ReviewState::Draft => {} api_model::ReviewState::Open => {} } // Check for fast forward, if so we can skip the rest of the checks. let is_fast_forward = repo .is_ancestor(&row.old_value, &row.new_value) .map_err(|e| IoError::new(format!("{branch}: {}", e))) .await?; if is_fast_forward { continue; } match rewrite { api_model::Rewrite::History => { let equal_content = repo .is_equal_content(&row.old_value, &row.new_value) .map_err(|e| IoError::new(format!("{branch}: {}", e))) .await?; if equal_content { continue; } errors.push(format!("{}: History rewrite not allowed as final result does not match. Please check locally with `git diff {} {}`", branch, row.old_value, row.new_value)); } api_model::Rewrite::Rebase => {} api_model::Rewrite::Disabled => { errors.push(format!( "Non fast-forward not allowed for review: {}", row.reference )); } } } Ok(if errors.is_empty() { git_socket::GitHookResponse { ok: true, message: "".to_string(), } } else { git_socket::GitHookResponse { ok: false, message: errors.join("\n"), } }) } async fn git_process_posthook( repo: &git::Repository, mut db: DbConnection, receive: &Vec, ) -> git_socket::GitHookResponse { let mut messages: Vec = Vec::new(); let mut updated: Vec = Vec::new(); for row in receive { let branch = row.reference.strip_prefix("refs/heads/").unwrap(); if row.old_value == git::EMPTY { let commiter = match repo.get_commiter(row.reference.as_str()).await { Ok(user) => user, Err(e) => { messages.push(format!("{branch}: {e}")); continue; } }; // Create review match sqlx::query!( "INSERT INTO reviews (project, owner, title, branch) VALUES (?, ?, ?, ?)", repo.project_id(), commiter.username, "Unnamed", branch ) .execute(&mut *db) .map_err(|e| IoError::new(format!("Database error: {e:?}"))) .await { Ok(result) => { updated.push(result.last_insert_id()); messages.push(format!( "{branch}: Review draft created, finalize at {}", "TODO" )); } Err(e) => { messages.push(format!("{branch}: Error {e}",)); } }; } else if row.new_value == git::EMPTY { // Delete branch, prehook already checked that it is not connected to a branch. } else { match sqlx::query!( "SELECT id, rewrite FROM reviews WHERE project=? AND branch=?", repo.project_id().unwrap_or(""), branch ) .fetch_one(&mut *db) .map_ok(|r| (r.id, api_model::Rewrite::try_from(r.rewrite).unwrap())) .map_err(|_| IoError::new(format!("{branch}: Unknown branch"))) .await { Ok((id, rewrite)) => match rewrite { api_model::Rewrite::History | api_model::Rewrite::Rebase => { match sqlx::query!("UPDATE reviews SET rewrite=0 WHERE id=?", id) .execute(&mut *db) .map_err(|e| IoError::new(format!("Database error: {e:?}"))) .await { Ok(_) => { messages.push(format!( "{branch}: Review draft created, finalize at {}", "TODO" )); updated.push(id); } Err(e) => { messages.push(format!("{branch}: Error {e}",)); } } } api_model::Rewrite::Disabled => { updated.push(id); } }, Err(e) => { messages.push(format!("{branch}: Error {e}",)); } } } } git_socket::GitHookResponse { ok: true, message: messages.join("\n"), } } async fn git_socket_process( repo: &git::Repository, db: DbConnection, stream: UnixStream, ) -> Result<(), IoError> { let std_stream = stream.into_std().map_err(|e| IoError::new(e.to_string()))?; std_stream .set_nonblocking(false) .map_err(|e| IoError::new(e.to_string()))?; let (request, std_stream) = task::spawn_blocking(move || { let request: Result = decode::from_read(&std_stream).map_err(|e| IoError::new(e.to_string())); let _ = std_stream.shutdown(std::net::Shutdown::Read); request.map(|r| (r, std_stream)) }) .map_err(|e| IoError::new(e.to_string())) .await??; let response = if request.pre { git_process_prehook(repo, db, &request.receive).await? } else { git_process_posthook(repo, db, &request.receive).await }; task::spawn_blocking(move || { let mut serializer = Serializer::new(&std_stream); response .serialize(&mut serializer) .map_err(|e| IoError::new(e.to_string())) }) .map_err(|e| IoError::new(e.to_string())) .await??; Ok(()) } async fn git_socket_listen( repo: Arc, db: ::Pool, ) -> Result<(), std::io::Error> { let socket = repo.socket().unwrap(); fs_utils::remove_file_allow_not_found(socket).await?; let listener = UnixListener::bind(socket)?; loop { match listener.accept().await { Ok((stream, _addr)) => { match db.get().await { Ok(conn) => { let repo2 = repo.clone(); tokio::spawn(async move { git_socket_process(repo2.as_ref(), conn, stream).await }); } Err(_) => { /* unable to access db */ } } } Err(_) => { /* connection failed */ } } } } async fn setup_project_root( config: &Config<'_>, db: &Db, project_id: &String, remote: String, main_branch: String, ) -> Result, git::Error> { let mut path = PathBuf::from(config.git_server_root.to_string()); path.push(project_id); let repo = Arc::new(git::Repository::new( path, true, Some(remote), Some(project_id), )); repo.setup().await?; if !repo.remote().unwrap().is_empty() && !main_branch.is_empty() { let bg_repo = repo.clone(); tokio::spawn(async move { bg_repo.fetch(main_branch).await }); } let socket_repo = repo.clone(); let socket_db = db.deref().clone(); tokio::spawn(async move { match git_socket_listen(socket_repo, socket_db).await { Ok(()) => {} Err(e) => { // TODO: Log print!("git_socket_listen returned {:?}", e) } } }); Ok(repo) } async fn setup_projects_roots(roots: &Roots, config: &Config<'_>, db: &Db) -> anyhow::Result<()> { fs_utils::create_dir_allow_existing(PathBuf::from(config.git_server_root.to_string())).await?; let projects = sqlx::query!("SELECT id,remote,main_branch FROM projects") .fetch(&**db) .map_ok(|r| (r.id, r.remote, r.main_branch)) .try_collect::>() .await .unwrap(); let mut project_repo: HashMap> = HashMap::new(); for (id, remote, main_branch) in projects { let repo = setup_project_root(config, db, &id, remote, main_branch).await?; project_repo.insert(id, repo); } { let mut data = roots.data.lock().unwrap(); data.project_repo = project_repo; } Ok(()) } async fn setup_projects(rocket: Rocket) -> fairing::Result { match rocket.state::() { Some(config) => match rocket.state::() { Some(roots) => match Db::fetch(&rocket) { Some(db) => match setup_projects_roots(roots, config, db).await { Ok(_) => Ok(rocket), Err(e) => { println!("{:?}", e); Err(rocket) } }, None => Err(rocket), }, None => Err(rocket), }, None => Err(rocket), } } pub fn stage() -> AdHoc { AdHoc::on_ignite("Git Root Stage", |rocket| async { rocket .manage(Roots { data: Mutex::new(RootsData { project_repo: HashMap::new(), }), }) .attach(AdHoc::config::()) .attach(AdHoc::try_on_ignite("Projects setup", setup_projects)) }) }