Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .github/workflows/build-samples.yml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ jobs:
- name: Build DF - LargePayloadFanOutFanIn
run: dotnet build samples/durable-functions/dotnet/LargePayloadFanOutFanIn/LargePayloadFanOutFanIn.csproj

- name: Build DF - WorkItemFiltering
run: dotnet build samples/durable-functions/dotnet/WorkItemFiltering/WorkItemFiltering.csproj

- name: Build DF - WorkItemFiltering.AppB
run: dotnet build samples/durable-functions/dotnet/WorkItemFiltering.AppB/AppB.csproj

# Durable Functions Aspire samples (net10.0)
- name: Build DF - AzureFunctionsAndDtsWithAspire
run: dotnet build samples/durable-functions/dotnet/AzureFunctionsAndDtsWithAspire/AspireHost/AspireHost.csproj
Expand Down Expand Up @@ -209,6 +215,12 @@ jobs:
- name: Install DF - pdf-summarizer
run: pip install -r samples/durable-functions/python/pdf-summarizer/requirements.txt

- name: Install DF - work-item-filtering
run: pip install -r samples/durable-functions/python/work-item-filtering/requirements.txt

- name: Install DF - work-item-filtering-app-b
run: pip install -r samples/durable-functions/python/work-item-filtering-app-b/requirements.txt
Comment thread
bachuv marked this conversation as resolved.

# Syntax check all Python files
- name: Syntax check Python files
run: find samples -name "*.py" -not -path "*/__pycache__/*" -exec python -m py_compile {} +
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<AzureFunctionsVersion>v4</AzureFunctionsVersion>
<OutputType>Exe</OutputType>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<RootNamespace>WorkItemFiltering.AppB</RootNamespace>
<AssemblyName>WorkItemFiltering.AppB</AssemblyName>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.Functions.Worker" Version="2.51.0" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.DurableTask" Version="1.16.5" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.DurableTask.AzureManaged" Version="1.8.1" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Http" Version="3.3.0" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Sdk" Version="2.0.7" OutputItemType="Analyzer" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System.Net;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.DurableTask;
using Microsoft.DurableTask.Client;
using Microsoft.Extensions.Logging;

namespace WorkItemFiltering.AppB;

// =============================================================================
// App B — registers an entirely DIFFERENT set of functions from App A.
// Both apps share the same DTS task hub ("default"). Work item filtering ensures
// each app only receives work items for the functions it has registered.
//
// App A owns: GreetingOrchestration, FanOutOrchestration, ParentOrchestration,
// CounterOrchestration, SayHello activity, CounterEntity
// App B owns: OrdersOrchestration, ShipOrder activity
//
// Either app's client endpoint can SCHEDULE any orchestration name. The
// scheduler routes the work item to the app whose filter matches.
// =============================================================================

public static class OrdersOrchestration
{
[Function(nameof(OrdersOrchestration))]
public static async Task<string> Run([OrchestrationTrigger] TaskOrchestrationContext ctx)
{
var logger = ctx.CreateReplaySafeLogger(nameof(OrdersOrchestration));
logger.LogInformation("OrdersOrchestration started on App B");

string orderId = ctx.GetInput<string>() ?? $"order-{ctx.NewGuid():N}";
string shipResult = await ctx.CallActivityAsync<string>(nameof(ShipOrder), orderId);
return shipResult;
}

[Function(nameof(OrdersOrchestration) + "_Start")]
public static async Task<HttpResponseData> Start(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "orchestrators/orders")] HttpRequestData req,
[DurableClient] DurableTaskClient client)
{
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
nameof(OrdersOrchestration), input: "order-42");
return client.CreateCheckStatusResponse(req, instanceId);
}
}

public static class ShipOrder
{
[Function(nameof(ShipOrder))]
public static string Run([ActivityTrigger] string orderId, FunctionContext ctx)
{
ctx.GetLogger(nameof(ShipOrder)).LogInformation("App B shipping {OrderId}", orderId);
return $"Shipped {orderId} from App B";
}
}

// Generic starter so you can schedule ANY orchestration name from App B's port too.
public static class GenericStarter
{
[Function("AppB_StartOrchestration")]
public static async Task<HttpResponseData> Start(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "start/{name}")] HttpRequestData req,
[DurableClient] DurableTaskClient client,
string name)
{
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(name);
return client.CreateCheckStatusResponse(req, instanceId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
using Microsoft.Azure.Functions.Worker.Builder;
using Microsoft.Extensions.Hosting;

FunctionsApplicationBuilder builder = FunctionsApplication.CreateBuilder(args);
builder.Build().Run();
22 changes: 22 additions & 0 deletions samples/durable-functions/dotnet/WorkItemFiltering.AppB/host.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"version": "2.0",
"logging": {
"logLevel": {
"default": "Information",
"DurableTask.AzureStorage": "Warning",
"DurableTask.Core": "Warning",
"Microsoft.DurableTask": "Information",
"Host.Triggers.DurableTask": "Information"
}
},
"extensions": {
"durableTask": {
"hubName": "default",
"storageProvider": {
"type": "azureManaged",
"connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING",
"workItemFilteringEnabled": true
}
}
}
}
161 changes: 161 additions & 0 deletions samples/durable-functions/dotnet/WorkItemFiltering/Functions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.DurableTask;
using Microsoft.DurableTask.Client;
using Microsoft.DurableTask.Entities;
using Microsoft.Extensions.Logging;

namespace WorkItemFiltering;

// =============================================================================
// Orchestrations
// =============================================================================

/// <summary>
/// A simple orchestration that calls an activity and returns the result.
/// With work item filtering enabled, DTS will only dispatch this orchestration
/// to workers that have it registered.
/// </summary>
public static class GreetingOrchestration
{
[Function(nameof(GreetingOrchestration))]
public static async Task<string> Run([OrchestrationTrigger] TaskOrchestrationContext ctx)
{
ctx.CreateReplaySafeLogger(nameof(GreetingOrchestration)).LogInformation("GreetingOrchestration started");
return await ctx.CallActivityAsync<string>(nameof(SayHello), "World");
}

[Function(nameof(GreetingOrchestration) + "_Start")]
public static async Task<HttpResponseData> Start(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "orchestrators/greeting")] HttpRequestData req,
[DurableClient] DurableTaskClient client)
{
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(nameof(GreetingOrchestration));
return client.CreateCheckStatusResponse(req, instanceId);
}
}

/// <summary>
/// A fan-out/fan-in orchestration that calls the same activity in parallel.
/// Demonstrates that activity work items are also filtered.
/// </summary>
public static class FanOutOrchestration
{
[Function(nameof(FanOutOrchestration))]
public static async Task<string[]> Run([OrchestrationTrigger] TaskOrchestrationContext ctx)
{
ctx.CreateReplaySafeLogger(nameof(FanOutOrchestration)).LogInformation("FanOutOrchestration: fanning out to 3 activities");
return await Task.WhenAll(
ctx.CallActivityAsync<string>(nameof(SayHello), "Tokyo"),
ctx.CallActivityAsync<string>(nameof(SayHello), "London"),
ctx.CallActivityAsync<string>(nameof(SayHello), "Seattle"));
}

[Function(nameof(FanOutOrchestration) + "_Start")]
public static async Task<HttpResponseData> Start(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "orchestrators/fanout")] HttpRequestData req,
[DurableClient] DurableTaskClient client)
{
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(nameof(FanOutOrchestration));
return client.CreateCheckStatusResponse(req, instanceId);
}
}

/// <summary>
/// A parent orchestration that calls a child orchestration.
/// Sub-orchestration dispatch is also governed by work item filters.
/// </summary>
public static class ParentOrchestration
{
[Function(nameof(ParentOrchestration))]
public static async Task<string> Run([OrchestrationTrigger] TaskOrchestrationContext ctx)
{
ctx.CreateReplaySafeLogger(nameof(ParentOrchestration)).LogInformation("Calling sub-orchestration");
string result = await ctx.CallSubOrchestratorAsync<string>(nameof(GreetingOrchestration));
return $"Parent received: {result}";
}

[Function(nameof(ParentOrchestration) + "_Start")]
public static async Task<HttpResponseData> Start(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "orchestrators/parent")] HttpRequestData req,
[DurableClient] DurableTaskClient client)
{
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(nameof(ParentOrchestration));
return client.CreateCheckStatusResponse(req, instanceId);
}
}

/// <summary>
/// An orchestration that interacts with a durable entity.
/// Entity work items are also filtered.
/// </summary>
public static class CounterOrchestration
{
[Function(nameof(CounterOrchestration))]
public static async Task<int> Run([OrchestrationTrigger] TaskOrchestrationContext ctx)
{
var logger = ctx.CreateReplaySafeLogger(nameof(CounterOrchestration));
var entityId = new EntityInstanceId(nameof(CounterEntity), "sample-counter");

await ctx.Entities.CallEntityAsync(entityId, "Add", 10);
await ctx.Entities.CallEntityAsync(entityId, "Add", 20);
int value = await ctx.Entities.CallEntityAsync<int>(entityId, "Get");

logger.LogInformation("Counter value = {Value}", value);
return value;
}

[Function(nameof(CounterOrchestration) + "_Start")]
public static async Task<HttpResponseData> Start(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "orchestrators/counter")] HttpRequestData req,
[DurableClient] DurableTaskClient client)
{
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(nameof(CounterOrchestration));
return client.CreateCheckStatusResponse(req, instanceId);
}
}

// =============================================================================
// Activities
// =============================================================================

public static class SayHello
{
[Function(nameof(SayHello))]
public static string Run([ActivityTrigger] string name) => $"Hello, {name}!";
}

// =============================================================================
// Entities
// =============================================================================

public class CounterEntity : TaskEntity<int>
{
public void Add(int amount) => this.State += amount;
public void Reset() => this.State = 0;
public int Get() => this.State;

[Function(nameof(CounterEntity))]
public static Task Dispatch([EntityTrigger] TaskEntityDispatcher dispatcher)
=> dispatcher.DispatchAsync<CounterEntity>();
}

// =============================================================================
// Generic starter (for cross-app filter isolation tests)
// =============================================================================

public static class GenericStarter
{
[Function("StartOrchestration")]
public static async Task<HttpResponseData> Start(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "start/{name}")] HttpRequestData req,
[DurableClient] DurableTaskClient client,
string name)
{
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(name);
return client.CreateCheckStatusResponse(req, instanceId);
}
}
5 changes: 5 additions & 0 deletions samples/durable-functions/dotnet/WorkItemFiltering/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
using Microsoft.Azure.Functions.Worker.Builder;
using Microsoft.Extensions.Hosting;

FunctionsApplicationBuilder builder = FunctionsApplication.CreateBuilder(args);
builder.Build().Run();
Loading