Bu örnek, Edge Function'ın temel döngüsünü gösterir: cihazlara abone olma, veriyi bellekte cache'leme ve hesaplama yapma.
using Proxus.SDK.BaseFunctions;
using Proxus.Common.Messages;
using Proxus.Common;
using System;
using System.Collections.Generic;
using ZLinq;
using System.Threading.Tasks;
namespace MyProject.Functions
{
public class CompressorMonitor : FunctionBase
{
// 1. PluginManager için zorunlu constructor
public CompressorMonitor(object sys, object log, object config)
: base(sys, log, config) { }
protected override void OnStarted()
{
// 2. Wildcard ile cihazlara abone ol
Subscriptions?.Add(new SubscriptionContext {
Type = typeof(TransportData),
Topics = (HashSet<string>) ["Compressor_*"]
});
// 3. Dahili Cache (10 dakika)
EnableCacheWithExpirationTime = TimeSpan.FromMinutes(10);
base.OnStarted();
}
protected override void OnMessageReceive(FunctionContext ctx)
{
if (ctx.Message is TransportData data)
{
ProcessCompressorData(data);
}
}
private async void ProcessCompressorData(TransportData currentData)
{
// 4. Güvenli Veri Çekme
double currentPressure = currentData.GetPayloadValueByName<double>("Pressure");
string deviceName = currentData.GetMetaDataValueByKey("DeviceName");
// 5. Local Cache ile durum analizi
var avgPressure = Cache
.AsValueEnumerable()
.Where(kv => kv.Value.GetMetaDataValueByKey("DeviceName") == deviceName)
.Select(kv => kv.Value)
.SelectMany(t => t.Payload)
.Where(p => p.Key == "Pressure")
.Select(p => p.NumericValue)
.DefaultIfEmpty(0)
.Average();
// 6. Anomali Mantığı
if (currentPressure > (avgPressure * 1.2))
{
LogWarning($"Spike on {deviceName}: {currentPressure} (Avg: {avgPressure:F2})");
// 7. Ham anomali paketini direkt DB'ye yaz
await Save(currentData, usePipeline: false);
}
}
}
}