diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java index 5d7593af..81d64d74 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowApplication.java @@ -29,6 +29,7 @@ import io.serverlessworkflow.impl.events.EventConsumer; import io.serverlessworkflow.impl.events.EventPublisher; import io.serverlessworkflow.impl.events.InMemoryEvents; +import io.serverlessworkflow.impl.executors.CallableTaskProxyBuilder; import io.serverlessworkflow.impl.executors.DefaultTaskExecutorFactory; import io.serverlessworkflow.impl.executors.TaskExecutorFactory; import io.serverlessworkflow.impl.expressions.ExpressionFactory; @@ -48,6 +49,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.ServiceLoader; @@ -83,6 +85,7 @@ public class WorkflowApplication implements AutoCloseable { private final Optional templateResolver; private final Optional functionReader; private final URI defaultCatalogURI; + private final Collection callableProxyBuilders; private WorkflowApplication(Builder builder) { this.taskFactory = builder.taskFactory; @@ -110,6 +113,7 @@ private WorkflowApplication(Builder builder) { this.functionReader = builder.functionReader; this.defaultCatalogURI = builder.defaultCatalogURI; this.id = builder.id; + this.callableProxyBuilders = builder.callableProxyBuilders; } public TaskExecutorFactory taskFactory() { @@ -170,10 +174,10 @@ public SchemaValidator getValidator(SchemaInline inline) { private String id; private TaskExecutorFactory taskFactory; private Collection exprFactories = new HashSet<>(); - private Collection listeners = - ServiceLoader.load(WorkflowExecutionListener.class).stream() - .map(Provider::get) - .collect(Collectors.toList()); + private List listeners = + loadFromServiceLoader(WorkflowExecutionListener.class); + private List callableProxyBuilders = + loadFromServiceLoader(CallableTaskProxyBuilder.class); private ResourceLoaderFactory resourceLoaderFactory = DefaultResourceLoaderFactory.get(); private SchemaValidatorFactory schemaValidatorFactory; private WorkflowPositionFactory positionFactory = () -> new QueueWorkflowPosition(); @@ -211,6 +215,11 @@ public Builder withListener(WorkflowExecutionListener listener) { return this; } + public Builder withCallableProxy(CallableTaskProxyBuilder builder) { + callableProxyBuilders.add(builder); + return this; + } + public Builder withTaskExecutorFactory(TaskExecutorFactory factory) { this.taskFactory = factory; return this; @@ -369,11 +378,17 @@ public WorkflowApplication build() { if (defaultCatalogURI == null) { defaultCatalogURI = URI.create("https://github.com/serverlessworkflow/catalog"); } + Collections.sort(listeners); + Collections.sort(callableProxyBuilders); if (id == null) { id = idFactory.get(); } return new WorkflowApplication(this); } + + private List loadFromServiceLoader(Class clazz) { + return ServiceLoader.load(clazz).stream().map(Provider::get).collect(Collectors.toList()); + } } public Map workflowDefinitions() { @@ -474,4 +489,8 @@ public Optional additionalObject( return Optional.ofNullable(additionalObjects.get(name)) .map(v -> (T) v.apply(workflowContext, taskContext)); } + + public Collection callableProxyBuilders() { + return callableProxyBuilders; + } } diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallTaskExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallTaskExecutor.java index 23d4cc17..b28d2ad8 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallTaskExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallTaskExecutor.java @@ -21,6 +21,7 @@ import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowModel; import io.serverlessworkflow.impl.WorkflowMutablePosition; +import java.util.List; import java.util.concurrent.CompletableFuture; public class CallTaskExecutor extends RegularTaskExecutor { @@ -29,27 +30,37 @@ public class CallTaskExecutor extends RegularTaskExecutor public static class CallTaskExecutorBuilder extends RegularTaskExecutorBuilder { - private final CallableTaskBuilder callable; + private CallableTaskBuilder callableBuilder; + private List callableProxyBuilders; + private CallableTask callable; protected CallTaskExecutorBuilder( WorkflowMutablePosition position, T task, WorkflowDefinition definition, - CallableTaskBuilder callable) { + CallableTaskBuilder callableBuilder) { super(position, task, definition); - this.callable = callable; - callable.init(task, definition, position); + this.callableProxyBuilders = + definition.application().callableProxyBuilders().stream() + .filter(t -> t.accept(task)) + .toList(); + callableBuilder.init(task, definition, position); + this.callableBuilder = callableBuilder; } @Override public CallTaskExecutor buildInstance() { + this.callable = callableBuilder.build(); + for (CallableTaskProxyBuilder callableBuilder : callableProxyBuilders) { + this.callable = callableBuilder.build(callable); + } return new CallTaskExecutor<>(this); } } protected CallTaskExecutor(CallTaskExecutorBuilder builder) { super(builder); - this.callable = builder.callable.build(); + this.callable = builder.callable; } @Override diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallableTaskProxyBuilder.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallableTaskProxyBuilder.java new file mode 100644 index 00000000..8cf5bd3a --- /dev/null +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/CallableTaskProxyBuilder.java @@ -0,0 +1,28 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors; + +import io.serverlessworkflow.api.types.TaskBase; +import io.serverlessworkflow.impl.ServicePriority; + +public interface CallableTaskProxyBuilder extends ServicePriority { + + default boolean accept(TaskBase taskBase) { + return true; + } + + CallableTask build(CallableTask delegate); +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowExecutionListener.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowExecutionListener.java index d8fd1825..bb63f06a 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowExecutionListener.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/WorkflowExecutionListener.java @@ -15,7 +15,9 @@ */ package io.serverlessworkflow.impl.lifecycle; -public interface WorkflowExecutionListener extends AutoCloseable { +import io.serverlessworkflow.impl.ServicePriority; + +public interface WorkflowExecutionListener extends AutoCloseable, ServicePriority { default void onWorkflowStarted(WorkflowStartedEvent ev) {} diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/HTTPWorkflowDefinitionTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/HTTPWorkflowDefinitionTest.java index 0842c8cf..e94646b7 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/HTTPWorkflowDefinitionTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/HTTPWorkflowDefinitionTest.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import mockwebserver3.MockResponse; import mockwebserver3.MockWebServer; @@ -37,15 +38,34 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class HTTPWorkflowDefinitionTest { private static WorkflowApplication appl; private static MockWebServer mockServer; + private static final Logger logger = LoggerFactory.getLogger(HTTPWorkflowDefinitionTest.class); @BeforeAll static void init() { - appl = WorkflowApplication.builder().build(); + appl = + WorkflowApplication.builder() + .withCallableProxy( + delegate -> + (w, t, n) -> { + long init = System.currentTimeMillis(); + CompletableFuture result = delegate.apply(w, t, n); + if (logger.isDebugEnabled()) { + result.thenAccept( + x -> + logger.debug( + "Http calls takes {} milliseconds", + System.currentTimeMillis() - init)); + } + return result; + }) + .build(); } @AfterAll