The Proxus.SDK.BaseFunctions.FunctionBase class is the foundational actor for all user-defined logic. It provides a managed environment with automatic resource cleanup, circuit-breaker protection, and thread-safe data access.
1. Lifecycle State Machine
Proxus manages your function as an Actor within the Edge runtime. Understanding the exact initialization and teardown sequence is critical for resource management.
Startup Sequence (OnStarted)
When a function is deployed, the base class executes this sequence before your code runs:
- Token Initialization: Creates a master
CancellationTokenSourcefor all async operations. - Messaging Connection: Establishes the context for messaging.
- Policy Loading: Loads
ExceptionThresholdandLogPolicyfrom configuration. - Subscription Registration: Binds to the local
SystemEventStreambased on yourSubscriptionslist. - Cache Hygiene: Starts the background cache cleaner task (if enabled).
protected override void OnStarted() {
// 1. Define Subscriptions
Subscriptions?.Add(new SubscriptionContext {
Type = typeof(TransportData),
Topics = (HashSet<string>) ["*"] // Listen to all local devices
});
// 2. Configure Cache (Optional)
EnableCacheWithExpirationTime = TimeSpan.FromSeconds(10);
// 3. Schedule Background Tasks
ExecuteScheduledTask(TimeSpan.FromSeconds(5), MyPeriodicTask);
base.OnStarted(); // Important: Triggers the internal setup
}Shutdown Sequence (OnStopping)
When you click Undeploy or the Gateway restarts, the SDK performs a graceful shutdown in this specific order:
- Signal Cancellation: Cancels the master
CancellationToken. All async tasks (likeSave()) observing this token will abort immediately. - Drain Operations: Waits up to 2 seconds for active operations (e.g., database writes) to complete.
- Unsubscribe: Removes listeners from the Event Stream.
- Resource Disposal:
- Stops all Schedulers.
- Closes messaging consumers.
- Disconnects the MQTT Client.
- Final Cleanup: Sets
fIsDisposed = true.
2. Safety Mechanisms
Circuit Breaker (Crash Recovery)
The SDK implements a "Circuit Breaker" pattern to protect the Gateway from unstable user code.
- Logic: Every unhandled exception in
OnMessageReceiveincrements an internal counter (fExceptionCount). - Threshold: Defaults to 10 exceptions. Configurable via
UserFunctions:ExceptionThresholdPerFunction. - Trip Action: If the threshold is reached:
- Logs a critical warning.
- Publishes a
deployfunctioncommand to Undeploy itself. - Updates the database status to
Deployed = false.
Log Throttling
To prevent disk flooding, logging is rate-limited using a "Token Bucket" strategy.
- Default Policy: If you log too frequently (e.g., inside a tight loop), subsequent logs are suppressed until the next time window.
- Indicator: You will see a warning: "Maximum log count reached... No more logs will be written until the next time window."
3. Data Flow Pipeline
Ingestion Flow
Data arrives via the SystemEventStream. The flow is Asynchronous but sequential per actor.
- Event Stream: A message arrives (e.g., from Modbus Driver).
- Filter: The SDK filters messages based on your
SubscriptionContext(Topic/Origin). - Dispatch: Calls
OnMessageReceive(FunctionContext ctx).- Note: Messages from other functions (
Origin.Function) are ignored by default to prevent infinite loops.
- Note: Messages from other functions (
Persistence (Save)
The Save() method chooses the most efficient path based on the context:
- Pipeline Mode (
usePipeline = true):- Publishes data back to the local Event Stream.
- Useful for chaining logic (e.g., Rule Engine -> Function -> Rule Engine).
- Direct Mode (
usePipeline = false):- Bypasses the local stream.
- Publishes directly to the internal subject
platformworker_transportdata.savetosystemdb.
- Fallback: If offline, the operation logs a warning and skips (Data is NOT buffered locally in this mode).
4. Coding Patterns & Best Practices
Hex Parsing
For legacy PLCs sending packed binary data, use the built-in hex parser.
// Payload Value: "0A1F..."
string? hexPayload = transportData.GetPayloadValueByName<string>("HexData");
if (hexPayload != null) {
try {
// Extract Signed Short (16-bit) from the first 2 bytes
var val = hexPayload.ExtractValueFromHexString<short>(2);
LogInformation($"Parsed Value: {val}");
} catch (Exception ex) {
LogError($"Hex parse error: {ex.Message}");
}
}Advanced Scheduling
The ExecuteScheduledTask method supports Wall-Clock Alignment. This is critical for aggregations (e.g., "Hourly Average").
// Run exactly at XX:00:00, XX:05:00, etc.
ExecuteScheduledTask(
period: TimeSpan.FromMinutes(5),
action: OnScheduledTaskExecute,
align: true, // Align to clock
strict: false // If missed, run immediately?
);LINQ Aggregation (Cache Analysis)
Perform complex analytics on cached data without hitting the database.
// Calculate average 'Pressure' for a specific device group over the last hour
var avgPressure = Cache
.Where(kv => kv.Key >= DateTime.UtcNow.AddHours(-1)) // Filter Time
.Select(kv => kv.Value)
.GroupBy(t => t.MetaData.FirstOrDefault(m => m.Key == "DeviceName")?.Value)
.Select(g => new {
Device = g.Key,
Avg = g.SelectMany(t => t.Payload)
.Where(p => p.Key == "Pressure")
.Average(p => p.NumericValue)
})
.ToList();Inter-Gateway Messaging
Send data to other Proxus nodes in the cluster.
var packet = new TransportDataBuilder()
.WithTopic("Global/Sync")
.WithPayload("Command", "StopLine")
.Build();
// Broadcasts to topic: edge_to_edge.{GatewayId}
await PublishToEdgeGateways(packet);