diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceHandlers.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceHandlers.java index 8231a007..dc354b63 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceHandlers.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceHandlers.java @@ -17,17 +17,53 @@ import static io.serverlessworkflow.impl.WorkflowUtils.safeClose; +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.ExecutorService; + public class DefaultPersistenceInstanceHandlers extends PersistenceInstanceHandlers { - private final PersistenceInstanceStore store; + public static class Builder { + + private final PersistenceInstanceStore store; + private ExecutorService executorService; + private Duration closeTimeout; + + private Builder(PersistenceInstanceStore store) { + this.store = store; + } + + public Builder withExecutorService(ExecutorService executorService) { + this.executorService = executorService; + return this; + } + + public Builder withCloseTimeout(Duration closeTimeout) { + this.closeTimeout = closeTimeout; + return this; + } - public static DefaultPersistenceInstanceHandlers from(PersistenceInstanceStore store) { - return new DefaultPersistenceInstanceHandlers( - new DefaultPersistenceInstanceWriter(store), - new DefaultPersistenceInstanceReader(store), - store); + public PersistenceInstanceHandlers build() { + return new DefaultPersistenceInstanceHandlers( + new DefaultPersistenceInstanceWriter( + store, + Optional.ofNullable(executorService), + closeTimeout == null ? Duration.ofSeconds(1) : closeTimeout), + new DefaultPersistenceInstanceReader(store), + store); + } } + public static Builder builder(PersistenceInstanceStore store) { + return new Builder(store); + } + + public static PersistenceInstanceHandlers from(PersistenceInstanceStore store) { + return new Builder(store).build(); + } + + private final PersistenceInstanceStore store; + private DefaultPersistenceInstanceHandlers( PersistenceInstanceWriter writer, PersistenceInstanceReader reader, @@ -38,6 +74,7 @@ private DefaultPersistenceInstanceHandlers( @Override public void close() { + super.close(); safeClose(store); } } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java index 19efa14a..2c1473d1 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceReader.java @@ -55,4 +55,7 @@ public Stream scanAll(WorkflowDefinition definition, String ap .onClose(() -> transaction.commit(definition)) .map(v -> new WorkflowPersistenceInstance(definition, v)); } + + @Override + public void close() {} } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java index e67a1cb4..f599a743 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java @@ -17,75 +17,123 @@ import io.serverlessworkflow.impl.TaskContextData; import io.serverlessworkflow.impl.WorkflowContextData; +import io.serverlessworkflow.impl.WorkflowDefinitionData; import io.serverlessworkflow.impl.WorkflowStatus; +import java.time.Duration; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; public class DefaultPersistenceInstanceWriter implements PersistenceInstanceWriter { private final PersistenceInstanceStore store; + private final Map> futuresMap = new ConcurrentHashMap<>(); + private final Optional executorService; + private final Duration closeTimeout; - protected DefaultPersistenceInstanceWriter(PersistenceInstanceStore store) { + protected DefaultPersistenceInstanceWriter( + PersistenceInstanceStore store, + Optional executorService, + Duration closeTimeout) { this.store = store; + this.executorService = executorService; + this.closeTimeout = closeTimeout; } @Override - public void started(WorkflowContextData workflowContext) { - doTransaction(t -> t.writeInstanceData(workflowContext), workflowContext); + public CompletableFuture started(WorkflowContextData workflowContext) { + return doTransaction(t -> t.writeInstanceData(workflowContext), workflowContext); } @Override - public void completed(WorkflowContextData workflowContext) { - removeProcessInstance(workflowContext); + public CompletableFuture completed(WorkflowContextData workflowContext) { + return removeProcessInstance(workflowContext); } @Override - public void failed(WorkflowContextData workflowContext, Throwable ex) { - removeProcessInstance(workflowContext); + public CompletableFuture failed(WorkflowContextData workflowContext, Throwable ex) { + return removeProcessInstance(workflowContext); } @Override - public void aborted(WorkflowContextData workflowContext) { - removeProcessInstance(workflowContext); + public CompletableFuture aborted(WorkflowContextData workflowContext) { + return removeProcessInstance(workflowContext); } - protected void removeProcessInstance(WorkflowContextData workflowContext) { - doTransaction(t -> t.removeProcessInstance(workflowContext), workflowContext); + protected CompletableFuture removeProcessInstance(WorkflowContextData workflowContext) { + return doTransaction(t -> t.removeProcessInstance(workflowContext), workflowContext) + .thenRun(() -> futuresMap.remove(workflowContext.instanceData().id())); } @Override - public void taskStarted(WorkflowContextData workflowContext, TaskContextData taskContext) { - // not recording + public CompletableFuture taskStarted( + WorkflowContextData workflowContext, TaskContextData taskContext) { + return CompletableFuture.completedFuture(null); } @Override - public void taskRetried(WorkflowContextData workflowContext, TaskContextData taskContext) { - doTransaction(t -> t.writeRetryTask(workflowContext, taskContext), workflowContext); + public CompletableFuture taskRetried( + WorkflowContextData workflowContext, TaskContextData taskContext) { + return doTransaction(t -> t.writeRetryTask(workflowContext, taskContext), workflowContext); } @Override - public void taskCompleted(WorkflowContextData workflowContext, TaskContextData taskContext) { - doTransaction(t -> t.writeCompletedTask(workflowContext, taskContext), workflowContext); + public CompletableFuture taskCompleted( + WorkflowContextData workflowContext, TaskContextData taskContext) { + return doTransaction(t -> t.writeCompletedTask(workflowContext, taskContext), workflowContext); } @Override - public void suspended(WorkflowContextData workflowContext) { - doTransaction(t -> t.writeStatus(workflowContext, WorkflowStatus.SUSPENDED), workflowContext); + public CompletableFuture suspended(WorkflowContextData workflowContext) { + return doTransaction( + t -> t.writeStatus(workflowContext, WorkflowStatus.SUSPENDED), workflowContext); } @Override - public void resumed(WorkflowContextData workflowContext) { - doTransaction(t -> t.clearStatus(workflowContext), workflowContext); + public CompletableFuture resumed(WorkflowContextData workflowContext) { + return doTransaction(t -> t.clearStatus(workflowContext), workflowContext); } - private void doTransaction( - Consumer operations, WorkflowContextData context) { + private CompletableFuture doTransaction( + Consumer operation, WorkflowContextData context) { + final ExecutorService service = + this.executorService.orElse(context.definition().application().executorService()); + final Runnable runnable = () -> executeTransaction(operation, context.definition()); + return futuresMap.compute( + context.instanceData().id(), + (k, v) -> + v == null + ? CompletableFuture.runAsync(runnable, service) + : v.thenRunAsync(runnable, service)); + } + + private void executeTransaction( + Consumer operation, WorkflowDefinitionData definition) { PersistenceInstanceTransaction transaction = store.begin(); try { - operations.accept(transaction); - transaction.commit(context.definition()); + operation.accept(transaction); + transaction.commit(definition); } catch (Exception ex) { - transaction.rollback(context.definition()); + transaction.rollback(definition); throw ex; } } + + @Override + public void close() { + futuresMap.clear(); + executorService.ifPresent( + e -> { + try { + e.awaitTermination(closeTimeout.toMillis(), TimeUnit.MILLISECONDS); + e.shutdown(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + }); + } } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceHandlers.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceHandlers.java index 84dd96c4..beec1c67 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceHandlers.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceHandlers.java @@ -15,6 +15,8 @@ */ package io.serverlessworkflow.impl.persistence; +import static io.serverlessworkflow.impl.WorkflowUtils.safeClose; + public class PersistenceInstanceHandlers implements AutoCloseable { private final PersistenceInstanceWriter writer; @@ -35,5 +37,8 @@ public PersistenceInstanceReader reader() { } @Override - public void close() {} + public void close() { + safeClose(writer); + safeClose(reader); + } } diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceReader.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceReader.java index 73e78879..d7736c8f 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceReader.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceReader.java @@ -20,7 +20,7 @@ import java.util.Optional; import java.util.stream.Stream; -public interface PersistenceInstanceReader { +public interface PersistenceInstanceReader extends AutoCloseable { default Stream scanAll(WorkflowDefinition definition) { return scanAll(definition, definition.application().id()); diff --git a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java index 55f79faf..31ce235a 100644 --- a/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java +++ b/impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/PersistenceInstanceWriter.java @@ -17,24 +17,28 @@ import io.serverlessworkflow.impl.TaskContextData; import io.serverlessworkflow.impl.WorkflowContextData; +import java.util.concurrent.CompletableFuture; -public interface PersistenceInstanceWriter { +public interface PersistenceInstanceWriter extends AutoCloseable { - void started(WorkflowContextData workflowContext); + CompletableFuture started(WorkflowContextData workflowContext); - void completed(WorkflowContextData workflowContext); + CompletableFuture completed(WorkflowContextData workflowContext); - void failed(WorkflowContextData workflowContext, Throwable ex); + CompletableFuture failed(WorkflowContextData workflowContext, Throwable ex); - void aborted(WorkflowContextData workflowContext); + CompletableFuture aborted(WorkflowContextData workflowContext); - void suspended(WorkflowContextData workflowContext); + CompletableFuture suspended(WorkflowContextData workflowContext); - void resumed(WorkflowContextData workflowContext); + CompletableFuture resumed(WorkflowContextData workflowContext); - void taskRetried(WorkflowContextData workflowContext, TaskContextData taskContext); + CompletableFuture taskRetried( + WorkflowContextData workflowContext, TaskContextData taskContext); - void taskStarted(WorkflowContextData workflowContext, TaskContextData taskContext); + CompletableFuture taskStarted( + WorkflowContextData workflowContext, TaskContextData taskContext); - void taskCompleted(WorkflowContextData workflowContext, TaskContextData taskContext); + CompletableFuture taskCompleted( + WorkflowContextData workflowContext, TaskContextData taskContext); } diff --git a/impl/persistence/mvstore/README.md b/impl/persistence/mvstore/README.md index ff67da68..c20058ca 100644 --- a/impl/persistence/mvstore/README.md +++ b/impl/persistence/mvstore/README.md @@ -8,14 +8,14 @@ This document explains how to enable persistence using MVStore as underlying per To enable MVStore persistence, users should at least do the following things: - Initialize a MVStorePersistenceStore instance, passing the path of the file containing the persisted information -- Pass this MVStorePersitenceStore as argument of BytesMapPersistenceInstanceHandlers.builder. This will create PersistenceInstanceWriter and PersistenceInstanceReader. +- Pass this MVStorePersitenceStore as argument of DefaultPersistenceInstanceHandlers.from. This will create PersistenceInstanceWriter and PersistenceInstanceReader. - Use the PersistenceInstanceWriter created in the previous step to decorate the existing WorkflowApplication builder. The code will look like this ---- try (PersistenceInstanceHandlers handlers = - BytesMapPersistenceInstanceHandlers.builder(new MVStorePersistenceStore("test.db")) + DefaultPersistenceInstanceHandlers.from(new MVStorePersistenceStore("test.db")) .build(); WorkflowApplication application = PersistenceApplicationBuilder.builder( @@ -33,7 +33,7 @@ If user wants to resume execution of all previously existing instances (typicall Once retrieved, calling `start` method will resume the execution after the latest completed task before the running JVM was stopped. ---- - handlers.reader().readAll(definition).values().forEach(WorkflowInstance::start); + handlers.reader().scanAll(definition).forEach(WorkflowInstance::start); ---- --- diff --git a/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractPersistenceTest.java b/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractPersistenceTest.java index 0051dead..b0994eba 100644 --- a/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractPersistenceTest.java +++ b/impl/persistence/tests/src/main/java/io/serverlessworkflow/impl/persistence/test/AbstractPersistenceTest.java @@ -41,6 +41,7 @@ import java.time.Instant; import java.util.Map; import java.util.Optional; +import java.util.concurrent.Executors; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -67,7 +68,10 @@ static void init() throws IOException { @BeforeEach void setup() { - handlers = DefaultPersistenceInstanceHandlers.from(persistenceStore()); + handlers = + DefaultPersistenceInstanceHandlers.builder(persistenceStore()) + .withExecutorService(Executors.newSingleThreadExecutor()) + .build(); context = app.modelFactory().fromNull(); workflowContext = mock(WorkflowContext.class); workflowInstance = mock(WorkflowInstance.class); @@ -117,8 +121,8 @@ void testWorkflowInstance() throws InterruptedException { final Map completedMap = Map.of("name", "fulanito"); - handlers.writer().started(workflowContext); - handlers.writer().taskRetried(workflowContext, retriedTaskContext(position, numRetries)); + handlers.writer().started(workflowContext).join(); + handlers.writer().taskRetried(workflowContext, retriedTaskContext(position, numRetries)).join(); Optional optional = handlers.reader().find(definition, workflowInstance.id()); assertThat(optional).isPresent(); WorkflowPersistenceInstance instance = (WorkflowPersistenceInstance) optional.orElseThrow(); @@ -135,7 +139,10 @@ void testWorkflowInstance() throws InterruptedException { assertThat(retryAttempt.getValue()).isEqualTo(numRetries); // task completed - handlers.writer().taskCompleted(workflowContext, completedTaskContext(position, completedMap)); + handlers + .writer() + .taskCompleted(workflowContext, completedTaskContext(position, completedMap)) + .join(); instance = (WorkflowPersistenceInstance) handlers.reader().find(definition, workflowInstance.id()).orElseThrow(); @@ -157,7 +164,7 @@ void testWorkflowInstance() throws InterruptedException { assertThat(transition.getValue().isEndNode()).isTrue(); // workflow completed - handlers.writer().completed(workflowContext); + handlers.writer().completed(workflowContext).join(); assertThat(handlers.reader().find(definition, workflowInstance.id())).isEmpty(); } } diff --git a/impl/test/db-samples/running.db b/impl/test/db-samples/running.db index 461b7996..531b9ae9 100644 Binary files a/impl/test/db-samples/running.db and b/impl/test/db-samples/running.db differ diff --git a/impl/test/db-samples/running_v1.db b/impl/test/db-samples/running_v1.db index fa4c2beb..cc9a8610 100644 Binary files a/impl/test/db-samples/running_v1.db and b/impl/test/db-samples/running_v1.db differ diff --git a/impl/test/db-samples/suspended.db b/impl/test/db-samples/suspended.db index 0450b6af..7f47c283 100644 Binary files a/impl/test/db-samples/suspended.db and b/impl/test/db-samples/suspended.db differ diff --git a/impl/test/db-samples/suspended_v1.db b/impl/test/db-samples/suspended_v1.db index 1acb7355..135af823 100644 Binary files a/impl/test/db-samples/suspended_v1.db and b/impl/test/db-samples/suspended_v1.db differ diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java index c1011a3b..7dcd7121 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java @@ -29,18 +29,23 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.time.Duration; import java.util.Collection; import java.util.Map; +import java.util.concurrent.Executors; import java.util.stream.Stream; import org.junit.jupiter.api.Test; public class MvStorePersistenceTest { @Test - void testSimpleRun() throws IOException { + void testSimpleRun() throws Exception { final String dbName = "db-samples/simple.db"; try (PersistenceInstanceHandlers handlers = - DefaultPersistenceInstanceHandlers.from(new MVStorePersistenceStore(dbName)); + DefaultPersistenceInstanceHandlers.builder(new MVStorePersistenceStore(dbName)) + .withExecutorService(Executors.newSingleThreadExecutor()) + .withCloseTimeout(Duration.ofMillis(100)) + .build(); WorkflowApplication application = PersistenceApplicationBuilder.builder(WorkflowApplication.builder(), handlers.writer()) .build(); ) { @@ -49,6 +54,7 @@ void testSimpleRun() throws IOException { readWorkflowFromClasspath("workflows-samples/simple-expression.yaml")); assertNoInstance(handlers, definition); definition.instance(Map.of()).start().join(); + handlers.writer().close(); assertNoInstance(handlers, definition); } finally { Files.delete(Path.of(dbName));