mod create_remove;
mod get_path;
mod loaded_storage_backends;
mod metadata;
mod node_descriptor;
mod read_dir;
#[cfg(test)]
pub(crate) mod tests;
mod utils;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use anyhow::Context;
use path_absolutize::*;
use tokio::runtime::Runtime;
use wildland_corex::dfs::interface::events::{DfsCause, FileSystemOperation};
use wildland_corex::dfs::interface::{
AbortFlag,
DfsFrontend,
DfsFrontendError,
DirEntry,
EventReceiver,
FsStat,
IStream,
NodeStat,
OStream,
ProgressReporter,
SpaceUsage,
WlPermissions,
};
use wildland_corex::dfs::unix_timestamp::UnixTimestamp;
use wildland_corex::{PathResolver, Storage};
use self::loaded_storage_backends::{BackendFactories, LoadedStorageBackends};
use self::node_descriptor::NodeDescriptor;
use self::utils::{execute_container_operation, get_related_nodes};
use crate::events::RingEventSystem;
use crate::storage_backends::models::{
DownloadResponse,
RenameResponse,
SetPermissionsResponse,
StorageBackendError,
UploadResponse,
};
#[derive(Clone, Debug)]
pub struct AbsPath(String);
impl From<AbsPath> for String {
fn from(value: AbsPath) -> Self {
value.to_string()
}
}
impl ToString for AbsPath {
fn to_string(&self) -> String {
self.0.clone()
}
}
impl AbsPath {
pub fn as_path(&self) -> &Path {
Path::new(&self.0)
}
}
pub struct DefaultDfs {
path_resolver: Box<dyn PathResolver>,
storage_backends: LoadedStorageBackends,
event_system: Box<RingEventSystem>,
runtime: Arc<Runtime>,
}
impl DefaultDfs {
#[tracing::instrument(level = "debug", skip_all)]
pub fn new(
path_resolver: Box<dyn PathResolver>,
storage_backend_factories: BackendFactories,
runtime: Arc<Runtime>,
) -> Self {
Self {
path_resolver,
storage_backends: LoadedStorageBackends::new(storage_backend_factories),
event_system: Box::<RingEventSystem>::default(),
runtime,
}
}
fn abs_path(path: &String) -> Result<AbsPath, DfsFrontendError> {
let p = PathBuf::from(&path);
let x = p.absolutize_virtually("/").map_err(|_| {
wildland_corex::PathResolutionError::Generic(format!(
"Could not absolutize given path [{path}]"
))
})?;
Ok(AbsPath(x.to_string_lossy().to_string()))
}
}
impl DfsFrontend for DefaultDfs {
fn read_dir(&self, requested_path: String) -> Result<Vec<DirEntry>, DfsFrontendError> {
let abs_path = Self::abs_path(&requested_path)?;
read_dir::read_dir(self, abs_path)
}
fn get_path(&self, identifier: String) -> Result<String, DfsFrontendError> {
get_path::get_path(self, identifier)
}
fn metadata(&self, input_exposed_path: String) -> Result<NodeStat, DfsFrontendError> {
let abs_path = Self::abs_path(&input_exposed_path)?;
metadata::metadata(self, abs_path)
}
fn remove_file(&self, input_exposed_path: String) -> Result<(), DfsFrontendError> {
let input_exposed_path = Self::abs_path(&input_exposed_path)?;
create_remove::remove_file(self, input_exposed_path)
}
fn create_dir(&self, requested_path: String) -> Result<(), DfsFrontendError> {
let requested_path = Self::abs_path(&requested_path)?;
create_remove::create_dir(self, requested_path)
}
fn remove_dir(
&self,
requested_path: String,
is_recursive: bool,
) -> Result<(), DfsFrontendError> {
let requested_path = Self::abs_path(&requested_path)?;
create_remove::remove_dir(self, requested_path, is_recursive)
}
fn rename(&self, old_path: String, new_path: String) -> Result<(), DfsFrontendError> {
let old_path = Self::abs_path(&old_path)?;
let new_path = Self::abs_path(&new_path)?;
let old_path = old_path.as_path();
let nodes = get_related_nodes(self, old_path)?;
let rename_node = |dfs: &DefaultDfs, node: &NodeDescriptor| match node {
node @ NodeDescriptor::Physical { storage, .. } => {
let abs_path = node.abs_path().to_string_lossy().to_string();
let claimed_container_root_path = abs_path
.strip_suffix(&storage.path_within_storage().to_string_lossy().to_string())
.unwrap_or("");
if let Some(new_path_within_storage) = new_path
.to_string()
.strip_prefix(claimed_container_root_path)
{
execute_container_operation(dfs, storage, |backend| async move {
backend
.rename(
storage.path_within_storage(),
Path::new(new_path_within_storage),
)
.await
})
.and_then(|response| match response {
RenameResponse::Renamed => Ok(()),
RenameResponse::NotFound => Err(DfsFrontendError::NoSuchPath),
RenameResponse::SourceIsParentOfTarget => {
Err(DfsFrontendError::SourceIsParentOfTarget)
}
RenameResponse::TargetPathAlreadyExists => {
Err(DfsFrontendError::PathAlreadyExists)
}
RenameResponse::InvalidTargetParentPath => {
Err(DfsFrontendError::InvalidParent)
}
})
} else {
Err(DfsFrontendError::MoveBetweenContainers)
}
}
NodeDescriptor::Virtual { .. } => Err(DfsFrontendError::ReadOnlyPath),
};
match nodes.as_slice() {
[] => Err(DfsFrontendError::NoSuchPath),
[node] => rename_node(self, node),
_ => Err(DfsFrontendError::PathConflict(
old_path.to_string_lossy().into(),
)),
}
}
fn set_permissions(
&self,
input_exposed_path: String,
permissions: WlPermissions,
) -> Result<(), DfsFrontendError> {
let input_exposed_path = Self::abs_path(&input_exposed_path)?;
let input_exposed_path = input_exposed_path.as_path();
let set_permissions_op = |dfs: &DefaultDfs, node: &NodeDescriptor| match node {
NodeDescriptor::Physical { storage, .. } => {
execute_container_operation(dfs, storage, |backend| async move {
backend
.set_permissions(storage.path_within_storage(), permissions)
.await
})
.and_then(|resp| match resp {
SetPermissionsResponse::Set => Ok(()),
SetPermissionsResponse::NotFound => Err(DfsFrontendError::NoSuchPath),
})
}
NodeDescriptor::Virtual { .. } => Err(DfsFrontendError::ReadOnlyPath),
};
let nodes = get_related_nodes(self, input_exposed_path)?;
match nodes.as_slice() {
[] => Err(DfsFrontendError::NoSuchPath),
[node] => set_permissions_op(self, node),
_ => Err(DfsFrontendError::PathConflict(
input_exposed_path.to_string_lossy().into(),
)),
}
}
fn set_owner(&self, _path: String) -> Result<(), DfsFrontendError> {
Err(DfsFrontendError::Generic(
"`set_owner` is not supported yet".into(),
))
}
fn stat_fs(&self, input_exposed_path: String) -> Result<FsStat, DfsFrontendError> {
let input_exposed_path = Self::abs_path(&input_exposed_path)?;
let input_exposed_path = input_exposed_path.as_path();
let stat_fs_op = |dfs: &DefaultDfs, node: &NodeDescriptor| match node {
NodeDescriptor::Physical { storage, .. } => {
execute_container_operation(dfs, storage, |backend| async move {
backend.stat_fs().await
})
}
NodeDescriptor::Virtual { .. } => Err(DfsFrontendError::ReadOnlyPath),
};
let nodes = get_related_nodes(self, input_exposed_path)?;
match nodes.as_slice() {
[] => Err(DfsFrontendError::NoSuchPath),
[node] => stat_fs_op(self, node),
_ => Err(DfsFrontendError::PathConflict(
input_exposed_path.to_string_lossy().into(),
)),
}
}
fn get_receiver(&self) -> Arc<Mutex<dyn EventReceiver>> {
Arc::new(Mutex::new(self.event_system.get_receiver()))
}
#[tracing::instrument(level = "debug", err(Debug), skip_all)]
fn mount(&self, storage: &Storage) -> Result<(), DfsFrontendError> {
let storage_type = storage.backend_type();
let task = async {
self.storage_backends
.get_backend(storage)
.map_err(|e| {
tracing::error!(
"Could not mount storage backend {}; Reason: {:?}",
storage.backend_type(),
e
);
DfsFrontendError::Generic(format!("{e}"))
})?
.mount()
.await
.map_err(|e| {
let err_msg = format!(
"Could not mount storage backend {}; Reason: {:?}",
storage_type, e
);
tracing::error!("{}", &err_msg);
DfsFrontendError::StorageNotResponsive(err_msg)
})
};
self.runtime.block_on(task)
}
fn get_space_usage(&self, storage: &Storage) -> Result<SpaceUsage, DfsFrontendError> {
let task = async {
self.storage_backends
.get_backend(storage)
.context("Error while retrieving backend for checking available space")?
.get_space_usage()
.await
.map_err(|e| {
DfsFrontendError::Generic(format!(
"Error while checking available space: {}",
e
))
})
};
self.runtime.block_on(task)
}
fn is_accessible(&self, storage: &Storage) -> Result<bool, DfsFrontendError> {
let task = async {
self.storage_backends
.get_backend(storage)
.context("Error while retrieving backend for checking storage accessibility")?
.metadata(Path::new("/"))
.await
.map(|_meta| true)
.map_err(|e| {
DfsFrontendError::Generic(format!(
"Error while checking storage accessibility: {}",
e
))
})
};
self.runtime.block_on(task)
}
fn download(
&self,
path: String,
output: Box<dyn OStream>,
progress_reporter: Box<dyn ProgressReporter>,
abort_flag: &AbortFlag,
) -> Result<(), DfsFrontendError> {
let path = Self::abs_path(&path)?;
let download_op = |dfs: &DefaultDfs, node: &NodeDescriptor| match node {
NodeDescriptor::Virtual { .. } => Err(DfsFrontendError::NotAFile),
NodeDescriptor::Physical { storage, .. } => {
let backend = dfs.storage_backends.get_backend(storage.storage()).unwrap();
let operation_future = async {
backend
.download(storage.path_within_storage(), output, progress_reporter)
.await
.map_err(|err| {
let msg = format!(
"Download error in {} backend: {}",
err.backend_type(),
err.context()
);
tracing::error!("{}", &msg);
DfsFrontendError::StorageNotResponsive(msg)
})
.and_then(|resp| match resp {
DownloadResponse::Success => Ok(()),
DownloadResponse::NotAFile => Result::Err(DfsFrontendError::NotAFile),
DownloadResponse::NoSuchPath => Err(DfsFrontendError::NoSuchPath),
})
};
dfs.runtime.block_on(async {
tokio::select! {
_ = abort_flag.wait() => Err(DfsFrontendError::Aborted),
result = operation_future => result
}
})
}
};
let nodes = get_related_nodes(self, path.as_path())?;
match nodes.as_slice() {
[] => Err(DfsFrontendError::NoSuchPath),
[node] => download_op(self, node),
_ => Err(DfsFrontendError::PathConflict(path.into())),
}
}
fn upload(
&self,
path: String,
input: Box<dyn IStream>,
progress_reporter: Box<dyn ProgressReporter>,
abort_flag: &AbortFlag,
creation_time: Option<UnixTimestamp>,
) -> Result<(), DfsFrontendError> {
let path = Self::abs_path(&path)?;
let upload_op = |dfs: &DefaultDfs, node: &NodeDescriptor| match node {
NodeDescriptor::Physical { storage, .. } => {
let backend = dfs.storage_backends.get_backend(storage.storage()).unwrap();
let operation_future = async {
backend
.upload(
storage.path_within_storage(),
input,
progress_reporter,
creation_time,
)
.await
.map_err(|err| match err {
StorageBackendError::InsufficientQuota { backend_type, requested_size } => {
tracing::warn!("Upload error in {} backend: insufficient quota to upload {} bytes", backend_type, requested_size);
DfsFrontendError::InsufficientQuota(requested_size)
},
_ => {
let msg = format!(
"Upload error in {} backend: {}",
err.backend_type(),
err.context()
);
tracing::error!("{}", &msg);
DfsFrontendError::StorageNotResponsive(msg)
}
} )
.and_then(|resp| match resp {
UploadResponse::Success => Ok(()),
UploadResponse::InvalidParent => Err(DfsFrontendError::InvalidParent),
UploadResponse::PathTakenByDir => {
Err(DfsFrontendError::PathAlreadyExists)
}
UploadResponse::NotPermitted => Err(DfsFrontendError::ReadOnlyPath),
})
};
dfs.runtime.block_on(async {
tokio::select! {
_ = abort_flag.wait() => Err(DfsFrontendError::Aborted),
result = operation_future => result
}
})
}
NodeDescriptor::Virtual { .. } => Err(DfsFrontendError::PathAlreadyExists),
};
let path = path.as_path();
let nodes = get_related_nodes(self, path)?;
match nodes.as_slice() {
[] => Err(DfsFrontendError::NoSuchPath),
[node] => upload_op(self, node),
_ => Err(DfsFrontendError::PathConflict(
path.to_string_lossy().into(),
)),
}
}
}