Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ let
mypy
uv
pytest
pytest-random-order
pytest-parallel
pytest-xdist
pytest-timeout
multiprocess

# Repeated here so MyPy sees them:
cbor2
Expand Down
4 changes: 2 additions & 2 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ test-integration *TEST_ARGS: build-bin build-python
cd libs/opsqueue_python
source "./.setup_local_venv.sh"

pytest --color=yes {{TEST_ARGS}}
timeout 600 pytest --color=yes {{TEST_ARGS}}

# Python integration test suite, using artefacts built through Nix. Args are forwarded to pytest
[group('nix')]
Expand All @@ -61,7 +61,7 @@ nix-test-integration *TEST_ARGS: nix-build-bin
export OPSQUEUE_VIA_NIX=true
export RUST_LOG="opsqueue=debug"

pytest --color=yes {{TEST_ARGS}}
timeout 600 pytest --color=yes {{TEST_ARGS}}

# Run all linters, fast and slow
[group('lint')]
Expand Down
28 changes: 9 additions & 19 deletions libs/opsqueue_python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,29 +38,19 @@ module-name = "opsqueue.opsqueue_internal"

# We specify test-dependencies here (and have them loaded through `.setup_local_venv.sh`)
# because otherwise `pytest` won't be able to see the locally build python package!
#
# Extra notes:
#
# - We rely on `pytest-parallel` rather than `pytest-xdist`
# since we currently rely on `multiprocessing`'s `fork` strategy,
# which is incompatible
# since pytest-xdist always spawns threads internally
# c.f. https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
# - We need to specify `py` as extra requirement because of this issue: https://github.com/kevlened/pytest-parallel/issues/118
[project.optional-dependencies]
test = [
"pytest==8.3.3",
"pytest-random-order==1.1.1",
"pytest-parallel==0.1.1",
# "pytest-timeout==2.4.0",
"py==1.11.0", # Needs to be manually specified because of this issue: https://github.com/kevlened/pytest-parallel/issues/118
"pytest==9.0.2",
"pytest-xdist==3.8.0",
"multiprocess==0.70.19",
"pytest-timeout==2.4.0",
]

[tool.pytest.ini_options]
# We ensure tests never rely on global state,
# by running them in a random order, and in parallel:
addopts = "--random-order --workers=4"
# # Individual tests should be very fast. They should never take multiple seconds
# # If after 20sec (accomodating for a toaster-like PC) there is no progress,
# # assume a deadlock
# timeout=20
addopts = "-n 4 --dist=worksteal"
# Individual tests should be very fast. They should never take multiple seconds
# If after 120sec (accomodating for a toaster-like PC, or an overloaded Semaphore runner)
# there is no progress, assume a deadlock
# timeout=120
40 changes: 30 additions & 10 deletions libs/opsqueue_python/src/common.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::fmt::Debug;
use std::future::IntoFuture;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -22,7 +23,7 @@ pub const SIGNAL_CHECK_INTERVAL: Duration = Duration::from_millis(100);
#[cfg(not(debug_assertions))]
pub const SIGNAL_CHECK_INTERVAL: Duration = Duration::from_secs(1);

#[pyclass(frozen, get_all, eq, ord, hash)]
#[pyclass(frozen, get_all, eq, ord, hash, module = "opsqueue")]
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct SubmissionId {
pub id: u64,
Expand Down Expand Up @@ -57,7 +58,7 @@ impl From<submission::SubmissionId> for SubmissionId {
}
}

#[pyclass(frozen, get_all, eq, ord, hash)]
#[pyclass(frozen, get_all, eq, ord, hash, module = "opsqueue")]
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct ChunkIndex {
pub id: u64,
Expand Down Expand Up @@ -97,8 +98,7 @@ impl From<u63> for ChunkIndex {
}
}

#[pyclass(frozen, eq)]
#[derive(Debug)]
#[pyclass(frozen, eq, module = "opsqueue_internal")]
pub enum Strategy {
#[pyo3(constructor=())]
Oldest(),
Expand All @@ -113,6 +113,26 @@ pub enum Strategy {
},
}

impl Debug for Strategy {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Strategy::Oldest() => f.debug_struct("Strategy.Oldest").finish(),
Strategy::Newest() => f.debug_struct("Strategy.Newest").finish(),
Strategy::Random() => f.debug_struct("Strategy.Random").finish(),
Strategy::PreferDistinct {
meta_key,
underlying,
} => Python::with_gil(|py| {
let underlying = underlying.borrow(py);
f.debug_struct("Strategy.PreferDistinct")
.field("meta_key", meta_key)
.field("underlying", &*underlying)
.finish()
}),
}
}
}

impl From<strategy::Strategy> for Strategy {
fn from(value: strategy::Strategy) -> Self {
match value {
Expand Down Expand Up @@ -184,7 +204,7 @@ impl Eq for Strategy {}

/// Wrapper for the internal Opsqueue Chunk datatype
/// Note that it also includes some fields originating from the Submission
#[pyclass(frozen, get_all)]
#[pyclass(frozen, get_all, module = "opsqueue")]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Chunk {
pub submission_id: SubmissionId,
Expand Down Expand Up @@ -238,7 +258,7 @@ impl Chunk {

/// Wrapper for the internal Opsqueue Chunk datatype
/// Note that it also includes some fields originating from the Submission
#[pyclass(frozen, get_all)]
#[pyclass(frozen, get_all, module = "opsqueue")]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ChunkFailed {
pub submission_id: SubmissionId,
Expand Down Expand Up @@ -295,7 +315,7 @@ impl From<opsqueue::common::submission::SubmissionFailed> for SubmissionFailed {
}
}

#[pyclass(frozen)]
#[pyclass(frozen, module = "opsqueue")]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SubmissionStatus {
InProgress {
Expand Down Expand Up @@ -329,7 +349,7 @@ impl From<opsqueue::common::submission::SubmissionStatus> for SubmissionStatus {
}
}

#[pyclass(frozen, get_all)]
#[pyclass(frozen, get_all, module = "opsqueue")]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Submission {
pub id: SubmissionId,
Expand Down Expand Up @@ -390,7 +410,7 @@ impl SubmissionFailed {
}
}

#[pyclass(frozen, get_all)]
#[pyclass(frozen, get_all, module = "opsqueue")]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SubmissionCompleted {
pub id: SubmissionId,
Expand All @@ -399,7 +419,7 @@ pub struct SubmissionCompleted {
pub completed_at: DateTime<Utc>,
}

#[pyclass(frozen, get_all)]
#[pyclass(frozen, get_all, module = "opsqueue")]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SubmissionFailed {
pub id: SubmissionId,
Expand Down
2 changes: 1 addition & 1 deletion libs/opsqueue_python/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use super::common::{Chunk, ChunkIndex, Strategy, SubmissionId};

create_exception!(opsqueue_internal, ConsumerClientError, PyException);

#[pyclass]
#[pyclass(module = "opsqueue")]
#[derive(Debug)]
pub struct ConsumerClient {
client: ActualConsumerClient,
Expand Down
6 changes: 3 additions & 3 deletions libs/opsqueue_python/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ create_exception!(opsqueue_internal, ProducerClientError, PyException);
const SUBMISSION_POLLING_INTERVAL: Duration = Duration::from_millis(5000);

// NOTE: ProducerClient is reasonably cheap to clone, as most of its fields are behind Arcs.
#[pyclass]
#[pyclass(module = "opsqueue")]
#[derive(Debug, Clone)]
pub struct ProducerClient {
producer_client: ActualClient,
Expand Down Expand Up @@ -466,7 +466,7 @@ impl ProducerClient {

pub type ChunksStream = BoxStream<'static, CPyResult<Vec<u8>, ChunkRetrievalError>>;

#[pyclass]
#[pyclass(module = "opsqueue")]
pub struct PyChunksIter {
stream: Arc<tokio::sync::Mutex<ChunksStream>>,
runtime: Arc<tokio::runtime::Runtime>,
Expand Down Expand Up @@ -515,7 +515,7 @@ impl PyChunksIter {
}
}

#[pyclass]
#[pyclass(module = "opsqueue")]
pub struct PyChunksAsyncIter {
stream: Arc<tokio::sync::Mutex<ChunksStream>>,
runtime: Arc<tokio::runtime::Runtime>,
Expand Down
55 changes: 39 additions & 16 deletions libs/opsqueue_python/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import pickle
from contextlib import contextmanager, ExitStack
from typing import Generator, Callable, Any, Iterable
import multiprocessing
import multiprocess # type: ignore[import-untyped]
import subprocess
import os
import pytest
Expand All @@ -13,10 +13,12 @@
from opsqueue.common import SerializationFormat, json_as_bytes
from opsqueue.consumer import Strategy

# @pytest.hookimpl(tryfirst=True)
# def pytest_configure(config: pytest.Config) -> None:
# print("A")
# multiprocessing.set_start_method('forkserver')

@pytest.hookimpl(tryfirst=True)
def pytest_configure(config: pytest.Config) -> None:
print("A")
multiprocess.set_start_method("forkserver")


PROJECT_ROOT = Path(__file__).parents[3]

Expand Down Expand Up @@ -106,8 +108,8 @@ def is_port_in_use(port: int) -> bool:
def background_process(
function: Callable[..., None],
args: Iterable[Any] = (),
) -> Generator[multiprocessing.Process, None, None]:
proc = multiprocessing.Process(target=function, args=args)
) -> Generator[multiprocess.Process, None, None]:
proc = multiprocess.Process(target=function, args=args)
try:
proc.daemon = True
proc.start()
Expand All @@ -126,32 +128,34 @@ def multiple_background_processes(
yield


basic_strategies = Strategy.Random(), Strategy.Newest(), Strategy.Oldest()
any_strategies = [
*basic_strategies,
*(Strategy.PreferDistinct(meta_key="id", underlying=s) for s in basic_strategies),
]
type StrategyDescription = str | tuple[str, str, StrategyDescription]

basic_strategies: Iterable[StrategyDescription] = ("Random", "Newest", "Oldest")
any_strategies: Iterable[StrategyDescription] = (
*(basic_strategies),
*(("PreferDistinct", "id", s) for s in basic_strategies),
)


@pytest.fixture(
scope="function",
ids=lambda s: f"Strategy.{s}",
ids=lambda s: f"Strategy.{strategy_from_description(s)}",
params=basic_strategies,
)
def basic_consumer_strategy(
request: pytest.FixtureRequest,
) -> Generator[Strategy, None, None]:
) -> Generator[StrategyDescription, None, None]:
yield request.param


@pytest.fixture(
scope="function",
ids=lambda s: f"Strategy.{s}",
ids=lambda s: f"Strategy.{strategy_from_description(s)}",
params=any_strategies,
)
def any_consumer_strategy(
request: pytest.FixtureRequest,
) -> Generator[Strategy, None, None]:
) -> Generator[StrategyDescription, None, None]:
yield request.param


Expand All @@ -160,3 +164,22 @@ def serialization_format(
request: pytest.FixtureRequest,
) -> Generator[SerializationFormat, None, None]:
yield request.param


def strategy_from_description(description: StrategyDescription) -> Strategy:
"""
PyO3 objects cannot currently be Pickle'd.
This helper function allows us to pass a pickle-able description across `multiprocessing.Process` borders,
and then look up the actual Strategy inside the consumer.
"""
match description:
case "Random":
return Strategy.Random()
case "Newest":
return Strategy.Newest()
case "Oldest":
return Strategy.Oldest()
case ("PreferDistinct", key, underlying):
return Strategy.PreferDistinct(
meta_key=key, underlying=strategy_from_description(underlying)
)
Loading