In Apache Stratos, data publishing over Thrift is done via streaming. It uses WSO2 Carbon’s Data Bridge to serialize data in to a stream and publish to a given IP address and a Port. The use of DataBridge is explained here.
Stream Definition
In order to publish data as a stream, the definition of the particular stream should be defined first. This is achieved using the org.wso2.carbon.databridge.commons.StreamDefinition class. The list of attributes that will be written to the stream is defined using a StreamDefinition object to be assigned to a particular DataPublisher.
One notable aspect of the StreamDefinition class is the three separate attribute list for metaData, correlationData and payloadData in that order. This separation of the attributes is a logical divide which doesn’t have any significant meaning on the server side when it comes to WSO2 CEP and WSO2 BAM. It is available for the publisher to make sense of the various types of data that are being published and then later analyzed. This scenario in the WSO2 CEP context is explained here.
A StreamDefinition should minimally have a stream ID and a stream name. Additionally a nickname, and a description can be added. The stream ID is generated by the receiver when the StreamDefinition is associated with a particular connection. This constitutes of the stream name and the stream version. When publishing data this stream ID is used to retrieve the stream definition at the receiver’s end. In fact the first two attributes to be published when publishing data are the stream ID of STRING type and the timestamp of the event in milliseconds of type LONG.
The attributes describing the data that is published have to added to the StreamDefinition in the order they are to be published. The order is important in a data stream because it is bound to the way the data is read from the receiver’s side.
For each attribute added, the type of the data should be defined. WSO2 DataBridge provides readable attribute types that represents the type system in Thrift, as INT, LONG, FLOAT, DOUBLE, STRING and BOOL.
Data
Data are written in to the stream as Events. org.wso2.carbon.databridge.commons.Event object stores the data for a particular event and when the time comes to actually serialize the data in to the wire, org.wso2.carbon.databridge.agent.thrift.internal.utils.ThriftEventConverter adds all the data together and assigns proper Thrift types to each data. This logic is implemented in the Carbon platform’s DataBridge.
Apache Stratos publishes data over Thrift in two scenarios. Both scenarios are involved with managing cartridges based on their performance. Cartridge Agent initiates and manages streaming of data from the cartridge to the monitoring server/complex event processing server. The addresses and the credentials of the data aggregation servers should be provided from Stratos to the cartridge agent.
Health Statistics Publishing
Health statistics of a particular cartridge instance is used by a real time processing engine to declare events that manage the cartridge life cycle. For example based on the memory load and the CPU usage, CEP can publish events that the AutoScaling picks up and accordingly spawn new instances. Another example is the AutoScaler terminating faulty cartridges based on events published by the CEP when health statistics publishing stops for a certain period of timeout. The role of the AutoScaler along with a CEP is described here.
Following values should be passed to the Cartridge Agent via the Puppet Configuration, which includes these values as system properties when the Cartridge Agent starts up. They are then used to establish the connection and publish the data to the Thrift receiver.
- Ip address (thrift.receiver.ip)
- Port (thrift.receiver.port) — 7711 is the default value
- Username(thrift.server.admin.username)
- Password(thrift.server.admin.password)
The Cartridge Agent publishes several events to the message broker after it starts, two key events being the InstanceStartedEvent and the InstanceActivatedEvent. The InstanceStartedEvent is published after the Topology is confirmed as consistent. The InstanceActivatedEvent is published after the InstanceStartedEvent, but this can happen in two code paths. If there is no repository based artifact management to be done, the InstanceActivatedEvent will be published immediately by the CartridgeAgent. If the Cartridge Agent has to wait for the ArtifactUpdatedEvent to start managing the artifacts related to a repository, the InstanceActivatedEvent will be published after the artifacts are synchronized to the instance. These events are published from the Cartridge Agent to the /instance/status topic in the message broker. In both cases the health statistics publishing job starts after the InstanceActivatedEvent is published. The interval at which the health statistics publishing job works can be adjusted by specifying stats.notifier.interval property of the cartridge agent. By default this is set to 15 seconds.
The health statistics are published with the stream name cartridge_agent_health_stats. The stream definition contains only the payload data and the types of the added payload data are as follows.
- “cluster_id” => STRING
- “network_partition_id” => STRING
- “member_id” => STRING
- “partition_id” => STRING
- “health_description” => STRING
- “value” => DOUBLE
Two types of health statistics are published by the cartridge agent, percentage of free memory, and the load average of the CPU. health_description field uses values memory_consumption and load_average respectively for these two readings.
Health statistics publishing can be disabled by setting cep.stats.publisher.enabled property of the Cartridge agent to false.
Log Publishing
The application logs are published to a monitoring server by the cartridge agent for data analysis. For this the property enable.data.publisher should be set to true.
The log publishing thread starts as soon as the cartridge agent initialization is complete. For it to connect to the Thrift receiver following fields have to be passed to the Cartridge Agent.
- Ip address (monitoring.server.ip)
- Port (monitoring.server.port)
- Secure Port (monitoring.server.secure.port)
- Username (monitoring.server.admin.username)
- Password (monitoring.server.admin.password)
The stream definition for the log publishing is as follows. Note that one metaData attribute is added to the definition by the publisher. This contains the member ID of the particular instance that is publishing the data.
MetaData
- “memberId” => STRING
PayloadData
- “tenantID” => STRING
- “serverName” => STRING
- “appName” => STRING
- “logTime” => LONG
- “priority” => STRING
- “message” => STRING
- “logger” => STRING
- “ip” => STRING
- “instance” => STRING
- “stacktrace” => STRING
The log publisher stream is defined with the name of format logs.{tenantId}.{cartridgeAlias}.yyyy.MM.dd.
For each of the log that is mentioned under log.file.paths parameter a separate org.apache.stratos.cartridge.agent.data.publisher.log.FileBasedLogPublisher thread is started.
The FileBasedLogPublisher scans the log file for new lines and for each new line added, publishes a new Event to the Thrift receiver. Therefore there is no defined interval for the log publisher naturally.
Originally published at chamilad.github.io on October 10, 2014.
Written on October 10, 2014 by chamila de alwis.
Originally published on Medium