Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,38 +159,39 @@ private void runInitializers() {
Initializer initializer = sourceManager.getNextInitializerAndSetActive();
while(initializer != null) {
try {
FDv2SourceResult result = initializer.run().get();
switch (result.getResultType()) {
case CHANGE_SET:
dataSourceUpdates.apply(result.getChangeSet());
anyDataReceived = true;
if (!result.getChangeSet().getSelector().isEmpty()) {
// We received data with a selector, so we end the initialization process.
dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null);
startFuture.complete(true);
return;
}
break;
case STATUS:
FDv2SourceResult.Status status = result.getStatus();
switch(status.getState()) {
case INTERRUPTED:
case TERMINAL_ERROR:
// The data source updates handler will filter the state during initializing, but this
// will make the error information available.
dataSourceUpdates.updateStatus(
// While the error was terminal to the individual initializer, it isn't terminal
// to the data source as a whole.
DataSourceStatusProvider.State.INTERRUPTED,
status.getErrorInfo());
break;
case SHUTDOWN:
case GOODBYE:
// We don't need to inform anyone of these statuses.
logger.debug("Ignoring status {} from initializer", result.getStatus().getState());
break;
}
break;
try(FDv2SourceResult result = initializer.run().get()) {
switch (result.getResultType()) {
case CHANGE_SET:
dataSourceUpdates.apply(result.getChangeSet());
anyDataReceived = true;
if (!result.getChangeSet().getSelector().isEmpty()) {
// We received data with a selector, so we end the initialization process.
dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null);
startFuture.complete(true);
return;
}
break;
case STATUS:
FDv2SourceResult.Status status = result.getStatus();
switch (status.getState()) {
case INTERRUPTED:
case TERMINAL_ERROR:
// The data source updates handler will filter the state during initializing, but this
// will make the error information available.
dataSourceUpdates.updateStatus(
// While the error was terminal to the individual initializer, it isn't terminal
// to the data source as a whole.
DataSourceStatusProvider.State.INTERRUPTED,
status.getErrorInfo());
break;
case SHUTDOWN:
case GOODBYE:
// We don't need to inform anyone of these statuses.
logger.debug("Ignoring status {} from initializer", result.getStatus().getState());
break;
}
break;
}
}
} catch (ExecutionException | InterruptedException | CancellationException e) {
// We don't expect these conditions to happen in practice.
Expand All @@ -205,7 +206,7 @@ private void runInitializers() {
new Date().toInstant()));
logger.warn("Error running initializer: {}", e.toString());
}
initializer = sourceManager.getNextInitializerAndSetActive();
initializer = sourceManager.getNextInitializerAndSetActive();
}
// We received data without a selector, and we have exhausted initializers, so we are going to
// consider ourselves initialized.
Expand Down Expand Up @@ -286,55 +287,56 @@ private void runSynchronizers() {
continue;
}

FDv2SourceResult result = (FDv2SourceResult) res;
conditions.inform(result);
try (FDv2SourceResult result = (FDv2SourceResult) res) {
conditions.inform(result);

switch (result.getResultType()) {
case CHANGE_SET:
dataSourceUpdates.apply(result.getChangeSet());
dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null);
// This could have been completed by any data source. But if it has not been completed before
// now, then we complete it.
startFuture.complete(true);
break;
case STATUS:
FDv2SourceResult.Status status = result.getStatus();
switch (status.getState()) {
case INTERRUPTED:
// Handled by conditions.
dataSourceUpdates.updateStatus(
DataSourceStatusProvider.State.INTERRUPTED,
status.getErrorInfo());
break;
case SHUTDOWN:
// We should be overall shutting down.
logger.debug("Synchronizer shutdown.");
return;
case TERMINAL_ERROR:
sourceManager.blockCurrentSynchronizer();
running = false;
dataSourceUpdates.updateStatus(
DataSourceStatusProvider.State.INTERRUPTED,
status.getErrorInfo());
break;
case GOODBYE:
// We let the synchronizer handle this internally.
break;
}
break;
}
// We have been requested to fall back to FDv1. We handle whatever message was associated,
// close the synchronizer, and then fallback.
// Only trigger fallback if we're not already running the FDv1 fallback synchronizer.
if (
result.isFdv1Fallback() &&
sourceManager.hasFDv1Fallback() &&
// This shouldn't happen in practice, an FDv1 source shouldn't request fallback
// to FDv1. But if it does, then we will discard its request.
!sourceManager.isCurrentSynchronizerFDv1Fallback()
) {
sourceManager.fdv1Fallback();
running = false;
switch (result.getResultType()) {
case CHANGE_SET:
dataSourceUpdates.apply(result.getChangeSet());
dataSourceUpdates.updateStatus(DataSourceStatusProvider.State.VALID, null);
// This could have been completed by any data source. But if it has not been completed before
// now, then we complete it.
startFuture.complete(true);
break;
case STATUS:
FDv2SourceResult.Status status = result.getStatus();
switch (status.getState()) {
case INTERRUPTED:
// Handled by conditions.
dataSourceUpdates.updateStatus(
DataSourceStatusProvider.State.INTERRUPTED,
status.getErrorInfo());
break;
case SHUTDOWN:
// We should be overall shutting down.
logger.debug("Synchronizer shutdown.");
return;
case TERMINAL_ERROR:
sourceManager.blockCurrentSynchronizer();
running = false;
dataSourceUpdates.updateStatus(
DataSourceStatusProvider.State.INTERRUPTED,
status.getErrorInfo());
break;
case GOODBYE:
// We let the synchronizer handle this internally.
break;
}
break;
}
// We have been requested to fall back to FDv1. We handle whatever message was associated,
// close the synchronizer, and then fallback.
// Only trigger fallback if we're not already running the FDv1 fallback synchronizer.
if (
result.isFdv1Fallback() &&
sourceManager.hasFDv1Fallback() &&
// This shouldn't happen in practice, an FDv1 source shouldn't request fallback
// to FDv1. But if it does, then we will discard its request.
!sourceManager.isCurrentSynchronizerFDv1Fallback()
) {
sourceManager.fdv1Fallback();
running = false;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
package com.launchdarkly.sdk.server.datasources;

import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider;
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes;

import java.io.Closeable;
import java.util.function.Function;

/**
* This type is currently experimental and not subject to semantic versioning.
* <p>
* The result type for FDv2 initializers and synchronizers. An FDv2 initializer produces a single result, while
* an FDv2 synchronizer produces a stream of results.
*/
public class FDv2SourceResult {
public class FDv2SourceResult implements Closeable {

public enum State {
/**
* The data source has encountered an interruption and will attempt to reconnect. This isn't intended to be used
Expand Down Expand Up @@ -67,49 +72,86 @@ public Status(State state, DataSourceStatusProvider.ErrorInfo errorInfo) {
private final Status status;

private final ResultType resultType;

private final boolean fdv1Fallback;

private FDv2SourceResult(DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> changeSet, Status status, ResultType resultType, boolean fdv1Fallback) {
private final Function<Void, Void> completionCallback;

private FDv2SourceResult(
DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> changeSet,
Status status, ResultType resultType,
boolean fdv1Fallback,
Function<Void, Void> completionCallback
) {
this.changeSet = changeSet;
this.status = status;
this.resultType = resultType;
this.fdv1Fallback = fdv1Fallback;
this.completionCallback = completionCallback;
}

public static FDv2SourceResult interrupted(DataSourceStatusProvider.ErrorInfo errorInfo, boolean fdv1Fallback) {
return interrupted(errorInfo, fdv1Fallback, null);
}

public static FDv2SourceResult interrupted(DataSourceStatusProvider.ErrorInfo errorInfo, boolean fdv1Fallback, Function<Void, Void> completionCallback) {
return new FDv2SourceResult(
null,
new Status(State.INTERRUPTED, errorInfo),
ResultType.STATUS,
fdv1Fallback);
null,
new Status(State.INTERRUPTED, errorInfo),
ResultType.STATUS,
fdv1Fallback,
completionCallback);
}

public static FDv2SourceResult shutdown() {
return shutdown(null);
}

public static FDv2SourceResult shutdown(Function<Void, Void> completionCallback) {
return new FDv2SourceResult(null,
new Status(State.SHUTDOWN, null),
ResultType.STATUS,
false);
new Status(State.SHUTDOWN, null),
ResultType.STATUS,
false,
completionCallback);
}

public static FDv2SourceResult terminalError(DataSourceStatusProvider.ErrorInfo errorInfo, boolean fdv1Fallback) {
return terminalError(errorInfo, fdv1Fallback, null);
}

public static FDv2SourceResult terminalError(DataSourceStatusProvider.ErrorInfo errorInfo, boolean fdv1Fallback, Function<Void, Void> completionCallback) {
return new FDv2SourceResult(null,
new Status(State.TERMINAL_ERROR, errorInfo),
ResultType.STATUS,
fdv1Fallback);
new Status(State.TERMINAL_ERROR, errorInfo),
ResultType.STATUS,
fdv1Fallback,
completionCallback);
}

public static FDv2SourceResult changeSet(DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> changeSet, boolean fdv1Fallback) {
return new FDv2SourceResult(changeSet, null, ResultType.CHANGE_SET, fdv1Fallback);
return changeSet(changeSet, fdv1Fallback, null);
}

public static FDv2SourceResult changeSet(DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> changeSet, boolean fdv1Fallback, Function<Void, Void> completionCallback) {
return new FDv2SourceResult(
changeSet,
null,
ResultType.CHANGE_SET,
fdv1Fallback,
completionCallback);
}

public static FDv2SourceResult goodbye(String reason, boolean fdv1Fallback) {
return goodbye(reason, fdv1Fallback, null);
}

public static FDv2SourceResult goodbye(String reason, boolean fdv1Fallback, Function<Void, Void> completionCallback) {
// TODO: Goodbye reason.
return new FDv2SourceResult(
null,
new Status(State.GOODBYE, null),
ResultType.STATUS,
fdv1Fallback);
null,
new Status(State.GOODBYE, null),
ResultType.STATUS,
fdv1Fallback,
completionCallback);
}

public ResultType getResultType() {
Expand All @@ -127,4 +169,31 @@ public DataStoreTypes.ChangeSet<DataStoreTypes.ItemDescriptor> getChangeSet() {
public boolean isFdv1Fallback() {
return fdv1Fallback;
}

/**
* Creates a new result wrapping this one with an additional completion callback.
* <p>
* The new completion callback will be invoked when the result is closed, followed by
* the original completion callback (if any).
*
* @param newCallback the completion callback to add
* @return a new FDv2SourceResult with the added completion callback
*/
public FDv2SourceResult withCompletion(Function<Void, Void> newCallback) {
Function<Void, Void> combinedCallback = v -> {
newCallback.apply(null);
if (completionCallback != null) {
completionCallback.apply(null);
}
return null;
};
return new FDv2SourceResult(changeSet, status, resultType, fdv1Fallback, combinedCallback);
}

@Override
public void close() {
if(completionCallback != null) {
completionCallback.apply(null);
}
}
}
Loading