C’est un sujet assez récurent dans les demandes que j’ai pu traiter récemment : Comment réussir à acquérir des données d’automate à fréquences assez élevées.
L’idée ici est simple : Si je réussis à avoir à une fréquence assez élevée des données en provenance de mon automate de production, je pourrais inclure des systèmes d’analyses me permettant de comprendre à quel moment et pourquoi le cycle de production de mon automate n’est pas celui que j’attends.
Aujourd’hui je vais vous montrer comment avec IoT Edge je réussi à lire 12 000 points de données par secondes en Edge et à les exploiter dans le cloud…
Architecture

L’architecture présentée ci-dessus, n’est certainement pas une architecture complète de ce que l’on déploierait en production, mais on retrouve les briques essentielles de la solution à mettre en oeuvre.
Je vais donc parcourir avec vous les différentes briques spécifiques que j’ai dû réaliser pour arriver à ce résultat…
Acquisition
Le module essentiel dans cette cartographie sera le module Azure IoT Edge d’acquisition. En effet, c’est celui-ci qui permettra de lire les différentes valeurs dans la mémoire automate.
Idéalement, je souhaitais utiliser un module du marché effectuant ces opérations. Un module qui prendrait en charge différents types de PLC (Siemens, Control Logix, …). Cependant, dans cette démonstration, j’ai réalisé un module d’acquisition en .NET Core se basant sur la librairie libplctag (github.com) fournissant ainsi le support des PLC de type Control Logix avec le protocol Modbus en prime.
Le code de l’acquisition de données PLC est plutôt simple :
// Instantiate the tag with the appropriate mapper and datatype
var myTag = new TagDint()
{
//Name is the full path to tag.
Name = "PROGRAM:SomeProgram.SomeDINT",
//Gateway is the IP Address of the PLC or communication module.
Gateway = "10.10.10.10",
//Path is the location in the control plane of the CPU. Almost always "1,0".
Path = "1,0",
PlcType = PlcType.ControlLogix,
Protocol = Protocol.ab_eip,
Timeout = TimeSpan.FromSeconds(5)
};
// Read the value from the PLC
int output = myTag.Read();
La difficulté d’implémentation dans ce genre de cas, concerne la possibilité d’exécuter un polling régulier des données à des fréquences stables.
Pour ce faire, j’ai réalisé mon module en deux parties :
- Acquisition des données dans un thread dédié à chaque tag
ThreadPool.QueueUserWorkItem(new WaitCallback(async c =>
{
using var timer = new PeriodicTimer(TimeSpan.FromMilliseconds(tag.PollingInterval));
this.runningTasks.Add(new object());
this.logger.LogTrace($"Starting polling for tag {tag.TagName}");
using var plcTag = this.tagFactory.Create(tag);
plcTag.ReadCacheMillisecondDuration = 0;
do
{
try
{
await timer.WaitForNextTickAsync(cancellationToken);
this.logger.LogTrace("Polling tag {0}", tag.TagName);
plcTag.Read();
this.logger.LogTrace("Polling tag {0} completed ({1})", tag.TagName, plcTag.GetStatus());
if (plcTag.GetStatus() != Status.Ok)
{
this.logger.LogWarning($"Tag {tag.TagName} is not OK - {plcTag.GetStatus()}");
continue;
}
this.messagesQueue.Enqueue(new EnqueuedMessage
{
Data = plcTag.Value,
Tag = tag
});
this.logger.LogTrace("Message queued for tag {0}", tag.TagName);
}
catch (Exception e)
when (e is TaskCanceledException || e is OperationCanceledException)
{
logger.LogInformation("Polling for tag {0} cancelled", tag.TagName);
return;
}
catch (Exception e)
when (e is LibPlcTagException)
{
this.logger.LogError(e, "Error while polling tag");
return;
}
finally
{
this.runningTasks.TryTake(out _);
}
}
while (!cancellationToken.IsCancellationRequested);
}));
- Envoie des données collectées dans un thread dédié
ThreadPool.QueueUserWorkItem(async (state) =>
{
do
{
List<EnqueuedMessage> currentMessages = new List<EnqueuedMessage>();
do
{
if (!this.messagesQueue.TryDequeue(out var item))
{
break;
}
currentMessages.Add(item);
} while (true);
var messages = currentMessages.Select(item =>
{
var jsonTagValue = JsonConvert.SerializeObject(item.Data);
this.logger.LogTrace($"handling message for tag {item.Tag.Name} with value {jsonTagValue}");
JObject jsonEvent = JObject.Parse($"{{\"{item.Tag.Name}\":{jsonTagValue}}}");
jsonEvent.Add("timestamp", item.EnqueuedTime.ToString("o"));
var jsonData = jsonEvent.ToString();
this.logger.LogTrace("{1}", jsonData);
return new Message(Encoding.UTF8.GetBytes(jsonData))
{
CreationTimeUtc = item.EnqueuedTime.UtcDateTime,
};
}).ToArray();
if (!messages.Any())
continue;
this.logger.LogDebug("Sending {0} messages", messages.Count());
await this.moduleClient.SendEventBatchAsync("tag", messages);
} while (true);
});
De cette manière mon module Edge exécutera autant de threads que de tags à acquérir et un supplémentaire pour l’envoie des données. Cela permettra ainsi d’utiliser au mieux les capacités de notre hardware IoT Edge pour capter ces données.
Nous utiliserons au mieux les capacités hardware de notre IoT Edge. Cependant, en fonction des fréquences que vous aurez choisis, du nombre de tags, et de PLCs connectés, l’impact sur les automates et sur les modules en aval sera important et le rapport ne sera pas totalement linéaire … Il vous appartient de bien mesurer les effets de ces mesures.
Le code du module est disponible sur mon github : kbeaugrand-org/iotedge-plc-publisher: This project aims to provide an Azure IoT Edge module for publishing PLC Tags values to Azure IoT Edge and Azure IoT Hub. (github.com).
Vous noterez que ce module possède également des méthodes permettant d’interroger l’appareil depuis le cloud et d’effectuer des opérations de read (le write arrivera plus tard).
Aggregation
Lors de mes tentatives, j’ai très vite été confronté à un souci : La vitesse de l’acquisition impacte la fréquence d’émission des messages sur notre IoT Hub.
Vous allez me dire : « Ben oui, c’est logique !« , et vous aurez raison ! Mais comment réaliser cela ?
J’ai donc créé un second module dédié à cette agrégation avant d’envoyer les données dans le Upstream (Azure IoT Hub). Ce module est développé en Javascript pour des raisons de rapidité, mais cela aurait aussi bien fait l’affaire en .NET…
var bag = [];
// Act on input messages to the module.
client.on('inputMessage', function (inputName, msg) {
var msgString = msg.getBytes().toString('utf8');
var message = JSON.parse(msgString);
if (message) {
bag.push(message);
}
});
setInterval(() => {
sendMessages(client);
}, process.env.OUTPUT_FREQUENCY || 1000);
Ici je m’abonne aux messages en entrée de mon module et pour chaque message je stocke (en mémoire) le message dans un tableau. Puis dans un setInterval je déclenche la fonction d’envoie des messages (par défaut toutes les secondes).
function sendMessages(client) {
var items = [];
while (bag.length > 0) {
items.push(bag.shift());
}
if(items.length > 0) {
var result = JSON.stringify(items);
gzip(result)
.then((compressed) => {
var outputMsg = new Message(compressed.toString('base64'));
client.sendOutputEvent('output', outputMsg);
});
}
La fonction sendMessages est alors en charge de réccupérer tous les messages présents dans mon tableau et d’en créer un tableau JSON. En revanche, avec la quantité de données que je générais, il m’a fallut ajouter une couche de compression de mon message puisque celui-ci dépassait allègrement les 256Kb autorisés sur IoT Hub.
Le batch de messages est donc compressé à la volée en gzip avant d’être encodé en base64 pour qu’il puisse être pris en charge par IoT Hub.
Le code du module d’agrégation est disponible sur mon github : iotedge-event-aggregator/src/AggregateMessages at main · kbeaugrand-org/iotedge-event-aggregator (github.com)
Passons maintenant à l’ingestion…
Décompression et split
Nous venons de le voir, pour que l’ensemble des données puisse transiter par IoT Hub et limiter la facturation, j’ai dû réaliser une couche de compression des données, une fois arrivé dans le cloud, comment exploiter les données ? Simplement en réalisant la chaîne inverse : Décompression des données split des messages …
Pour simplifier les choses, j’ai réalisé une Azure Function, une fois de plus en NodeJS, déclenchée sur les évènements de IoT Hub et publiant les résultats dans Azure Event Hub :
const {ungzip} = require('node-gzip');
module.exports = async function (context, eventHubMessages) {
eventHubMessages.forEach((message, index) => {
ungzip(Buffer.from(message, 'base64'))
.then(uncompressed => {
var event = JSON.parse(uncompressed);
if (!Array.isArray(event))
return event;
context.bindings.outputEventHubMessage = [];
event.forEach(item => {
context.bindings.outputEventHubMessage.push(JSON.stringify(item));
});
});
});
context.done();
};
Le code de la fonction Azure est disponible sur mon github : iotedge-event-aggregator/src/SplitMessages at main · kbeaugrand-org/iotedge-event-aggregator (github.com)
Enfin, je déploie l’intégralité sur le cloud et en Edge.
Manifest de déploiement IoT Edge
Le manifest de déploiement est très standard, mais je vous propose de voir les briques principales mises en oeuvre.
Modules
"modules": {
"aggregator": {
"env": {
"OUTPUT_FREQUENCY": {
"value": 1000
}
},
"imagePullPolicy": "on-create",
"restartPolicy": "always",
"settings": {
"image": "ghcr.io/kbeaugrand-org/azure-iotedge-aggregationmodule:1.0.0"
},
"startupOrder": 100,
"status": "running",
"type": "docker"
},
"plcPublisher": {
"env": {
"Logging__LogLevel__Default": {
"value": "Information"
}
},
"imagePullPolicy": "on-create",
"restartPolicy": "always",
"settings": {
"image": "ghcr.io/kbeaugrand-org/azure-iotedge-plc-publisher:1.0.0"
},
"startupOrder": 200,
"status": "running",
"type": "docker"
}
}
edgeHub Routes
"routes": {
"plc-to-aggregator": {
"priority": 0,
"route": "FROM /messages/modules/plcPublisher/outputs/* INTO BrokeredEndpoint(\"/modules/aggregator/inputs/inputMessage\")"
},
"aggregator-to-upstream": {
"priority": 1,
"route": "FROM /messages/modules/aggregator/outputs/* INTO $upstream"
}
}
plcPublisher Module Twin
Enfin, nous pouvons configurer les tags à lire au travers du jumeau numérique du module :
"properties.desired": {
"tags": [
{
"gateway": "192.168.2.55",
"path": "1,0",
"plcType": "ControlLogix",
"tagType": "ARRAY_DINT",
"tagName": "P01_G",
"arrayLength": 60,
"pollingInterval": 100
}
]
}
Dans cet exemple, je vais lire un tableau de 60 DINT à une fréquence de 10Hz (soit toutes les 100ms).
Azure Data Explorer
Dans cette démonstration, j’ai choisi d’utiliser Azure Data Explorer pour visualiser mes données et les exploiter (aussi, je l’avoue pour valider la qualité de l’acquisition de mes données).
Brancher Azure Data Explorer sur un stream de données en provenance de Event Hub est très facile dans la partie DataManagement, vous serez ainsi guidé dans la configuration de votre source de données et sur le mapping :

Sélectionnez Ingest from Event Hub.

Cliquez sur Next

Cliquer sur Next

Par défaut Azure Data Explorer propose d’ingérer des données textuelles, mais vous pouvez lui spécifier que la source de données est JSON. Ainsi il découvrira le schéma de vos données et vous le proposera ainsi que le mapping de vos données avec votre table.

Après avoir validé la création de votre ingestion, vous pourrez commencer à exploiter les données. (Il faudra attendre cependant quelques minutes que votre ingestion ne se mette en place et que vous puissiez utiliser les premières données).

Enfin, Azure Data Explorer nous permet, comme vous pouvez le voir ci-dessus, de suivre la latence de l’ingestion de vos données depuis votre Event Hub.
Exploitation des données
Azure Data Explorer permet de consulter et d’exploiter les données au travers d’un moteur de requête utilant le language Kusto. Language peu commun pour nous les développeurs, mais rudement efficace ex :
let lag = 1.5m;
let timeWindow = 0.2sec;
telemetry
| where timestamp > ago(lag) and timestamp < ago(lag - timeWindow)
| project timestamp, value = Item
| order by timestamp desc
| render timechart with(title="Values by time");
Cette requête me permet ici d’afficher la courbe d’évolution d’une donnée enregistrée sur il y a 1min et 30 sec. D’afficher ses valeurs sur une période de 0.2sec dans un graphique d’évolution de celle-ci par rapport au temps :

On remarque ici les différentes valeurs dans l’évolution de ma donnée. Les données sont lues à une fréquence de 100Hz dans cet exemple.
Résultats obtenus
La solution m’a permi de réaliser une captation de 60 points de données par automate et ceux sur 2 automates, à une fréquence de 100Hz.
Un petit calcul rapide ? 60*2*100 = 12 000 !
Héé oui, 12 000 données par secondes lues sur des automates en parallèle.
Mitigation
Effectivement, cela parait impressionnant dit comme cela, mais je vais attirer votre attention sur des points importants dans cette réalisation.
Performance de la lecture : Effectuer une demande à un automate nécessite une communication (par le réseau éthernet). Et il n’y a là, rien d’instantané, initier une connection TCP peut prendre quelques µs voir ms et ceci est donc du temps que vous perdez lors de l’acquisition. Afin d’augmenter la performance de votre acquisition il faudra limiter le nombre de requêtes de lectures au minimum.
Lors de mes tests, j’effectue ainsi une lecture d’un tableau de données que je reformate en JSON avant d’envoyer dans le EdgeHub. Cette opération est réalisée dans le thread de publication pour ne pas impacter les threads de lecture. J’utilise un module permettant le mapping JSON utilisant le language JMESPath (kbeaugrand-org/iotedge-plc-publisher: This project aims to provide an Azure IoT Edge module for publishing PLC Tags values to Azure IoT Edge and Azure IoT Hub. (github.com))
Précision sur la fréquence : Le module d’acquisition est exécuté sur un système nativement non temps réel et cela a une importance capitale dans l’exécution d’une acquisition. En effet, la fréquence de l’acquisition dépend de la charge de travail de votre IoT Edge, et il se peut qu’à quelques millisecondes prêt le déclenchement de la lecture ne soit pas exactement le même.
Ainsi sur mes tests, j’ai pu mettre en évidence que cette précision sur une fréquence de 100Hz était de ±5ms dans les 90 percentils. C’est effectivement important puisqu’ il y a un risque d’erreur de 50% sur cette fréquence d’acquisition.
Impacte sur la production de l’automate : l’automate est quant à lui nativement temps réel. Chaque opération que nous allons lui demander impliquera un temps d’exécution dans son cycle dédié à la communication, et ce sera du temps en moins pour l’exécution de son programme. Ainsi plus vous allez le solliciter, plus vous risquez de ralentir sa production.
Lors de mes tests, j’ai effectué les mesures sur des automates non reliés à de la production et qui n’effectuaient que des modifications de données en mémoire. Il vous appartient donc de valider l’impact ou non de la lecture des données sur le cycle de votre automate.
Sollicitation de edgeHub : Dans ces essais, edgeHub est un point central dans cette captation. En premier lieu, il est en charge d’ingérer les messages et de les diffuser aux consommateurs (au travers des routes). Mais il est également en charge de mettre en tampon les messages non distribués pour augmenter la résilience (coupure du lien WAN par exemple).
J’ai pu constater qu’il est effectivement très sollicité :

Il y a un article très bien détaillé sur la communauté technique de Microsoft expliquant en détails les différents paramètres de edgeHub à utiliser pour optimiser les performances et les risques de pertes de données que vous pouvez espérer avec cette solution : Stretching the IoT Edge performance limits – Microsoft Tech Community
Et d’autres que je n’ai pas encore exploité.
Retenez donc que cette fréquence d’acquisition n’est certainement pas adaptée à tous les usages et peu s’avérer quelques peu faussé si vous vous attendez à un résultat précis.
En revanche, donner la capacité à un IoT Edge de capter des données à des fréquences importantes > 10Hz est un levier important dans la transformation digitale en donnant une solution de plus dans la convergence OT/IT …
Un commentaire sur “Acquisition de données haute fréquence sur son PLC Control Logix”