Incapture Technologies

Inside the Cloud

Incapture Technologies Blog

 

Pipeline and Workflow

Published:
November 6, 2014
Author:

This blog post covers two related areas in Rapture – Pipelines and Workflows. A pipeline in Rapture is an abstraction around a message queue to coordinate activity both within a Rapture platform and to external systems connected to Rapture through a queue. A pipeline uses common concepts such as an “exchange” and a “queue” to manage the various topologies needed. A workflow is a set of steps, which can be in the form of a decision tree, that can be executed in a distributed Rapture environment. Workflows use pipelines for coordination. Each step can be either a custom piece of Java code or a Reflex script, and context is collected and passed to each step as a workflow executes.

Pipeline

A pipeline in Rapture isolates the underlying implementation of a messaging system from the API used to interact with it.

Domain

At the top level of this isolation is the concept of an exchange “domain” – a named definition of an implementation of a messaging system. Rapture on startup as (in its configuration) the concept of a “standard” domain – one that Rapture uses internally for its messaging – but developers can use the pipeline API to create others.

The format of a domain is shown in the sample call to setup an exchange domain:

#pipeline.registerExchangeDomain("test", "EXCHANGE {} USING RABBITMQ {}");

The configuration has the keyword “EXCHANGE” followed by an implementation – in this case we are using RabbitMQ as the underlying messaging system.

Exchange

Within a domain there is a concept of an exchange – a common feature of message implementations, being a switching point for routing inbound messages on a queue to one or a set of outbound queues. The call to define an exchange takes a more complex data structure that defines the name of the exchange and its behavior. The behavior could be one of “DIRECT” (a direct connection between an inbound queue and a given outbound queue) or “FANOUT” (a message on an inbound queue is sent to all outbound queues). An example piece of code setting up an exchange is shown below:

       String domainName = "test";
       String exchangeName = "kernel";
       RaptureExchange exchange = new RaptureExchange();
       exchange.setName(exchangeName);
       exchange.setDomain(domain);
       exchange.setExchangeType(RaptureExchangeType.FANOUT);

       pipelineApi.registerPipelineExchange(exchangeName, exchange);

Tasks and categories

For internal message coordination on an exchange, Rapture has the concept of a “category” – a server registers itself as a listener on a category by binding to an exchange with that named category. In this way servers providing a common set of tasks can all share the same category and when tasks are published on that category each server (in the case of FANOUT) or one server (in the case of DIRECT) will receive the task to execute. This category concept is reused in the Workflow implementation described later.

A task that can be published to a category contains both a mime type and a specific set of mime types have very specific meanings within Rapture. The code below shows an example of using such a task publication:

       MimeReflexScript reflexScript = new MimeReflexScript();
        reflexScript.setReflexScript("println('Hello from the Reflex Script');");
        RapturePipelineTask task2 = new RapturePipelineTask();
        task2.addMimeObject(reflexScript);

        task2.setPriority(1);
        List categories = new ArrayList();
        categories.add("blue");
        task2.setCategoryList(categories);
        task2.setContentType("application/vnd.rapture.reflex.script");
        pipelineApi.publishPipelineMessage(exchangeName, task2);

In this example the mime task defines an actual embedded Reflex script that should be executed on a server that is a member of the category provided.

A developer can use their own tasks as long as the receiving server can understand the content. Rapture has a number of pre-built tasks such as running a Reflex script, sending a message on a queue or starting a workflow. These are simply formats for the message content and a part of Rapture that listens for these messages and acts upon them.

Workflows

Workflows in Rapture are constructs that define a set of tasks that need to be performed in some order. The workflow can branch and rejoin – the choice of branch to take at any given point can be determined by the return “state” of a task of the workflow.

Each task in a workflow can be one of two things – a Reflex script that can simply be run and whose return value determines what to do next in the workflow or an explicit piece of java code that is invoked to execute the task.

Switching from one task to another in a workflow is handled by Rapture sending a specific mime message on an internal pipeline. Any Rapture application that can take part in a workflow could potentially receive the message and execute the task, with Rapture handling the activity involved in choosing the next step to execute. Furthermore each task can be associated with a category and only Rapture servers that are members of that category are eligible to receive the message to execute the task. In this way custom Rapture servers containing custom Java code can be placed into a category with the guarantee that they are able to execute those tasks.

Workflows can contain state that is passed (and can be updated) by each task in the workflow. It’s a convenient way for context to be passed along the workflow as it runs. Finally workflows can be initiated using a specific API for that purpose or attached to an event (such as writing a document to a repository).

As a final example, consider the following workflow that performs some checking on a trading order:

Workflow

In this example via some means we have a trading order document saved in Rapture. Rapture automatically fires a data update event and in this case we have attached a workflow to that data update. The original document id and content of the document saved forms part of the context of this running workflow.

In our example the tasks in green (the early tasks) are simple Reflex scripts that are analyzing the content of the order, perhaps looking up other documents (limits by trader by asset class) and making a decision based on that information. Other scripts decide whether the order can be automatically sent to Bloomberg. The last step in the workflow (colored differently) is a specific piece of custom Java code (a service if you like) that formats the order and sends it to Bloomberg.

Workflows in Rapture can be simple and quick (running very often and in milliseconds) or perhaps more complex and slow (an overnight batch style job running for hours). Rapture installations have used them to perform complex data capture from remote sources, for running quantitive research processes and for discrete position and attribution updates in real time.

Finally for a complex system with many workflows running simultaneously there are APIs in Rapture to manage the activity and status of workflows and these are often pulled together into UIs for operational purposes. The screenshot below gives an example of one of the common displays in use within Incapture.

Screen Shot 2014-11-06 at 8.43.09 PM


Subscribe for updates