Azure Service Bus and Logic App integration Pattern using PeekLock

This blog entry describes the best practice for reading a message from Azure Service Bus in Logic App using Peek Lock Mechanism

Azure Integration: Azure Service Bus

The central capability of a message broker such as Service Bus is to accept messages into a queue or topic and hold them available for later retrieval. When the broker transfers a message to a client, the broker and client want to establish an understanding of whether the message has been successfully processed and can therefore be removed, or whether the message delivery or processing failed, and thus the message might have to be delivered again. There are two types of delivery

  • ReceiveAndDelete
  • Peek-Lock Message

The Receive-and-Delete mode tells the broker to consider all messages it sends to the receiving client as settled when sent. That means that the message is considered consumed as soon as the broker has put it onto the wire. If the message transfer fails, the message is lost. So this is not desirable for our use case, hence we will concentrate on the PeekLock option.

Peek-Lock (Non-Destructive Read) using Logic App

Peek Lock azure integration pattern for Logic App to handle Message Locks

Logic App atomically retrieves and locks a message from a queue or subscription for processing. The message is now guaranteed not to be delivered to other receivers during the lock duration specified in the queue/subscription description. When the lock expires, the message becomes available to other receivers. In order to complete processing of the message, the receiver should issue a delete command with the lock ID received from this operation. To abandon processing of the message and unlock it for other receivers, an Unlock Message command should be issued, otherwise the lock duration period can expire. This operation should be used in applications that require At-Least-Once delivery assurances. If the receiver does not delete the message before processing succeeds, this ensures that another receiver is able to attempt processing after the lock duration period expires.

In Logic App we will use a variable to monitor the status of the Message. The message would have the following status. Then based on the message state, we need to perform the related actions (Handle Message Lock loop in the below image is the one performing these actions)

  • Processing: If the processing of the message takes a longer duration, then message lock needs to be renewed by Logic App. The default duration is 60 seconds.
  • Processed: Which means the message has been successfully processed by Logic App and then we need to inform the Service bus to complete the message on the queue
  • Error: Which means the message couldn’t be processed, hence we need to inform the ASB to abandon the message
Peek Lock azure integration pattern for Logic App to handle Message Locks

When Message is processed successfully (i.e. end of the Try Processing the Message) block, we will set the status of the message state to processed. In Catch the Exception block, we will set the MessageState to Error. Please note that there is a link between Handle Message Lock and the rest of the operation. They coexist. We will use Until loop action from Logic App to monitor the message state and perform the relevant action. The loop ends when the message state is either Done[Processed] or Terminate[Error]

Peek Lock azure integration pattern for Logic App to handle Message Locks

The following are the JSON for Logic App to Handle the lock correctly.

{
    "definition": {
        "$schema": "https://schema.management.azure.com/providers/Microsoft.Logic/schemas/2016-06-01/workflowdefinition.json#",
        "actions": {
            "Finally": {
                "actions": {
                    "Condition": {
                        "actions": {
                            "Terminate": {
                                "inputs": {
                                    "runError": {
                                        "code": "500",
                                        "message": "Message processing encoutered an Error!"
                                    },
                                    "runStatus": "Failed"
                                },
                                "runAfter": {},
                                "type": "Terminate"
                            }
                        },
                        "expression": {
                            "and": [
                                {
                                    "equals": [
                                        "@variables('MessageState')",
                                        "Terminate"
                                    ]
                                }
                            ]
                        },
                        "runAfter": {},
                        "type": "If"
                    }
                },
                "runAfter": {
                    "Process_the_Message": [
                        "Succeeded"
                    ]
                },
                "type": "Scope"
            },
            "Initialize_MessageState": {
                "inputs": {
                    "variables": [
                        {
                            "name": "MessageState",
                            "type": "string",
                            "value": "Processing"
                        }
                    ]
                },
                "runAfter": {},
                "type": "InitializeVariable"
            },
            "Process_the_Message": {
                "actions": {
                    "Catch_the_Exception": {
                        "actions": {
                            "Set_variable": {
                                "inputs": {
                                    "name": "MessageState",
                                    "value": "Error"
                                },
                                "runAfter": {},
                                "type": "SetVariable"
                            }
                        },
                        "runAfter": {
                            "Try_processing_the_Message": [
                                "Failed"
                            ]
                        },
                        "type": "Scope"
                    },
                    "Handle_Message_Lock": {
                        "actions": {
                            "Switch": {
                                "cases": {
                                    "Abandon_the_Message": {
                                        "actions": {
                                            "Abandon_the_message_in_a_queue": {
                                                "inputs": {
                                                    "host": {
                                                        "connection": {
                                                            "name": "@parameters('$connections')['servicebus']['connectionId']"
                                                        }
                                                    },
                                                    "method": "post",
                                                    "path": "/@{encodeURIComponent(encodeURIComponent(parameters('WebFromCEQueuName')))}/messages/abandon",
                                                    "queries": {
                                                        "lockToken": "@triggerBody()?['LockToken']",
                                                        "queueType": "Main",
                                                        "sessionId": ""
                                                    }
                                                },
                                                "runAfter": {},
                                                "type": "ApiConnection"
                                            },
                                            "Set_the_MessageState_to_Error": {
                                                "inputs": {
                                                    "name": "MessageState",
                                                    "value": "Terminate"
                                                },
                                                "runAfter": {
                                                    "Abandon_the_message_in_a_queue": [
                                                        "Succeeded"
                                                    ]
                                                },
                                                "type": "SetVariable"
                                            }
                                        },
                                        "case": "Error"
                                    },
                                    "Complete_the_Message": {
                                        "actions": {
                                            "Complete_the_message_in_a_queue": {
                                                "inputs": {
                                                    "host": {
                                                        "connection": {
                                                            "name": "@parameters('$connections')['servicebus']['connectionId']"
                                                        }
                                                    },
                                                    "method": "delete",
                                                    "path": "/@{encodeURIComponent(encodeURIComponent(parameters('WebFromCEQueuName')))}/messages/complete",
                                                    "queries": {
                                                        "lockToken": "@triggerBody()?['LockToken']",
                                                        "queueType": "Main",
                                                        "sessionId": ""
                                                    }
                                                },
                                                "runAfter": {},
                                                "type": "ApiConnection"
                                            },
                                            "Set_the_Message_State_to_Done": {
                                                "inputs": {
                                                    "name": "MessageState",
                                                    "value": "Done"
                                                },
                                                "runAfter": {
                                                    "Complete_the_message_in_a_queue": [
                                                        "Succeeded"
                                                    ]
                                                },
                                                "type": "SetVariable"
                                            }
                                        },
                                        "case": "Processed"
                                    },
                                    "Renew_the_Message_lock": {
                                        "actions": {
                                            "Renew_lock_on_the_message_in_a_queue": {
                                                "inputs": {
                                                    "host": {
                                                        "connection": {
                                                            "name": "@parameters('$connections')['servicebus']['connectionId']"
                                                        }
                                                    },
                                                    "method": "post",
                                                    "path": "/@{encodeURIComponent(encodeURIComponent(parameters('WebFromCEQueuName')))}/messages/renewlock",
                                                    "queries": {
                                                        "lockToken": "@triggerBody()?['LockToken']",
                                                        "queueType": "Main"
                                                    }
                                                },
                                                "runAfter": {},
                                                "type": "ApiConnection"
                                            },
                                            "Wait_half_the_standard_queue_lock_time": {
                                                "inputs": {
                                                    "interval": {
                                                        "count": 30,
                                                        "unit": "Second"
                                                    }
                                                },
                                                "runAfter": {
                                                    "Renew_lock_on_the_message_in_a_queue": [
                                                        "Succeeded"
                                                    ]
                                                },
                                                "type": "Wait"
                                            }
                                        },
                                        "case": "Processing"
                                    }
                                },
                                "default": {
                                    "actions": {}
                                },
                                "expression": "@variables('MessageState')",
                                "runAfter": {},
                                "type": "Switch"
                            }
                        },
                        "expression": "@or(equals(variables('MessageState'), 'Done'),equals(variables('MessageState'), 'Terminate'))",
                        "limit": {
                            "count": 60,
                            "timeout": "PT1H"
                        },
                        "runAfter": {},
                        "type": "Until"
                    },
                    "Try_processing_the_Message": {
                        "actions": {
                            "Set_the_Message_State_to_Processed": {
                                "inputs": {
                                    "name": "MessageState",
                                    "value": "Processed"
                                },
                                "runAfter": {},
                                "type": "SetVariable"
                            }
                        },
                        "runAfter": {},
                        "type": "Scope"
                    }
                },
                "runAfter": {
                    "Initialize_MessageState": [
                        "Succeeded"
                    ]
                },
                "type": "Scope"
            }
        },
        "contentVersion": "1.0.0.0",
        "outputs": {},
        "parameters": {
            "$connections": {
                "defaultValue": {},
                "type": "Object"
            },
            "WebFromCEQueuName": {
                "defaultValue": "webform-ce-queue-dev",
                "type": "String"
            }
        },
        "triggers": {
            "When_a_message_is_received_in_a_queue_(peek-lock)": {
                "inputs": {
                    "host": {
                        "connection": {
                            "name": "@parameters('$connections')['servicebus']['connectionId']"
                        }
                    },
                    "method": "get",
                    "path": "/@{encodeURIComponent(encodeURIComponent(parameters('WebFromCEQueuName')))}/messages/head/peek",
                    "queries": {
                        "queueType": "Main",
                        "sessionId": "None"
                    }
                },
                "recurrence": {
                    "frequency": "Minute",
                    "interval": 3
                },
                "type": "ApiConnection"
            }
        }
    },
    "parameters": {
        "$connections": {
            "value": {
                "servicebus": {
                    "connectionId": "/subscriptions/SubscriptionID/resourceGroups/RG/providers/Microsoft.Web/connections/servicebus",
                    "connectionName": "servicebus",
                    "id": "/subscriptions/SubscriptionID/providers/Microsoft.Web/locations/westeurope/managedApis/servicebus"
                }
            }
        }
    }
}

Published by Poojith Jain

Poojith Jain is an Azure Architect with good experience with software design and development. He has a thorough knowledge of Azure Integration and he is passionate about solving complex and challenging problems in the field of Azure

2 thoughts on “Azure Service Bus and Logic App integration Pattern using PeekLock

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.

%d bloggers like this: