use std::fmt::Display;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::Notify;
#[derive(Error, Debug, Clone)]
#[error("Stream Error {code}: {msg}")]
pub struct StreamErr {
pub code: i32,
pub msg: String,
}
pub type OStreamResult = Result<(), StreamErr>;
pub trait OStream: Send {
fn write(&mut self, bytes: Vec<u8>) -> OStreamResult;
fn flush(self: Box<Self>) -> OStreamResult {
Ok(())
}
}
impl OStream for Vec<u8> {
fn write(&mut self, bytes: Vec<u8>) -> OStreamResult {
self.extend(bytes);
OStreamResult::Ok(())
}
}
pub type IStreamResult = Result<Vec<u8>, StreamErr>;
pub trait IStream: Send + Sync {
fn read(&mut self, bytes_count: usize) -> IStreamResult;
fn total_size(&self) -> usize; }
pub struct BoxedChunks {
stream: Box<dyn IStream>,
chunk_size: usize,
}
pub trait IntoBoxedChunks {
fn chunks(self, chunk_size: usize) -> BoxedChunks;
}
impl IntoBoxedChunks for Box<dyn IStream> {
fn chunks(self, chunk_size: usize) -> BoxedChunks {
BoxedChunks {
stream: self,
chunk_size,
}
}
}
impl Iterator for BoxedChunks {
type Item = IStreamResult;
fn next(&mut self) -> Option<Self::Item> {
let mut buffer = Vec::with_capacity(self.chunk_size);
while buffer.len() < self.chunk_size {
let read_bytes = self.stream.read(self.chunk_size - buffer.len());
match read_bytes {
Ok(bytes) => {
if !bytes.is_empty() {
buffer.extend(bytes);
} else {
break;
}
}
err => return Some(err),
}
}
if !buffer.is_empty() {
Some(Ok(buffer))
} else {
None
}
}
}
impl Iterator for Box<dyn IStream> {
type Item = IStreamResult;
fn next(&mut self) -> Option<Self::Item> {
let read_bytes = self.read(u16::MAX as usize);
match read_bytes {
Ok(bytes) => {
if !bytes.is_empty() {
Some(Ok(bytes))
} else {
None
}
}
err => Some(err),
}
}
}
#[derive(Clone)]
pub struct DummyIStream {
bytes: Vec<u8>,
}
impl DummyIStream {
pub fn boxed(bytes: Vec<u8>) -> Box<Self> {
Box::new(Self { bytes })
}
}
impl IStream for DummyIStream {
fn read(&mut self, bytes_count: usize) -> IStreamResult {
IStreamResult::Ok(
self.bytes
.drain(..bytes_count.min(self.total_size()))
.collect(),
)
}
fn total_size(&self) -> usize {
self.bytes.len()
}
}
pub enum ProgressUnit {
Bytes,
}
impl Display for ProgressUnit {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Bytes => write!(f, "bytes"),
}
}
}
pub trait ProgressReporter: Send + Sync {
fn report(&self, completed: usize, total: usize, unit: ProgressUnit);
}
#[derive(Clone)]
pub struct DummyProgressReporter {}
impl DummyProgressReporter {
pub fn boxed() -> Box<Self> {
Box::new(Self {})
}
}
impl ProgressReporter for DummyProgressReporter {
fn report(&self, _completed: usize, _total: usize, _unit: ProgressUnit) {}
}
#[derive(Debug, Clone, Default)]
pub struct AbortFlag {
notify: Arc<Notify>,
is_set: Arc<AtomicBool>,
}
impl AbortFlag {
pub fn new() -> Self {
Default::default()
}
pub fn set(&self) {
let was_set = self.is_set.swap(true, std::sync::atomic::Ordering::Relaxed);
if !was_set {
self.notify.notify_waiters();
}
}
pub fn is_set(&self) -> bool {
self.is_set.load(std::sync::atomic::Ordering::Relaxed)
}
pub async fn wait(&self) {
if self.is_set() {
return;
}
let future = self.notify.notified();
tokio::pin!(future);
future.as_mut().enable();
if self.is_set() {
return;
}
future.await;
}
}