适用于:Azure 逻辑应用(消耗)
如果需要按特定顺序发送相关消息,则可在通过
Azure 服务总线连接器
使用
Azure 逻辑应用
时按
“顺序保护”模式
操作。 相关消息有一个用于定义这些消息之间的关系的属性,例如服务总线中的
会话
的 ID。
例如,假设你有 10 条消息用于名为“会话 1”的会话,有 5 条消息用于名为“会话 2”的会话,且所有这些消息都发送到同一
服务总线队列
。 你可以创建一个逻辑应用,用于处理来自队列的消息,使来自“会话 1”的所有消息都由单个触发器运行处理,使来自“会话 2”的所有消息则都由下一个触发器运行处理。
本文介绍了如何通过“使用服务总线会话按顺序传送相关消息”模板来创建实现此模式的逻辑应用。 此模板定义了一个逻辑应用工作流,该工作流从服务总线连接器的“队列中收到消息时(扫视锁定)”触发器开始,而该触发器从
服务总线队列
接收消息。 下面是此逻辑应用执行的大致步骤:
根据触发器从服务总线队列读取的消息初始化会话。
在当前工作流运行期间,读取并处理来自队列中同一会话的所有消息。
若要查看此模板的 JSON 文件,请参阅
GitHub:service-bus-sessions.json
。
有关详细信息,请参阅
顺序保护模式 - Azure 体系结构云设计模式
。
Azure 订阅。 如果没有订阅,可以
注册免费的 Azure 帐户
。
服务总线命名空间和
服务总线队列
,后者是将在逻辑应用中使用的消息传送实体。 这些项和你的逻辑应用需使用同一 Azure 订阅。 在创建队列时,请确保选择“启用会话”。 如果没有这些项,请了解
如何创建服务总线命名空间和队列
。
当你同时选择具有相同连接器类型的触发器和操作并使用它们处理相同的实体(如消息队列或主题订阅)时,请务必谨慎。
这种组合可以创建无限循环,从而导致逻辑应用永不结束。
有关如何创建逻辑应用的基础知识。 如果你不熟悉 Azure 逻辑应用,请尝试
在多租户 Azure 逻辑应用中创建示例消耗逻辑应用工作流的快速入门
。
检查对服务总线命名空间的访问权限
如果不确定逻辑应用是否有权访问服务总线命名空间,请确认这些权限。
登录到
Azure 门户
。 找到并选择你的服务总线命名空间。
在命名空间菜单上的“设置”下,选择“共享访问策略”。 在“声明”下,检查你是否有该命名空间的“管理”权限。
现在,获取你的服务总线命名空间的连接字符串。 稍后在创建从逻辑应用到命名空间的连接时可以使用此字符串。
在“共享访问策略”窗格中,在“策略”下,选择“RootManageSharedAccessKey” 。
在主连接字符串旁边选择复制按钮。 保存连接字符串供以后使用。
创建逻辑应用
在此部分中,你将使用“使用服务总线会话按顺序传送相关消息”模板来创建逻辑应用,该模板包括用于实现此工作流模式的触发器和操作。 另外,请创建一个到服务总线命名空间的连接,并指定要使用的服务总线队列的名称。
在
Azure 门户
中,创建一个空白逻辑应用。 在 Azure 主页中,选择“创建资源”>“集成”>“逻辑应用”。
在模板库出现后,滚动经过视频和常用触发器部分。 从“模板”部分中,选择“使用服务总线会话按顺序传送相关消息”模板。
在确认框出现后,选择“使用此模板”。
在逻辑应用设计器中,在“服务总线”形状中,选择“继续”,然后选择形状中显示的加号 (
+
)。
现在,通过选择以下任一选项来创建服务总线连接:
若要使用之前从服务总线命名空间复制的连接字符串,请执行以下步骤:
选择“手动输入连接信息”。
对于“连接名称”,请为连接提供一个名称。 对于“连接字符串”,请粘贴你的命名空间连接字符串,然后选择“创建”,例如:
如果你没有此连接字符串,请了解如何
查找并复制服务总线命名空间连接字符串
。
完成后,选择“继续”。
逻辑应用设计器现在会显示“使用服务总线会话按顺序传送相关消息”模板,该模板包含一个预先填充了触发器和操作的工作流,其中包括的两个作用域实现了遵循
Try-Catch
模式的错误处理。
现在,你可以了解有关模板中的触发器和操作的详细信息,也可以直接跳到
为逻辑应用模板提供值
。
下面是折叠详细信息后,“使用服务总线会话按顺序传送相关消息”模板中的大致工作流:
When a message is received in a queue (peek-lock)
此服务总线触发器会根据指定的重复周期检查指定的服务总线队列中是否有任何消息。 如果队列中存在消息,则触发器将触发,这将创建并运行一个工作流实例。
“扫视锁定”这一术语是指触发器发送从队列中检索消息的请求。 如果有消息存在,则触发器会检索并锁定该消息,防止在锁定期限到期之前对该消息进行其他处理。 有关详细信息,请参阅
初始化会话
。
Init isDone
此
初始化变量
操作
创建一个设置为
false
的布尔变量,并指示何时满足以下条件:
- 会话中没有剩余可供读取的消息。
- 不再需要续订会话锁,就能完成当前工作流实例。
有关详细信息,请参阅
初始化会话
。
此
“作用域”操作
包含处理消息时要运行的操作。 如果
Try
范围中出现问题,则后续的
Catch
范围操作会处理该问题。 有关详细信息,请参阅
“Try”作用域
。
Catch
此
“作用域”操作
包含在上述
Try
作用域中出现问题时将运行的操作。 有关详细信息,请参阅
“Catch”作用域
。
Send initial message to topic
可以将此操作替换对来自队列中会话的第一条消息进行处理时需要执行的任何操作。 会话 ID 用于指定会话。
对于此模板,服务总线操作会将第一条消息发送到一个服务总线主题。 有关详细信息,请参阅
处理初始消息
。
(并行分支)
此
并行分支操作
创建两个路径:
- 分支 1:继续处理消息。 有关详细信息,请参阅
分支 1:完成队列中的初始消息
。
- 分支 2:如果出现问题,则放弃该消息,将其释放,供另一个触发器运行来拾取。 有关详细信息,请参阅
分支 2:放弃队列中的初始消息
。
这两个路径稍后会在“关闭队列中的会话并成功”操作中汇合,详见下一行中的说明。
Close a session in a queue and succeed
此服务总线操作将前面所述的分支汇合,并在发生以下事件之一后关闭队列中的会话:
- 工作流完成了对队列中可用消息的处理。
- 工作流因为某个错误放弃了初始消息。
有关详细信息,请参阅
关闭队列中的会话并成功
。
Complete initial message in queue
此服务总线操作会将成功检索的消息标记为完成,然后从队列中删除该消息,防止重新处理。 有关详细信息,请参阅
处理初始消息
。
While there are more messages for the session in the queue
此
Until
循环
会在有消息存在时或一小时过后继续获取消息。 有关此循环中的操作的详细信息,请参阅
当队列中的会话存在剩余消息时
。
Set isDone = true
当不存在剩余消息时,此
“设置变量”操作
会将
isDone
设置为
true
。
Renew session lock until cancelled
此
Until
循环
可确保此逻辑应用在有消息存在时或一小时过后持有会话锁。 有关此循环中的操作的详细信息,请参阅
续订会话锁直至取消
。
分支 2:放弃队列中的初始消息
如果处理第一条消息的操作失败,则服务总线操作“放弃队列中的初始消息”会释放此消息,供另一个工作流实例运行来拾取并处理。 有关详细信息,请参阅
处理初始消息
。
“Catch”作用域
如果
Try
作用域中的操作失败,则逻辑应用仍必须关闭会话。
Catch
范围操作
会在
Try
范围操作导致
Failed
、
Skipped
或
TimedOut
状态时运行。 此作用域会返回一条错误消息(其中包含发生问题的会话 ID)并终止逻辑应用。
下面是折叠详细信息后
Catch
作用域操作中的大致流:
Close a session in a queue and fail
此服务总线操作会关闭队列中的会话,使会话锁不再保持打开状态。 有关详细信息,请参阅
关闭队列中的会话并失败
。
Find failure msg from 'Try' block
此
“筛选数组”操作
按照指定的条件基于
Try
作用域内的所有操作的输入和输出创建一个数组。 在这种情况下,此操作会返回导致
Failed
状态的操作的输出。 有关详细信息,请参阅
从“Try”块查找失败消息
。
Select error details
此
“选择”操作
根据指定的条件创建包含 JSON 对象的数组。 这些 JSON 对象是根据上一操作(
Find failure msg from 'Try' block
)创建的数组中的值生成的。 在这种情况下,此操作会返回一个数组,其中包含根据从上一操作返回的错误详细信息创建的 JSON 对象。 有关详细信息,请参阅
“选择”错误详细信息
。
Terminate
此
“终止”操作
会停止工作流的运行,取消正在进行的任何操作,跳过任何剩余的操作,并返回指定的状态、会话 ID 以及来自“
Select error details
”操作的错误结果。 有关详细信息,请参阅
终止逻辑应用
。
下一个可用
此选项会根据服务总线队列中消息的会话 ID 为每个触发器运行获取一个会话。 此会话也被锁定,以防止其他逻辑应用或其他客户端处理与此会话相关的消息。 工作流的后续操作将处理与该会话关联的所有消息,如本文稍后所述。
下面是有关其他
会话 ID
选项的详细信息:
-
无
:默认选项,会导致没有会话,不能用于实现顺序保护模式。
-
输入自定义值
:如果你知道要使用的会话 ID,并且始终要为该会话 ID 运行触发器,请使用此选项。
注意
:服务总线连接器一次可以将 Azure 服务总线的有限数量的唯一会话保存到连接器缓存。 如果会话计数超过此限制,则将从缓存中删除旧会话。 有关详细信息,请参阅
使用 Azure 逻辑应用和 Azure 服务总线在云中交换消息
。
<时间间隔数>
在重复检查是否存在消息之前,在两个重复周期之间间隔的时间单位数。
“秒”、“分钟”、“小时”、“天”、“周”或“月”
在检查是否存在消息时要使用的重复周期的时间单位。
提示
:若要添加“时区”或“开始时间”,请从“添加新参数”列表中选择这些属性。
处理初始消息
第一个操作是占位符服务总线操作“将初始消息发送到主题”,你可以将其替换为对队列中会话的第一条消息进行处理时需要执行的任何操作。 会话 ID 指定产生消息的会话。
此占位符服务总线操作会将第一条消息发送到由“会话 ID”属性指定的服务总线主题。 这样一来,与特定会话关联的所有消息都将移至同一主题。 此模板中的后续操作的所有“会话 ID”属性均使用同一会话 ID 值。
在服务总线操作“完成队列中的初始消息”中,提供你的服务总线队列的名称,并保留操作中的所有其他默认属性值。
在服务总线操作“放弃队列中的初始消息”中,提供你的服务总线队列的名称,并保留操作中的所有其他默认属性值。
接下来,你将为“完成队列中的初始消息”操作后面的操作提供必要信息。 你将从“当队列中的会话存在剩余消息时”循环中的操作开始。
当队列中的会话存在剩余消息时
此
Until
循环
会在队列中有消息存在时或一小时过后运行这些操作。 若要更改循环的时间限制,请编辑循环的“超时”属性值。
当有消息存在时从队列中获取其他消息。
检查剩余消息的数目。 如果仍然有消息存在,则继续处理消息。 如果不存在剩余消息,则工作流会将
isDone
变量设置为
true
,并退出循环。
在
If false
部分中,每个
For each
循环按先进先出顺序 (FIFO) 处理每条消息。 在循环的“设置”中,“并发控制”设置设定为
1
,因此一次只处理一条消息。 。
对于服务总线操作“完成队列中的消息”和“放弃队列中的消息” ,请提供你的服务总线队列的名称。
“当队列中的会话存在剩余消息时”循环完成后,工作流会将
isDone
变量设置为
true
。
接下来,你将为“续订会话锁直至取消”循环中的操作提供必要信息。
续订会话锁直至取消
此
Until
循环
可通过运行这些操作确保此逻辑应用在有消息存在于队列中时或一小时过后持有会话锁。 若要更改循环的时间限制,请编辑循环的“超时”属性值。
延迟 25 秒或延迟比正在处理的队列的锁超时持续时间小的某个时间。 最小的锁持续时间为 30 秒,因此默认值就足够了。 不过,可以通过适当调整来优化循环的运行次数。
检查
isDone
变量是否已设置为
true
。
如果
isDone
设置为
true
,工作流仍在处理消息,则工作流会继续锁定队列中的会话,并再次检查循环条件。
你需要在服务总线操作
续订队列中的会话锁
中提供你的服务总线队列的名称。
如果
isDone
设置为
true
,则工作流不会续订队列中的会话锁,而是退出循环。
从“Try”块查找失败消息
此
“筛选数组”操作
会按照
result()
函数
指定的条件创建一个数组,该数组包含
Try
作用域内所有操作的输入和输出。 在这种情况下,此操作会通过使用
equals()
函数
和
item()
函数
返回状态为
Failed
的操作的输出。
下面是此操作的 JSON 定义:
"Find_failure_msg_from_'Try'_block": {
"inputs": {
"from": "@Result('Try')",
"where": "@equals(item()['status'], 'Failed')"
"runAfter": {
"Close_the_session_in_the_queue_and_fail": [
"Succeeded"
"type": "Query"
接下来,工作流创建一个包含 JSON 对象的数组,该对象包含从“Find failure msg from 'Try' block”操作返回的数组中的错误信息。
“选择”错误详细信息
此“选择”操作根据输入数组(上面的操作“Find failure msg from 'Try' block”的输出)创建包含 JSON 对象的数组。 具体说来,此操作会返回一个数组,该数组只包含输入数组中每个对象的指定属性。 在此示例中,数组包含操作名称和错误结果属性。
下面是此操作的 JSON 定义:
"Select_error_details": {
"inputs": {
"from": "@body('Find_failure_msg_from_''Try''_block')[0]['outputs']",
"select": {
"action": "@item()['name']",
"errorResult": "@item()"
"runAfter": {
"Find_failure_msg_from_'Try'_block": [
"Succeeded"
"type": "Select"
接下来,工作流会停止逻辑应用运行,并返回运行状态以及已发生的错误或失败的详细信息。
终止逻辑应用运行
此“终止”操作会停止逻辑应用运行,并返回 Failed(作为逻辑应用的运行状态)以及会话 ID 和来自“Select error details”操作的错误结果。
下面是此操作的 JSON 定义:
"Terminate": {
"description": "This Failure Termination only runs if the Close Session upon Failure action runs - otherwise the LA will be terminated as Success",
"inputs": {
"runError": {
"code": "",
"message": "There was an error processing messages for Session ID @{triggerBody()?['SessionId']}. The following error(s) occurred: @{body('Select_error_details')['errorResult']}"
"runStatus": "Failed"
"runAfter": {
"Select_error_details": [
"Succeeded"
"type": "Terminate"
保存并运行逻辑应用
完成模板后,可以保存你的逻辑应用。 在设计器工具栏上选择“保存”。
若要测试逻辑应用,请向服务总线队列发送消息。
详细了解服务总线连接器的触发器和操作