Skip to content

Not all logs are captured when a container is dropped #719

@inahga

Description

@inahga

When a container is created with a LogConsumer, the ContainerAsync does not wait for the logs to stop while dropping. If the process ends quickly, e.g. when running a single cargo test, some logs may not be captured by the time the process ends.

See this example failing test. Note that by its nature it's a flaky test, so you may need to increase the $(seq 20) argument to reproduce it, though on my relatively fast machine the counter never gets past 1 or 2.

#[cfg(test)]
mod tests {
    use std::{
        ops::AddAssign,
        sync::{Arc, Mutex},
    };

    use futures::{future::BoxFuture, FutureExt};
    use testcontainers::{
        core::logs::{consumer::LogConsumer, LogFrame},
        runners::AsyncRunner,
        GenericImage, ImageExt,
    };

    struct TestLogConsumer(Arc<Mutex<u64>>);

    impl LogConsumer for TestLogConsumer {
        fn accept<'a>(&'a self, _: &'a LogFrame) -> BoxFuture<'a, ()> {
            async move {
                self.0.lock().unwrap().add_assign(1);
            }
            .boxed()
        }
    }

    #[tokio::test]
    async fn bad_test() {
        let counter = Arc::new(Mutex::new(0u64));

        {
            GenericImage::new("docker.io/library/ubuntu", "latest")
                .with_cmd(["/bin/bash", "-c", "for i in $(seq 20); do echo $i; done"])
                .with_log_consumer(TestLogConsumer(Arc::clone(&counter)))
                .start()
                .await
                .unwrap();
        }

        assert_eq!(counter.lock().unwrap().to_owned(), 20);
    }
}

I'm able to workaround this by wrapping the container and avoiding using LogConsumer at all:

Details
#[cfg(test)]
mod tests {
    use std::{
        ops::AddAssign,
        pin::Pin,
        sync::{Arc, Mutex},
    };

    use testcontainers::{runners::AsyncRunner, ContainerAsync, GenericImage, ImageExt};
    use tokio::{
        io::{AsyncBufRead, AsyncBufReadExt},
        runtime::Handle,
        task::JoinHandle,
    };

    type Buffer = Pin<Box<dyn AsyncBufRead + Send>>;

    struct TestLogConsumer {
        stdout_handle: Option<JoinHandle<()>>,
        stderr_handle: Option<JoinHandle<()>>,
    }

    impl TestLogConsumer {
        fn new(counter: Arc<Mutex<u64>>, stdout: Buffer, stderr: Buffer) -> Self {
            let (stdout_counter, stderr_counter) = (Arc::clone(&counter), Arc::clone(&counter));

            let stdout_handle = Some(tokio::spawn(async move {
                let mut lines = stdout.lines();
                while let Ok(Some(_)) = lines.next_line().await {
                    stdout_counter.lock().unwrap().add_assign(1);
                }
            }));

            let stderr_handle = Some(tokio::spawn(async move {
                let mut lines = stderr.lines();
                while let Ok(Some(_)) = lines.next_line().await {
                    stderr_counter.lock().unwrap().add_assign(1);
                }
            }));

            Self {
                stdout_handle,
                stderr_handle,
            }
        }
    }

    impl Drop for TestLogConsumer {
        fn drop(&mut self) {
            let handle = Handle::current();
            let stdout_handle = self.stdout_handle.take();
            let stderr_handle = self.stderr_handle.take();
            tokio::task::block_in_place(|| {
                stdout_handle.map(|task| handle.block_on(task));
                stderr_handle.map(|task| handle.block_on(task));
            });
        }
    }

    struct TestContainer {
        container: ContainerAsync<GenericImage>,
        log_consumer: TestLogConsumer,
    }

    impl TestContainer {
        async fn new(counter: Arc<Mutex<u64>>) -> Self {
            let container = GenericImage::new("docker.io/library/ubuntu", "latest")
                .with_cmd(["/bin/bash", "-c", "for i in $(seq 20); do echo $i; done"])
                .start()
                .await
                .unwrap();

            let log_consumer =
                TestLogConsumer::new(counter, container.stdout(true), container.stderr(true));

            TestContainer {
                container,
                log_consumer,
            }
        }
    }

    #[tokio::test(flavor = "multi_thread")]
    async fn good_test() {
        let counter = Arc::new(Mutex::new(0u64));

        {
            TestContainer::new(Arc::clone(&counter)).await;
        }

        assert_eq!(counter.lock().unwrap().to_owned(), 20);
    }
}

But that's unfortunate. I think if someone is using a LogConsumer they'd prefer that all logs are captured.

I think the problem is that the log consumer task is created with tokio::spawn, and the corresponding handle is never joined. https://github.com/testcontainers/testcontainers-rs/blob/main/testcontainers/src/core/containers/async_container.rs#L71.

I'm more than happy to fix this myself, but wanted to gauge maintainer interest first, since adding more blocking calls to a Drop is generally unpleasant.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions