This example demonstrates the core loop of an Edge Function: subscribing to devices, caching data in memory, and performing calculations.
using Proxus.SDK.BaseFunctions;
using Proxus.Common.Messages;
using Proxus.Common;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace MyProject.Functions
{
public class CompressorMonitor : FunctionBase
{
// 1. Boilerplate constructor required by PluginManager
public CompressorMonitor(object sys, object log, object config)
: base(sys, log, config) { }
protected override void OnStarted()
{
// 2. Subscribe to devices using Wildcards
Subscriptions?.Add(new SubscriptionContext {
Type = typeof(TransportData),
Topics = (HashSet<string>) ["Compressor_*"]
});
// 3. Configure built-in Cache (Retain data for 10 minutes)
EnableCacheWithExpirationTime = TimeSpan.FromMinutes(10);
base.OnStarted();
}
protected override void OnMessageReceive(FunctionContext ctx)
{
if (ctx.Message is TransportData data)
{
ProcessCompressorData(data);
}
}
private async void ProcessCompressorData(TransportData currentData)
{
// 4. Safe Data Extraction
double currentPressure = currentData.GetPayloadValueByName<double>("Pressure");
string deviceName = currentData.GetMetaDataValueByKey("DeviceName");
// 5. Stateful Analysis using Local Cache
// Calculate average pressure for THIS specific device over the cache window
var avgPressure = Cache
.Where(kv => kv.Value.GetMetaDataValueByKey("DeviceName") == deviceName)
.Select(kv => kv.Value)
.SelectMany(t => t.Payload)
.Where(p => p.Key == "Pressure")
.Select(p => p.NumericValue)
.DefaultIfEmpty(0)
.Average();
// 6. Anomaly Logic
if (currentPressure > (avgPressure * 1.2))
{
LogWarning($"Spike on {deviceName}: {currentPressure} (Avg: {avgPressure:F2})");
// 7. Persist the raw anomaly packet directly to DB
await Save(currentData, usePipeline: false);
}
}
}
}