Proxus.SDK.BaseFunctions.FunctionBase sınıfı, kullanıcı tanımlı tüm mantık için temel aktördür. Otomatik kaynak temizliği, circuit-breaker koruması ve thread-safe veri erişimi sağlar.
1. Yaşam Döngüsü Durum Makinesi
Proxus, fonksiyonunuzu Edge runtime içinde bir Actor olarak yönetir. Doğru kaynak yönetimi için başlatma ve durdurma sırasını bilmek kritik önem taşır.
Başlatma Sırası (OnStarted)
Fonksiyon deploy edildiğinde, base class sizin kodunuzdan önce şu sırayı uygular:
- Token Başlatma: Tüm async işlemler için master
CancellationTokenSourceoluşturur. - Messaging Connection: Messaging context'i kurar.
- Policy Yükleme:
ExceptionThresholdveLogPolicykonfigürasyondan yüklenir. - Subscription Kayıt:
Subscriptionslistenize göreSystemEventStreame bağlanır. - Cache Hijyeni: Arka planda cache temizleyicisini başlatır (aktifse).
protected override void OnStarted() {
// 1. Subscriptions tanımla
Subscriptions?.Add(new SubscriptionContext {
Type = typeof(TransportData),
Topics = (HashSet<string>) ["*"] // Tüm lokal cihazları dinle
});
// 2. Cache ayarı (opsiyonel)
EnableCacheWithExpirationTime = TimeSpan.FromSeconds(10);
// 3. Arka plan görevleri
ExecuteScheduledTask(TimeSpan.FromSeconds(5), MyPeriodicTask);
base.OnStarted(); // Önemli: Dahili kurulum tetiklenir
}Kapanış Sırası (OnStopping)
Undeploy tıklanınca veya Gateway yeniden başlatılınca SDK aşağıdaki sırayla zarif kapatma yapar:
- Cancellation Sinyali: Master
CancellationTokeniptal edilir. Bu token'ı izleyen async görevler hemen durur. - Drain Operations: Aktif işlemlerin (örn. DB yazımı) tamamlanması için 2 saniye bekler.
- Unsubscribe: Event Stream dinleyicilerini kaldırır.
- Kaynakları Kapat:
- Scheduler'ları durdurur.
- Messaging consumer'ları kapatır.
- MQTT Client bağlantısını keser.
- Final Cleanup:
fIsDisposed = true.
2. Güvenlik Mekanizmaları
Circuit Breaker (Crash Recovery)
SDK, Gateway'i dengesiz kullanıcı kodundan korumak için "Circuit Breaker" uygular.
- Mantık:
OnMessageReceiveiçinde her unhandled exceptionfExceptionCountdeğerini artırır. - Eşik: Varsayılan 10.
UserFunctions:ExceptionThresholdPerFunctionile değişir. - Trip Aksiyonu: Eşik aşılırsa:
- Kritik log yazar.
- Kendini Undeploy edecek
deployfunctionkomutu yayınlar. - Veritabanında
Deployed = falseyapar.
Log Throttling
Disk taşmasını önlemek için loglar "Token Bucket" stratejisiyle rate-limit edilir.
- Varsayılan Policy: Çok sık log atarsanız, sonraki zaman penceresine kadar loglar bastırılır.
- Göstergesi: Şu uyarıyı görürsünüz: "Maximum log count reached... No more logs will be written until the next time window."
3. Veri Akış Hattı
Ingestion Akışı
Veri SystemEventStream üzerinden gelir. Akış asenkron ama actor başına sıralıdır.
- Event Stream: Mesaj gelir (örn. Modbus Driver).
- Filter: SDK,
SubscriptionContext(Topic/Origin) ile filtreler. - Dispatch:
OnMessageReceive(FunctionContext ctx)çağrılır.- Not: Diğer fonksiyonlardan gelen mesajlar (
Origin.Function) varsayılan olarak yok sayılır (sonsuz loop'u önlemek için).
- Not: Diğer fonksiyonlardan gelen mesajlar (
Persist (Save)
Save() metodu, bağlama göre en verimli yolu seçer:
- Pipeline Mode (
usePipeline = true):- Veriyi lokal Event Stream'e geri yayınlar.
- Mantık zincirlemek için ideal (örn. Rule Engine -> Function -> Rule Engine).
- Direct Mode (
usePipeline = false):- Lokal stream'i bypass eder.
platformworker_transportdata.savetosystemdbsubject'ine direkt publish eder.
- Fallback: Offline ise uyarı loglanır ve işlem atlanır (bu modda lokal buffer yoktur).
4. Kodlama Kalıpları ve En İyi Uygulamalar
Hex Parsing
Legacy PLC'lerden gelen paketli binary veriler için yerleşik hex parser'ı kullanın.
// Payload Value: "0A1F..."
string? hexPayload = transportData.GetPayloadValueByName<string>("HexData");
if (hexPayload != null) {
try {
// İlk 2 byte'tan Signed Short (16-bit)
var val = hexPayload.ExtractValueFromHexString<short>(2);
LogInformation($"Parsed Value: {val}");
} catch (Exception ex) {
LogError($"Hex parse error: {ex.Message}");
}
}Gelişmiş Zamanlama
ExecuteScheduledTask metodu Wall-Clock Alignment destekler. Toplu hesaplamalar için kritiktir (örn. "Saatlik Ortalama").
// Tam saatlerde çalıştır (XX:00:00, XX:05:00, ...)
ExecuteScheduledTask(
period: TimeSpan.FromMinutes(5),
action: OnScheduledTaskExecute,
align: true, // Saatle hizala
strict: false // Kaçırdıysa hemen çalıştır?
);LINQ Analizi (Cache)
Veritabanına gitmeden cache üzerinden analiz yapın.
// Son 1 saatte belirli cihaz grubu için ortalama "Pressure"
var avgPressure = Cache
.AsValueEnumerable()
.Where(kv => kv.Key >= DateTime.UtcNow.AddHours(-1))
.Select(kv => kv.Value)
.GroupBy(t => t.MetaData.FirstOrDefault(m => m.Key == "DeviceName")?.Value)
.Select(g => new {
Device = g.Key,
Avg = g.AsValueEnumerable().SelectMany(t => t.Payload)
.Where(p => p.Key == "Pressure")
.Average(p => p.NumericValue)
})
.ToList();Gatewayler Arası Mesajlaşma
Cluster içindeki diğer Proxus node'larına veri gönderin.
var packet = new TransportDataBuilder()
.WithTopic("Global/Sync")
.WithPayload("Command", "StopLine")
.Build();
// Topic: edge_to_edge.{GatewayId}
await PublishToEdgeGateways(packet);