Skip to content

Basic concepts overview

Quanzheng Long edited this page Sep 15, 2023 · 18 revisions

The iWF top level concept is WorkflowDefinitionwhich consists of the components shown below:

Name Description
WorkflowState A basic asyn/background execution unit as a "workflow". A State consists of one or two steps: waitUntil (optional) and execute with retry
RPC API for application to interact with the workflow. It can access to persistence, internal channel, and state execution
Persistence A Kev-Value storage out-of-box to storing data. Can be accessed by RPC/WorkflowState implementation.
DurableTimer The waitUntil API can return a timer command to wait for certain time as a durable timer -- it is persisted by server and will not be lost.
InternalChannel The waitUntil API can return some command for "Internal Channel" -- An internal message queue workflow
Signal Channel Legacy concept and deprecated. Use InternalChannel + RPC instead. A message queue for the workflowState to receive messages from external sources

SDK

A user application defines an ObjectWorkflow by implementing:

Once workflow is implemented, register the workflows into Registry of SDK, and expose an RESTful endpoint for iWF server to call using WorkerService of the SDK.

Underneath, SDK will invoke the corresponding Workflow/WorkflowState/RPC code when being called by iWF server:

The Java interface has default implementation of all methods. So you can skip if you don't need any of them. For example, if a workflow doesn't need persistence, then just skip the persistenceSchema.

An example of Java workflow definition:

public class UserSignupWorkflow implements ObjectWorkflow {

    public static final String DA_FORM = "Form";

    public static final String DA_Status = "Status";
    public static final String VERIFY_CHANNEL = "Verify";

    private MyDependencyService myService;

    public UserSignupWorkflow(MyDependencyService myService) {
        this.myService = myService;
    }

    @Override
    public List<StateDef> getWorkflowStates() {
        return Arrays.asList(
                StateDef.startingState(new SubmitState(myService)),
                StateDef.nonStartingState(new VerifyState(myService))
        );
    }

    @Override
    public List<PersistenceFieldDef> getPersistenceSchema() {
        return Arrays.asList(
                DataAttributeDef.create(SignupForm.class, DA_FORM),
                DataAttributeDef.create(String.class, DA_Status)
        );
    }

    @Override
    public List<CommunicationMethodDef> getCommunicationSchema() {
        return Arrays.asList(
                InternalChannelDef.create(Void.class, VERIFY_CHANNEL)
        );
    }

    // Atomically read/write/send message in RPC
    @RPC
    public String verify(Context context, Persistence persistence, Communication communication) {
        String status = persistence.getDataAttribute(DA_Status, String.class);
        if (status == "verified") {
            return "already verified";
        }
        persistence.setDataAttribute(DA_Status, "verified");
        communication.publishInternalChannel(VERIFY_CHANNEL, null);
        return "done";
    }
}

The Python base class has default implementation of all methods. So you can skip if you don't need any of them. For example, if a workflow doesn't need persistence, then just skip the persistenceSchema.

Example in Python:

class UserSignupWorkflow(ObjectWorkflow):
    def get_workflow_states(self) -> StateSchema:
        return StateSchema.with_starting_state(SubmitState(), VerifyState())

    def get_persistence_schema(self) -> PersistenceSchema:
        return PersistenceSchema.create(
            PersistenceField.data_attribute_def(data_attribute_form, Form),
            PersistenceField.data_attribute_def(data_attribute_status, str),
            PersistenceField.data_attribute_def(data_attribute_verified_source, str),
        )

    def get_communication_schema(self) -> CommunicationSchema:
        return CommunicationSchema.create(
            CommunicationMethod.internal_channel_def(verify_channel, None)
        )

    @rpc()
    def verify(
            self, source: str, persistence: Persistence, communication: Communication
    ) -> str:
        status = persistence.get_data_attribute(data_attribute_status)
        if status == "verified":
            return "already verified"
        persistence.set_data_attribute(data_attribute_status, "verified")
        persistence.set_data_attribute(data_attribute_verified_source, source)
        communication.publish_to_internal_channel(verify_channel)
        return "done"

Golang interface doesn't have default method implementation. So to make it "skippable", you just need to add the default implementation iwf.DefaultWorkflowType of all:

type MyWorkflow struct {
	iwf.DefaultWorkflowType
}

Also, Golang doesn't have equivalence to Java's annotation or Python's decorator. An RPC must be registered under CommunicationSchema.

This is an example of a Golang workflow definition:

type OrchestrationWorkflow struct {
	iwf.DefaultWorkflowType

	svc service.MyService
}

func (e OrchestrationWorkflow) GetWorkflowStates() []iwf.StateDef {
	return []iwf.StateDef{
		iwf.StartingStateDef(NewState1(e.svc)),
		iwf.NonStartingStateDef(NewState2(e.svc)),
		iwf.NonStartingStateDef(NewState3(e.svc)),
		iwf.NonStartingStateDef(NewState4(e.svc)),
	}
}

func (e OrchestrationWorkflow) GetPersistenceSchema() []iwf.PersistenceFieldDef {
	return []iwf.PersistenceFieldDef{
		iwf.DataAttributeDef(keyData),
	}
}

func (e OrchestrationWorkflow) GetCommunicationSchema() []iwf.CommunicationMethodDef {
	return []iwf.CommunicationMethodDef{
		iwf.SignalChannelDef(SignalChannelReady),

		iwf.RPCMethodDef(e.Swap, nil),
	}
}