Skip to main content

General

FunctionBase SDK Reference

In-depth guide to the C# SDK internals: Lifecycle state machine, Circuit Breaker patterns, and Data Flow pipeline.

The Proxus.SDK.BaseFunctions.FunctionBase class is the foundational actor for all user-defined logic. It provides a managed environment with automatic resource cleanup, circuit-breaker protection, and thread-safe data access.

1. Lifecycle State Machine

Proxus manages your function as an Actor within the Edge runtime. Understanding the exact initialization and teardown sequence is critical for resource management.

Startup Sequence (OnStarted)

When a function is deployed, the base class executes this sequence before your code runs:

  1. Token Initialization: Creates a master CancellationTokenSource for all async operations.
  2. Messaging Connection: Establishes the context for messaging.
  3. Policy Loading: Loads ExceptionThreshold and LogPolicy from configuration.
  4. Subscription Registration: Binds to the local SystemEventStream based on your Subscriptions list.
  5. Cache Hygiene: Starts the background cache cleaner task (if enabled).
protected override void OnStarted() {
    // 1. Define Subscriptions
    Subscriptions?.Add(new SubscriptionContext {
        Type = typeof(TransportData),
        Topics = (HashSet<string>) ["*"] // Listen to all local devices
    });

    // 2. Configure Cache (Optional)
    EnableCacheWithExpirationTime = TimeSpan.FromSeconds(10);

    // 3. Schedule Background Tasks
    ExecuteScheduledTask(TimeSpan.FromSeconds(5), MyPeriodicTask);

    base.OnStarted(); // Important: Triggers the internal setup
}

Shutdown Sequence (OnStopping)

When you click Undeploy or the Gateway restarts, the SDK performs a graceful shutdown in this specific order:

  1. Signal Cancellation: Cancels the master CancellationToken. All async tasks (like Save()) observing this token will abort immediately.
  2. Drain Operations: Waits up to 2 seconds for active operations (e.g., database writes) to complete.
  3. Unsubscribe: Removes listeners from the Event Stream.
  4. Resource Disposal:
    • Stops all Schedulers.
    • Closes messaging consumers.
    • Disconnects the MQTT Client.
  5. Final Cleanup: Sets fIsDisposed = true.

2. Safety Mechanisms

Circuit Breaker (Crash Recovery)

The SDK implements a "Circuit Breaker" pattern to protect the Gateway from unstable user code.

  • Logic: Every unhandled exception in OnMessageReceive increments an internal counter (fExceptionCount).
  • Threshold: Defaults to 10 exceptions. Configurable via UserFunctions:ExceptionThresholdPerFunction.
  • Trip Action: If the threshold is reached:
    1. Logs a critical warning.
    2. Publishes a deployfunction command to Undeploy itself.
    3. Updates the database status to Deployed = false.

Log Throttling

To prevent disk flooding, logging is rate-limited using a "Token Bucket" strategy.

  • Default Policy: If you log too frequently (e.g., inside a tight loop), subsequent logs are suppressed until the next time window.
  • Indicator: You will see a warning: "Maximum log count reached... No more logs will be written until the next time window."

3. Data Flow Pipeline

Ingestion Flow

Data arrives via the SystemEventStream. The flow is Asynchronous but sequential per actor.

  1. Event Stream: A message arrives (e.g., from Modbus Driver).
  2. Filter: The SDK filters messages based on your SubscriptionContext (Topic/Origin).
  3. Dispatch: Calls OnMessageReceive(FunctionContext ctx).
    • Note: Messages from other functions (Origin.Function) are ignored by default to prevent infinite loops.

Persistence (Save)

The Save() method chooses the most efficient path based on the context:

  • Pipeline Mode (usePipeline = true):
    • Publishes data back to the local Event Stream.
    • Useful for chaining logic (e.g., Rule Engine -> Function -> Rule Engine).
  • Direct Mode (usePipeline = false):
    • Bypasses the local stream.
    • Publishes directly to the internal subject platformworker_transportdata.savetosystemdb.
  • Fallback: If offline, the operation logs a warning and skips (Data is NOT buffered locally in this mode).

4. Coding Patterns & Best Practices

Hex Parsing

For legacy PLCs sending packed binary data, use the built-in hex parser.

// Payload Value: "0A1F..."
string? hexPayload = transportData.GetPayloadValueByName<string>("HexData");

if (hexPayload != null) {
    try {
        // Extract Signed Short (16-bit) from the first 2 bytes
        var val = hexPayload.ExtractValueFromHexString<short>(2);
        LogInformation($"Parsed Value: {val}");
    } catch (Exception ex) {
        LogError($"Hex parse error: {ex.Message}");
    }
}

Advanced Scheduling

The ExecuteScheduledTask method supports Wall-Clock Alignment. This is critical for aggregations (e.g., "Hourly Average").

// Run exactly at XX:00:00, XX:05:00, etc.
ExecuteScheduledTask(
    period: TimeSpan.FromMinutes(5),
    action: OnScheduledTaskExecute,
    align: true, // Align to clock
    strict: false // If missed, run immediately?
);

LINQ Aggregation (Cache Analysis)

Perform complex analytics on cached data without hitting the database.

// Calculate average 'Pressure' for a specific device group over the last hour
var avgPressure = Cache
    .Where(kv => kv.Key >= DateTime.UtcNow.AddHours(-1)) // Filter Time
    .Select(kv => kv.Value)
    .GroupBy(t => t.MetaData.FirstOrDefault(m => m.Key == "DeviceName")?.Value)
    .Select(g => new {
        Device = g.Key,
        Avg = g.SelectMany(t => t.Payload)
            .Where(p => p.Key == "Pressure")
            .Average(p => p.NumericValue)
    })
    .ToList();

Inter-Gateway Messaging

Send data to other Proxus nodes in the cluster.

var packet = new TransportDataBuilder()
    .WithTopic("Global/Sync")
    .WithPayload("Command", "StopLine")
    .Build();

// Broadcasts to topic: edge_to_edge.{GatewayId}
await PublishToEdgeGateways(packet);