Skip to main content

General

FunctionBase SDK Referansı

C# SDK iç yapısı için derinlemesine rehber: yaşam döngüsü, Circuit Breaker ve Veri Akış hattı.

Proxus.SDK.BaseFunctions.FunctionBase sınıfı, kullanıcı tanımlı tüm mantık için temel aktördür. Otomatik kaynak temizliği, circuit-breaker koruması ve thread-safe veri erişimi sağlar.

1. Yaşam Döngüsü Durum Makinesi

Proxus, fonksiyonunuzu Edge runtime içinde bir Actor olarak yönetir. Doğru kaynak yönetimi için başlatma ve durdurma sırasını bilmek kritik önem taşır.

Başlatma Sırası (OnStarted)

Fonksiyon deploy edildiğinde, base class sizin kodunuzdan önce şu sırayı uygular:

  1. Token Başlatma: Tüm async işlemler için master CancellationTokenSource oluşturur.
  2. Messaging Connection: Messaging context'i kurar.
  3. Policy Yükleme: ExceptionThreshold ve LogPolicy konfigürasyondan yüklenir.
  4. Subscription Kayıt: Subscriptions listenize göre SystemEventStreame bağlanır.
  5. Cache Hijyeni: Arka planda cache temizleyicisini başlatır (aktifse).
protected override void OnStarted() {
    // 1. Subscriptions tanımla
    Subscriptions?.Add(new SubscriptionContext {
        Type = typeof(TransportData),
        Topics = (HashSet<string>) ["*"] // Tüm lokal cihazları dinle
    });

    // 2. Cache ayarı (opsiyonel)
    EnableCacheWithExpirationTime = TimeSpan.FromSeconds(10);

    // 3. Arka plan görevleri
    ExecuteScheduledTask(TimeSpan.FromSeconds(5), MyPeriodicTask);

    base.OnStarted(); // Önemli: Dahili kurulum tetiklenir
}

Kapanış Sırası (OnStopping)

Undeploy tıklanınca veya Gateway yeniden başlatılınca SDK aşağıdaki sırayla zarif kapatma yapar:

  1. Cancellation Sinyali: Master CancellationToken iptal edilir. Bu token'ı izleyen async görevler hemen durur.
  2. Drain Operations: Aktif işlemlerin (örn. DB yazımı) tamamlanması için 2 saniye bekler.
  3. Unsubscribe: Event Stream dinleyicilerini kaldırır.
  4. Kaynakları Kapat:
    • Scheduler'ları durdurur.
    • Messaging consumer'ları kapatır.
    • MQTT Client bağlantısını keser.
  5. Final Cleanup: fIsDisposed = true.

2. Güvenlik Mekanizmaları

Circuit Breaker (Crash Recovery)

SDK, Gateway'i dengesiz kullanıcı kodundan korumak için "Circuit Breaker" uygular.

  • Mantık: OnMessageReceive içinde her unhandled exception fExceptionCount değerini artırır.
  • Eşik: Varsayılan 10. UserFunctions:ExceptionThresholdPerFunction ile değişir.
  • Trip Aksiyonu: Eşik aşılırsa:
    1. Kritik log yazar.
    2. Kendini Undeploy edecek deployfunction komutu yayınlar.
    3. Veritabanında Deployed = false yapar.

Log Throttling

Disk taşmasını önlemek için loglar "Token Bucket" stratejisiyle rate-limit edilir.

  • Varsayılan Policy: Çok sık log atarsanız, sonraki zaman penceresine kadar loglar bastırılır.
  • Göstergesi: Şu uyarıyı görürsünüz: "Maximum log count reached... No more logs will be written until the next time window."

3. Veri Akış Hattı

Ingestion Akışı

Veri SystemEventStream üzerinden gelir. Akış asenkron ama actor başına sıralıdır.

  1. Event Stream: Mesaj gelir (örn. Modbus Driver).
  2. Filter: SDK, SubscriptionContext (Topic/Origin) ile filtreler.
  3. Dispatch: OnMessageReceive(FunctionContext ctx) çağrılır.
    • Not: Diğer fonksiyonlardan gelen mesajlar (Origin.Function) varsayılan olarak yok sayılır (sonsuz loop'u önlemek için).

Persist (Save)

Save() metodu, bağlama göre en verimli yolu seçer:

  • Pipeline Mode (usePipeline = true):
    • Veriyi lokal Event Stream'e geri yayınlar.
    • Mantık zincirlemek için ideal (örn. Rule Engine -> Function -> Rule Engine).
  • Direct Mode (usePipeline = false):
    • Lokal stream'i bypass eder.
    • platformworker_transportdata.savetosystemdb subject'ine direkt publish eder.
  • Fallback: Offline ise uyarı loglanır ve işlem atlanır (bu modda lokal buffer yoktur).

4. Kodlama Kalıpları ve En İyi Uygulamalar

Hex Parsing

Legacy PLC'lerden gelen paketli binary veriler için yerleşik hex parser'ı kullanın.

// Payload Value: "0A1F..."
string? hexPayload = transportData.GetPayloadValueByName<string>("HexData");

if (hexPayload != null) {
    try {
        // İlk 2 byte'tan Signed Short (16-bit)
        var val = hexPayload.ExtractValueFromHexString<short>(2);
        LogInformation($"Parsed Value: {val}");
    } catch (Exception ex) {
        LogError($"Hex parse error: {ex.Message}");
    }
}

Gelişmiş Zamanlama

ExecuteScheduledTask metodu Wall-Clock Alignment destekler. Toplu hesaplamalar için kritiktir (örn. "Saatlik Ortalama").

// Tam saatlerde çalıştır (XX:00:00, XX:05:00, ...)
ExecuteScheduledTask(
    period: TimeSpan.FromMinutes(5),
    action: OnScheduledTaskExecute,
    align: true, // Saatle hizala
    strict: false // Kaçırdıysa hemen çalıştır?
);

LINQ Analizi (Cache)

Veritabanına gitmeden cache üzerinden analiz yapın.

// Son 1 saatte belirli cihaz grubu için ortalama "Pressure"
var avgPressure = Cache
    .AsValueEnumerable()
    .Where(kv => kv.Key >= DateTime.UtcNow.AddHours(-1))
    .Select(kv => kv.Value)
    .GroupBy(t => t.MetaData.FirstOrDefault(m => m.Key == "DeviceName")?.Value)
    .Select(g => new {
        Device = g.Key,
        Avg = g.AsValueEnumerable().SelectMany(t => t.Payload)
            .Where(p => p.Key == "Pressure")
            .Average(p => p.NumericValue)
    })
    .ToList();

Gatewayler Arası Mesajlaşma

Cluster içindeki diğer Proxus node'larına veri gönderin.

var packet = new TransportDataBuilder()
    .WithTopic("Global/Sync")
    .WithPayload("Command", "StopLine")
    .Build();

// Topic: edge_to_edge.{GatewayId}
await PublishToEdgeGateways(packet);