Skip to main content

General

Example: Basic Processing

Complete class demonstrating subscription, caching, and LINQ analytics.

This example demonstrates the core loop of an Edge Function: subscribing to devices, caching data in memory, and performing calculations.

using Proxus.SDK.BaseFunctions;
using Proxus.Common.Messages;
using Proxus.Common;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace MyProject.Functions
{
    public class CompressorMonitor : FunctionBase
    {
        // 1. Boilerplate constructor required by PluginManager
        public CompressorMonitor(object sys, object log, object config) 
            : base(sys, log, config) { }

        protected override void OnStarted()
        {
            // 2. Subscribe to devices using Wildcards
            Subscriptions?.Add(new SubscriptionContext {
                Type = typeof(TransportData),
                Topics = (HashSet<string>) ["Compressor_*"] 
            });

            // 3. Configure built-in Cache (Retain data for 10 minutes)
            EnableCacheWithExpirationTime = TimeSpan.FromMinutes(10);

            base.OnStarted();
        }

        protected override void OnMessageReceive(FunctionContext ctx)
        {
            if (ctx.Message is TransportData data)
            {
                ProcessCompressorData(data);
            }
        }

        private async void ProcessCompressorData(TransportData currentData)
        {
            // 4. Safe Data Extraction
            double currentPressure = currentData.GetPayloadValueByName<double>("Pressure");
            string deviceName = currentData.GetMetaDataValueByKey("DeviceName");

            // 5. Stateful Analysis using Local Cache
            // Calculate average pressure for THIS specific device over the cache window
            var avgPressure = Cache
                .Where(kv => kv.Value.GetMetaDataValueByKey("DeviceName") == deviceName)
                .Select(kv => kv.Value)
                .SelectMany(t => t.Payload)
                .Where(p => p.Key == "Pressure")
                .Select(p => p.NumericValue)
                .DefaultIfEmpty(0)
                .Average();

            // 6. Anomaly Logic
            if (currentPressure > (avgPressure * 1.2)) 
            {
                LogWarning($"Spike on {deviceName}: {currentPressure} (Avg: {avgPressure:F2})");

                // 7. Persist the raw anomaly packet directly to DB
                await Save(currentData, usePipeline: false);
            }
        }
    }
}