Skip to main content

General

Örnek: Temel İşleme

Abonelik, cache ve analiz için tam sınıf örneği.

Bu örnek, Edge Function'ın temel döngüsünü gösterir: cihazlara abone olma, veriyi bellekte cache'leme ve hesaplama yapma.

using Proxus.SDK.BaseFunctions;
using Proxus.Common.Messages;
using Proxus.Common;
using System;
using System.Collections.Generic;
using ZLinq;
using System.Threading.Tasks;

namespace MyProject.Functions
{
    public class CompressorMonitor : FunctionBase
    {
        // 1. PluginManager için zorunlu constructor
        public CompressorMonitor(object sys, object log, object config) 
            : base(sys, log, config) { }

        protected override void OnStarted()
        {
            // 2. Wildcard ile cihazlara abone ol
            Subscriptions?.Add(new SubscriptionContext {
                Type = typeof(TransportData),
                Topics = (HashSet<string>) ["Compressor_*"] 
            });

            // 3. Dahili Cache (10 dakika)
            EnableCacheWithExpirationTime = TimeSpan.FromMinutes(10);

            base.OnStarted();
        }

        protected override void OnMessageReceive(FunctionContext ctx)
        {
            if (ctx.Message is TransportData data)
            {
                ProcessCompressorData(data);
            }
        }

        private async void ProcessCompressorData(TransportData currentData)
        {
            // 4. Güvenli Veri Çekme
            double currentPressure = currentData.GetPayloadValueByName<double>("Pressure");
            string deviceName = currentData.GetMetaDataValueByKey("DeviceName");

            // 5. Local Cache ile durum analizi
            var avgPressure = Cache
                .AsValueEnumerable()
                .Where(kv => kv.Value.GetMetaDataValueByKey("DeviceName") == deviceName)
                .Select(kv => kv.Value)
                .SelectMany(t => t.Payload)
                .Where(p => p.Key == "Pressure")
                .Select(p => p.NumericValue)
                .DefaultIfEmpty(0)
                .Average();

            // 6. Anomali Mantığı
            if (currentPressure > (avgPressure * 1.2)) 
            {
                LogWarning($"Spike on {deviceName}: {currentPressure} (Avg: {avgPressure:F2})");

                // 7. Ham anomali paketini direkt DB'ye yaz
                await Save(currentData, usePipeline: false);
            }
        }
    }
}