summaryrefslogtreecommitdiff
path: root/server/src/git_root.rs
diff options
context:
space:
mode:
Diffstat (limited to 'server/src/git_root.rs')
-rw-r--r--server/src/git_root.rs465
1 files changed, 465 insertions, 0 deletions
diff --git a/server/src/git_root.rs b/server/src/git_root.rs
new file mode 100644
index 0000000..b1c533e
--- /dev/null
+++ b/server/src/git_root.rs
@@ -0,0 +1,465 @@
+use futures::{future::TryFutureExt, stream::TryStreamExt};
+use rmp_serde::{decode, Serializer};
+use rocket::fairing::{self, AdHoc};
+use rocket::serde::ser::Serialize;
+use rocket::serde::Deserialize;
+use rocket::{Build, Rocket};
+use rocket_db_pools::{sqlx, Database, Pool};
+use std::borrow::Cow;
+use std::collections::HashMap;
+use std::ops::Deref;
+use std::path::PathBuf;
+use std::sync::{Arc, Mutex};
+use tokio::net::{UnixListener, UnixStream};
+use tokio::task;
+
+use crate::api_model;
+use crate::fs_utils;
+use crate::git;
+use crate::git_socket;
+use crate::Db;
+
+type DbPool = <Db as Database>::Pool;
+type DbConnection = <DbPool as Pool>::Connection;
+
+#[derive(Debug, Deserialize)]
+pub struct Config<'a> {
+ git_server_root: Cow<'a, str>,
+}
+
+struct RootsData {
+ project_repo: HashMap<String, Arc<git::Repository>>,
+}
+
+pub struct Roots {
+ data: Mutex<RootsData>,
+}
+
+impl Roots {
+ pub async fn new_project(
+ &self,
+ config: &Config<'_>,
+ db: &Db,
+ project_id: &str,
+ remote: &str,
+ main_branch: &str,
+ ) -> Result<(), git::Error> {
+ let project_id = project_id.to_string();
+ let repo = setup_project_root(
+ config,
+ db,
+ &project_id,
+ remote.to_string(),
+ main_branch.to_string(),
+ )
+ .await?;
+
+ {
+ let mut data = self.data.lock().unwrap();
+ data.project_repo.insert(project_id, repo);
+ }
+
+ Ok(())
+ }
+}
+
+#[derive(Debug)]
+struct IoError {
+ message: String,
+}
+
+impl IoError {
+ fn new(message: impl Into<String>) -> Self {
+ Self {
+ message: message.into(),
+ }
+ }
+}
+
+impl std::fmt::Display for IoError {
+ fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+ write!(f, "{}", self.message)
+ }
+}
+
+impl std::error::Error for IoError {}
+
+const EMPTY: &str = "0000000000000000000000000000000000000000";
+
+fn is_printable(name: &str) -> bool {
+ name.as_bytes().iter().all(|c| c.is_ascii_graphic())
+}
+
+fn valid_branch_name(name: &str) -> bool {
+ name.starts_with("refs/heads/") && is_printable(name)
+}
+
+async fn git_process_prehook(
+ repo: &git::Repository,
+ mut db: DbConnection,
+ receive: &Vec<git_socket::GitReceive>,
+) -> Result<git_socket::GitHookResponse, IoError> {
+ let mut errors: Vec<String> = Vec::new();
+
+ for row in receive {
+ if !valid_branch_name(row.reference.as_str()) {
+ if row.reference.starts_with("refs/heads/") {
+ errors.push(format!(
+ "{}: Bad branch name",
+ row.reference.strip_prefix("refs/heads/").unwrap()
+ ));
+ } else {
+ errors.push(format!("{}: Only branches are allowed", row.reference));
+ }
+ continue;
+ }
+
+ let branch = row.reference.strip_prefix("refs/heads/").unwrap();
+
+ if row.old_value == EMPTY {
+ // Creating new review, nothing to check (yet).
+ continue;
+ }
+
+ let result = sqlx::query!(
+ "SELECT state, rewrite FROM reviews WHERE project=? AND branch=?",
+ repo.project_id().unwrap_or(""),
+ branch
+ )
+ .fetch_one(&mut *db)
+ .map_ok(|r| {
+ (
+ api_model::ReviewState::try_from(r.state).unwrap(),
+ api_model::Rewrite::try_from(r.rewrite).unwrap(),
+ )
+ })
+ .map_err(|_| IoError::new(format!("{branch}: Unknown branch")))
+ .await;
+
+ if row.new_value == EMPTY {
+ // Do not allow to delete branch if there is a review connected to the branch.
+ // All branches should be connected to a branch, but in case of errors this might
+ // be relevant.
+ if result.is_ok() {
+ errors.push(format!(
+ "{branch}: Not allowed to delete branch, delete review instead."
+ ));
+ }
+ continue;
+ }
+
+ let (state, rewrite) = result?;
+
+ match state {
+ api_model::ReviewState::Dropped => {
+ errors.push(format!("{branch}: Review is dropped, no pushes allowed"));
+ continue;
+ }
+ api_model::ReviewState::Closed => {
+ errors.push(format!("{branch}: Review is closed, no pushes allowed"));
+ continue;
+ }
+ api_model::ReviewState::Draft => {}
+ api_model::ReviewState::Open => {}
+ }
+
+ // Check for fast forward, if so we can skip the rest of the checks.
+ let is_fast_forward = repo
+ .is_ancestor(&row.old_value, &row.new_value)
+ .map_err(|e| IoError::new(format!("{branch}: {}", e)))
+ .await?;
+ if is_fast_forward {
+ continue;
+ }
+
+ match rewrite {
+ api_model::Rewrite::History => {
+ let equal_content = repo
+ .is_equal_content(&row.old_value, &row.new_value)
+ .map_err(|e| IoError::new(format!("{branch}: {}", e)))
+ .await?;
+ if equal_content {
+ continue;
+ }
+ errors.push(format!("{}: History rewrite not allowed as final result does not match. Please check locally with `git diff {} {}`", branch, row.old_value, row.new_value));
+ }
+ api_model::Rewrite::Rebase => {}
+ api_model::Rewrite::Disabled => {
+ errors.push(format!(
+ "Non fast-forward not allowed for review: {}",
+ row.reference
+ ));
+ }
+ }
+ }
+
+ Ok(if errors.is_empty() {
+ git_socket::GitHookResponse {
+ ok: true,
+ message: "".to_string(),
+ }
+ } else {
+ git_socket::GitHookResponse {
+ ok: false,
+ message: errors.join("\n"),
+ }
+ })
+}
+
+async fn git_process_posthook(
+ repo: &git::Repository,
+ mut db: DbConnection,
+ user_id: &String,
+ receive: &Vec<git_socket::GitReceive>,
+) -> git_socket::GitHookResponse {
+ let mut messages: Vec<String> = Vec::new();
+ let mut updated: Vec<u64> = Vec::new();
+
+ for row in receive {
+ let branch = row.reference.strip_prefix("refs/heads/").unwrap();
+
+ if row.old_value == EMPTY {
+ // Create review
+ match sqlx::query!(
+ "INSERT INTO reviews (project, owner, title, branch) VALUES (?, ?, ?, ?)",
+ repo.project_id(),
+ user_id,
+ "Unnamed",
+ branch
+ )
+ .execute(&mut *db)
+ .map_err(|e| IoError::new(format!("Database error: {e:?}")))
+ .await
+ {
+ Ok(result) => {
+ updated.push(result.last_insert_id());
+ messages.push(format!(
+ "{branch}: Review draft created, finalize at {}",
+ "TODO"
+ ));
+ }
+ Err(e) => {
+ messages.push(format!("{branch}: Error {e}",));
+ }
+ };
+ } else if row.new_value == EMPTY {
+ // Delete branch, prehook already checked that it is not connected to a branch.
+ } else {
+ match sqlx::query!(
+ "SELECT id, rewrite FROM reviews WHERE project=? AND branch=?",
+ repo.project_id().unwrap_or(""),
+ branch
+ )
+ .fetch_one(&mut *db)
+ .map_ok(|r| (r.id, api_model::Rewrite::try_from(r.rewrite).unwrap()))
+ .map_err(|_| IoError::new(format!("{branch}: Unknown branch")))
+ .await
+ {
+ Ok((id, rewrite)) => match rewrite {
+ api_model::Rewrite::History | api_model::Rewrite::Rebase => {
+ match sqlx::query!("UPDATE reviews SET rewrite=0 WHERE id=?", id)
+ .execute(&mut *db)
+ .map_err(|e| IoError::new(format!("Database error: {e:?}")))
+ .await
+ {
+ Ok(_) => {
+ messages.push(format!(
+ "{branch}: Review draft created, finalize at {}",
+ "TODO"
+ ));
+ updated.push(id);
+ }
+ Err(e) => {
+ messages.push(format!("{branch}: Error {e}",));
+ }
+ }
+ }
+ api_model::Rewrite::Disabled => {
+ updated.push(id);
+ }
+ },
+ Err(e) => {
+ messages.push(format!("{branch}: Error {e}",));
+ }
+ }
+ }
+ }
+
+ git_socket::GitHookResponse {
+ ok: true,
+ message: messages.join("\n"),
+ }
+}
+
+async fn git_socket_process(
+ repo: &git::Repository,
+ db: DbConnection,
+ stream: UnixStream,
+) -> Result<(), IoError> {
+ let std_stream = stream.into_std().map_err(|e| IoError::new(e.to_string()))?;
+ std_stream
+ .set_nonblocking(false)
+ .map_err(|e| IoError::new(e.to_string()))?;
+ let (request, std_stream) = task::spawn_blocking(move || {
+ let request: Result<git_socket::GitHookRequest, IoError> =
+ decode::from_read(&std_stream).map_err(|e| IoError::new(e.to_string()));
+ let _ = std_stream.shutdown(std::net::Shutdown::Read);
+ request.map(|r| (r, std_stream))
+ })
+ .map_err(|e| IoError::new(e.to_string()))
+ .await??;
+
+ let response = if request.pre {
+ git_process_prehook(repo, db, &request.receive).await?
+ } else {
+ git_process_posthook(repo, db, &request.user, &request.receive).await
+ };
+
+ task::spawn_blocking(move || {
+ let mut serializer = Serializer::new(&std_stream);
+ response
+ .serialize(&mut serializer)
+ .map_err(|e| IoError::new(e.to_string()))
+ })
+ .map_err(|e| IoError::new(e.to_string()))
+ .await??;
+
+ Ok(())
+}
+
+async fn git_socket_listen(
+ repo: Arc<git::Repository>,
+ db: <Db as Database>::Pool,
+) -> Result<(), std::io::Error> {
+ let socket = repo.socket().unwrap();
+ fs_utils::remove_file_allow_not_found(socket).await?;
+ let listener = UnixListener::bind(socket)?;
+
+ loop {
+ match listener.accept().await {
+ Ok((stream, _addr)) => {
+ match db.get().await {
+ Ok(conn) => {
+ let repo2 = repo.clone();
+ tokio::spawn(async move {
+ git_socket_process(repo2.as_ref(), conn, stream).await
+ });
+ }
+ Err(_) => { /* unable to access db */ }
+ }
+ }
+ Err(_) => { /* connection failed */ }
+ }
+ }
+}
+
+async fn setup_project_root(
+ config: &Config<'_>,
+ db: &Db,
+ project_id: &String,
+ remote: String,
+ main_branch: String,
+) -> Result<Arc<git::Repository>, git::Error> {
+ let mut path = PathBuf::from(config.git_server_root.to_string());
+ path.push(project_id);
+ let repo = Arc::new(git::Repository::new(
+ path,
+ true,
+ Some(remote),
+ Some(project_id),
+ ));
+ repo.setup().await?;
+
+ if !repo.remote().unwrap().is_empty() && !main_branch.is_empty() {
+ let bg_repo = repo.clone();
+ tokio::spawn(async move { bg_repo.fetch(main_branch).await });
+ }
+
+ let socket_repo = repo.clone();
+ let socket_db = db.deref().clone();
+ tokio::spawn(async move {
+ match git_socket_listen(socket_repo, socket_db).await {
+ Ok(()) => {}
+ Err(e) => {
+ // TODO: Log
+ print!("git_socket_listen returned {:?}", e)
+ }
+ }
+ });
+
+ Ok(repo)
+}
+
+#[derive(Debug)]
+#[allow(dead_code)]
+enum GitOrSqlOrIoError {
+ Git(git::Error),
+ Sql(sqlx::Error),
+ Io(std::io::Error),
+}
+
+async fn setup_projects_roots(
+ roots: &Roots,
+ config: &Config<'_>,
+ db: &Db,
+) -> Result<(), GitOrSqlOrIoError> {
+ fs_utils::create_dir_allow_existing(PathBuf::from(config.git_server_root.to_string()))
+ .map_err(GitOrSqlOrIoError::Io)
+ .await?;
+
+ let projects = sqlx::query!("SELECT id,remote,main_branch FROM projects")
+ .fetch(&**db)
+ .map_ok(|r| (r.id, r.remote, r.main_branch))
+ .map_err(GitOrSqlOrIoError::Sql)
+ .try_collect::<Vec<_>>()
+ .await
+ .unwrap();
+
+ let mut project_repo: HashMap<String, Arc<git::Repository>> = HashMap::new();
+
+ for (id, remote, main_branch) in projects {
+ let repo = setup_project_root(config, db, &id, remote, main_branch)
+ .map_err(GitOrSqlOrIoError::Git)
+ .await?;
+ project_repo.insert(id, repo);
+ }
+
+ {
+ let mut data = roots.data.lock().unwrap();
+ data.project_repo = project_repo;
+ }
+
+ Ok(())
+}
+
+async fn setup_projects(rocket: Rocket<Build>) -> fairing::Result {
+ match rocket.state::<Config>() {
+ Some(config) => match rocket.state::<Roots>() {
+ Some(roots) => match Db::fetch(&rocket) {
+ Some(db) => match setup_projects_roots(roots, config, db).await {
+ Ok(_) => Ok(rocket),
+ Err(e) => {
+ println!("{:?}", e);
+ Err(rocket)
+ }
+ },
+ None => Err(rocket),
+ },
+ None => Err(rocket),
+ },
+ None => Err(rocket),
+ }
+}
+
+pub fn stage() -> AdHoc {
+ AdHoc::on_ignite("Git Root Stage", |rocket| async {
+ rocket
+ .manage(Roots {
+ data: Mutex::new(RootsData {
+ project_repo: HashMap::new(),
+ }),
+ })
+ .attach(AdHoc::config::<Config>())
+ .attach(AdHoc::try_on_ignite("Projects setup", setup_projects))
+ })
+}