From 9e80b8cd1e44fcf863d926055d9fa458db46e0d3 Mon Sep 17 00:00:00 2001 From: Joel Klinghed Date: Sun, 26 Jan 2025 21:58:42 +0100 Subject: Add basic git support Pushing a commit to a new branch creates a review. Each project has its own git directory, with githooks installed that talkes with server process via unix sockets. --- server/src/git_root.rs | 465 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 465 insertions(+) create mode 100644 server/src/git_root.rs (limited to 'server/src/git_root.rs') diff --git a/server/src/git_root.rs b/server/src/git_root.rs new file mode 100644 index 0000000..b1c533e --- /dev/null +++ b/server/src/git_root.rs @@ -0,0 +1,465 @@ +use futures::{future::TryFutureExt, stream::TryStreamExt}; +use rmp_serde::{decode, Serializer}; +use rocket::fairing::{self, AdHoc}; +use rocket::serde::ser::Serialize; +use rocket::serde::Deserialize; +use rocket::{Build, Rocket}; +use rocket_db_pools::{sqlx, Database, Pool}; +use std::borrow::Cow; +use std::collections::HashMap; +use std::ops::Deref; +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; +use tokio::net::{UnixListener, UnixStream}; +use tokio::task; + +use crate::api_model; +use crate::fs_utils; +use crate::git; +use crate::git_socket; +use crate::Db; + +type DbPool = ::Pool; +type DbConnection = ::Connection; + +#[derive(Debug, Deserialize)] +pub struct Config<'a> { + git_server_root: Cow<'a, str>, +} + +struct RootsData { + project_repo: HashMap>, +} + +pub struct Roots { + data: Mutex, +} + +impl Roots { + pub async fn new_project( + &self, + config: &Config<'_>, + db: &Db, + project_id: &str, + remote: &str, + main_branch: &str, + ) -> Result<(), git::Error> { + let project_id = project_id.to_string(); + let repo = setup_project_root( + config, + db, + &project_id, + remote.to_string(), + main_branch.to_string(), + ) + .await?; + + { + let mut data = self.data.lock().unwrap(); + data.project_repo.insert(project_id, repo); + } + + Ok(()) + } +} + +#[derive(Debug)] +struct IoError { + message: String, +} + +impl IoError { + fn new(message: impl Into) -> Self { + Self { + message: message.into(), + } + } +} + +impl std::fmt::Display for IoError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", self.message) + } +} + +impl std::error::Error for IoError {} + +const EMPTY: &str = "0000000000000000000000000000000000000000"; + +fn is_printable(name: &str) -> bool { + name.as_bytes().iter().all(|c| c.is_ascii_graphic()) +} + +fn valid_branch_name(name: &str) -> bool { + name.starts_with("refs/heads/") && is_printable(name) +} + +async fn git_process_prehook( + repo: &git::Repository, + mut db: DbConnection, + receive: &Vec, +) -> Result { + let mut errors: Vec = Vec::new(); + + for row in receive { + if !valid_branch_name(row.reference.as_str()) { + if row.reference.starts_with("refs/heads/") { + errors.push(format!( + "{}: Bad branch name", + row.reference.strip_prefix("refs/heads/").unwrap() + )); + } else { + errors.push(format!("{}: Only branches are allowed", row.reference)); + } + continue; + } + + let branch = row.reference.strip_prefix("refs/heads/").unwrap(); + + if row.old_value == EMPTY { + // Creating new review, nothing to check (yet). + continue; + } + + let result = sqlx::query!( + "SELECT state, rewrite FROM reviews WHERE project=? AND branch=?", + repo.project_id().unwrap_or(""), + branch + ) + .fetch_one(&mut *db) + .map_ok(|r| { + ( + api_model::ReviewState::try_from(r.state).unwrap(), + api_model::Rewrite::try_from(r.rewrite).unwrap(), + ) + }) + .map_err(|_| IoError::new(format!("{branch}: Unknown branch"))) + .await; + + if row.new_value == EMPTY { + // Do not allow to delete branch if there is a review connected to the branch. + // All branches should be connected to a branch, but in case of errors this might + // be relevant. + if result.is_ok() { + errors.push(format!( + "{branch}: Not allowed to delete branch, delete review instead." + )); + } + continue; + } + + let (state, rewrite) = result?; + + match state { + api_model::ReviewState::Dropped => { + errors.push(format!("{branch}: Review is dropped, no pushes allowed")); + continue; + } + api_model::ReviewState::Closed => { + errors.push(format!("{branch}: Review is closed, no pushes allowed")); + continue; + } + api_model::ReviewState::Draft => {} + api_model::ReviewState::Open => {} + } + + // Check for fast forward, if so we can skip the rest of the checks. + let is_fast_forward = repo + .is_ancestor(&row.old_value, &row.new_value) + .map_err(|e| IoError::new(format!("{branch}: {}", e))) + .await?; + if is_fast_forward { + continue; + } + + match rewrite { + api_model::Rewrite::History => { + let equal_content = repo + .is_equal_content(&row.old_value, &row.new_value) + .map_err(|e| IoError::new(format!("{branch}: {}", e))) + .await?; + if equal_content { + continue; + } + errors.push(format!("{}: History rewrite not allowed as final result does not match. Please check locally with `git diff {} {}`", branch, row.old_value, row.new_value)); + } + api_model::Rewrite::Rebase => {} + api_model::Rewrite::Disabled => { + errors.push(format!( + "Non fast-forward not allowed for review: {}", + row.reference + )); + } + } + } + + Ok(if errors.is_empty() { + git_socket::GitHookResponse { + ok: true, + message: "".to_string(), + } + } else { + git_socket::GitHookResponse { + ok: false, + message: errors.join("\n"), + } + }) +} + +async fn git_process_posthook( + repo: &git::Repository, + mut db: DbConnection, + user_id: &String, + receive: &Vec, +) -> git_socket::GitHookResponse { + let mut messages: Vec = Vec::new(); + let mut updated: Vec = Vec::new(); + + for row in receive { + let branch = row.reference.strip_prefix("refs/heads/").unwrap(); + + if row.old_value == EMPTY { + // Create review + match sqlx::query!( + "INSERT INTO reviews (project, owner, title, branch) VALUES (?, ?, ?, ?)", + repo.project_id(), + user_id, + "Unnamed", + branch + ) + .execute(&mut *db) + .map_err(|e| IoError::new(format!("Database error: {e:?}"))) + .await + { + Ok(result) => { + updated.push(result.last_insert_id()); + messages.push(format!( + "{branch}: Review draft created, finalize at {}", + "TODO" + )); + } + Err(e) => { + messages.push(format!("{branch}: Error {e}",)); + } + }; + } else if row.new_value == EMPTY { + // Delete branch, prehook already checked that it is not connected to a branch. + } else { + match sqlx::query!( + "SELECT id, rewrite FROM reviews WHERE project=? AND branch=?", + repo.project_id().unwrap_or(""), + branch + ) + .fetch_one(&mut *db) + .map_ok(|r| (r.id, api_model::Rewrite::try_from(r.rewrite).unwrap())) + .map_err(|_| IoError::new(format!("{branch}: Unknown branch"))) + .await + { + Ok((id, rewrite)) => match rewrite { + api_model::Rewrite::History | api_model::Rewrite::Rebase => { + match sqlx::query!("UPDATE reviews SET rewrite=0 WHERE id=?", id) + .execute(&mut *db) + .map_err(|e| IoError::new(format!("Database error: {e:?}"))) + .await + { + Ok(_) => { + messages.push(format!( + "{branch}: Review draft created, finalize at {}", + "TODO" + )); + updated.push(id); + } + Err(e) => { + messages.push(format!("{branch}: Error {e}",)); + } + } + } + api_model::Rewrite::Disabled => { + updated.push(id); + } + }, + Err(e) => { + messages.push(format!("{branch}: Error {e}",)); + } + } + } + } + + git_socket::GitHookResponse { + ok: true, + message: messages.join("\n"), + } +} + +async fn git_socket_process( + repo: &git::Repository, + db: DbConnection, + stream: UnixStream, +) -> Result<(), IoError> { + let std_stream = stream.into_std().map_err(|e| IoError::new(e.to_string()))?; + std_stream + .set_nonblocking(false) + .map_err(|e| IoError::new(e.to_string()))?; + let (request, std_stream) = task::spawn_blocking(move || { + let request: Result = + decode::from_read(&std_stream).map_err(|e| IoError::new(e.to_string())); + let _ = std_stream.shutdown(std::net::Shutdown::Read); + request.map(|r| (r, std_stream)) + }) + .map_err(|e| IoError::new(e.to_string())) + .await??; + + let response = if request.pre { + git_process_prehook(repo, db, &request.receive).await? + } else { + git_process_posthook(repo, db, &request.user, &request.receive).await + }; + + task::spawn_blocking(move || { + let mut serializer = Serializer::new(&std_stream); + response + .serialize(&mut serializer) + .map_err(|e| IoError::new(e.to_string())) + }) + .map_err(|e| IoError::new(e.to_string())) + .await??; + + Ok(()) +} + +async fn git_socket_listen( + repo: Arc, + db: ::Pool, +) -> Result<(), std::io::Error> { + let socket = repo.socket().unwrap(); + fs_utils::remove_file_allow_not_found(socket).await?; + let listener = UnixListener::bind(socket)?; + + loop { + match listener.accept().await { + Ok((stream, _addr)) => { + match db.get().await { + Ok(conn) => { + let repo2 = repo.clone(); + tokio::spawn(async move { + git_socket_process(repo2.as_ref(), conn, stream).await + }); + } + Err(_) => { /* unable to access db */ } + } + } + Err(_) => { /* connection failed */ } + } + } +} + +async fn setup_project_root( + config: &Config<'_>, + db: &Db, + project_id: &String, + remote: String, + main_branch: String, +) -> Result, git::Error> { + let mut path = PathBuf::from(config.git_server_root.to_string()); + path.push(project_id); + let repo = Arc::new(git::Repository::new( + path, + true, + Some(remote), + Some(project_id), + )); + repo.setup().await?; + + if !repo.remote().unwrap().is_empty() && !main_branch.is_empty() { + let bg_repo = repo.clone(); + tokio::spawn(async move { bg_repo.fetch(main_branch).await }); + } + + let socket_repo = repo.clone(); + let socket_db = db.deref().clone(); + tokio::spawn(async move { + match git_socket_listen(socket_repo, socket_db).await { + Ok(()) => {} + Err(e) => { + // TODO: Log + print!("git_socket_listen returned {:?}", e) + } + } + }); + + Ok(repo) +} + +#[derive(Debug)] +#[allow(dead_code)] +enum GitOrSqlOrIoError { + Git(git::Error), + Sql(sqlx::Error), + Io(std::io::Error), +} + +async fn setup_projects_roots( + roots: &Roots, + config: &Config<'_>, + db: &Db, +) -> Result<(), GitOrSqlOrIoError> { + fs_utils::create_dir_allow_existing(PathBuf::from(config.git_server_root.to_string())) + .map_err(GitOrSqlOrIoError::Io) + .await?; + + let projects = sqlx::query!("SELECT id,remote,main_branch FROM projects") + .fetch(&**db) + .map_ok(|r| (r.id, r.remote, r.main_branch)) + .map_err(GitOrSqlOrIoError::Sql) + .try_collect::>() + .await + .unwrap(); + + let mut project_repo: HashMap> = HashMap::new(); + + for (id, remote, main_branch) in projects { + let repo = setup_project_root(config, db, &id, remote, main_branch) + .map_err(GitOrSqlOrIoError::Git) + .await?; + project_repo.insert(id, repo); + } + + { + let mut data = roots.data.lock().unwrap(); + data.project_repo = project_repo; + } + + Ok(()) +} + +async fn setup_projects(rocket: Rocket) -> fairing::Result { + match rocket.state::() { + Some(config) => match rocket.state::() { + Some(roots) => match Db::fetch(&rocket) { + Some(db) => match setup_projects_roots(roots, config, db).await { + Ok(_) => Ok(rocket), + Err(e) => { + println!("{:?}", e); + Err(rocket) + } + }, + None => Err(rocket), + }, + None => Err(rocket), + }, + None => Err(rocket), + } +} + +pub fn stage() -> AdHoc { + AdHoc::on_ignite("Git Root Stage", |rocket| async { + rocket + .manage(Roots { + data: Mutex::new(RootsData { + project_repo: HashMap::new(), + }), + }) + .attach(AdHoc::config::()) + .attach(AdHoc::try_on_ignite("Projects setup", setup_projects)) + }) +} -- cgit v1.2.3-70-g09d2