diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java
index a7d1c874..70475e23 100644
--- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java
+++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/FDv2DataSource.java
@@ -159,38 +159,39 @@ private void runInitializers() {
Initializer initializer = sourceManager.getNextInitializerAndSetActive();
while(initializer != null) {
try {
- FDv2SourceResult result = initializer.run().get();
- switch (result.getResultType()) {
- case CHANGE_SET:
- dataSourceUpdates.apply(result.getChangeSet());
- anyDataReceived = true;
- if (!result.getChangeSet().getSelector().isEmpty()) {
- // We received data with a selector, so we end the initialization process.
- dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null);
- startFuture.complete(true);
- return;
- }
- break;
- case STATUS:
- FDv2SourceResult.Status status = result.getStatus();
- switch(status.getState()) {
- case INTERRUPTED:
- case TERMINAL_ERROR:
- // The data source updates handler will filter the state during initializing, but this
- // will make the error information available.
- dataSourceUpdates.updateStatus(
- // While the error was terminal to the individual initializer, it isn't terminal
- // to the data source as a whole.
- DataSourceStatusProvider.State.INTERRUPTED,
- status.getErrorInfo());
- break;
- case SHUTDOWN:
- case GOODBYE:
- // We don't need to inform anyone of these statuses.
- logger.debug("Ignoring status {} from initializer", result.getStatus().getState());
- break;
- }
- break;
+ try(FDv2SourceResult result = initializer.run().get()) {
+ switch (result.getResultType()) {
+ case CHANGE_SET:
+ dataSourceUpdates.apply(result.getChangeSet());
+ anyDataReceived = true;
+ if (!result.getChangeSet().getSelector().isEmpty()) {
+ // We received data with a selector, so we end the initialization process.
+ dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null);
+ startFuture.complete(true);
+ return;
+ }
+ break;
+ case STATUS:
+ FDv2SourceResult.Status status = result.getStatus();
+ switch (status.getState()) {
+ case INTERRUPTED:
+ case TERMINAL_ERROR:
+ // The data source updates handler will filter the state during initializing, but this
+ // will make the error information available.
+ dataSourceUpdates.updateStatus(
+ // While the error was terminal to the individual initializer, it isn't terminal
+ // to the data source as a whole.
+ DataSourceStatusProvider.State.INTERRUPTED,
+ status.getErrorInfo());
+ break;
+ case SHUTDOWN:
+ case GOODBYE:
+ // We don't need to inform anyone of these statuses.
+ logger.debug("Ignoring status {} from initializer", result.getStatus().getState());
+ break;
+ }
+ break;
+ }
}
} catch (ExecutionException | InterruptedException | CancellationException e) {
// We don't expect these conditions to happen in practice.
@@ -205,7 +206,7 @@ private void runInitializers() {
new Date().toInstant()));
logger.warn("Error running initializer: {}", e.toString());
}
- initializer = sourceManager.getNextInitializerAndSetActive();
+ initializer = sourceManager.getNextInitializerAndSetActive();
}
// We received data without a selector, and we have exhausted initializers, so we are going to
// consider ourselves initialized.
@@ -286,55 +287,56 @@ private void runSynchronizers() {
continue;
}
- FDv2SourceResult result = (FDv2SourceResult) res;
- conditions.inform(result);
+ try (FDv2SourceResult result = (FDv2SourceResult) res) {
+ conditions.inform(result);
- switch (result.getResultType()) {
- case CHANGE_SET:
- dataSourceUpdates.apply(result.getChangeSet());
- dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null);
- // This could have been completed by any data source. But if it has not been completed before
- // now, then we complete it.
- startFuture.complete(true);
- break;
- case STATUS:
- FDv2SourceResult.Status status = result.getStatus();
- switch (status.getState()) {
- case INTERRUPTED:
- // Handled by conditions.
- dataSourceUpdates.updateStatus(
- DataSourceStatusProvider.State.INTERRUPTED,
- status.getErrorInfo());
- break;
- case SHUTDOWN:
- // We should be overall shutting down.
- logger.debug("Synchronizer shutdown.");
- return;
- case TERMINAL_ERROR:
- sourceManager.blockCurrentSynchronizer();
- running = false;
- dataSourceUpdates.updateStatus(
- DataSourceStatusProvider.State.INTERRUPTED,
- status.getErrorInfo());
- break;
- case GOODBYE:
- // We let the synchronizer handle this internally.
- break;
- }
- break;
- }
- // We have been requested to fall back to FDv1. We handle whatever message was associated,
- // close the synchronizer, and then fallback.
- // Only trigger fallback if we're not already running the FDv1 fallback synchronizer.
- if (
- result.isFdv1Fallback() &&
- sourceManager.hasFDv1Fallback() &&
- // This shouldn't happen in practice, an FDv1 source shouldn't request fallback
- // to FDv1. But if it does, then we will discard its request.
- !sourceManager.isCurrentSynchronizerFDv1Fallback()
- ) {
- sourceManager.fdv1Fallback();
- running = false;
+ switch (result.getResultType()) {
+ case CHANGE_SET:
+ dataSourceUpdates.apply(result.getChangeSet());
+ dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null);
+ // This could have been completed by any data source. But if it has not been completed before
+ // now, then we complete it.
+ startFuture.complete(true);
+ break;
+ case STATUS:
+ FDv2SourceResult.Status status = result.getStatus();
+ switch (status.getState()) {
+ case INTERRUPTED:
+ // Handled by conditions.
+ dataSourceUpdates.updateStatus(
+ DataSourceStatusProvider.State.INTERRUPTED,
+ status.getErrorInfo());
+ break;
+ case SHUTDOWN:
+ // We should be overall shutting down.
+ logger.debug("Synchronizer shutdown.");
+ return;
+ case TERMINAL_ERROR:
+ sourceManager.blockCurrentSynchronizer();
+ running = false;
+ dataSourceUpdates.updateStatus(
+ DataSourceStatusProvider.State.INTERRUPTED,
+ status.getErrorInfo());
+ break;
+ case GOODBYE:
+ // We let the synchronizer handle this internally.
+ break;
+ }
+ break;
+ }
+ // We have been requested to fall back to FDv1. We handle whatever message was associated,
+ // close the synchronizer, and then fallback.
+ // Only trigger fallback if we're not already running the FDv1 fallback synchronizer.
+ if (
+ result.isFdv1Fallback() &&
+ sourceManager.hasFDv1Fallback() &&
+ // This shouldn't happen in practice, an FDv1 source shouldn't request fallback
+ // to FDv1. But if it does, then we will discard its request.
+ !sourceManager.isCurrentSynchronizerFDv1Fallback()
+ ) {
+ sourceManager.fdv1Fallback();
+ running = false;
+ }
}
}
}
diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/FDv2SourceResult.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/FDv2SourceResult.java
index 6f440c63..acd5e121 100644
--- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/FDv2SourceResult.java
+++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/datasources/FDv2SourceResult.java
@@ -1,14 +1,19 @@
package com.launchdarkly.sdk.server.datasources;
+
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes;
+import java.io.Closeable;
+import java.util.function.Function;
+
/**
* This type is currently experimental and not subject to semantic versioning.
*
* The result type for FDv2 initializers and synchronizers. An FDv2 initializer produces a single result, while
* an FDv2 synchronizer produces a stream of results.
*/
-public class FDv2SourceResult {
+public class FDv2SourceResult implements Closeable {
+
public enum State {
/**
* The data source has encountered an interruption and will attempt to reconnect. This isn't intended to be used
@@ -67,49 +72,86 @@ public Status(State state, DataSourceStatusProvider.ErrorInfo errorInfo) {
private final Status status;
private final ResultType resultType;
-
+
private final boolean fdv1Fallback;
- private FDv2SourceResult(DataStoreTypes.ChangeSet changeSet, Status status, ResultType resultType, boolean fdv1Fallback) {
+ private final Function completionCallback;
+
+ private FDv2SourceResult(
+ DataStoreTypes.ChangeSet changeSet,
+ Status status, ResultType resultType,
+ boolean fdv1Fallback,
+ Function completionCallback
+ ) {
this.changeSet = changeSet;
this.status = status;
this.resultType = resultType;
this.fdv1Fallback = fdv1Fallback;
+ this.completionCallback = completionCallback;
}
public static FDv2SourceResult interrupted(DataSourceStatusProvider.ErrorInfo errorInfo, boolean fdv1Fallback) {
+ return interrupted(errorInfo, fdv1Fallback, null);
+ }
+
+ public static FDv2SourceResult interrupted(DataSourceStatusProvider.ErrorInfo errorInfo, boolean fdv1Fallback, Function completionCallback) {
return new FDv2SourceResult(
- null,
- new Status(State.INTERRUPTED, errorInfo),
- ResultType.STATUS,
- fdv1Fallback);
+ null,
+ new Status(State.INTERRUPTED, errorInfo),
+ ResultType.STATUS,
+ fdv1Fallback,
+ completionCallback);
}
public static FDv2SourceResult shutdown() {
+ return shutdown(null);
+ }
+
+ public static FDv2SourceResult shutdown(Function completionCallback) {
return new FDv2SourceResult(null,
- new Status(State.SHUTDOWN, null),
- ResultType.STATUS,
- false);
+ new Status(State.SHUTDOWN, null),
+ ResultType.STATUS,
+ false,
+ completionCallback);
}
public static FDv2SourceResult terminalError(DataSourceStatusProvider.ErrorInfo errorInfo, boolean fdv1Fallback) {
+ return terminalError(errorInfo, fdv1Fallback, null);
+ }
+
+ public static FDv2SourceResult terminalError(DataSourceStatusProvider.ErrorInfo errorInfo, boolean fdv1Fallback, Function completionCallback) {
return new FDv2SourceResult(null,
- new Status(State.TERMINAL_ERROR, errorInfo),
- ResultType.STATUS,
- fdv1Fallback);
+ new Status(State.TERMINAL_ERROR, errorInfo),
+ ResultType.STATUS,
+ fdv1Fallback,
+ completionCallback);
}
public static FDv2SourceResult changeSet(DataStoreTypes.ChangeSet changeSet, boolean fdv1Fallback) {
- return new FDv2SourceResult(changeSet, null, ResultType.CHANGE_SET, fdv1Fallback);
+ return changeSet(changeSet, fdv1Fallback, null);
+ }
+
+ public static FDv2SourceResult changeSet(DataStoreTypes.ChangeSet changeSet, boolean fdv1Fallback, Function completionCallback) {
+ return new FDv2SourceResult(
+ changeSet,
+ null,
+ ResultType.CHANGE_SET,
+ fdv1Fallback,
+ completionCallback);
}
public static FDv2SourceResult goodbye(String reason, boolean fdv1Fallback) {
+ return goodbye(reason, fdv1Fallback, null);
+ }
+
+ public static FDv2SourceResult goodbye(String reason, boolean fdv1Fallback, Function completionCallback) {
// TODO: Goodbye reason.
return new FDv2SourceResult(
- null,
- new Status(State.GOODBYE, null),
- ResultType.STATUS,
- fdv1Fallback);
+ null,
+ new Status(State.GOODBYE, null),
+ ResultType.STATUS,
+ fdv1Fallback,
+ completionCallback);
}
public ResultType getResultType() {
@@ -127,4 +169,31 @@ public DataStoreTypes.ChangeSet getChangeSet() {
public boolean isFdv1Fallback() {
return fdv1Fallback;
}
+
+ /**
+ * Creates a new result wrapping this one with an additional completion callback.
+ *
+ * The new completion callback will be invoked when the result is closed, followed by
+ * the original completion callback (if any).
+ *
+ * @param newCallback the completion callback to add
+ * @return a new FDv2SourceResult with the added completion callback
+ */
+ public FDv2SourceResult withCompletion(Function newCallback) {
+ Function combinedCallback = v -> {
+ newCallback.apply(null);
+ if (completionCallback != null) {
+ completionCallback.apply(null);
+ }
+ return null;
+ };
+ return new FDv2SourceResult(changeSet, status, resultType, fdv1Fallback, combinedCallback);
+ }
+
+ @Override
+ public void close() {
+ if(completionCallback != null) {
+ completionCallback.apply(null);
+ }
+ }
}
diff --git a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/TestDataV2.java b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/TestDataV2.java
index 1e17e8e0..21af15fc 100644
--- a/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/TestDataV2.java
+++ b/lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/integrations/TestDataV2.java
@@ -1,32 +1,27 @@
package com.launchdarkly.sdk.server.integrations;
import com.google.common.collect.ImmutableMap;
+import com.launchdarkly.sdk.internal.collections.IterableAsyncQueue;
import com.launchdarkly.sdk.internal.fdv2.sources.Selector;
import com.launchdarkly.sdk.server.DataModel;
import com.launchdarkly.sdk.server.datasources.FDv2SourceResult;
import com.launchdarkly.sdk.server.datasources.Synchronizer;
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider;
-import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.State;
import com.launchdarkly.sdk.server.subsystems.DataSourceBuildInputs;
import com.launchdarkly.sdk.server.subsystems.DataSourceBuilder;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSet;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSetType;
-import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.DataKind;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ItemDescriptor;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.KeyedItems;
import java.time.Instant;
import java.util.AbstractMap;
import java.util.Collections;
-import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
-import java.util.TreeMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.function.BooleanSupplier;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* A mechanism for providing dynamically updatable feature flag state as a {@link Synchronizer}
@@ -149,7 +144,7 @@ public TestDataV2 delete(String key) {
*
* Any subsequent changes to this {@link TestData.FlagBuilder} instance do not affect the test data,
* unless you call {@link #update(TestData.FlagBuilder)} again.
- *
+ *
* @param flagBuilder a flag configuration builder
* @return the same {@code TestDataV2} instance
* @see #flag(String)
@@ -158,7 +153,7 @@ public TestDataV2 update(TestData.FlagBuilder flagBuilder) {
String key = flagBuilder.key;
TestData.FlagBuilder clonedBuilder = new TestData.FlagBuilder(flagBuilder);
ItemDescriptor newItem = null;
-
+
synchronized (lock) {
ItemDescriptor oldItem = currentFlags.get(key);
int oldVersion = oldItem == null ? 0 : oldItem.getVersion();
@@ -166,7 +161,7 @@ public TestDataV2 update(TestData.FlagBuilder flagBuilder) {
currentFlags.put(key, newItem);
currentBuilders.put(key, clonedBuilder);
}
-
+
pushToSynchronizers(FDv2SourceResult.changeSet(makePartialChangeSet(key, newItem), false));
return this;
@@ -211,45 +206,6 @@ public TestDataV2 updateStatus(DataSourceStatusProvider.State newState, DataSour
}
return this;
}
-
- /**
- * Waits until the given condition returns true or the timeout elapses.
- *
- * Use this after calling {@link #update(TestData.FlagBuilder)}, {@link #delete(String)}, or
- * {@link #updateStatus(DataSourceStatusProvider.State, DataSourceStatusProvider.ErrorInfo)}
- * when using TestDataV2 with an {@code LDClient}, so that your assertions see the updated state.
- * The synchronizer may apply updates asynchronously.
- *
- * @param timeout maximum time to wait
- * @param unit unit for the timeout
- * @param condition condition to poll; when it returns true, this method returns
- * @throws InterruptedException if the current thread is interrupted while waiting
- * @throws AssertionError if the condition does not become true before the timeout
- */
- public void awaitPropagation(long timeout, TimeUnit unit, BooleanSupplier condition)
- throws InterruptedException {
- long deadlineMs = System.currentTimeMillis() + unit.toMillis(timeout);
- while (System.currentTimeMillis() < deadlineMs) {
- if (condition.getAsBoolean()) {
- return;
- }
- Thread.sleep(20);
- }
- throw new AssertionError("Update did not propagate within " + timeout + " " + unit);
- }
-
- /**
- * Waits until the given condition returns true or the default timeout (5 seconds) elapses.
- *
- * Equivalent to {@link #awaitPropagation(long, TimeUnit, BooleanSupplier)} with a 5-second timeout.
- *
- * @param condition condition to poll; when it returns true, this method returns
- * @throws InterruptedException if the current thread is interrupted while waiting
- * @throws AssertionError if the condition does not become true before the timeout
- */
- public void awaitPropagation(BooleanSupplier condition) throws InterruptedException {
- awaitPropagation(5, TimeUnit.SECONDS, condition);
- }
/**
* Configures whether test data should be persisted to persistent stores.
@@ -289,7 +245,12 @@ public Synchronizer build(DataSourceBuildInputs context) {
private void pushToSynchronizers(FDv2SourceResult result) {
for (TestDataV2SynchronizerImpl sync : synchronizerInstances) {
- sync.put(result);
+ CompletableFuture completion = new CompletableFuture<>();
+ FDv2SourceResult wrappedResult = result.withCompletion(v -> {
+ completion.complete(null);
+ return null;
+ });
+ sync.put(wrappedResult, completion);
}
}
@@ -328,54 +289,33 @@ private void closedSynchronizerInstance(TestDataV2SynchronizerImpl synchronizer)
* Synchronizer implementation that queues initial and incremental change sets from TestDataV2.
*/
private final class TestDataV2SynchronizerImpl implements Synchronizer {
- private final Object queueLock = new Object();
- private final LinkedList queue = new LinkedList<>();
- private final LinkedList> pendingFutures = new LinkedList<>();
+ private final IterableAsyncQueue resultQueue = new IterableAsyncQueue<>();
private final CompletableFuture shutdownFuture = new CompletableFuture<>();
- private volatile boolean closed;
- private volatile boolean initialSent;
- void put(FDv2SourceResult result) {
- synchronized (queueLock) {
- if (closed) return;
- CompletableFuture waiter = pendingFutures.pollFirst();
- if (waiter != null) {
- waiter.complete(result);
- } else {
- queue.addLast(result);
- }
+ private final AtomicBoolean initialSent = new AtomicBoolean(false);
+
+ void put(FDv2SourceResult result, CompletableFuture completion) {
+ resultQueue.put(result);
+ try {
+ CompletableFuture.anyOf(completion, shutdownFuture).get();
+ } catch (Exception e) {
+ // Completion interrupted or canceled
}
}
@Override
public CompletableFuture next() {
- synchronized (queueLock) {
- if (!initialSent) {
- initialSent = true;
- // Prepend full changeset so it is delivered before any partial changesets that
- // accumulated from update()/delete() calls made before next() was first called.
- if (!closed) {
- queue.addFirst(FDv2SourceResult.changeSet(makeFullChangeSet(), false));
- }
- }
- if (!queue.isEmpty()) {
- return CompletableFuture.completedFuture(queue.removeFirst());
- }
- CompletableFuture future = new CompletableFuture<>();
- pendingFutures.addLast(future);
- if (closed) {
- future.complete(FDv2SourceResult.shutdown());
- }
- return CompletableFuture.anyOf(shutdownFuture, future).thenApply(r -> (FDv2SourceResult) r);
+ if (!initialSent.getAndSet(true)) {
+ // Send full changeset first, before any partial changesets that
+ // accumulated from update()/delete() calls made before next() was first called.
+ resultQueue.put(FDv2SourceResult.changeSet(makeFullChangeSet(), false));
}
+ return CompletableFuture.anyOf(shutdownFuture, resultQueue.take())
+ .thenApply(r -> (FDv2SourceResult) r);
}
@Override
public void close() {
- synchronized (queueLock) {
- if (closed) return;
- closed = true;
- }
shutdownFuture.complete(FDv2SourceResult.shutdown());
closedSynchronizerInstance(this);
}
diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/TestDataV2Test.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/TestDataV2Test.java
index ce0b3a6d..cd1dd13c 100644
--- a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/TestDataV2Test.java
+++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/TestDataV2Test.java
@@ -52,6 +52,41 @@ public class TestDataV2Test {
private final CapturingDataSourceUpdates updates = new CapturingDataSourceUpdates();
+ /**
+ * Helper class that consumes FDv2SourceResult objects in a background thread.
+ * This is necessary because update() and delete() block until results are closed,
+ * so we can't call sync.next() on the same thread.
+ */
+ private static class ResultConsumer {
+ private final BlockingQueue results = new LinkedBlockingQueue<>();
+ private final Thread consumerThread;
+ private volatile boolean stopped = false;
+
+ ResultConsumer(Synchronizer sync) {
+ consumerThread = new Thread(() -> {
+ while (!stopped) {
+ try {
+ FDv2SourceResult result = sync.next().get(5, TimeUnit.SECONDS);
+ result.close(); // Close immediately to unblock put()
+ results.put(result);
+ } catch (Exception e) {
+ break;
+ }
+ }
+ });
+ consumerThread.start();
+ }
+
+ FDv2SourceResult next() throws Exception {
+ return results.poll(5, TimeUnit.SECONDS);
+ }
+
+ void stop() {
+ stopped = true;
+ consumerThread.interrupt();
+ }
+ }
+
private DataSourceBuildInputs dataSourceBuildInputs() {
ClientContext context = clientContext("", new LDConfig.Builder().build(), updates);
SelectorSource selectorSource = () -> Selector.EMPTY;
@@ -70,8 +105,9 @@ private DataSourceBuildInputs dataSourceBuildInputs() {
public void initializesWithEmptyData() throws Exception {
TestDataV2 td = TestDataV2.synchronizer();
Synchronizer sync = td.build(dataSourceBuildInputs());
+ ResultConsumer consumer = new ResultConsumer(sync);
- FDv2SourceResult result = sync.next().get(5, TimeUnit.SECONDS);
+ FDv2SourceResult result = consumer.next();
assertThat(result.getResultType(), equalTo(FDv2SourceResult.ResultType.CHANGE_SET));
ChangeSet changeSet = result.getChangeSet();
@@ -80,6 +116,8 @@ public void initializesWithEmptyData() throws Exception {
assertThat(changeSet.getData(), iterableWithSize(1));
assertThat(get(changeSet.getData(), 0).getKey(), equalTo(DataModel.FEATURES));
assertThat(get(changeSet.getData(), 0).getValue().getItems(), emptyIterable());
+
+ consumer.stop();
}
@Test
@@ -89,7 +127,9 @@ public void initializesWithFlags() throws Exception {
.update(td.flag("flag2").on(false));
Synchronizer sync = td.build(dataSourceBuildInputs());
- FDv2SourceResult result = sync.next().get(5, TimeUnit.SECONDS);
+ ResultConsumer consumer = new ResultConsumer(sync);
+
+ FDv2SourceResult result = consumer.next();
assertThat(result.getResultType(), equalTo(FDv2SourceResult.ResultType.CHANGE_SET));
ChangeSet changeSet = result.getChangeSet();
@@ -113,20 +153,23 @@ public void initializesWithFlags() throws Exception {
assertJsonEquals(flagJson(expectedFlag1, 1), flagJson(flag1));
assertJsonEquals(flagJson(expectedFlag2, 1), flagJson(flag2));
+
+ consumer.stop();
}
@Test
public void addsFlag() throws Exception {
TestDataV2 td = TestDataV2.synchronizer();
Synchronizer sync = td.build(dataSourceBuildInputs());
+ ResultConsumer consumer = new ResultConsumer(sync);
- FDv2SourceResult initResult = sync.next().get(5, TimeUnit.SECONDS);
+ FDv2SourceResult initResult = consumer.next();
assertThat(initResult.getResultType(), equalTo(FDv2SourceResult.ResultType.CHANGE_SET));
assertThat(initResult.getChangeSet().getType(), equalTo(ChangeSetType.Full));
td.update(td.flag("flag1").on(true));
- FDv2SourceResult updateResult = sync.next().get(5, TimeUnit.SECONDS);
+ FDv2SourceResult updateResult = consumer.next();
assertThat(updateResult.getResultType(), equalTo(FDv2SourceResult.ResultType.CHANGE_SET));
ChangeSet changeSet = updateResult.getChangeSet();
assertThat(changeSet.getType(), equalTo(ChangeSetType.Partial));
@@ -141,6 +184,8 @@ public void addsFlag() throws Exception {
ModelBuilders.FlagBuilder expectedFlag = flagBuilder("flag1").version(1).salt("")
.on(true).offVariation(1).fallthroughVariation(0).variations(true, false);
assertJsonEquals(flagJson(expectedFlag, 1), flagJson(flag1));
+
+ consumer.stop();
}
@Test
@@ -152,12 +197,14 @@ public void updatesFlag() throws Exception {
.ifMatch("name", LDValue.of("Lucy")).thenReturn(true));
Synchronizer sync = td.build(dataSourceBuildInputs());
- FDv2SourceResult initResult = sync.next().get(5, TimeUnit.SECONDS);
+ ResultConsumer consumer = new ResultConsumer(sync);
+
+ FDv2SourceResult initResult = consumer.next();
assertThat(initResult.getResultType(), equalTo(FDv2SourceResult.ResultType.CHANGE_SET));
td.update(td.flag("flag1").on(true));
- FDv2SourceResult updateResult = sync.next().get(5, TimeUnit.SECONDS);
+ FDv2SourceResult updateResult = consumer.next();
ChangeSet changeSet = updateResult.getChangeSet();
Map items = ImmutableMap.copyOf(get(changeSet.getData(), 0).getValue().getItems());
ItemDescriptor flag1 = items.get("flag1");
@@ -168,29 +215,33 @@ public void updatesFlag() throws Exception {
.addTarget(0, "a").addContextTarget(ContextKind.DEFAULT, 0)
.addRule("rule0", 0, "{\"contextKind\":\"user\",\"attribute\":\"name\",\"op\":\"in\",\"values\":[\"Lucy\"]}");
assertJsonEquals(flagJson(expectedFlag, 2), flagJson(flag1));
+
+ consumer.stop();
}
@Test
public void deletesFlag() throws Exception {
TestDataV2 td = TestDataV2.synchronizer();
Synchronizer sync = td.build(dataSourceBuildInputs());
+ ResultConsumer consumer = new ResultConsumer(sync);
- sync.next().get(5, TimeUnit.SECONDS);
+ consumer.next(); // Consume initial result
td.update(td.flag("foo").on(false).valueForAll(LDValue.of("bar")));
- FDv2SourceResult addResult = sync.next().get(5, TimeUnit.SECONDS);
+ FDv2SourceResult addResult = consumer.next();
assertThat(addResult.getChangeSet().getType(), equalTo(ChangeSetType.Partial));
Map addItems = ImmutableMap.copyOf(get(addResult.getChangeSet().getData(), 0).getValue().getItems());
assertThat(addItems.get("foo").getVersion(), equalTo(1));
assertThat(addItems.get("foo").getItem(), notNullValue());
td.delete("foo");
- FDv2SourceResult deleteResult = sync.next().get(5, TimeUnit.SECONDS);
+ FDv2SourceResult deleteResult = consumer.next();
assertThat(deleteResult.getChangeSet().getType(), equalTo(ChangeSetType.Partial));
Map deleteItems = ImmutableMap.copyOf(get(deleteResult.getChangeSet().getData(), 0).getValue().getItems());
assertThat(deleteItems.get("foo").getVersion(), equalTo(2));
assertThat(deleteItems.get("foo").getItem(), nullValue());
+ consumer.stop();
sync.close();
}
@@ -248,17 +299,21 @@ private void verifyFlag(
TestDataV2 td = TestDataV2.synchronizer();
Synchronizer sync = td.build(dataSourceBuildInputs());
- sync.next().get(5, TimeUnit.SECONDS);
+ ResultConsumer consumer = new ResultConsumer(sync);
+
+ consumer.next(); // Consume initial result
td.update(configureFlag.apply(td.flag("flagkey")));
- FDv2SourceResult result = sync.next().get(5, TimeUnit.SECONDS);
+ FDv2SourceResult result = consumer.next();
assertThat(result.getResultType(), equalTo(FDv2SourceResult.ResultType.CHANGE_SET));
ChangeSet changeSet = result.getChangeSet();
Map items = ImmutableMap.copyOf(get(changeSet.getData(), 0).getValue().getItems());
ItemDescriptor flag = items.get("flagkey");
assertThat(flag.getVersion(), equalTo(1));
assertJsonEquals(flagJson(expectedFlag, 1), flagJson(flag));
+
+ consumer.stop();
}
private static String flagJson(ModelBuilders.FlagBuilder flagBuilder, int version) {
diff --git a/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/TestDataV2WithClientTest.java b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/TestDataV2WithClientTest.java
new file mode 100644
index 00000000..ac236afb
--- /dev/null
+++ b/lib/sdk/server/src/test/java/com/launchdarkly/sdk/server/integrations/TestDataV2WithClientTest.java
@@ -0,0 +1,144 @@
+package com.launchdarkly.sdk.server.integrations;
+
+import com.launchdarkly.sdk.EvaluationDetail;
+import com.launchdarkly.sdk.EvaluationReason;
+import com.launchdarkly.sdk.LDContext;
+import com.launchdarkly.sdk.LDValue;
+import com.launchdarkly.sdk.server.Components;
+import com.launchdarkly.sdk.server.LDClient;
+import com.launchdarkly.sdk.server.LDConfig;
+import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.ErrorInfo;
+import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.State;
+
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+@SuppressWarnings("javadoc")
+public class TestDataV2WithClientTest {
+ private static final String SDK_KEY = "sdk-key";
+
+ private TestDataV2 td = TestDataV2.synchronizer();
+ private LDConfig config = new LDConfig.Builder()
+ .dataSystem(new DataSystemBuilder().synchronizers(td))
+ .events(Components.noEvents())
+ .build();
+
+ @Test
+ public void initializesWithEmptyData() throws Exception {
+ try (LDClient client = new LDClient(SDK_KEY, config)) {
+ assertThat(client.isInitialized(), is(true));
+ }
+ }
+
+ @Test
+ public void initializesWithFlag() throws Exception {
+ td.update(td.flag("flag").on(true));
+
+ try (LDClient client = new LDClient(SDK_KEY, config)) {
+ assertThat(client.boolVariation("flag", LDContext.create("user"), false), is(true));
+ }
+ }
+
+ @Test
+ public void updatesFlag() throws Exception {
+ td.update(td.flag("flag").on(false));
+
+ try (LDClient client = new LDClient(SDK_KEY, config)) {
+ assertThat(client.boolVariation("flag", LDContext.create("user"), false), is(false));
+
+ td.update(td.flag("flag").on(true));
+
+ assertThat(client.boolVariation("flag", LDContext.create("user"), false), is(true));
+ }
+ }
+
+ @Test
+ public void deletesFlag() throws Exception {
+ td.update(td.flag("flag").on(true));
+
+ try (LDClient client = new LDClient(SDK_KEY, config)) {
+ assertThat(client.boolVariation("flag", LDContext.create("user"), false), is(true));
+
+ td.delete("flag");
+
+ final EvaluationDetail detail = client.boolVariationDetail("flag", LDContext.create("user"), false);
+ assertThat(detail.getValue(), is(false));
+ assertThat(detail.isDefaultValue(), is(true));
+ assertThat(detail.getReason().getErrorKind(), is(EvaluationReason.ErrorKind.FLAG_NOT_FOUND));
+ }
+ }
+
+ @Test
+ public void usesTargets() throws Exception {
+ td.update(td.flag("flag").fallthroughVariation(false).variationForUser("user1", true));
+
+ try (LDClient client = new LDClient(SDK_KEY, config)) {
+ assertThat(client.boolVariation("flag", LDContext.create("user1"), false), is(true));
+ assertThat(client.boolVariation("flag", LDContext.create("user2"), false), is(false));
+ }
+ }
+
+ @Test
+ public void usesRules() throws Exception {
+ td.update(td.flag("flag").fallthroughVariation(false)
+ .ifMatch("name", LDValue.of("Lucy")).thenReturn(true)
+ .ifMatch("name", LDValue.of("Mina")).thenReturn(true));
+
+ try (LDClient client = new LDClient(SDK_KEY, config)) {
+ assertThat(client.boolVariation("flag", LDContext.builder("user1").name("Lucy").build(), false), is(true));
+ assertThat(client.boolVariation("flag", LDContext.builder("user2").name("Mina").build(), false), is(true));
+ assertThat(client.boolVariation("flag", LDContext.builder("user3").name("Quincy").build(), false), is(false));
+ }
+ }
+
+ @Test
+ public void nonBooleanFlags() throws Exception {
+ td.update(td.flag("flag").variations(LDValue.of("red"), LDValue.of("green"), LDValue.of("blue"))
+ .offVariation(0).fallthroughVariation(2)
+ .variationForUser("user1", 1)
+ .ifMatch("name", LDValue.of("Mina")).thenReturn(1));
+
+ try (LDClient client = new LDClient(SDK_KEY, config)) {
+ assertThat(client.stringVariation("flag", LDContext.builder("user1").name("Lucy").build(), ""), equalTo("green"));
+ assertThat(client.stringVariation("flag", LDContext.builder("user2").name("Mina").build(), ""), equalTo("green"));
+ assertThat(client.stringVariation("flag", LDContext.builder("user3").name("Quincy").build(), ""), equalTo("blue"));
+
+ td.update(td.flag("flag").on(false));
+
+ assertThat(client.stringVariation("flag", LDContext.builder("user1").name("Lucy").build(), ""), equalTo("red"));
+ }
+ }
+
+ @Test
+ public void canUpdateStatus() throws Exception {
+ try (LDClient client = new LDClient(SDK_KEY, config)) {
+ assertThat(client.getDataSourceStatusProvider().getStatus().getState(), equalTo(State.VALID));
+
+ ErrorInfo ei = ErrorInfo.fromHttpError(500);
+ td.updateStatus(State.INTERRUPTED, ei);
+
+ assertThat(client.getDataSourceStatusProvider().getStatus().getState(), equalTo(State.INTERRUPTED));
+ assertThat(client.getDataSourceStatusProvider().getStatus().getLastError(), equalTo(ei));
+ }
+ }
+
+ @Test
+ public void dataSourcePropagatesToMultipleClients() throws Exception {
+ td.update(td.flag("flag").on(true));
+
+ try (LDClient client1 = new LDClient(SDK_KEY, config)) {
+ try (LDClient client2 = new LDClient(SDK_KEY, config)) {
+ assertThat(client1.boolVariation("flag", LDContext.create("user"), false), is(true));
+ assertThat(client2.boolVariation("flag", LDContext.create("user"), false), is(true));
+
+ td.update(td.flag("flag").on(false));
+
+ assertThat(client1.boolVariation("flag", LDContext.create("user"), false), is(false));
+ assertThat(client2.boolVariation("flag", LDContext.create("user"), false), is(false));
+ }
+ }
+ }
+}