Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -159,38 +159,39 @@ private void runInitializers() {
Initializer initializer = sourceManager.getNextInitializerAndSetActive();
while(initializer != null) {
try {
FDv2SourceResult result = initializer.run().get();
switch (result.getResultType()) {
case CHANGE_SET:
dataSourceUpdates.apply(result.getChangeSet());
anyDataReceived = true;
if (!result.getChangeSet().getSelector().isEmpty()) {
// We received data with a selector, so we end the initialization process.
dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null);
startFuture.complete(true);
return;
}
break;
case STATUS:
FDv2SourceResult.Status status = result.getStatus();
switch(status.getState()) {
case INTERRUPTED:
case TERMINAL_ERROR:
// The data source updates handler will filter the state during initializing, but this
// will make the error information available.
dataSourceUpdates.updateStatus(
// While the error was terminal to the individual initializer, it isn't terminal
// to the data source as a whole.
DataSourceStatusProvider.State.INTERRUPTED,
status.getErrorInfo());
break;
case SHUTDOWN:
case GOODBYE:
// We don't need to inform anyone of these statuses.
logger.debug("Ignoring status {} from initializer", result.getStatus().getState());
break;
}
break;
try(FDv2SourceResult result = initializer.run().get()) {
switch (result.getResultType()) {
case CHANGE_SET:
dataSourceUpdates.apply(result.getChangeSet());
anyDataReceived = true;
if (!result.getChangeSet().getSelector().isEmpty()) {
// We received data with a selector, so we end the initialization process.
dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null);
startFuture.complete(true);
return;
}
break;
case STATUS:
FDv2SourceResult.Status status = result.getStatus();
switch (status.getState()) {
case INTERRUPTED:
case TERMINAL_ERROR:
// The data source updates handler will filter the state during initializing, but this
// will make the error information available.
dataSourceUpdates.updateStatus(
// While the error was terminal to the individual initializer, it isn't terminal
// to the data source as a whole.
DataSourceStatusProvider.State.INTERRUPTED,
status.getErrorInfo());
break;
case SHUTDOWN:
case GOODBYE:
// We don't need to inform anyone of these statuses.
logger.debug("Ignoring status {} from initializer", result.getStatus().getState());
break;
}
break;
}
}
} catch (ExecutionException | InterruptedException | CancellationException e) {
// We don't expect these conditions to happen in practice.
Expand All @@ -205,7 +206,7 @@ private void runInitializers() {
new Date().toInstant()));
logger.warn("Error running initializer: {}", e.toString());
}
initializer = sourceManager.getNextInitializerAndSetActive();
initializer = sourceManager.getNextInitializerAndSetActive();
}
// We received data without a selector, and we have exhausted initializers, so we are going to
// consider ourselves initialized.
Expand Down Expand Up @@ -286,55 +287,56 @@ private void runSynchronizers() {
continue;
}

FDv2SourceResult result = (FDv2SourceResult) res;
conditions.inform(result);
try (FDv2SourceResult result = (FDv2SourceResult) res) {
conditions.inform(result);

switch (result.getResultType()) {
case CHANGE_SET:
dataSourceUpdates.apply(result.getChangeSet());
dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null);
// This could have been completed by any data source. But if it has not been completed before
// now, then we complete it.
startFuture.complete(true);
break;
case STATUS:
FDv2SourceResult.Status status = result.getStatus();
switch (status.getState()) {
case INTERRUPTED:
// Handled by conditions.
dataSourceUpdates.updateStatus(
DataSourceStatusProvider.State.INTERRUPTED,
status.getErrorInfo());
break;
case SHUTDOWN:
// We should be overall shutting down.
logger.debug("Synchronizer shutdown.");
return;
case TERMINAL_ERROR:
sourceManager.blockCurrentSynchronizer();
running = false;
dataSourceUpdates.updateStatus(
DataSourceStatusProvider.State.INTERRUPTED,
status.getErrorInfo());
break;
case GOODBYE:
// We let the synchronizer handle this internally.
break;
}
break;
}
// We have been requested to fall back to FDv1. We handle whatever message was associated,
// close the synchronizer, and then fallback.
// Only trigger fallback if we're not already running the FDv1 fallback synchronizer.
if (
result.isFdv1Fallback() &&
sourceManager.hasFDv1Fallback() &&
// This shouldn't happen in practice, an FDv1 source shouldn't request fallback
// to FDv1. But if it does, then we will discard its request.
!sourceManager.isCurrentSynchronizerFDv1Fallback()
) {
sourceManager.fdv1Fallback();
running = false;
switch (result.getResultType()) {
case CHANGE_SET:
dataSourceUpdates.apply(result.getChangeSet());
dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null);
// This could have been completed by any data source. But if it has not been completed before
// now, then we complete it.
startFuture.complete(true);
break;
case STATUS:
FDv2SourceResult.Status status = result.getStatus();
switch (status.getState()) {
case INTERRUPTED:
// Handled by conditions.
dataSourceUpdates.updateStatus(
DataSourceStatusProvider.State.INTERRUPTED,
status.getErrorInfo());
break;
case SHUTDOWN:
// We should be overall shutting down.
logger.debug("Synchronizer shutdown.");
return;
case TERMINAL_ERROR:
sourceManager.blockCurrentSynchronizer();
running = false;
dataSourceUpdates.updateStatus(
DataSourceStatusProvider.State.INTERRUPTED,
status.getErrorInfo());
break;
case GOODBYE:
// We let the synchronizer handle this internally.
break;
}
break;
}
// We have been requested to fall back to FDv1. We handle whatever message was associated,
// close the synchronizer, and then fallback.
// Only trigger fallback if we're not already running the FDv1 fallback synchronizer.
if (
result.isFdv1Fallback() &&
sourceManager.hasFDv1Fallback() &&
// This shouldn't happen in practice, an FDv1 source shouldn't request fallback
// to FDv1. But if it does, then we will discard its request.
!sourceManager.isCurrentSynchronizerFDv1Fallback()
) {
sourceManager.fdv1Fallback();
running = false;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
package com.launchdarkly.sdk.server.datasources;

import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes;

import java.io.Closeable;
import java.util.function.Function;

/**
* This type is currently experimental and not subject to semantic versioning.
* <p>
* The result type for FDv2 initializers and synchronizers. An FDv2 initializer produces a single result, while
* an FDv2 synchronizer produces a stream of results.
*/
public class FDv2SourceResult {
public class FDv2SourceResult implements Closeable {

public enum State {
/**
* The data source has encountered an interruption and will attempt to reconnect. This isn't intended to be used
Expand Down Expand Up @@ -67,49 +72,86 @@ public Status(State state, DataSourceStatusProvider.ErrorInfo errorInfo) {
private final Status status;

private final ResultType resultType;

private final boolean fdv1Fallback;

private FDv2SourceResult(DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> changeSet, Status status, ResultType resultType, boolean fdv1Fallback) {
private final Function<Void, Void> completionCallback;

private FDv2SourceResult(
DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> changeSet,
Status status, ResultType resultType,
boolean fdv1Fallback,
Function<Void, Void> completionCallback
) {
this.changeSet = changeSet;
this.status = status;
this.resultType = resultType;
this.fdv1Fallback = fdv1Fallback;
this.completionCallback = completionCallback;
}

public static FDv2SourceResult interrupted(DataSourceStatusProvider.ErrorInfo errorInfo, boolean fdv1Fallback) {
return interrupted(errorInfo, fdv1Fallback, null);
}

public static FDv2SourceResult interrupted(DataSourceStatusProvider.ErrorInfo errorInfo, boolean fdv1Fallback, Function<Void, Void> completionCallback) {
return new FDv2SourceResult(
null,
new Status(State.INTERRUPTED, errorInfo),
ResultType.STATUS,
fdv1Fallback);
null,
new Status(State.INTERRUPTED, errorInfo),
ResultType.STATUS,
fdv1Fallback,
completionCallback);
}

public static FDv2SourceResult shutdown() {
return shutdown(null);
}

public static FDv2SourceResult shutdown(Function<Void, Void> completionCallback) {
return new FDv2SourceResult(null,
new Status(State.SHUTDOWN, null),
ResultType.STATUS,
false);
new Status(State.SHUTDOWN, null),
ResultType.STATUS,
false,
completionCallback);
}

public static FDv2SourceResult terminalError(DataSourceStatusProvider.ErrorInfo errorInfo, boolean fdv1Fallback) {
return terminalError(errorInfo, fdv1Fallback, null);
}

public static FDv2SourceResult terminalError(DataSourceStatusProvider.ErrorInfo errorInfo, boolean fdv1Fallback, Function<Void, Void> completionCallback) {
return new FDv2SourceResult(null,
new Status(State.TERMINAL_ERROR, errorInfo),
ResultType.STATUS,
fdv1Fallback);
new Status(State.TERMINAL_ERROR, errorInfo),
ResultType.STATUS,
fdv1Fallback,
completionCallback);
}

public static FDv2SourceResult changeSet(DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> changeSet, boolean fdv1Fallback) {
return new FDv2SourceResult(changeSet, null, ResultType.CHANGE_SET, fdv1Fallback);
return changeSet(changeSet, fdv1Fallback, null);
}

public static FDv2SourceResult changeSet(DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> changeSet, boolean fdv1Fallback, Function<Void, Void> completionCallback) {
return new FDv2SourceResult(
changeSet,
null,
ResultType.CHANGE_SET,
fdv1Fallback,
completionCallback);
}

public static FDv2SourceResult goodbye(String reason, boolean fdv1Fallback) {
return goodbye(reason, fdv1Fallback, null);
}

public static FDv2SourceResult goodbye(String reason, boolean fdv1Fallback, Function<Void, Void> completionCallback) {
// TODO: Goodbye reason.
return new FDv2SourceResult(
null,
new Status(State.GOODBYE, null),
ResultType.STATUS,
fdv1Fallback);
null,
new Status(State.GOODBYE, null),
ResultType.STATUS,
fdv1Fallback,
completionCallback);
}

public ResultType getResultType() {
Expand All @@ -127,4 +169,31 @@ public DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> getChangeSet() {
public boolean isFdv1Fallback() {
return fdv1Fallback;
}

/**
* Creates a new result wrapping this one with an additional completion callback.
* <p>
* The new completion callback will be invoked when the result is closed, followed by
* the original completion callback (if any).
*
* @param newCallback the completion callback to add
* @return a new FDv2SourceResult with the added completion callback
*/
public FDv2SourceResult withCompletion(Function<Void, Void> newCallback) {
Function<Void, Void> combinedCallback = v -> {
newCallback.apply(null);
if (completionCallback != null) {
completionCallback.apply(null);
}
return null;
};
return new FDv2SourceResult(changeSet, status, resultType, fdv1Fallback, combinedCallback);
}

@Override
public void close() {
if(completionCallback != null) {
completionCallback.apply(null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,8 @@ private void closedInstance(DataSourceImpl instance) {
}

/**
* A builder for feature flag configurations to be used with {@link TestData}.
*
* A builder for feature flag configurations to be used with {@link TestData} and {@link TestDataV2}.
*
* @see TestData#flag(String)
* @see TestData#update(FlagBuilder)
*/
Expand All @@ -269,18 +269,20 @@ public static final class FlagBuilder {
final Map<ContextKind, Map<Integer, ImmutableSet<String>>> targets = new TreeMap<>(); // TreeMap enforces ordering for test determinacy
final List<FlagRuleBuilder> rules = new ArrayList<>();

private FlagBuilder(String key) {
FlagBuilder(String key) {
this.key = key;
this.on = true;
this.variations = new CopyOnWriteArrayList<>();
}

private FlagBuilder(FlagBuilder from) {
FlagBuilder(FlagBuilder from) {
this.key = from.key;
this.offVariation = from.offVariation;
this.on = from.on;
this.fallthroughVariation = from.fallthroughVariation;
this.variations = new CopyOnWriteArrayList<>(from.variations);
this.samplingRatio = from.samplingRatio;
this.migrationCheckRatio = from.migrationCheckRatio;
for (ContextKind contextKind: from.targets.keySet()) {
this.targets.put(contextKind, new TreeMap<>(from.targets.get(contextKind)));
}
Expand Down Expand Up @@ -811,6 +813,8 @@ private static int variationForBoolean(boolean value) {
* {@link #thenReturn(int)} to finish defining the rule.
*/
public final class FlagRuleBuilder {
// TODO: Move FlagRuleBuilder to TestDataV2 when TestData is deprecated

final List<Clause> clauses = new ArrayList<>();
int variation;

Expand Down
Loading