From a26dc8ca59aed4549c9a8755acc696e24d65d350 Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Tue, 3 Feb 2026 15:41:56 -0800 Subject: [PATCH 1/3] chore: Allow for completion reporting for FDv2SourceResult consumption. --- .../sdk/server/FDv2DataSource.java | 164 +++++++++--------- .../server/datasources/FDv2SourceResult.java | 105 +++++++++-- .../sdk/server/integrations/TestDataV2.java | 110 +++--------- .../TestDataV2WithClientTest.java | 144 +++++++++++++++ 4 files changed, 340 insertions(+), 183 deletions(-) create mode 100644 lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/TestDataV2WithClientTest.java diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java index a7d1c874..70475e23 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java @@ -159,38 +159,39 @@ private void runInitializers() { Initializer initializer = sourceManager.getNextInitializerAndSetActive(); while(initializer != null) { try { - FDv2SourceResult result = initializer.run().get(); - switch (result.getResultType()) { - case CHANGE_SET: - dataSourceUpdates.apply(result.getChangeSet()); - anyDataReceived = true; - if (!result.getChangeSet().getSelector().isEmpty()) { - // We received data with a selector, so we end the initialization process. - dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null); - startFuture.complete(true); - return; - } - break; - case STATUS: - FDv2SourceResult.Status status = result.getStatus(); - switch(status.getState()) { - case INTERRUPTED: - case TERMINAL_ERROR: - // The data source updates handler will filter the state during initializing, but this - // will make the error information available. - dataSourceUpdates.updateStatus( - // While the error was terminal to the individual initializer, it isn't terminal - // to the data source as a whole. - DataSourceStatusProvider.State.INTERRUPTED, - status.getErrorInfo()); - break; - case SHUTDOWN: - case GOODBYE: - // We don't need to inform anyone of these statuses. - logger.debug("Ignoring status {} from initializer", result.getStatus().getState()); - break; - } - break; + try(FDv2SourceResult result = initializer.run().get()) { + switch (result.getResultType()) { + case CHANGE_SET: + dataSourceUpdates.apply(result.getChangeSet()); + anyDataReceived = true; + if (!result.getChangeSet().getSelector().isEmpty()) { + // We received data with a selector, so we end the initialization process. + dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null); + startFuture.complete(true); + return; + } + break; + case STATUS: + FDv2SourceResult.Status status = result.getStatus(); + switch (status.getState()) { + case INTERRUPTED: + case TERMINAL_ERROR: + // The data source updates handler will filter the state during initializing, but this + // will make the error information available. + dataSourceUpdates.updateStatus( + // While the error was terminal to the individual initializer, it isn't terminal + // to the data source as a whole. + DataSourceStatusProvider.State.INTERRUPTED, + status.getErrorInfo()); + break; + case SHUTDOWN: + case GOODBYE: + // We don't need to inform anyone of these statuses. + logger.debug("Ignoring status {} from initializer", result.getStatus().getState()); + break; + } + break; + } } } catch (ExecutionException | InterruptedException | CancellationException e) { // We don't expect these conditions to happen in practice. @@ -205,7 +206,7 @@ private void runInitializers() { new Date().toInstant())); logger.warn("Error running initializer: {}", e.toString()); } - initializer = sourceManager.getNextInitializerAndSetActive(); + initializer = sourceManager.getNextInitializerAndSetActive(); } // We received data without a selector, and we have exhausted initializers, so we are going to // consider ourselves initialized. @@ -286,55 +287,56 @@ private void runSynchronizers() { continue; } - FDv2SourceResult result = (FDv2SourceResult) res; - conditions.inform(result); + try (FDv2SourceResult result = (FDv2SourceResult) res) { + conditions.inform(result); - switch (result.getResultType()) { - case CHANGE_SET: - dataSourceUpdates.apply(result.getChangeSet()); - dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null); - // This could have been completed by any data source. But if it has not been completed before - // now, then we complete it. - startFuture.complete(true); - break; - case STATUS: - FDv2SourceResult.Status status = result.getStatus(); - switch (status.getState()) { - case INTERRUPTED: - // Handled by conditions. - dataSourceUpdates.updateStatus( - DataSourceStatusProvider.State.INTERRUPTED, - status.getErrorInfo()); - break; - case SHUTDOWN: - // We should be overall shutting down. - logger.debug("Synchronizer shutdown."); - return; - case TERMINAL_ERROR: - sourceManager.blockCurrentSynchronizer(); - running = false; - dataSourceUpdates.updateStatus( - DataSourceStatusProvider.State.INTERRUPTED, - status.getErrorInfo()); - break; - case GOODBYE: - // We let the synchronizer handle this internally. - break; - } - break; - } - // We have been requested to fall back to FDv1. We handle whatever message was associated, - // close the synchronizer, and then fallback. - // Only trigger fallback if we're not already running the FDv1 fallback synchronizer. - if ( - result.isFdv1Fallback() && - sourceManager.hasFDv1Fallback() && - // This shouldn't happen in practice, an FDv1 source shouldn't request fallback - // to FDv1. But if it does, then we will discard its request. - !sourceManager.isCurrentSynchronizerFDv1Fallback() - ) { - sourceManager.fdv1Fallback(); - running = false; + switch (result.getResultType()) { + case CHANGE_SET: + dataSourceUpdates.apply(result.getChangeSet()); + dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null); + // This could have been completed by any data source. But if it has not been completed before + // now, then we complete it. + startFuture.complete(true); + break; + case STATUS: + FDv2SourceResult.Status status = result.getStatus(); + switch (status.getState()) { + case INTERRUPTED: + // Handled by conditions. + dataSourceUpdates.updateStatus( + DataSourceStatusProvider.State.INTERRUPTED, + status.getErrorInfo()); + break; + case SHUTDOWN: + // We should be overall shutting down. + logger.debug("Synchronizer shutdown."); + return; + case TERMINAL_ERROR: + sourceManager.blockCurrentSynchronizer(); + running = false; + dataSourceUpdates.updateStatus( + DataSourceStatusProvider.State.INTERRUPTED, + status.getErrorInfo()); + break; + case GOODBYE: + // We let the synchronizer handle this internally. + break; + } + break; + } + // We have been requested to fall back to FDv1. We handle whatever message was associated, + // close the synchronizer, and then fallback. + // Only trigger fallback if we're not already running the FDv1 fallback synchronizer. + if ( + result.isFdv1Fallback() && + sourceManager.hasFDv1Fallback() && + // This shouldn't happen in practice, an FDv1 source shouldn't request fallback + // to FDv1. But if it does, then we will discard its request. + !sourceManager.isCurrentSynchronizerFDv1Fallback() + ) { + sourceManager.fdv1Fallback(); + running = false; + } } } } diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/FDv2SourceResult.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/FDv2SourceResult.java index 6f440c63..acd5e121 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/FDv2SourceResult.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/FDv2SourceResult.java @@ -1,14 +1,19 @@ package com.launchdarkly.sdk.server.datasources; + import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider; import com.launchdarkly.sdk.server.subsystems.DataStoreTypes; +import java.io.Closeable; +import java.util.function.Function; + /** * This type is currently experimental and not subject to semantic versioning. *

* The result type for FDv2 initializers and synchronizers. An FDv2 initializer produces a single result, while * an FDv2 synchronizer produces a stream of results. */ -public class FDv2SourceResult { +public class FDv2SourceResult implements Closeable { + public enum State { /** * The data source has encountered an interruption and will attempt to reconnect. This isn't intended to be used @@ -67,49 +72,86 @@ public Status(State state, DataSourceStatusProvider.ErrorInfo errorInfo) { private final Status status; private final ResultType resultType; - + private final boolean fdv1Fallback; - private FDv2SourceResult(DataStoreTypes.ChangeSet changeSet, Status status, ResultType resultType, boolean fdv1Fallback) { + private final Function completionCallback; + + private FDv2SourceResult( + DataStoreTypes.ChangeSet changeSet, + Status status, ResultType resultType, + boolean fdv1Fallback, + Function completionCallback + ) { this.changeSet = changeSet; this.status = status; this.resultType = resultType; this.fdv1Fallback = fdv1Fallback; + this.completionCallback = completionCallback; } public static FDv2SourceResult interrupted(DataSourceStatusProvider.ErrorInfo errorInfo, boolean fdv1Fallback) { + return interrupted(errorInfo, fdv1Fallback, null); + } + + public static FDv2SourceResult interrupted(DataSourceStatusProvider.ErrorInfo errorInfo, boolean fdv1Fallback, Function completionCallback) { return new FDv2SourceResult( - null, - new Status(State.INTERRUPTED, errorInfo), - ResultType.STATUS, - fdv1Fallback); + null, + new Status(State.INTERRUPTED, errorInfo), + ResultType.STATUS, + fdv1Fallback, + completionCallback); } public static FDv2SourceResult shutdown() { + return shutdown(null); + } + + public static FDv2SourceResult shutdown(Function completionCallback) { return new FDv2SourceResult(null, - new Status(State.SHUTDOWN, null), - ResultType.STATUS, - false); + new Status(State.SHUTDOWN, null), + ResultType.STATUS, + false, + completionCallback); } public static FDv2SourceResult terminalError(DataSourceStatusProvider.ErrorInfo errorInfo, boolean fdv1Fallback) { + return terminalError(errorInfo, fdv1Fallback, null); + } + + public static FDv2SourceResult terminalError(DataSourceStatusProvider.ErrorInfo errorInfo, boolean fdv1Fallback, Function completionCallback) { return new FDv2SourceResult(null, - new Status(State.TERMINAL_ERROR, errorInfo), - ResultType.STATUS, - fdv1Fallback); + new Status(State.TERMINAL_ERROR, errorInfo), + ResultType.STATUS, + fdv1Fallback, + completionCallback); } public static FDv2SourceResult changeSet(DataStoreTypes.ChangeSet changeSet, boolean fdv1Fallback) { - return new FDv2SourceResult(changeSet, null, ResultType.CHANGE_SET, fdv1Fallback); + return changeSet(changeSet, fdv1Fallback, null); + } + + public static FDv2SourceResult changeSet(DataStoreTypes.ChangeSet changeSet, boolean fdv1Fallback, Function completionCallback) { + return new FDv2SourceResult( + changeSet, + null, + ResultType.CHANGE_SET, + fdv1Fallback, + completionCallback); } public static FDv2SourceResult goodbye(String reason, boolean fdv1Fallback) { + return goodbye(reason, fdv1Fallback, null); + } + + public static FDv2SourceResult goodbye(String reason, boolean fdv1Fallback, Function completionCallback) { // TODO: Goodbye reason. return new FDv2SourceResult( - null, - new Status(State.GOODBYE, null), - ResultType.STATUS, - fdv1Fallback); + null, + new Status(State.GOODBYE, null), + ResultType.STATUS, + fdv1Fallback, + completionCallback); } public ResultType getResultType() { @@ -127,4 +169,31 @@ public DataStoreTypes.ChangeSet getChangeSet() { public boolean isFdv1Fallback() { return fdv1Fallback; } + + /** + * Creates a new result wrapping this one with an additional completion callback. + *

+ * The new completion callback will be invoked when the result is closed, followed by + * the original completion callback (if any). + * + * @param newCallback the completion callback to add + * @return a new FDv2SourceResult with the added completion callback + */ + public FDv2SourceResult withCompletion(Function newCallback) { + Function combinedCallback = v -> { + newCallback.apply(null); + if (completionCallback != null) { + completionCallback.apply(null); + } + return null; + }; + return new FDv2SourceResult(changeSet, status, resultType, fdv1Fallback, combinedCallback); + } + + @Override + public void close() { + if(completionCallback != null) { + completionCallback.apply(null); + } + } } diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/TestDataV2.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/TestDataV2.java index 1e17e8e0..143ad7f8 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/TestDataV2.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/TestDataV2.java @@ -1,31 +1,28 @@ package com.launchdarkly.sdk.server.integrations; import com.google.common.collect.ImmutableMap; +import com.launchdarkly.sdk.internal.collections.IterableAsyncQueue; import com.launchdarkly.sdk.internal.fdv2.sources.Selector; import com.launchdarkly.sdk.server.DataModel; import com.launchdarkly.sdk.server.datasources.FDv2SourceResult; import com.launchdarkly.sdk.server.datasources.Synchronizer; import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider; -import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.State; import com.launchdarkly.sdk.server.subsystems.DataSourceBuildInputs; import com.launchdarkly.sdk.server.subsystems.DataSourceBuilder; import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSet; import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSetType; -import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.DataKind; import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ItemDescriptor; import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.KeyedItems; import java.time.Instant; import java.util.AbstractMap; import java.util.Collections; -import java.util.LinkedList; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; -import java.util.TreeMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BooleanSupplier; /** @@ -149,7 +146,7 @@ public TestDataV2 delete(String key) { *

* Any subsequent changes to this {@link TestData.FlagBuilder} instance do not affect the test data, * unless you call {@link #update(TestData.FlagBuilder)} again. - * + * * @param flagBuilder a flag configuration builder * @return the same {@code TestDataV2} instance * @see #flag(String) @@ -158,7 +155,7 @@ public TestDataV2 update(TestData.FlagBuilder flagBuilder) { String key = flagBuilder.key; TestData.FlagBuilder clonedBuilder = new TestData.FlagBuilder(flagBuilder); ItemDescriptor newItem = null; - + synchronized (lock) { ItemDescriptor oldItem = currentFlags.get(key); int oldVersion = oldItem == null ? 0 : oldItem.getVersion(); @@ -166,7 +163,7 @@ public TestDataV2 update(TestData.FlagBuilder flagBuilder) { currentFlags.put(key, newItem); currentBuilders.put(key, clonedBuilder); } - + pushToSynchronizers(FDv2SourceResult.changeSet(makePartialChangeSet(key, newItem), false)); return this; @@ -211,45 +208,6 @@ public TestDataV2 updateStatus(DataSourceStatusProvider.State newState, DataSour } return this; } - - /** - * Waits until the given condition returns true or the timeout elapses. - *

- * Use this after calling {@link #update(TestData.FlagBuilder)}, {@link #delete(String)}, or - * {@link #updateStatus(DataSourceStatusProvider.State, DataSourceStatusProvider.ErrorInfo)} - * when using TestDataV2 with an {@code LDClient}, so that your assertions see the updated state. - * The synchronizer may apply updates asynchronously. - * - * @param timeout maximum time to wait - * @param unit unit for the timeout - * @param condition condition to poll; when it returns true, this method returns - * @throws InterruptedException if the current thread is interrupted while waiting - * @throws AssertionError if the condition does not become true before the timeout - */ - public void awaitPropagation(long timeout, TimeUnit unit, BooleanSupplier condition) - throws InterruptedException { - long deadlineMs = System.currentTimeMillis() + unit.toMillis(timeout); - while (System.currentTimeMillis() < deadlineMs) { - if (condition.getAsBoolean()) { - return; - } - Thread.sleep(20); - } - throw new AssertionError("Update did not propagate within " + timeout + " " + unit); - } - - /** - * Waits until the given condition returns true or the default timeout (5 seconds) elapses. - *

- * Equivalent to {@link #awaitPropagation(long, TimeUnit, BooleanSupplier)} with a 5-second timeout. - * - * @param condition condition to poll; when it returns true, this method returns - * @throws InterruptedException if the current thread is interrupted while waiting - * @throws AssertionError if the condition does not become true before the timeout - */ - public void awaitPropagation(BooleanSupplier condition) throws InterruptedException { - awaitPropagation(5, TimeUnit.SECONDS, condition); - } /** * Configures whether test data should be persisted to persistent stores. @@ -289,7 +247,12 @@ public Synchronizer build(DataSourceBuildInputs context) { private void pushToSynchronizers(FDv2SourceResult result) { for (TestDataV2SynchronizerImpl sync : synchronizerInstances) { - sync.put(result); + CompletableFuture completion = new CompletableFuture<>(); + FDv2SourceResult wrappedResult = result.withCompletion(v -> { + completion.complete(null); + return null; + }); + sync.put(wrappedResult, completion); } } @@ -328,54 +291,33 @@ private void closedSynchronizerInstance(TestDataV2SynchronizerImpl synchronizer) * Synchronizer implementation that queues initial and incremental change sets from TestDataV2. */ private final class TestDataV2SynchronizerImpl implements Synchronizer { - private final Object queueLock = new Object(); - private final LinkedList queue = new LinkedList<>(); - private final LinkedList> pendingFutures = new LinkedList<>(); + private final IterableAsyncQueue resultQueue = new IterableAsyncQueue<>(); private final CompletableFuture shutdownFuture = new CompletableFuture<>(); - private volatile boolean closed; - private volatile boolean initialSent; - void put(FDv2SourceResult result) { - synchronized (queueLock) { - if (closed) return; - CompletableFuture waiter = pendingFutures.pollFirst(); - if (waiter != null) { - waiter.complete(result); - } else { - queue.addLast(result); - } + private final AtomicBoolean initialSent = new AtomicBoolean(false); + + void put(FDv2SourceResult result, CompletableFuture completion) { + resultQueue.put(result); + try { + CompletableFuture.anyOf(completion, shutdownFuture).get(); + } catch (Exception e) { + // Completion interrupted or canceled } } @Override public CompletableFuture next() { - synchronized (queueLock) { - if (!initialSent) { - initialSent = true; - // Prepend full changeset so it is delivered before any partial changesets that - // accumulated from update()/delete() calls made before next() was first called. - if (!closed) { - queue.addFirst(FDv2SourceResult.changeSet(makeFullChangeSet(), false)); - } - } - if (!queue.isEmpty()) { - return CompletableFuture.completedFuture(queue.removeFirst()); - } - CompletableFuture future = new CompletableFuture<>(); - pendingFutures.addLast(future); - if (closed) { - future.complete(FDv2SourceResult.shutdown()); - } - return CompletableFuture.anyOf(shutdownFuture, future).thenApply(r -> (FDv2SourceResult) r); + if (!initialSent.getAndSet(true)) { + // Send full changeset first, before any partial changesets that + // accumulated from update()/delete() calls made before next() was first called. + resultQueue.put(FDv2SourceResult.changeSet(makeFullChangeSet(), false)); } + return CompletableFuture.anyOf(shutdownFuture, resultQueue.take()) + .thenApply(r -> (FDv2SourceResult) r); } @Override public void close() { - synchronized (queueLock) { - if (closed) return; - closed = true; - } shutdownFuture.complete(FDv2SourceResult.shutdown()); closedSynchronizerInstance(this); } diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/TestDataV2WithClientTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/TestDataV2WithClientTest.java new file mode 100644 index 00000000..ac236afb --- /dev/null +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/TestDataV2WithClientTest.java @@ -0,0 +1,144 @@ +package com.launchdarkly.sdk.server.integrations; + +import com.launchdarkly.sdk.EvaluationDetail; +import com.launchdarkly.sdk.EvaluationReason; +import com.launchdarkly.sdk.LDContext; +import com.launchdarkly.sdk.LDValue; +import com.launchdarkly.sdk.server.Components; +import com.launchdarkly.sdk.server.LDClient; +import com.launchdarkly.sdk.server.LDConfig; +import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.ErrorInfo; +import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.State; + +import org.junit.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +@SuppressWarnings("javadoc") +public class TestDataV2WithClientTest { + private static final String SDK_KEY = "sdk-key"; + + private TestDataV2 td = TestDataV2.synchronizer(); + private LDConfig config = new LDConfig.Builder() + .dataSystem(new DataSystemBuilder().synchronizers(td)) + .events(Components.noEvents()) + .build(); + + @Test + public void initializesWithEmptyData() throws Exception { + try (LDClient client = new LDClient(SDK_KEY, config)) { + assertThat(client.isInitialized(), is(true)); + } + } + + @Test + public void initializesWithFlag() throws Exception { + td.update(td.flag("flag").on(true)); + + try (LDClient client = new LDClient(SDK_KEY, config)) { + assertThat(client.boolVariation("flag", LDContext.create("user"), false), is(true)); + } + } + + @Test + public void updatesFlag() throws Exception { + td.update(td.flag("flag").on(false)); + + try (LDClient client = new LDClient(SDK_KEY, config)) { + assertThat(client.boolVariation("flag", LDContext.create("user"), false), is(false)); + + td.update(td.flag("flag").on(true)); + + assertThat(client.boolVariation("flag", LDContext.create("user"), false), is(true)); + } + } + + @Test + public void deletesFlag() throws Exception { + td.update(td.flag("flag").on(true)); + + try (LDClient client = new LDClient(SDK_KEY, config)) { + assertThat(client.boolVariation("flag", LDContext.create("user"), false), is(true)); + + td.delete("flag"); + + final EvaluationDetail detail = client.boolVariationDetail("flag", LDContext.create("user"), false); + assertThat(detail.getValue(), is(false)); + assertThat(detail.isDefaultValue(), is(true)); + assertThat(detail.getReason().getErrorKind(), is(EvaluationReason.ErrorKind.FLAG_NOT_FOUND)); + } + } + + @Test + public void usesTargets() throws Exception { + td.update(td.flag("flag").fallthroughVariation(false).variationForUser("user1", true)); + + try (LDClient client = new LDClient(SDK_KEY, config)) { + assertThat(client.boolVariation("flag", LDContext.create("user1"), false), is(true)); + assertThat(client.boolVariation("flag", LDContext.create("user2"), false), is(false)); + } + } + + @Test + public void usesRules() throws Exception { + td.update(td.flag("flag").fallthroughVariation(false) + .ifMatch("name", LDValue.of("Lucy")).thenReturn(true) + .ifMatch("name", LDValue.of("Mina")).thenReturn(true)); + + try (LDClient client = new LDClient(SDK_KEY, config)) { + assertThat(client.boolVariation("flag", LDContext.builder("user1").name("Lucy").build(), false), is(true)); + assertThat(client.boolVariation("flag", LDContext.builder("user2").name("Mina").build(), false), is(true)); + assertThat(client.boolVariation("flag", LDContext.builder("user3").name("Quincy").build(), false), is(false)); + } + } + + @Test + public void nonBooleanFlags() throws Exception { + td.update(td.flag("flag").variations(LDValue.of("red"), LDValue.of("green"), LDValue.of("blue")) + .offVariation(0).fallthroughVariation(2) + .variationForUser("user1", 1) + .ifMatch("name", LDValue.of("Mina")).thenReturn(1)); + + try (LDClient client = new LDClient(SDK_KEY, config)) { + assertThat(client.stringVariation("flag", LDContext.builder("user1").name("Lucy").build(), ""), equalTo("green")); + assertThat(client.stringVariation("flag", LDContext.builder("user2").name("Mina").build(), ""), equalTo("green")); + assertThat(client.stringVariation("flag", LDContext.builder("user3").name("Quincy").build(), ""), equalTo("blue")); + + td.update(td.flag("flag").on(false)); + + assertThat(client.stringVariation("flag", LDContext.builder("user1").name("Lucy").build(), ""), equalTo("red")); + } + } + + @Test + public void canUpdateStatus() throws Exception { + try (LDClient client = new LDClient(SDK_KEY, config)) { + assertThat(client.getDataSourceStatusProvider().getStatus().getState(), equalTo(State.VALID)); + + ErrorInfo ei = ErrorInfo.fromHttpError(500); + td.updateStatus(State.INTERRUPTED, ei); + + assertThat(client.getDataSourceStatusProvider().getStatus().getState(), equalTo(State.INTERRUPTED)); + assertThat(client.getDataSourceStatusProvider().getStatus().getLastError(), equalTo(ei)); + } + } + + @Test + public void dataSourcePropagatesToMultipleClients() throws Exception { + td.update(td.flag("flag").on(true)); + + try (LDClient client1 = new LDClient(SDK_KEY, config)) { + try (LDClient client2 = new LDClient(SDK_KEY, config)) { + assertThat(client1.boolVariation("flag", LDContext.create("user"), false), is(true)); + assertThat(client2.boolVariation("flag", LDContext.create("user"), false), is(true)); + + td.update(td.flag("flag").on(false)); + + assertThat(client1.boolVariation("flag", LDContext.create("user"), false), is(false)); + assertThat(client2.boolVariation("flag", LDContext.create("user"), false), is(false)); + } + } + } +} From ec2f44419532bb9d8570a8917968eccdcbe4f335 Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Tue, 3 Feb 2026 16:32:46 -0800 Subject: [PATCH 2/3] Fix tests --- .../server/integrations/TestDataV2Test.java | 77 ++++++++++++++++--- 1 file changed, 66 insertions(+), 11 deletions(-) diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/TestDataV2Test.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/TestDataV2Test.java index ce0b3a6d..cd1dd13c 100644 --- a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/TestDataV2Test.java +++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/TestDataV2Test.java @@ -52,6 +52,41 @@ public class TestDataV2Test { private final CapturingDataSourceUpdates updates = new CapturingDataSourceUpdates(); + /** + * Helper class that consumes FDv2SourceResult objects in a background thread. + * This is necessary because update() and delete() block until results are closed, + * so we can't call sync.next() on the same thread. + */ + private static class ResultConsumer { + private final BlockingQueue results = new LinkedBlockingQueue<>(); + private final Thread consumerThread; + private volatile boolean stopped = false; + + ResultConsumer(Synchronizer sync) { + consumerThread = new Thread(() -> { + while (!stopped) { + try { + FDv2SourceResult result = sync.next().get(5, TimeUnit.SECONDS); + result.close(); // Close immediately to unblock put() + results.put(result); + } catch (Exception e) { + break; + } + } + }); + consumerThread.start(); + } + + FDv2SourceResult next() throws Exception { + return results.poll(5, TimeUnit.SECONDS); + } + + void stop() { + stopped = true; + consumerThread.interrupt(); + } + } + private DataSourceBuildInputs dataSourceBuildInputs() { ClientContext context = clientContext("", new LDConfig.Builder().build(), updates); SelectorSource selectorSource = () -> Selector.EMPTY; @@ -70,8 +105,9 @@ private DataSourceBuildInputs dataSourceBuildInputs() { public void initializesWithEmptyData() throws Exception { TestDataV2 td = TestDataV2.synchronizer(); Synchronizer sync = td.build(dataSourceBuildInputs()); + ResultConsumer consumer = new ResultConsumer(sync); - FDv2SourceResult result = sync.next().get(5, TimeUnit.SECONDS); + FDv2SourceResult result = consumer.next(); assertThat(result.getResultType(), equalTo(FDv2SourceResult.ResultType.CHANGE_SET)); ChangeSet changeSet = result.getChangeSet(); @@ -80,6 +116,8 @@ public void initializesWithEmptyData() throws Exception { assertThat(changeSet.getData(), iterableWithSize(1)); assertThat(get(changeSet.getData(), 0).getKey(), equalTo(DataModel.FEATURES)); assertThat(get(changeSet.getData(), 0).getValue().getItems(), emptyIterable()); + + consumer.stop(); } @Test @@ -89,7 +127,9 @@ public void initializesWithFlags() throws Exception { .update(td.flag("flag2").on(false)); Synchronizer sync = td.build(dataSourceBuildInputs()); - FDv2SourceResult result = sync.next().get(5, TimeUnit.SECONDS); + ResultConsumer consumer = new ResultConsumer(sync); + + FDv2SourceResult result = consumer.next(); assertThat(result.getResultType(), equalTo(FDv2SourceResult.ResultType.CHANGE_SET)); ChangeSet changeSet = result.getChangeSet(); @@ -113,20 +153,23 @@ public void initializesWithFlags() throws Exception { assertJsonEquals(flagJson(expectedFlag1, 1), flagJson(flag1)); assertJsonEquals(flagJson(expectedFlag2, 1), flagJson(flag2)); + + consumer.stop(); } @Test public void addsFlag() throws Exception { TestDataV2 td = TestDataV2.synchronizer(); Synchronizer sync = td.build(dataSourceBuildInputs()); + ResultConsumer consumer = new ResultConsumer(sync); - FDv2SourceResult initResult = sync.next().get(5, TimeUnit.SECONDS); + FDv2SourceResult initResult = consumer.next(); assertThat(initResult.getResultType(), equalTo(FDv2SourceResult.ResultType.CHANGE_SET)); assertThat(initResult.getChangeSet().getType(), equalTo(ChangeSetType.Full)); td.update(td.flag("flag1").on(true)); - FDv2SourceResult updateResult = sync.next().get(5, TimeUnit.SECONDS); + FDv2SourceResult updateResult = consumer.next(); assertThat(updateResult.getResultType(), equalTo(FDv2SourceResult.ResultType.CHANGE_SET)); ChangeSet changeSet = updateResult.getChangeSet(); assertThat(changeSet.getType(), equalTo(ChangeSetType.Partial)); @@ -141,6 +184,8 @@ public void addsFlag() throws Exception { ModelBuilders.FlagBuilder expectedFlag = flagBuilder("flag1").version(1).salt("") .on(true).offVariation(1).fallthroughVariation(0).variations(true, false); assertJsonEquals(flagJson(expectedFlag, 1), flagJson(flag1)); + + consumer.stop(); } @Test @@ -152,12 +197,14 @@ public void updatesFlag() throws Exception { .ifMatch("name", LDValue.of("Lucy")).thenReturn(true)); Synchronizer sync = td.build(dataSourceBuildInputs()); - FDv2SourceResult initResult = sync.next().get(5, TimeUnit.SECONDS); + ResultConsumer consumer = new ResultConsumer(sync); + + FDv2SourceResult initResult = consumer.next(); assertThat(initResult.getResultType(), equalTo(FDv2SourceResult.ResultType.CHANGE_SET)); td.update(td.flag("flag1").on(true)); - FDv2SourceResult updateResult = sync.next().get(5, TimeUnit.SECONDS); + FDv2SourceResult updateResult = consumer.next(); ChangeSet changeSet = updateResult.getChangeSet(); Map items = ImmutableMap.copyOf(get(changeSet.getData(), 0).getValue().getItems()); ItemDescriptor flag1 = items.get("flag1"); @@ -168,29 +215,33 @@ public void updatesFlag() throws Exception { .addTarget(0, "a").addContextTarget(ContextKind.DEFAULT, 0) .addRule("rule0", 0, "{\"contextKind\":\"user\",\"attribute\":\"name\",\"op\":\"in\",\"values\":[\"Lucy\"]}"); assertJsonEquals(flagJson(expectedFlag, 2), flagJson(flag1)); + + consumer.stop(); } @Test public void deletesFlag() throws Exception { TestDataV2 td = TestDataV2.synchronizer(); Synchronizer sync = td.build(dataSourceBuildInputs()); + ResultConsumer consumer = new ResultConsumer(sync); - sync.next().get(5, TimeUnit.SECONDS); + consumer.next(); // Consume initial result td.update(td.flag("foo").on(false).valueForAll(LDValue.of("bar"))); - FDv2SourceResult addResult = sync.next().get(5, TimeUnit.SECONDS); + FDv2SourceResult addResult = consumer.next(); assertThat(addResult.getChangeSet().getType(), equalTo(ChangeSetType.Partial)); Map addItems = ImmutableMap.copyOf(get(addResult.getChangeSet().getData(), 0).getValue().getItems()); assertThat(addItems.get("foo").getVersion(), equalTo(1)); assertThat(addItems.get("foo").getItem(), notNullValue()); td.delete("foo"); - FDv2SourceResult deleteResult = sync.next().get(5, TimeUnit.SECONDS); + FDv2SourceResult deleteResult = consumer.next(); assertThat(deleteResult.getChangeSet().getType(), equalTo(ChangeSetType.Partial)); Map deleteItems = ImmutableMap.copyOf(get(deleteResult.getChangeSet().getData(), 0).getValue().getItems()); assertThat(deleteItems.get("foo").getVersion(), equalTo(2)); assertThat(deleteItems.get("foo").getItem(), nullValue()); + consumer.stop(); sync.close(); } @@ -248,17 +299,21 @@ private void verifyFlag( TestDataV2 td = TestDataV2.synchronizer(); Synchronizer sync = td.build(dataSourceBuildInputs()); - sync.next().get(5, TimeUnit.SECONDS); + ResultConsumer consumer = new ResultConsumer(sync); + + consumer.next(); // Consume initial result td.update(configureFlag.apply(td.flag("flagkey"))); - FDv2SourceResult result = sync.next().get(5, TimeUnit.SECONDS); + FDv2SourceResult result = consumer.next(); assertThat(result.getResultType(), equalTo(FDv2SourceResult.ResultType.CHANGE_SET)); ChangeSet changeSet = result.getChangeSet(); Map items = ImmutableMap.copyOf(get(changeSet.getData(), 0).getValue().getItems()); ItemDescriptor flag = items.get("flagkey"); assertThat(flag.getVersion(), equalTo(1)); assertJsonEquals(flagJson(expectedFlag, 1), flagJson(flag)); + + consumer.stop(); } private static String flagJson(ModelBuilders.FlagBuilder flagBuilder, int version) { From 425a9e7a7ce670514b9e30174f458dc326ec0c3a Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Tue, 3 Feb 2026 16:36:11 -0800 Subject: [PATCH 3/3] Remove unused imports. --- .../com/launchdarkly/sdk/server/integrations/TestDataV2.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/TestDataV2.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/TestDataV2.java index 143ad7f8..21af15fc 100644 --- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/TestDataV2.java +++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/TestDataV2.java @@ -18,12 +18,10 @@ import java.util.Collections; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; import java.util.HashMap; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.BooleanSupplier; /** * A mechanism for providing dynamically updatable feature flag state as a {@link Synchronizer}