diff --git a/default.nix b/default.nix index e84fd2a..15f00c0 100644 --- a/default.nix +++ b/default.nix @@ -10,9 +10,9 @@ let mypy uv pytest - pytest-random-order - pytest-parallel + pytest-xdist pytest-timeout + multiprocess # Repeated here so MyPy sees them: cbor2 diff --git a/justfile b/justfile index d12fb41..ee1baa4 100644 --- a/justfile +++ b/justfile @@ -47,7 +47,7 @@ test-integration *TEST_ARGS: build-bin build-python cd libs/opsqueue_python source "./.setup_local_venv.sh" - pytest --color=yes {{TEST_ARGS}} + timeout 600 pytest --color=yes {{TEST_ARGS}} # Python integration test suite, using artefacts built through Nix. Args are forwarded to pytest [group('nix')] @@ -61,7 +61,7 @@ nix-test-integration *TEST_ARGS: nix-build-bin export OPSQUEUE_VIA_NIX=true export RUST_LOG="opsqueue=debug" - pytest --color=yes {{TEST_ARGS}} + timeout 600 pytest --color=yes {{TEST_ARGS}} # Run all linters, fast and slow [group('lint')] diff --git a/libs/opsqueue_python/pyproject.toml b/libs/opsqueue_python/pyproject.toml index 21be1d1..90c5481 100644 --- a/libs/opsqueue_python/pyproject.toml +++ b/libs/opsqueue_python/pyproject.toml @@ -38,29 +38,19 @@ module-name = "opsqueue.opsqueue_internal" # We specify test-dependencies here (and have them loaded through `.setup_local_venv.sh`) # because otherwise `pytest` won't be able to see the locally build python package! -# -# Extra notes: -# -# - We rely on `pytest-parallel` rather than `pytest-xdist` -# since we currently rely on `multiprocessing`'s `fork` strategy, -# which is incompatible -# since pytest-xdist always spawns threads internally -# c.f. https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods -# - We need to specify `py` as extra requirement because of this issue: https://github.com/kevlened/pytest-parallel/issues/118 [project.optional-dependencies] test = [ - "pytest==8.3.3", - "pytest-random-order==1.1.1", - "pytest-parallel==0.1.1", - # "pytest-timeout==2.4.0", - "py==1.11.0", # Needs to be manually specified because of this issue: https://github.com/kevlened/pytest-parallel/issues/118 + "pytest==9.0.2", + "pytest-xdist==3.8.0", + "multiprocess==0.70.19", + "pytest-timeout==2.4.0", ] [tool.pytest.ini_options] # We ensure tests never rely on global state, # by running them in a random order, and in parallel: -addopts = "--random-order --workers=4" -# # Individual tests should be very fast. They should never take multiple seconds -# # If after 20sec (accomodating for a toaster-like PC) there is no progress, -# # assume a deadlock -# timeout=20 +addopts = "-n 4 --dist=worksteal" +# Individual tests should be very fast. They should never take multiple seconds +# If after 120sec (accomodating for a toaster-like PC, or an overloaded Semaphore runner) +# there is no progress, assume a deadlock +# timeout=120 diff --git a/libs/opsqueue_python/src/common.rs b/libs/opsqueue_python/src/common.rs index ba5e8aa..401d86b 100644 --- a/libs/opsqueue_python/src/common.rs +++ b/libs/opsqueue_python/src/common.rs @@ -1,3 +1,4 @@ +use std::fmt::Debug; use std::future::IntoFuture; use std::sync::Arc; use std::time::Duration; @@ -22,7 +23,7 @@ pub const SIGNAL_CHECK_INTERVAL: Duration = Duration::from_millis(100); #[cfg(not(debug_assertions))] pub const SIGNAL_CHECK_INTERVAL: Duration = Duration::from_secs(1); -#[pyclass(frozen, get_all, eq, ord, hash)] +#[pyclass(frozen, get_all, eq, ord, hash, module = "opsqueue")] #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] pub struct SubmissionId { pub id: u64, @@ -57,7 +58,7 @@ impl From for SubmissionId { } } -#[pyclass(frozen, get_all, eq, ord, hash)] +#[pyclass(frozen, get_all, eq, ord, hash, module = "opsqueue")] #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)] pub struct ChunkIndex { pub id: u64, @@ -97,8 +98,7 @@ impl From for ChunkIndex { } } -#[pyclass(frozen, eq)] -#[derive(Debug)] +#[pyclass(frozen, eq, module = "opsqueue_internal")] pub enum Strategy { #[pyo3(constructor=())] Oldest(), @@ -113,6 +113,26 @@ pub enum Strategy { }, } +impl Debug for Strategy { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Strategy::Oldest() => f.debug_struct("Strategy.Oldest").finish(), + Strategy::Newest() => f.debug_struct("Strategy.Newest").finish(), + Strategy::Random() => f.debug_struct("Strategy.Random").finish(), + Strategy::PreferDistinct { + meta_key, + underlying, + } => Python::with_gil(|py| { + let underlying = underlying.borrow(py); + f.debug_struct("Strategy.PreferDistinct") + .field("meta_key", meta_key) + .field("underlying", &*underlying) + .finish() + }), + } + } +} + impl From for Strategy { fn from(value: strategy::Strategy) -> Self { match value { @@ -184,7 +204,7 @@ impl Eq for Strategy {} /// Wrapper for the internal Opsqueue Chunk datatype /// Note that it also includes some fields originating from the Submission -#[pyclass(frozen, get_all)] +#[pyclass(frozen, get_all, module = "opsqueue")] #[derive(Debug, Clone, PartialEq, Eq)] pub struct Chunk { pub submission_id: SubmissionId, @@ -238,7 +258,7 @@ impl Chunk { /// Wrapper for the internal Opsqueue Chunk datatype /// Note that it also includes some fields originating from the Submission -#[pyclass(frozen, get_all)] +#[pyclass(frozen, get_all, module = "opsqueue")] #[derive(Debug, Clone, PartialEq, Eq)] pub struct ChunkFailed { pub submission_id: SubmissionId, @@ -295,7 +315,7 @@ impl From for SubmissionFailed { } } -#[pyclass(frozen)] +#[pyclass(frozen, module = "opsqueue")] #[derive(Debug, Clone, PartialEq, Eq)] pub enum SubmissionStatus { InProgress { @@ -329,7 +349,7 @@ impl From for SubmissionStatus { } } -#[pyclass(frozen, get_all)] +#[pyclass(frozen, get_all, module = "opsqueue")] #[derive(Debug, Clone, PartialEq, Eq)] pub struct Submission { pub id: SubmissionId, @@ -390,7 +410,7 @@ impl SubmissionFailed { } } -#[pyclass(frozen, get_all)] +#[pyclass(frozen, get_all, module = "opsqueue")] #[derive(Debug, Clone, PartialEq, Eq)] pub struct SubmissionCompleted { pub id: SubmissionId, @@ -399,7 +419,7 @@ pub struct SubmissionCompleted { pub completed_at: DateTime, } -#[pyclass(frozen, get_all)] +#[pyclass(frozen, get_all, module = "opsqueue")] #[derive(Debug, Clone, PartialEq, Eq)] pub struct SubmissionFailed { pub id: SubmissionId, diff --git a/libs/opsqueue_python/src/consumer.rs b/libs/opsqueue_python/src/consumer.rs index 613ac5c..cc31118 100644 --- a/libs/opsqueue_python/src/consumer.rs +++ b/libs/opsqueue_python/src/consumer.rs @@ -34,7 +34,7 @@ use super::common::{Chunk, ChunkIndex, Strategy, SubmissionId}; create_exception!(opsqueue_internal, ConsumerClientError, PyException); -#[pyclass] +#[pyclass(module = "opsqueue")] #[derive(Debug)] pub struct ConsumerClient { client: ActualConsumerClient, diff --git a/libs/opsqueue_python/src/producer.rs b/libs/opsqueue_python/src/producer.rs index 7e16ba5..ff6d258 100644 --- a/libs/opsqueue_python/src/producer.rs +++ b/libs/opsqueue_python/src/producer.rs @@ -33,7 +33,7 @@ create_exception!(opsqueue_internal, ProducerClientError, PyException); const SUBMISSION_POLLING_INTERVAL: Duration = Duration::from_millis(5000); // NOTE: ProducerClient is reasonably cheap to clone, as most of its fields are behind Arcs. -#[pyclass] +#[pyclass(module = "opsqueue")] #[derive(Debug, Clone)] pub struct ProducerClient { producer_client: ActualClient, @@ -466,7 +466,7 @@ impl ProducerClient { pub type ChunksStream = BoxStream<'static, CPyResult, ChunkRetrievalError>>; -#[pyclass] +#[pyclass(module = "opsqueue")] pub struct PyChunksIter { stream: Arc>, runtime: Arc, @@ -515,7 +515,7 @@ impl PyChunksIter { } } -#[pyclass] +#[pyclass(module = "opsqueue")] pub struct PyChunksAsyncIter { stream: Arc>, runtime: Arc, diff --git a/libs/opsqueue_python/tests/conftest.py b/libs/opsqueue_python/tests/conftest.py index c0c9d37..4b0ebb4 100644 --- a/libs/opsqueue_python/tests/conftest.py +++ b/libs/opsqueue_python/tests/conftest.py @@ -2,7 +2,7 @@ import pickle from contextlib import contextmanager, ExitStack from typing import Generator, Callable, Any, Iterable -import multiprocessing +import multiprocess # type: ignore[import-untyped] import subprocess import os import pytest @@ -13,10 +13,12 @@ from opsqueue.common import SerializationFormat, json_as_bytes from opsqueue.consumer import Strategy -# @pytest.hookimpl(tryfirst=True) -# def pytest_configure(config: pytest.Config) -> None: -# print("A") -# multiprocessing.set_start_method('forkserver') + +@pytest.hookimpl(tryfirst=True) +def pytest_configure(config: pytest.Config) -> None: + print("A") + multiprocess.set_start_method("forkserver") + PROJECT_ROOT = Path(__file__).parents[3] @@ -106,8 +108,8 @@ def is_port_in_use(port: int) -> bool: def background_process( function: Callable[..., None], args: Iterable[Any] = (), -) -> Generator[multiprocessing.Process, None, None]: - proc = multiprocessing.Process(target=function, args=args) +) -> Generator[multiprocess.Process, None, None]: + proc = multiprocess.Process(target=function, args=args) try: proc.daemon = True proc.start() @@ -126,32 +128,34 @@ def multiple_background_processes( yield -basic_strategies = Strategy.Random(), Strategy.Newest(), Strategy.Oldest() -any_strategies = [ - *basic_strategies, - *(Strategy.PreferDistinct(meta_key="id", underlying=s) for s in basic_strategies), -] +type StrategyDescription = str | tuple[str, str, StrategyDescription] + +basic_strategies: Iterable[StrategyDescription] = ("Random", "Newest", "Oldest") +any_strategies: Iterable[StrategyDescription] = ( + *(basic_strategies), + *(("PreferDistinct", "id", s) for s in basic_strategies), +) @pytest.fixture( scope="function", - ids=lambda s: f"Strategy.{s}", + ids=lambda s: f"Strategy.{strategy_from_description(s)}", params=basic_strategies, ) def basic_consumer_strategy( request: pytest.FixtureRequest, -) -> Generator[Strategy, None, None]: +) -> Generator[StrategyDescription, None, None]: yield request.param @pytest.fixture( scope="function", - ids=lambda s: f"Strategy.{s}", + ids=lambda s: f"Strategy.{strategy_from_description(s)}", params=any_strategies, ) def any_consumer_strategy( request: pytest.FixtureRequest, -) -> Generator[Strategy, None, None]: +) -> Generator[StrategyDescription, None, None]: yield request.param @@ -160,3 +164,22 @@ def serialization_format( request: pytest.FixtureRequest, ) -> Generator[SerializationFormat, None, None]: yield request.param + + +def strategy_from_description(description: StrategyDescription) -> Strategy: + """ + PyO3 objects cannot currently be Pickle'd. + This helper function allows us to pass a pickle-able description across `multiprocessing.Process` borders, + and then look up the actual Strategy inside the consumer. + """ + match description: + case "Random": + return Strategy.Random() + case "Newest": + return Strategy.Newest() + case "Oldest": + return Strategy.Oldest() + case ("PreferDistinct", key, underlying): + return Strategy.PreferDistinct( + meta_key=key, underlying=strategy_from_description(underlying) + ) diff --git a/libs/opsqueue_python/tests/test_rountrip.py b/libs/opsqueue_python/tests/test_rountrip.py index c025a72..fdb3054 100644 --- a/libs/opsqueue_python/tests/test_rountrip.py +++ b/libs/opsqueue_python/tests/test_rountrip.py @@ -9,9 +9,15 @@ ChunkFailed, SubmissionFailedError, ) -from opsqueue.consumer import ConsumerClient, Strategy, Chunk +from opsqueue.consumer import ConsumerClient, Chunk from opsqueue.common import SerializationFormat -from conftest import background_process, multiple_background_processes, OpsqueueProcess +from conftest import ( + background_process, + multiple_background_processes, + OpsqueueProcess, + StrategyDescription, + strategy_from_description, +) import logging import pytest @@ -21,7 +27,9 @@ def increment(data: int) -> int: return data + 1 -def test_roundtrip(opsqueue: OpsqueueProcess, any_consumer_strategy: Strategy) -> None: +def test_roundtrip( + opsqueue: OpsqueueProcess, any_consumer_strategy: StrategyDescription +) -> None: """ A most basic test that round-trips all three components. If this fails, something is very wrong. @@ -34,7 +42,8 @@ def run_consumer() -> None: consumer_client = ConsumerClient( f"localhost:{opsqueue.port}", "file:///tmp/opsqueue/test_roundtrip" ) - consumer_client.run_each_op(increment, strategy=any_consumer_strategy) + strategy = strategy_from_description(any_consumer_strategy) + consumer_client.run_each_op(increment, strategy=strategy) with background_process(run_consumer) as _consumer: input_iter = range(0, 100) @@ -48,7 +57,7 @@ def run_consumer() -> None: def test_empty_submission( - opsqueue: OpsqueueProcess, any_consumer_strategy: Strategy + opsqueue: OpsqueueProcess, any_consumer_strategy: StrategyDescription ) -> None: """ Empty submissions ought to be supported without problems. @@ -69,7 +78,7 @@ def test_empty_submission( def test_roundtrip_explicit_serialization_format( opsqueue: OpsqueueProcess, - any_consumer_strategy: Strategy, + any_consumer_strategy: StrategyDescription, serialization_format: SerializationFormat, ) -> None: """ @@ -86,9 +95,10 @@ def run_consumer() -> None: consumer_client = ConsumerClient( f"localhost:{opsqueue.port}", "file:///tmp/opsqueue/test_roundtrip" ) + strategy = strategy_from_description(any_consumer_strategy) consumer_client.run_each_op( increment, - strategy=any_consumer_strategy, + strategy=strategy, serialization_format=serialization_format, ) @@ -151,7 +161,7 @@ def broken_increment(input: int) -> float: def test_chunk_roundtrip( - opsqueue: OpsqueueProcess, basic_consumer_strategy: Strategy + opsqueue: OpsqueueProcess, basic_consumer_strategy: StrategyDescription ) -> None: """ Tests whether everything still works well @@ -172,7 +182,8 @@ def increment_list(ints: Sequence[int], _chunk: Chunk) -> Sequence[int]: f"localhost:{opsqueue.port}", "file:///tmp/opsqueue/test_chunk_roundtrip", ) - consumer_client.run_each_chunk(increment_list, strategy=basic_consumer_strategy) + strategy = strategy_from_description(basic_consumer_strategy) + consumer_client.run_each_chunk(increment_list, strategy=strategy) with background_process(run_consumer) as _consumer: input_iter = map(lambda i: cbor2.dumps([i, i, i]), range(0, 10)) @@ -188,7 +199,7 @@ def increment_list(ints: Sequence[int], _chunk: Chunk) -> Sequence[int]: def test_many_consumers( - opsqueue: OpsqueueProcess, any_consumer_strategy: Strategy + opsqueue: OpsqueueProcess, any_consumer_strategy: StrategyDescription ) -> None: """ Ensure the system still works if we have many consumers concurrently @@ -210,7 +221,8 @@ def run_consumer(consumer_id: int) -> None: consumer_client = ConsumerClient( f"localhost:{opsqueue.port}", "file:///tmp/opsqueue/test_many_consumers" ) - consumer_client.run_each_op(increment, strategy=any_consumer_strategy) + strategy = strategy_from_description(any_consumer_strategy) + consumer_client.run_each_op(increment, strategy=strategy) n_consumers = 16 with multiple_background_processes(run_consumer, n_consumers) as _consumers: