Process Monitoring With Dgraph Cloud and GraphQL

Modern business processes are often distributed across multiple enterprise applications, and keeping track of a particular business process can be a challenge. In this blog, you will review these challenges and implement a GraphQL-based solution. Learn how you can leverage Dgraph components, such as GraphQL API and Lambdas, to implement a coherent solution for your process monitoring needs.

Dgraph Cloud provides an easy-to-use interface for authoring the schema as well as running queries and mutations. You can begin with the steps mentioned in the Quickstart.

Process Monitoring in the Modern World

Process Monitoring in the modern world

Modern business processes such as those related to Order to Cash, Procure to Pay and others tend to run across several enterprise applications. These applications may be deployed on premise or deployed on cloud, and each application is specialized by its domain. For example, order management, shipping, and payment may each be performed in completely independent enterprise applications.
A sophisticated web of middleware components does the grunt work of synchronizing the business process. Customers also demand integration via a variety of customized mobile apps as well as generic channels such as Facebook Messenger, WhatsApp or WeChat. It is a challenge to maintain visibility of these distributed processes on a timely basis. Delayed visibility results in poor customer experience and unnecessary intervention through intermediaries, such as agents in call centers, etc.

A modern process monitoring solution needs to easily integrate with events and messages coming in from distributed enterprise applications. In this blog, you will apply Dgraph Cloud’s GraphQL capabilities, as well as Dgraph Lambdas, to store, inspect, and create incidents.

Challenges with Process Monitoring

Challenges

A simple approach to creating a process monitoring solution would be to route all your important logs, events, and message payloads from respective applications into a common storage layer and then use this layer to drive queries. However, there are several challenges involved in this process.

  • The messages coming from various apps are never in a common structure. These structures may be based on fixed-width text, XML, or JSON.
  • The content itself may not be conducive to consistent interpretation. For example, the customer name may never appear in a commonly-agreed field.
  • The values themselves may not be standard. For example for a currency code, some apps may send a 3-digit ISO code, while some may send the text content.
  • In the context of a particular process, the keys, such as an order number, may not be the same across processes.

In order to build an effective monitoring solution that addresses the challenges mentioned above, you will need to involve team members from these distributed apps and find a way to incorporate their knowledge about structure, content, interpretations, etc. in your solution. You could consider the methodology explained in this blog to use a GraphQL API running on Dgraph Cloud GraphQL for a coordinated solution to this problem.

Process Monitoring on Dgraph Cloud

Process Monitoring with GraphQL Cloud

You can leverage GraphQL with Dgraph Cloud to build a robust process monitoring on Dgraph. The solution will consist of two parts, a GraphQL API for message ingestion and a message processing workflow for coherently processing messages. The GraphQL API will serve as a standard API for all consumers to interact with the solution. GraphQL allows you to define the interface based on the message content. This will allow you to tune your API to the needs of the participants of this solution. For parts of the enterprise that only provide unstructured content (such as application logs), your API interface will be a simple string, while for other parts that expect fully structured content (such as an incident), you can serve up an appropriately structured API interface. The message processing workflow will focus on the message content and will be based on Dgraph Cloud. The workflow will consist of small discrete steps and will implement these steps using Lambdas on Dgraph Cloud. You will leverage Dgraph Cloud Lambdas to accomplish different tasks in these steps, such as extract structured content from the messages, validate and signal errors, make external calls for any message enrichment needs as well as transform the data and store it.

The Dgraph Cloud-based message processing workflow will produce a graph that will provide near real-time and correlated visibility over your entire enterprise. You can use this graph to drive incident creation (ticketing) as well as extract trends for analytics.

A less understood aspect is that many modern apps and middleware have sophisticated operating capabilities. For example, modern cloud components such as queuing engines are capable of automatically managing issues if the right levers of throttling (https://camel.apache.org/components/3.4.x/eips/throttle-eip.html) or scaling are engaged. This implies that modern business process monitoring solutions need to treat these levers as first-class digital citizens. You can choose to connect levers such as throttling, scaling, etc. available in the enterprise directly to this graph. This creates a feedback loop where-in the graph can be used to measure and tune the effects of engaging these control levers. For example, you can answer questions like, “Did the throttling applied on the middleware help in reducing the end-to-end processing time on the backend DB?”.

Finally, you can use the Dgraph Cloud GraphQL API to drive these visibility and feedback-related processes. You do not need to take any overhead for provisioning an API for your monitoring teams as Dgraph Cloud will be providing a GraphQL API for you out of the box.

The Role of Dgraph Cloud Lambdas and JavaScript

You can think of Dgraph Cloud Lambdas as plug points where you can author functions using JavaScript. Additionally, Dgraph Cloud Lambdas make it easy for you to interact with GraphQL, DQL, as well as make external HTTP calls. JavaScript is a fairly easy-to-use language and allows federated operations; different teams can come in and author their respective lambdas, freeing you up to focus on other critical parts of the solution.

GraphQL Schema For Process Monitoring

In the simplified example in this blog, you are storing, evaluating, and correlating events and messages related to an Order to Cash process. You can model the messages and events in the enterprise as a type Event. You can start with a relaxed model with just the source application and the payload of the event. As events arrive, you will use ExtractProcess, a lambda to correlate the messages and connect to a type Process. You will store the correlation key, in this case the order id, in the process type. Finally, any error detected via the lambda MessageCheck will result in the creation of an incident. A specific type, Incident, is used to store these incidents.

type Event{
  id: ID!
  applicationName: String! @search
  payload: String
  process: Process @hasInverse(field:associatedEvents)
}

type Process{
  id: ID!
  orderId: String @search(by: [exact])
  associatedEvents : [Event]
}

type Incident{
  id: ID!
  description: String!
  linkedEvent: Event
}

type Mutation {
  ExtractProcess(eventID: String): String @lambda
  MessageCheck(eventID: String) : String @lambda
}

Message Processing Workflow

The lambda functions need to be called in a certain sequence (ExtractProcess, then MessageCheck, and perhaps any other cleanup steps). You will be stepping through these steps manually in this example. You can use any simple orchestration engine such as Apache Camel to automate this.

The ExtractProcess receives an event id argument and looks it up via a GraphQL query. You will implement any custom parsing and correlation required in this lambda. In this example, the messages are simple text with comma separation. The first field is an order id, while the second field is a text message. You will extract the order id and then check if a process already exists that is related to this order id. If found, you will link this event to the process object, otherwise, you will create a new process and link the event.

The MessageCheck lambda also receives an event id. You can write all checks required for process monitoring in this lambda. Javascript has good support for various string operations and is a good choice to implement such checks. For advanced use cases, you can leverage the underlying Dgraph DQL functions, including those related to fuzzy text searches, geospatial functions, aggregates, shortest path functions, and others. In this example, you will write a simple check that checks that a customer’s mobile number is absent. If the mobile number is indeed absent, you will create an Incident using the GraphQL API. This is the end of the message processing workflow in this example.


async function ExtractProcess({args, graphql}) {
  console.log(args)
  
  const getEventData = await graphql(`query getEventData($eventId: [ID!]) {
    queryEvent(filter: {id: $eventId}) {
      id
      applicationName
      payload
        }
    }`,
    {  "eventId": [args.eventID] }
    )
  var txt="processed successfully"
  txt=getEventData.data.queryEvent[0].payload
  payload=getEventData.data.queryEvent[0].payload
  
  payloadData=payload.split(",")
  var orderId=payloadData[0]
  var message=payloadData[1]
  var applicationName=getEventData.data.queryEvent[0].applicationName
  
  console.log(orderId)
  console.log(message)
  console.log(applicationName)
  var processName=""

  if(applicationName == "Sales" && message.includes("order received")){
    processName = "SalesOrderProcess"
  }
  console.log(processName)
  
  
  const getLinkedProcess = await graphql(`query getProcess($orderId: String) {
      queryProcess(filter: {orderId: {eq: $orderId}}){
      id
    }
  }`,
  {  "orderId": orderId }
    )
  console.log(getLinkedProcess.data.queryProcess[0])
  
  if(typeof getLinkedProcess.data.queryProcess[0] === 'undefined'){
    console.log("linked process does not exist")
    
    const results = await graphql(`mutation linkProcess($eventId: [ID!], $orderId: String){
      updateEvent(input : {filter: {id : $eventId}, set: {
      process: {
        orderId: $orderId
      }
    }}){
        event{
          process{
            id
          }
        }
      }
    }`, 
    {"eventId": [args.eventID], "orderId": orderId})
    
    console.log(results)
  }else{
    console.log("linking to existing process")
    console.log(getLinkedProcess.data.queryProcess[0].id)
        const results = await graphql(`mutation linkToEventExistingProcess($processId:ID!, $eventId:ID!){
      updateEvent(input:{filter:{id:[$eventId]},set:{process:{id:$processId}}} ){
        numUids
      }
    }`, 
    {"processId": getLinkedProcess.data.queryProcess[0].id, "eventId": args.eventID})
  }
 
  return txt
}


async function MessageCheck({args, graphql}) {
  console.log(args)
  var txt = "Message Checks Processed"
  const getEventData = await graphql(`query getEventData($eventId: [ID!]) {
    queryEvent(filter: {id: $eventId}) {
      id
      applicationName
      payload
    }
  }`,
  {  "eventId": [args.eventID] }
    )
  var txt="ok"
  txt=getEventData.data.queryEvent[0].payload
  payload=getEventData.data.queryEvent[0].payload
  
  payloadData=payload.split(",")
  var orderId=payloadData[0]
  var message=payloadData[1]
  var applicationName=getEventData.data.queryEvent[0].applicationName

  if(applicationName == "LogisticsPartner" && message.includes("Mobile number of customer is absent")){
    processName = "LogisticsPartner"
    txt = "Customer's mobile number is missing. Please update."
    // raise incident
    const results = await graphql(`mutation raiseIncident($eventId: ID!, $description: String!){
      addIncident(input:{description: $description,linkedEvent: {id: $eventId}}){
        incident{
          id
        }
      }
    }`, 
    {"eventId": args.eventID, "description": "Customer's mobile number is missing. Please update."}
    )
  }
  console.log(processName)
  
  return message
}

self.addGraphQLResolvers({
    "Mutation.ExtractProcess": ExtractProcess
})

self.addGraphQLResolvers({
    "Mutation.MessageCheck": MessageCheck
})

You can deploy the above schema and lambda in your Dgraph Cloud instance. You can then begin testing out this simple monitoring solution.

Step-by-Step walkthrough

First, you will generate an event. Let’s assume that you received a message from your sales application. This event or message will contain the application name (Sales) and a payload. In the payload, you will pass a comma-separated string with the order id and the text message from the sales application.

mutation addSalesEvent{
  addEvent(input: {applicationName: "Sales",payload:"123, Sales order received"}){
    event{
      id
    }
  }
}

You can connect this event to a process instance by calling the ExtractProcess lambda. As explained previously, this lambda is responsible for correlating the incoming event to a process and it uses an order id as the correlation key. This process will become the reference point to all future events related to this particular order id. Let’s assume that the id of the event is 0x3e.

mutation ExtractProcess {
  ExtractProcess(eventID: "0x3e")
}

Extracting a Process to Monitor

In the screenshot above, you can observe that the order is extracted and attached to a process object. The correlation key, order id, is now stored in the type Process. Let’s conclude the message processing workflow for this sales event at this junction.

Now let’s assume that the order is being processed, and is now being delivered through a logistics partner. At this point, you might receive a message that the mobile number of the receiving partner is not available in the order. This is worthy of raising an incident. You will now generate this logistics partner-related event. Let’s assume that this event is created with an id 0x40.

mutation addLogisticsEvent{
  addEvent(input: {applicationName: "LogisticsPartner",payload:"123, Mobile number of customer is absent"}){
    event{
      id
    }
  }
}

As previously done, you will call the ExtractProcess lambda to connect to a Process. In this case, since a process object already exists, the newly created logistics partner event will connect with this existing process object.

mutation ExtractProcess {
  ExtractProcess(eventID: "0x40")
}

Correlating and Linking

We will now do a message check. This message check will observe that the payload of the event contains the text “Mobile number of customer is Absent “, and will raise an incident on this.

mutation MessageCheck {
  MessageCheck(eventID: "0x40")
}

Incident Creation

You can query for created incidents, and you will find an incident created in Dgraph as above. This can be queried by middleware or other tools and an actual ticket can be created.

Conclusion

In this blog, you walked through a simplified example of process monitoring. You leveraged the GraphQL API to describe events, messages, processes as well as incidents. You also used lambdas to enforce message checks using simple JavaScript functions. Finally, you created incident objects that can in turn be used to create actual incidents in your enterprise.

You can review some additional blogs for related information. This blog on Entity Resolution will help you deal with advanced correlation needs. This blog will help you build customized experiences for agents working on incidents using Domain Driven Design (DDD) techniques.

Interested in getting started with Dgraph? You can get started here. Please say hello to our community here; we will be glad to help you in your journey!