diff options
Diffstat (limited to 'server/src')
| -rw-r--r-- | server/src/api_model.rs | 34 | ||||
| -rw-r--r-- | server/src/db_utils.rs | 4 | ||||
| -rw-r--r-- | server/src/fs_utils.rs | 54 | ||||
| -rw-r--r-- | server/src/git.rs | 451 | ||||
| -rw-r--r-- | server/src/git_root.rs | 465 | ||||
| -rw-r--r-- | server/src/git_socket.rs | 21 | ||||
| -rw-r--r-- | server/src/githook.rs | 108 | ||||
| -rw-r--r-- | server/src/main.rs | 109 |
8 files changed, 1201 insertions, 45 deletions
diff --git a/server/src/api_model.rs b/server/src/api_model.rs index 3e94d6c..2dc20f1 100644 --- a/server/src/api_model.rs +++ b/server/src/api_model.rs @@ -9,6 +9,40 @@ pub enum ReviewState { Closed, } +impl TryFrom<u8> for ReviewState { + type Error = &'static str; + + fn try_from(value: u8) -> Result<Self, Self::Error> { + match value { + 0 => Ok(ReviewState::Draft), + 1 => Ok(ReviewState::Open), + 2 => Ok(ReviewState::Dropped), + 3 => Ok(ReviewState::Closed), + _ => Err("Invalid review state"), + } + } +} + +#[derive(Copy, Clone, Deserialize, Serialize, ToSchema)] +pub enum Rewrite { + Disabled, + History, + Rebase, +} + +impl TryFrom<u8> for Rewrite { + type Error = &'static str; + + fn try_from(value: u8) -> Result<Self, Self::Error> { + match value { + 0 => Ok(Rewrite::Disabled), + 1 => Ok(Rewrite::History), + 2 => Ok(Rewrite::Rebase), + _ => Err("Invalid review state"), + } + } +} + #[derive(Copy, Clone, Debug, Deserialize, PartialEq, Serialize, ToSchema)] pub enum UserReviewRole { Reviewer, diff --git a/server/src/db_utils.rs b/server/src/db_utils.rs index b7e1fd5..ccc07ff 100644 --- a/server/src/db_utils.rs +++ b/server/src/db_utils.rs @@ -95,11 +95,11 @@ where pub fn ok(&self) -> bool { self.sanity_check(); - return !self + !self .names .as_ref() .expect("BUG: names taken already") - .is_empty(); + .is_empty() } pub fn build(&mut self) -> (String, <DB as HasArguments<'args>>::Arguments) { diff --git a/server/src/fs_utils.rs b/server/src/fs_utils.rs new file mode 100644 index 0000000..7905d01 --- /dev/null +++ b/server/src/fs_utils.rs @@ -0,0 +1,54 @@ +#![allow(dead_code)] + +use std::io; +use std::path::Path; +use tokio::fs; + +pub async fn create_dir_allow_existing(path: impl AsRef<Path>) -> io::Result<()> { + match fs::create_dir(path).await { + Ok(_) => Ok(()), + Err(e) => { + if e.kind() == io::ErrorKind::AlreadyExists { + Ok(()) + } else { + Err(e) + } + } + } +} + +pub async fn remove_file_allow_not_found(path: impl AsRef<Path>) -> io::Result<()> { + match fs::remove_file(path).await { + Ok(_) => Ok(()), + Err(e) => { + if e.kind() == io::ErrorKind::NotFound { + Ok(()) + } else { + Err(e) + } + } + } +} + +pub async fn symlink_update_existing( + src: impl AsRef<Path>, + dst: impl AsRef<Path>, +) -> io::Result<()> { + let src = src.as_ref(); + let dst = dst.as_ref(); + match fs::symlink(&src, &dst).await { + Ok(_) => Ok(()), + Err(e) => { + if e.kind() == io::ErrorKind::AlreadyExists { + let path = fs::read_link(&dst).await?; + if path == src { + return Ok(()); + } + fs::remove_file(&dst).await?; + fs::symlink(&src, &dst).await + } else { + Err(e) + } + } + } +} diff --git a/server/src/git.rs b/server/src/git.rs new file mode 100644 index 0000000..652eb29 --- /dev/null +++ b/server/src/git.rs @@ -0,0 +1,451 @@ +use futures::future::TryFutureExt; +use pathdiff::diff_paths; +use std::collections::HashMap; +use std::fmt; +use std::path::{Path, PathBuf}; +use std::process::Stdio; +use tokio::fs; +use tokio::process::Command; +use tokio::sync::{RwLock, Semaphore}; + +use crate::fs_utils; + +#[derive(Debug)] +pub struct Error { + pub message: String, +} + +impl Error { + fn new(message: impl Into<String>) -> Self { + Self { + message: message.into(), + } + } +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.message) + } +} + +impl std::error::Error for Error {} + +struct RepoData { + // Only one fetch at a time, and they should be in queue + fetch_semaphore: Semaphore, + config_cache: HashMap<String, String>, +} + +pub struct Repository { + path: PathBuf, + bare: bool, + + remote: Option<String>, + project_id: Option<String>, + socket: Option<PathBuf>, + + // Lock for any repo task, 90% of all tasks are readers but there are some writers + // where nothing else may be done. + lock: RwLock<RepoData>, +} + +fn io_err(action: &str, e: std::io::Error) -> Error { + Error::new(format!("{action}: {e}")) +} + +impl RepoData { + fn new() -> Self { + Self { + fetch_semaphore: Semaphore::new(1), + config_cache: HashMap::new(), + } + } + + async fn fetch(&self, repo: &Repository, branch: String) -> Result<(), Error> { + if repo.remote.is_none() { + return Err(Error::new("No remote set")); + } + + let _permit = self.fetch_semaphore.acquire().await; + + let mut cmd = self.git_cmd(repo); + cmd.arg("fetch"); + // Use an atomic transaction to update local refs. + cmd.arg("--atomic"); + // Print the output to standard output in an easy-to-parse format for scripts. + cmd.arg("--porcelain"); + // This option disables this automatic tag following. + cmd.arg("--no-tags"); + cmd.arg("origin"); + // <+ force update><remote branch>:<local branch> + cmd.arg(format!("+{branch}:{branch}")); + + self.output(&mut cmd).await?; + + Ok(()) + } + + async fn init(&mut self, repo: &Repository) -> Result<(), Error> { + fs_utils::create_dir_allow_existing(repo.path()) + .map_err(|e| Error::new(format!("{e}"))) + .await?; + + let mut cmd = self.git_cmd(repo); + cmd.arg("init"); + if repo.is_bare() { + cmd.arg("--bare"); + } + + self.run(&mut cmd).await?; + + Ok(()) + } + + async fn sync_config(&mut self, repo: &Repository) -> Result<(), Error> { + self.config_fill_cache(repo).await?; + + if let Some(remote) = repo.remote() { + self.config_set(repo, "remote.origin.url", remote).await?; + } + if let Some(socket) = repo.socket() { + let relative = diff_paths(socket, repo.path()).unwrap(); + self.config_set(repo, "eyeballs.socket", relative.to_str().unwrap()) + .await?; + } + Ok(()) + } + + async fn sync_hooks(&mut self, repo: &Repository) -> Result<(), Error> { + let server_exe = + std::env::current_exe().map_err(|e| io_err("unable to get current exe", e))?; + + let hook = server_exe.parent().unwrap().join("eyeballs-githook"); + + let hooks = if repo.is_bare() { + repo.path().join("hooks") + } else { + repo.path().join(".git/hooks") + }; + + fs_utils::create_dir_allow_existing(&hooks) + .map_err(|e| io_err("unable to create hooks", e)) + .await?; + + let pre_receive = hooks.join("pre-receive"); + let update = hooks.join("update"); + let post_receive = hooks.join("post-receive"); + + fs_utils::remove_file_allow_not_found(update) + .map_err(|e| io_err("unable to remove update hook", e)) + .await?; + + // Must be hard links, symbolic links doesn't allow the hook + // the lookup how it's called using std::env::current_exe(). + fs_utils::remove_file_allow_not_found(&pre_receive) + .map_err(|e| io_err("unable to remove pre-receive hook", e)) + .await?; + fs::hard_link(hook.as_path(), pre_receive) + .map_err(|e| io_err("unable to link pre-receive hook", e)) + .await?; + fs_utils::remove_file_allow_not_found(&post_receive) + .map_err(|e| io_err("unable to remove post-receive hook", e)) + .await?; + fs::hard_link(hook.as_path(), post_receive) + .map_err(|e| io_err("unable to link post-receive hook", e)) + .await + } + + async fn config_get(&self, repo: &Repository, name: &str) -> Result<String, Error> { + if let Some(value) = self.config_cache.get(name) { + return Ok(value.clone()); + } + + // Note, want to keep this method non-mutable so we can't update the cache here, should be + // edge case to end up here anyway. + + let mut cmd = self.git_cmd(repo); + cmd.arg("config") + .arg("get") + // End value with the null character and use newline as delimiter between key and value + .arg("--null") + .arg("--default=") + .arg(name); + let data = self.output(&mut cmd).await?; + match data.as_str().split_once('\0') { + Some((value, _)) => Ok(value.to_string()), + None => Err(Error::new("Invalid output from git config get")), + } + } + + async fn config_fill_cache(&mut self, repo: &Repository) -> Result<(), Error> { + self.config_cache.clear(); + + let mut cmd = self.git_cmd(repo); + cmd.arg("config") + .arg("list") + // read only from the repository .git/config, + .arg("--local") + // End value with the null character and use newline as delimiter between key and value + .arg("--null"); + let data = self.output(&mut cmd).await?; + for key_value in data.split_terminator('\0') { + match key_value.split_once('\n') { + Some((key, value)) => self.config_cache.insert(key.to_string(), value.to_string()), + None => return Err(Error::new("Invalid output from git config list")), + }; + } + Ok(()) + } + + async fn config_set( + &mut self, + repo: &Repository, + name: &str, + value: &str, + ) -> Result<(), Error> { + if let Some(cached_value) = self.config_cache.get(name) { + if cached_value == value { + return Ok(()); + } + } + + let mut cmd = self.git_cmd(repo); + cmd.arg("config").arg("set").arg(name).arg(value); + self.run(&mut cmd).await?; + + self.config_cache + .insert(name.to_string(), value.to_string()); + + Ok(()) + } + + async fn is_ancestor( + &self, + repo: &Repository, + ancestor: &str, + commit: &str, + ) -> Result<bool, Error> { + let mut cmd = self.git_cmd(repo); + cmd.arg("merge-base") + .arg("--is-ancestor") + .arg(ancestor) + .arg(commit); + self.check(&mut cmd).await + } + + async fn is_equal_content( + &self, + repo: &Repository, + commit1: &str, + commit2: &str, + ) -> Result<bool, Error> { + let mut cmd = self.git_cmd(repo); + cmd.arg("diff") + .arg("--quiet") + .arg("--no-renames") + .arg(commit1) + .arg(commit2); + self.check(&mut cmd).await + } + + fn git_cmd(&self, repo: &Repository) -> Command { + let mut cmd = Command::new("git"); + // Run as if git was started in <path> instead of the current working directory. + cmd.arg("-C").arg(repo.path().to_str().unwrap()); + // Disable all advice hints from being printed. + cmd.arg("--no-advice"); + // Do not pipe Git output into a pager. + cmd.arg("--no-pager"); + // Do not perform optional operations that require locks. + cmd.arg("--no-optional-locks"); + + cmd + } + + async fn run(&self, cmd: &mut Command) -> Result<(), Error> { + cmd.stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::piped()); + + let child = cmd + .spawn() + .map_err(|e| Error::new(format!("git command failed to start: {e}")))?; + + let output = child + .wait_with_output() + .map_err(|e| Error::new(format!("git command failed to execute: {e}"))) + .await?; + + if output.status.success() { + Ok(()) + } else { + Err(Error::new(format!( + "git command failed with exitcode: {}\n{:?}\n{}", + output.status, + cmd.as_std().get_args(), + std::str::from_utf8(output.stderr.as_slice()).unwrap_or(""), + ))) + } + } + + async fn check(&self, cmd: &mut Command) -> Result<bool, Error> { + cmd.stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::piped()); + + let child = cmd + .spawn() + .map_err(|e| Error::new(format!("git command failed to start: {e}")))?; + + let output = child + .wait_with_output() + .map_err(|e| Error::new(format!("git command failed to execute: {e}"))) + .await?; + + if output.status.success() { + Ok(true) + } else { + match output.status.code() { + Some(1) => Ok(false), + _ => Err(Error::new(format!( + "git command failed with exitcode: {}\n{:?}\n{}", + output.status, + cmd.as_std().get_args(), + std::str::from_utf8(output.stderr.as_slice()).unwrap_or(""), + ))), + } + } + } + + async fn output(&self, cmd: &mut Command) -> Result<String, Error> { + cmd.stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + + let child = cmd + .spawn() + .map_err(|e| Error::new(format!("git command failed to start: {e}")))?; + + let output = child + .wait_with_output() + .map_err(|e| Error::new(format!("git command failed to execute: {e}"))) + .await?; + + if output.status.success() { + let output_utf8 = String::from_utf8(output.stdout) + .map_err(|e| Error::new(format!("git command had invalid output: {e}")))?; + Ok(output_utf8) + } else { + Err(Error::new(format!( + "git command failed with exitcode: {}", + output.status + ))) + } + } +} + +#[allow(dead_code)] +impl Repository { + pub fn new( + path: impl Into<PathBuf>, + bare: bool, + remote: Option<impl Into<String>>, + project_id: Option<impl Into<String>>, + ) -> Self { + let path = path.into(); + let project_id = project_id.map(|x| x.into()); + let socket: Option<PathBuf>; + if let Some(project_id) = &project_id { + socket = Some( + path.parent() + .unwrap() + .join(format!("{}.socket", project_id)), + ); + } else { + socket = None; + } + + Self { + remote: remote.map(|x| x.into()), + project_id, + path, + socket, + bare, + lock: RwLock::new(RepoData::new()), + } + } + + pub fn remote(&self) -> Option<&str> { + self.remote.as_deref() + } + + pub fn project_id(&self) -> Option<&str> { + self.project_id.as_deref() + } + + pub fn path(&self) -> &Path { + self.path.as_path() + } + + pub fn socket(&self) -> Option<&Path> { + self.socket.as_deref() + } + + pub fn is_bare(&self) -> bool { + self.bare + } + + pub async fn setup(&self) -> Result<(), Error> { + let mut data = self.lock.write().await; + + data.init(self).await?; + data.sync_config(self).await?; + if self.socket.is_some() { + data.sync_hooks(self).await?; + } + + Ok(()) + } + + pub async fn fetch(&self, branch: impl Into<String>) -> Result<(), Error> { + let branch = branch.into(); + let data = self.lock.read().await; + + data.fetch(self, branch).await + } + + pub async fn config_get(&self, name: impl Into<String>) -> Result<String, Error> { + let name = name.into(); + let data = self.lock.read().await; + + data.config_get(self, name.as_str()).await + } + + pub async fn is_ancestor( + &self, + ancestor: impl Into<String>, + commit: impl Into<String>, + ) -> Result<bool, Error> { + let ancestor = ancestor.into(); + let commit = commit.into(); + + let data = self.lock.read().await; + + data.is_ancestor(self, ancestor.as_str(), commit.as_str()) + .await + } + + pub async fn is_equal_content( + &self, + commit1: impl Into<String>, + commit2: impl Into<String>, + ) -> Result<bool, Error> { + let commit1 = commit1.into(); + let commit2 = commit2.into(); + let data = self.lock.read().await; + + data.is_equal_content(self, commit1.as_str(), commit2.as_str()) + .await + } +} diff --git a/server/src/git_root.rs b/server/src/git_root.rs new file mode 100644 index 0000000..b1c533e --- /dev/null +++ b/server/src/git_root.rs @@ -0,0 +1,465 @@ +use futures::{future::TryFutureExt, stream::TryStreamExt}; +use rmp_serde::{decode, Serializer}; +use rocket::fairing::{self, AdHoc}; +use rocket::serde::ser::Serialize; +use rocket::serde::Deserialize; +use rocket::{Build, Rocket}; +use rocket_db_pools::{sqlx, Database, Pool}; +use std::borrow::Cow; +use std::collections::HashMap; +use std::ops::Deref; +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; +use tokio::net::{UnixListener, UnixStream}; +use tokio::task; + +use crate::api_model; +use crate::fs_utils; +use crate::git; +use crate::git_socket; +use crate::Db; + +type DbPool = <Db as Database>::Pool; +type DbConnection = <DbPool as Pool>::Connection; + +#[derive(Debug, Deserialize)] +pub struct Config<'a> { + git_server_root: Cow<'a, str>, +} + +struct RootsData { + project_repo: HashMap<String, Arc<git::Repository>>, +} + +pub struct Roots { + data: Mutex<RootsData>, +} + +impl Roots { + pub async fn new_project( + &self, + config: &Config<'_>, + db: &Db, + project_id: &str, + remote: &str, + main_branch: &str, + ) -> Result<(), git::Error> { + let project_id = project_id.to_string(); + let repo = setup_project_root( + config, + db, + &project_id, + remote.to_string(), + main_branch.to_string(), + ) + .await?; + + { + let mut data = self.data.lock().unwrap(); + data.project_repo.insert(project_id, repo); + } + + Ok(()) + } +} + +#[derive(Debug)] +struct IoError { + message: String, +} + +impl IoError { + fn new(message: impl Into<String>) -> Self { + Self { + message: message.into(), + } + } +} + +impl std::fmt::Display for IoError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", self.message) + } +} + +impl std::error::Error for IoError {} + +const EMPTY: &str = "0000000000000000000000000000000000000000"; + +fn is_printable(name: &str) -> bool { + name.as_bytes().iter().all(|c| c.is_ascii_graphic()) +} + +fn valid_branch_name(name: &str) -> bool { + name.starts_with("refs/heads/") && is_printable(name) +} + +async fn git_process_prehook( + repo: &git::Repository, + mut db: DbConnection, + receive: &Vec<git_socket::GitReceive>, +) -> Result<git_socket::GitHookResponse, IoError> { + let mut errors: Vec<String> = Vec::new(); + + for row in receive { + if !valid_branch_name(row.reference.as_str()) { + if row.reference.starts_with("refs/heads/") { + errors.push(format!( + "{}: Bad branch name", + row.reference.strip_prefix("refs/heads/").unwrap() + )); + } else { + errors.push(format!("{}: Only branches are allowed", row.reference)); + } + continue; + } + + let branch = row.reference.strip_prefix("refs/heads/").unwrap(); + + if row.old_value == EMPTY { + // Creating new review, nothing to check (yet). + continue; + } + + let result = sqlx::query!( + "SELECT state, rewrite FROM reviews WHERE project=? AND branch=?", + repo.project_id().unwrap_or(""), + branch + ) + .fetch_one(&mut *db) + .map_ok(|r| { + ( + api_model::ReviewState::try_from(r.state).unwrap(), + api_model::Rewrite::try_from(r.rewrite).unwrap(), + ) + }) + .map_err(|_| IoError::new(format!("{branch}: Unknown branch"))) + .await; + + if row.new_value == EMPTY { + // Do not allow to delete branch if there is a review connected to the branch. + // All branches should be connected to a branch, but in case of errors this might + // be relevant. + if result.is_ok() { + errors.push(format!( + "{branch}: Not allowed to delete branch, delete review instead." + )); + } + continue; + } + + let (state, rewrite) = result?; + + match state { + api_model::ReviewState::Dropped => { + errors.push(format!("{branch}: Review is dropped, no pushes allowed")); + continue; + } + api_model::ReviewState::Closed => { + errors.push(format!("{branch}: Review is closed, no pushes allowed")); + continue; + } + api_model::ReviewState::Draft => {} + api_model::ReviewState::Open => {} + } + + // Check for fast forward, if so we can skip the rest of the checks. + let is_fast_forward = repo + .is_ancestor(&row.old_value, &row.new_value) + .map_err(|e| IoError::new(format!("{branch}: {}", e))) + .await?; + if is_fast_forward { + continue; + } + + match rewrite { + api_model::Rewrite::History => { + let equal_content = repo + .is_equal_content(&row.old_value, &row.new_value) + .map_err(|e| IoError::new(format!("{branch}: {}", e))) + .await?; + if equal_content { + continue; + } + errors.push(format!("{}: History rewrite not allowed as final result does not match. Please check locally with `git diff {} {}`", branch, row.old_value, row.new_value)); + } + api_model::Rewrite::Rebase => {} + api_model::Rewrite::Disabled => { + errors.push(format!( + "Non fast-forward not allowed for review: {}", + row.reference + )); + } + } + } + + Ok(if errors.is_empty() { + git_socket::GitHookResponse { + ok: true, + message: "".to_string(), + } + } else { + git_socket::GitHookResponse { + ok: false, + message: errors.join("\n"), + } + }) +} + +async fn git_process_posthook( + repo: &git::Repository, + mut db: DbConnection, + user_id: &String, + receive: &Vec<git_socket::GitReceive>, +) -> git_socket::GitHookResponse { + let mut messages: Vec<String> = Vec::new(); + let mut updated: Vec<u64> = Vec::new(); + + for row in receive { + let branch = row.reference.strip_prefix("refs/heads/").unwrap(); + + if row.old_value == EMPTY { + // Create review + match sqlx::query!( + "INSERT INTO reviews (project, owner, title, branch) VALUES (?, ?, ?, ?)", + repo.project_id(), + user_id, + "Unnamed", + branch + ) + .execute(&mut *db) + .map_err(|e| IoError::new(format!("Database error: {e:?}"))) + .await + { + Ok(result) => { + updated.push(result.last_insert_id()); + messages.push(format!( + "{branch}: Review draft created, finalize at {}", + "TODO" + )); + } + Err(e) => { + messages.push(format!("{branch}: Error {e}",)); + } + }; + } else if row.new_value == EMPTY { + // Delete branch, prehook already checked that it is not connected to a branch. + } else { + match sqlx::query!( + "SELECT id, rewrite FROM reviews WHERE project=? AND branch=?", + repo.project_id().unwrap_or(""), + branch + ) + .fetch_one(&mut *db) + .map_ok(|r| (r.id, api_model::Rewrite::try_from(r.rewrite).unwrap())) + .map_err(|_| IoError::new(format!("{branch}: Unknown branch"))) + .await + { + Ok((id, rewrite)) => match rewrite { + api_model::Rewrite::History | api_model::Rewrite::Rebase => { + match sqlx::query!("UPDATE reviews SET rewrite=0 WHERE id=?", id) + .execute(&mut *db) + .map_err(|e| IoError::new(format!("Database error: {e:?}"))) + .await + { + Ok(_) => { + messages.push(format!( + "{branch}: Review draft created, finalize at {}", + "TODO" + )); + updated.push(id); + } + Err(e) => { + messages.push(format!("{branch}: Error {e}",)); + } + } + } + api_model::Rewrite::Disabled => { + updated.push(id); + } + }, + Err(e) => { + messages.push(format!("{branch}: Error {e}",)); + } + } + } + } + + git_socket::GitHookResponse { + ok: true, + message: messages.join("\n"), + } +} + +async fn git_socket_process( + repo: &git::Repository, + db: DbConnection, + stream: UnixStream, +) -> Result<(), IoError> { + let std_stream = stream.into_std().map_err(|e| IoError::new(e.to_string()))?; + std_stream + .set_nonblocking(false) + .map_err(|e| IoError::new(e.to_string()))?; + let (request, std_stream) = task::spawn_blocking(move || { + let request: Result<git_socket::GitHookRequest, IoError> = + decode::from_read(&std_stream).map_err(|e| IoError::new(e.to_string())); + let _ = std_stream.shutdown(std::net::Shutdown::Read); + request.map(|r| (r, std_stream)) + }) + .map_err(|e| IoError::new(e.to_string())) + .await??; + + let response = if request.pre { + git_process_prehook(repo, db, &request.receive).await? + } else { + git_process_posthook(repo, db, &request.user, &request.receive).await + }; + + task::spawn_blocking(move || { + let mut serializer = Serializer::new(&std_stream); + response + .serialize(&mut serializer) + .map_err(|e| IoError::new(e.to_string())) + }) + .map_err(|e| IoError::new(e.to_string())) + .await??; + + Ok(()) +} + +async fn git_socket_listen( + repo: Arc<git::Repository>, + db: <Db as Database>::Pool, +) -> Result<(), std::io::Error> { + let socket = repo.socket().unwrap(); + fs_utils::remove_file_allow_not_found(socket).await?; + let listener = UnixListener::bind(socket)?; + + loop { + match listener.accept().await { + Ok((stream, _addr)) => { + match db.get().await { + Ok(conn) => { + let repo2 = repo.clone(); + tokio::spawn(async move { + git_socket_process(repo2.as_ref(), conn, stream).await + }); + } + Err(_) => { /* unable to access db */ } + } + } + Err(_) => { /* connection failed */ } + } + } +} + +async fn setup_project_root( + config: &Config<'_>, + db: &Db, + project_id: &String, + remote: String, + main_branch: String, +) -> Result<Arc<git::Repository>, git::Error> { + let mut path = PathBuf::from(config.git_server_root.to_string()); + path.push(project_id); + let repo = Arc::new(git::Repository::new( + path, + true, + Some(remote), + Some(project_id), + )); + repo.setup().await?; + + if !repo.remote().unwrap().is_empty() && !main_branch.is_empty() { + let bg_repo = repo.clone(); + tokio::spawn(async move { bg_repo.fetch(main_branch).await }); + } + + let socket_repo = repo.clone(); + let socket_db = db.deref().clone(); + tokio::spawn(async move { + match git_socket_listen(socket_repo, socket_db).await { + Ok(()) => {} + Err(e) => { + // TODO: Log + print!("git_socket_listen returned {:?}", e) + } + } + }); + + Ok(repo) +} + +#[derive(Debug)] +#[allow(dead_code)] +enum GitOrSqlOrIoError { + Git(git::Error), + Sql(sqlx::Error), + Io(std::io::Error), +} + +async fn setup_projects_roots( + roots: &Roots, + config: &Config<'_>, + db: &Db, +) -> Result<(), GitOrSqlOrIoError> { + fs_utils::create_dir_allow_existing(PathBuf::from(config.git_server_root.to_string())) + .map_err(GitOrSqlOrIoError::Io) + .await?; + + let projects = sqlx::query!("SELECT id,remote,main_branch FROM projects") + .fetch(&**db) + .map_ok(|r| (r.id, r.remote, r.main_branch)) + .map_err(GitOrSqlOrIoError::Sql) + .try_collect::<Vec<_>>() + .await + .unwrap(); + + let mut project_repo: HashMap<String, Arc<git::Repository>> = HashMap::new(); + + for (id, remote, main_branch) in projects { + let repo = setup_project_root(config, db, &id, remote, main_branch) + .map_err(GitOrSqlOrIoError::Git) + .await?; + project_repo.insert(id, repo); + } + + { + let mut data = roots.data.lock().unwrap(); + data.project_repo = project_repo; + } + + Ok(()) +} + +async fn setup_projects(rocket: Rocket<Build>) -> fairing::Result { + match rocket.state::<Config>() { + Some(config) => match rocket.state::<Roots>() { + Some(roots) => match Db::fetch(&rocket) { + Some(db) => match setup_projects_roots(roots, config, db).await { + Ok(_) => Ok(rocket), + Err(e) => { + println!("{:?}", e); + Err(rocket) + } + }, + None => Err(rocket), + }, + None => Err(rocket), + }, + None => Err(rocket), + } +} + +pub fn stage() -> AdHoc { + AdHoc::on_ignite("Git Root Stage", |rocket| async { + rocket + .manage(Roots { + data: Mutex::new(RootsData { + project_repo: HashMap::new(), + }), + }) + .attach(AdHoc::config::<Config>()) + .attach(AdHoc::try_on_ignite("Projects setup", setup_projects)) + }) +} diff --git a/server/src/git_socket.rs b/server/src/git_socket.rs new file mode 100644 index 0000000..90f9dc2 --- /dev/null +++ b/server/src/git_socket.rs @@ -0,0 +1,21 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Deserialize, Serialize)] +pub struct GitReceive { + pub old_value: String, + pub new_value: String, + pub reference: String, +} + +#[derive(Deserialize, Serialize)] +pub struct GitHookRequest { + pub pre: bool, + pub user: String, + pub receive: Vec<GitReceive>, +} + +#[derive(Deserialize, Serialize)] +pub struct GitHookResponse { + pub ok: bool, + pub message: String, +} diff --git a/server/src/githook.rs b/server/src/githook.rs new file mode 100644 index 0000000..057ee47 --- /dev/null +++ b/server/src/githook.rs @@ -0,0 +1,108 @@ +use rmp_serde::{decode, Serializer}; +use serde::ser::Serialize; +use std::error::Error; +use std::fmt; +use std::os::unix::net::UnixStream; +use std::path::PathBuf; +use tokio::io::{self, AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::task; +use users::get_current_username; + +mod fs_utils; +mod git; +mod git_socket; + +#[derive(Debug)] +struct IoError { + message: String, +} + +impl IoError { + fn new(message: impl Into<String>) -> Self { + Self { + message: message.into(), + } + } +} + +impl fmt::Display for IoError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.message) + } +} + +impl Error for IoError {} + +async fn get_socket() -> Result<String, git::Error> { + let repo = git::Repository::new(PathBuf::from("."), true, None::<String>, None::<String>); + repo.config_get("eyeballs.socket").await +} + +#[tokio::main] +async fn main() -> Result<(), Box<dyn Error>> { + let pre = match std::env::current_exe()? + .file_name() + .and_then(|x| x.to_str()) + { + Some("pre-receive") => true, + Some("post-receive") => false, + _ => return Err(Box::<dyn Error>::from("Invalid hook executable name")), + }; + + let user = match get_current_username() { + Some(username) => match username.into_string() { + Ok(valid_username) => valid_username, + Err(_) => return Err(Box::<dyn Error>::from("Invalid username for current user")), + }, + None => { + return Err(Box::<dyn Error>::from( + "Unable to get username of current user", + )) + } + }; + + let input = io::stdin(); + let reader = BufReader::new(input); + let mut lines = reader.lines(); + + let mut request = git_socket::GitHookRequest { + pre, + user, + receive: Vec::new(), + }; + while let Some(line) = lines.next_line().await? { + let data: Vec<&str> = line.split(' ').collect(); + + if data.len() == 3 { + request.receive.push(git_socket::GitReceive { + old_value: data[0].to_string(), + new_value: data[1].to_string(), + reference: data[2].to_string(), + }) + } + } + + let socket = PathBuf::from(get_socket().await?); + + let response = task::spawn_blocking(move || { + let stream = UnixStream::connect(socket).map_err(|e| IoError::new(e.to_string()))?; + let mut serializer = Serializer::new(&stream); + request + .serialize(&mut serializer) + .map_err(|e| IoError::new(e.to_string()))?; + let result: Result<git_socket::GitHookResponse, IoError> = + decode::from_read(stream).map_err(|e| IoError::new(e.to_string())); + result + }) + .await? + .map_err(Box::<dyn Error>::from)?; + + let mut output = io::stdout(); + output.write_all(response.message.as_bytes()).await?; + + if response.ok { + Ok(()) + } else { + Err(Box::<dyn Error>::from("Hook failed")) + } +} diff --git a/server/src/main.rs b/server/src/main.rs index d70c4e7..d0547c1 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -5,10 +5,10 @@ use futures::{future::TryFutureExt, stream::TryStreamExt}; use rocket::fairing::{self, AdHoc}; use rocket::figment::Figment; use rocket::http::Status; -use rocket::response::status::{Conflict, Custom, NotFound}; +use rocket::response::status::{Custom, NotFound}; use rocket::serde::json::Json; -use rocket::{futures, Build, Rocket}; -use rocket_db_pools::{sqlx, Connection, Database}; +use rocket::{futures, Build, Rocket, State}; +use rocket_db_pools::{sqlx, Connection, Database, Pool}; use sqlx::Acquire; use std::path::PathBuf; use utoipa::OpenApi; @@ -20,6 +20,10 @@ mod tests; mod api_model; mod auth; mod db_utils; +mod fs_utils; +mod git; +mod git_root; +mod git_socket; use auth::AuthApiAddon; @@ -68,20 +72,6 @@ impl From<api_model::UserReviewRole> for u8 { } } -impl TryFrom<u8> for api_model::ReviewState { - type Error = &'static str; - - fn try_from(value: u8) -> Result<Self, Self::Error> { - match value { - 0 => Ok(api_model::ReviewState::Draft), - 1 => Ok(api_model::ReviewState::Open), - 2 => Ok(api_model::ReviewState::Dropped), - 3 => Ok(api_model::ReviewState::Closed), - _ => Err("Invalid review state"), - } - } -} - #[utoipa::path( responses( (status = 200, description = "Get all projects", body = api_model::Projects), @@ -132,13 +122,13 @@ async fn projects( } async fn get_project( - db: &mut Connection<Db>, + db: &mut <<Db as Database>::Pool as Pool>::Connection, projectid: &str, ) -> Result<Json<api_model::Project>, NotFound<&'static str>> { let users = sqlx::query!( "SELECT id, name, dn, default_role, maintainer FROM users JOIN project_users ON project_users.user=users.id WHERE project_users.project=?", projectid) - .fetch(&mut ***db) + .fetch(&mut **db) .map_ok(|r| api_model::ProjectUserEntry { user: api_model::User { id: r.id, @@ -156,7 +146,7 @@ async fn get_project( "SELECT id,title,description,remote,main_branch FROM projects WHERE id=?", projectid ) - .fetch_one(&mut ***db) + .fetch_one(&mut **db) .map_ok(|r| api_model::Project { id: r.id, title: r.title, @@ -182,11 +172,12 @@ async fn get_project( )] #[get("/project/<projectid>")] async fn project( - mut db: Connection<Db>, + db: &Db, _session: auth::Session, projectid: &str, ) -> Result<Json<api_model::Project>, NotFound<&'static str>> { - get_project(&mut db, projectid).await + let mut conn = db.get().await.unwrap(); + get_project(&mut conn, projectid).await } #[utoipa::path( @@ -200,24 +191,30 @@ async fn project( )] #[post("/project/<projectid>/new", data = "<data>")] async fn project_new( - mut db: Connection<Db>, + db: &Db, + git_roots_config: &State<git_root::Config<'_>>, + roots_state: &State<git_root::Roots>, session: auth::Session, projectid: &str, data: Json<api_model::ProjectData<'_>>, -) -> Result<Json<api_model::Project>, Conflict<&'static str>> { +) -> Result<Json<api_model::Project>, Custom<String>> { + let remote = data.remote.unwrap_or(""); + let main_branch = data.main_branch.unwrap_or("main"); + + let mut conn = db.get().await.unwrap(); { - let mut tx = db.begin().await.unwrap(); + let mut tx = conn.begin().await.unwrap(); sqlx::query!( "INSERT INTO projects (id, title, description, remote, main_branch) VALUES (?, ?, ?, ?, ?)", projectid, data.title.unwrap_or("Unnamed"), data.description.unwrap_or(""), - data.remote.unwrap_or(""), - data.main_branch.unwrap_or("main"), + remote, + main_branch, ) .execute(&mut *tx) - .map_err(|_| Conflict("Project with id already exists")) + .map_err(|_| Custom(Status::Conflict, "Project with id already exists".to_string())) .await?; sqlx::query!( @@ -230,7 +227,12 @@ async fn project_new( tx.commit().await.unwrap(); } - Ok(get_project(&mut db, projectid).await.unwrap()) + roots_state + .new_project(git_roots_config, db, projectid, remote, main_branch) + .map_err(|e| Custom(Status::InternalServerError, e.message)) + .await?; + + Ok(get_project(&mut conn, projectid).await.unwrap()) } async fn project_check_maintainer( @@ -504,25 +506,14 @@ async fn reviews( })) } -#[utoipa::path( - responses( - (status = 200, description = "Get review", body = api_model::Review), - (status = 404, description = "No such review"), - ), - security( - ("session" = []), - ), -)] -#[get("/review/<projectid>/<branch..>")] -async fn review( +async fn get_review_from_branch( mut db: Connection<Db>, - _session: auth::Session, projectid: &str, - branch: PathBuf, + branch: &str, ) -> Result<Json<api_model::Review>, NotFound<&'static str>> { let mut review = sqlx::query!( "SELECT reviews.id AS id,title,description,state,progress,branch,archived,users.id AS user_id,users.name AS name,users.dn AS user_dn FROM reviews JOIN users ON users.id=owner WHERE project=? AND branch=?", - projectid, branch.as_path().to_str().unwrap()) + projectid, branch) .fetch_one(&mut **db) .map_ok(|r| { api_model::Review { @@ -594,6 +585,36 @@ async fn review( #[utoipa::path( responses( + (status = 200, description = "Get review", body = api_model::Review), + (status = 404, description = "No such review"), + ), + security( + ("session" = []), + ), +)] +#[get("/review/<projectid>/<branch..>")] +async fn review( + db: Connection<Db>, + _session: auth::Session, + projectid: &str, + branch: PathBuf, +) -> Result<Json<api_model::Review>, NotFound<&'static str>> { + get_review_from_branch(db, projectid, branch.as_path().to_str().unwrap()).await +} + +// Backup for the above. Matches if <branch> ends up encoded (with / as %2f) +#[get("/review/<projectid>/<branch>", rank = 1)] +async fn review_encoded_path( + db: Connection<Db>, + _session: auth::Session, + projectid: &str, + branch: &str, +) -> Result<Json<api_model::Review>, NotFound<&'static str>> { + get_review_from_branch(db, projectid, branch).await +} + +#[utoipa::path( + responses( (status = 200, description = "Get all users", body = api_model::Users), ), security( @@ -672,10 +693,12 @@ fn rocket_from_config(figment: Figment) -> Rocket<Build> { project_user_del, reviews, review, + review_encoded_path, users, ], ) .attach(auth::stage(basepath)) + .attach(git_root::stage()) } #[rocket::main] |
