use futures::{future::TryFutureExt, stream::TryStreamExt}; use log::{trace, error}; use rmp_serde::{decode, Serializer}; use rocket::fairing::{self, AdHoc}; use rocket::serde::Deserialize; use rocket::serde::ser::Serialize; use rocket::{Build, Rocket}; use rocket_db_pools::{sqlx, Database, Pool}; use std::borrow::Cow; use std::collections::HashMap; use std::fs::Permissions; use std::ops::Deref; use std::os::unix::fs::PermissionsExt; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; use tokio::fs; use tokio::net::{UnixListener, UnixStream}; use tokio::task; use url::Url; use crate::api_model; use crate::fs_utils; use crate::git; use crate::git_socket; use crate::Db; type DbPool = ::Pool; type DbConnection = ::Connection; #[derive(Debug, Deserialize)] pub struct Config<'a> { git_server_root: Cow<'a, str>, git_hook: Cow<'a, str>, } struct RootsData { project_repo: HashMap>, } pub struct Roots { data: Mutex, } impl Roots { pub async fn new_project( &self, config: &Config<'_>, db: &Db, project_id: &str, remote: &str, remote_key: &str, main_branch: &str, ) -> Result<(), anyhow::Error> { let project_id = project_id.to_string(); let repo = setup_project_root( config, db, &project_id, remote.to_string(), remote_key.to_string(), main_branch.to_string(), ) .await?; { let mut data = self.data.lock().unwrap(); data.project_repo.insert(project_id, repo); } Ok(()) } pub async fn del_branch(&self, project_id: &str, branch: &str) -> Result<(), git::Error> { let repo; { let data = self.data.lock().unwrap(); if let Some(tmp_repo) = data.project_repo.get(project_id) { repo = tmp_repo.clone(); } else { return Ok(()); } } repo.delete_branch(branch).await } } #[derive(Debug)] struct IoError { message: String, } impl IoError { fn new(message: impl Into) -> Self { Self { message: message.into(), } } } impl std::fmt::Display for IoError { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, "{}", self.message) } } impl std::error::Error for IoError {} fn is_printable(name: &str) -> bool { name.as_bytes().iter().all(|c| c.is_ascii_graphic()) } fn valid_branch_name(name: &str) -> bool { name.starts_with("refs/heads/") && is_printable(name) } async fn git_process_prehook( repo: &git::Repository, mut db: DbConnection, receive: &Vec, ) -> Result { let mut errors: Vec = Vec::new(); for row in receive { trace!("prehook {} {} {}", row.old_value, row.new_value, row.reference); if !valid_branch_name(row.reference.as_str()) { if row.reference.starts_with("refs/heads/") { error!("{}: Bad branch name", row.reference); errors.push(format!( "{}: Bad branch name", row.reference.strip_prefix("refs/heads/").unwrap() )); } else { error!("{}: Only branches are allowed", row.reference); errors.push(format!("{}: Only branches are allowed", row.reference)); } continue; } let branch = row.reference.strip_prefix("refs/heads/").unwrap(); if row.new_value != git::EMPTY { match row.commiter { Some(ref commiter) => match sqlx::query!( "SELECT id FROM users WHERE id=? AND dn IS NOT NULL", commiter, ) .fetch_one(&mut *db) .map_err(|_| IoError::new(format!("{branch}: Unknown commiter {}", commiter))) .await { Ok(_) => {} Err(e) => { error!("{e:?}"); errors.push(e.message); continue; } }, None => { error!("{branch}: Missing commiter"); errors.push(format!("{branch}: Missing commiter")); continue; } } } if row.old_value == git::EMPTY { // Creating new review, nothing to check (yet). continue; } let result = sqlx::query!( "SELECT state, rewrite FROM reviews WHERE project=? AND branch=?", repo.project_id().unwrap_or(""), branch ) .fetch_one(&mut *db) .map_ok(|r| { ( api_model::ReviewState::try_from(r.state).unwrap(), api_model::Rewrite::try_from(r.rewrite).unwrap(), ) }) .map_err(|_| IoError::new(format!("{branch}: Unknown branch"))) .await; if row.new_value == git::EMPTY { // Do not allow to delete branch if there is a review connected to the branch. // All branches should be connected to a branch, but in case of errors this might // be relevant. if result.is_ok() { error!("{branch}: Not allowed to delete branch, delete review instead."); errors.push(format!( "{branch}: Not allowed to delete branch, delete review instead." )); } continue; } let (state, rewrite) = match result { Ok(data) => data, Err(e) => { error!("{e:?}"); errors.push(e.message); continue; } }; match state { api_model::ReviewState::Dropped => { error!("{branch}: Review is dropped, no pushes allowed"); errors.push(format!("{branch}: Review is dropped, no pushes allowed")); continue; } api_model::ReviewState::Closed => { error!("{branch}: Review is closed, no pushes allowed"); errors.push(format!("{branch}: Review is closed, no pushes allowed")); continue; } api_model::ReviewState::Draft => {} api_model::ReviewState::Open => {} } // Check for fast forward, if so we can skip the rest of the checks. let is_fast_forward = repo .is_ancestor(&row.old_value, &row.new_value) .map_err(|e| IoError::new(format!("{branch}: {}", e))) .await?; if is_fast_forward { continue; } match rewrite { api_model::Rewrite::History => { let equal_content = repo .is_equal_content(&row.old_value, &row.new_value) .map_err(|e| IoError::new(format!("{branch}: {}", e))) .await?; if equal_content { continue; } error!("{branch}: History rewrite not allowed"); errors.push(format!("{}: History rewrite not allowed as final result does not match. Please check locally with `git diff {} {}`", branch, row.old_value, row.new_value)); } api_model::Rewrite::Rebase => {} api_model::Rewrite::Disabled => { error!("{}: Non fast-forward not allowed", row.reference); errors.push(format!( "Non fast-forward not allowed for review: {}", row.reference )); } } } Ok(if errors.is_empty() { git_socket::GitHookResponse { ok: true, message: "".to_string(), } } else { git_socket::GitHookResponse { ok: false, message: errors.join("\n"), } }) } async fn git_process_posthook( repo: &git::Repository, mut db: DbConnection, receive: &Vec, ) -> git_socket::GitHookResponse { let mut messages: Vec = Vec::new(); let mut updated: Vec = Vec::new(); for row in receive { trace!("posthook {} {} {}", row.old_value, row.new_value, row.reference); let branch = row.reference.strip_prefix("refs/heads/").unwrap(); if row.old_value == git::EMPTY { let commiter = match repo.get_commiter(row.reference.as_str()).await { Ok(user) => user, Err(e) => { error!("{e:?}"); messages.push(format!("{branch}: {e}")); continue; } }; // Create review match sqlx::query!( "INSERT INTO reviews (project, owner, title, branch) VALUES (?, ?, ?, ?)", repo.project_id(), commiter.username, "Unnamed", branch ) .execute(&mut *db) .map_err(|e| IoError::new(format!("Database error: {e:?}"))) .await { Ok(result) => { updated.push(result.last_insert_id()); messages.push(format!( "{branch}: Review draft created, finalize at {}", "TODO" )); } Err(e) => { error!("{e:?}"); messages.push(format!("{branch}: Error {e}",)); } }; } else if row.new_value == git::EMPTY { // Delete branch, prehook already checked that it is not connected to a review. } else { match sqlx::query!( "SELECT id, rewrite FROM reviews WHERE project=? AND branch=?", repo.project_id().unwrap_or(""), branch ) .fetch_one(&mut *db) .map_ok(|r| (r.id, api_model::Rewrite::try_from(r.rewrite).unwrap())) .map_err(|_| IoError::new(format!("{branch}: Unknown branch"))) .await { Ok((id, rewrite)) => match rewrite { api_model::Rewrite::History | api_model::Rewrite::Rebase => { match sqlx::query!("UPDATE reviews SET rewrite=0 WHERE id=?", id) .execute(&mut *db) .map_err(|e| IoError::new(format!("Database error: {e:?}"))) .await { Ok(_) => { messages.push(format!( "{branch}: Review draft created, finalize at {}", "TODO" )); updated.push(id); } Err(e) => { error!("{e:?}"); messages.push(format!("{branch}: Error {e}",)); } } } api_model::Rewrite::Disabled => { updated.push(id); } }, Err(e) => { error!("{e:?}"); messages.push(format!("{branch}: Error {e}",)); } } } } git_socket::GitHookResponse { ok: true, message: messages.join("\n"), } } async fn git_socket_process( repo: &git::Repository, db: DbConnection, stream: UnixStream, ) -> Result<(), IoError> { let std_stream = stream.into_std().map_err(|e| IoError::new(e.to_string()))?; std_stream .set_nonblocking(false) .map_err(|e| IoError::new(e.to_string()))?; let (request, std_stream) = task::spawn_blocking(move || { let request: Result = decode::from_read(&std_stream).map_err(|e| IoError::new(e.to_string())); let _ = std_stream.shutdown(std::net::Shutdown::Read); request.map(|r| (r, std_stream)) }) .map_err(|e| IoError::new(e.to_string())) .await??; let response = if request.pre { git_process_prehook(repo, db, &request.receive).await? } else { git_process_posthook(repo, db, &request.receive).await }; task::spawn_blocking(move || { let mut serializer = Serializer::new(&std_stream); response .serialize(&mut serializer) .map_err(|e| IoError::new(e.to_string())) }) .map_err(|e| IoError::new(e.to_string())) .await??; Ok(()) } async fn git_socket_listen( repo: Arc, db: ::Pool, ) -> Result<(), std::io::Error> { let socket = repo.socket().unwrap(); fs_utils::remove_file_allow_not_found(socket).await?; let listener = UnixListener::bind(socket)?; loop { match listener.accept().await { Ok((stream, _addr)) => { match db.get().await { Ok(conn) => { let repo2 = repo.clone(); tokio::spawn(async move { git_socket_process(repo2.as_ref(), conn, stream).await }); } Err(_) => { /* unable to access db */ } } } Err(_) => { /* connection failed */ } } } } fn get_host(url: &str) -> String { match Url::parse(url) { Ok(u) => match u.host_str() { Some(h) => h.to_string(), None => String::new(), }, Err(_) => String::new(), } } async fn write_private_file, D: AsRef<[u8]>>(path: P, data: D) -> anyhow::Result<()> { fs::write(&path, data).await?; let permissions = Permissions::from_mode(0o600); fs::set_permissions(&path, permissions).await?; Ok(()) } async fn write_ssh_config(ssh_config: &Path, host: &str, remote_key: &str) -> anyhow::Result<()> { let host_pattern = if host.is_empty() { "*" } else { host }; let basedir = ssh_config.parent().unwrap(); let identity_file = basedir.join("ssh_identity"); let known_hosts = basedir.join("ssh_known_hosts"); let mut identity_data = String::from("-----BEGIN OPENSSH PRIVATE KEY-----\n"); { let mut left = remote_key; while left.len() > 70 { let (a, b) = left.split_at(70); identity_data.push_str(a); identity_data.push('\n'); left = b; } identity_data.push_str(left); identity_data.push('\n'); } identity_data.push_str("-----END OPENSSH PRIVATE KEY-----\n"); let config_data = format!("Host {host_pattern} IdentityFile ./{} PasswordAuthentication no StrictHostKeyChecking accept-new UpdateHostKeys yes UserKnownHostsFile ./{} ", identity_file.file_name().unwrap().to_str().unwrap(), known_hosts.file_name().unwrap().to_str().unwrap()); let (a, b, c) = tokio::join!( write_private_file(identity_file, identity_data), fs::write(known_hosts, "\n"), fs::write(ssh_config, config_data), ); a?; b?; c?; Ok(()) } async fn setup_project_root( config: &Config<'_>, db: &Db, project_id: &String, remote: String, remote_key: String, main_branch: String, ) -> Result, anyhow::Error> { let mut path = PathBuf::from(config.git_server_root.to_string()); path.push(project_id); info!("{project_id}: Setup repo at {path:?}"); let githook = PathBuf::from(config.git_hook.to_string()); let ssh_config = path.join("ssh_config"); let repo = Arc::new(git::Repository::new( path, true, Some(remote), Some(project_id), Some(githook), Some(ssh_config), )); repo.setup().await?; write_ssh_config(repo.ssh_config().unwrap(), get_host(repo.remote().unwrap()).as_str(), remote_key.as_str()).await?; if !repo.remote().unwrap().is_empty() && !main_branch.is_empty() { let bg_repo = repo.clone(); let bg_project_id = project_id.clone(); tokio::spawn(async move { match bg_repo.fetch(&main_branch).await { Ok(()) => {} Err(e) => { error!("{bg_project_id}: fetch {main_branch} returned {e:?}"); } } }); } let socket_repo = repo.clone(); let socket_db = db.deref().clone(); let bg_project_id = project_id.clone(); tokio::spawn(async move { match git_socket_listen(socket_repo, socket_db).await { Ok(()) => {} Err(e) => { error!("{bg_project_id}: git_socket_listen returned {e:?}"); } } }); Ok(repo) } async fn setup_projects_roots(roots: &Roots, config: &Config<'_>, db: &Db) -> anyhow::Result<()> { fs_utils::create_dir_allow_existing(PathBuf::from(config.git_server_root.to_string())).await?; let projects = sqlx::query!("SELECT id,remote,remote_key,main_branch FROM projects") .fetch(&**db) .map_ok(|r| (r.id, r.remote, r.remote_key, r.main_branch)) .try_collect::>() .await .unwrap(); let mut project_repo: HashMap> = HashMap::new(); for (id, remote, remote_key, main_branch) in projects { let repo = setup_project_root(config, db, &id, remote, remote_key, main_branch).await?; project_repo.insert(id, repo); } { let mut data = roots.data.lock().unwrap(); data.project_repo = project_repo; } Ok(()) } async fn setup_projects(rocket: Rocket) -> fairing::Result { match rocket.state::() { Some(config) => match rocket.state::() { Some(roots) => match Db::fetch(&rocket) { Some(db) => match setup_projects_roots(roots, config, db).await { Ok(_) => Ok(rocket), Err(e) => { error!("{e:?}"); Err(rocket) }, }, None => Err(rocket), }, None => Err(rocket), }, None => Err(rocket), } } pub fn stage() -> AdHoc { AdHoc::on_ignite("Git Root Stage", |rocket| async { rocket .manage(Roots { data: Mutex::new(RootsData { project_repo: HashMap::new(), }), }) .attach(AdHoc::config::()) .attach(AdHoc::try_on_ignite("Projects setup", setup_projects)) }) }