EnsLib.Kafka.Service, performance issues when working with topics that contain many messages
I need to build an integration solution that reads messages from a Kafka topic. The topic has 3 partitions and contains several million messages.
For certain reasons, I can only use the standard EnsLib.Kafka.Service class and cannot use either KafkaClient or Python.
To measure performance and collect statistics I created a simple key + timestamp table with no indexes (so it is unlikely to be a bottleneck). Next, I started an instance of EnsLib.Kafka.Service. In the OnProcessInput method, I receive a message, extract the key from it, get the current time, and write the row to the table.
The statistics shows that as messages are read, performance degrades literally by the minute. In the first minute, the business service is capable of processing up to 25000 messages per minute, then performance gradually decreases, and after 10 minutes performance drops to 2000 messages per minute.
Pool Size = 1, Call Interval = 5 First run 2025-10-11 19:01:00 25880 2025-10-11 19:02:00 12468 2025-10-11 19:03:00 8013 2025-10-11 19:04:00 6626 2025-10-11 19:05:00 5023 2025-10-11 19:06:00 4947 2025-10-11 19:07:00 3912 2025-10-11 19:08:00 3539 2025-10-11 19:09:00 3529 2025-10-11 19:10:00 3169 2025-10-11 19:11:00 2955 2025-10-11 19:12:00 2914 2025-10-11 19:13:00 2771 2025-10-11 19:14:00 2624 2025-10-11 19:15:00 2446 2025-10-11 19:16:00 2754 2025-10-11 19:17:00 2545 2025-10-11 19:18:00 2350 2025-10-11 19:19:00 2314 2025-10-11 19:20:00 2274 Pool Size = 1, Call Interval = 5 Second run 2025-10-11 19:22:00 22892 2025-10-11 19:23:00 15239 2025-10-11 19:24:00 11489 2025-10-11 19:25:00 8267 2025-10-11 19:26:00 6351 2025-10-11 19:27:00 5268 2025-10-11 19:28:00 4779 2025-10-11 19:29:00 4502 2025-10-11 19:30:00 3854 2025-10-11 19:31:00 4048 2025-10-11 19:32:00 3675 2025-10-11 19:33:00 3434 2025-10-11 19:34:00 2981 2025-10-11 19:35:00 3101 2025-10-11 19:36:00 2869 2025-10-11 19:37:00 2343
I tried to play with other configuration properties (CallInterval, ReceiveSettings = { "pollInterval": 1000 }) but that didn't help and even caused Java OutOfMemory error.
How can this problem be solved?
Comments
Can you show me the OnProcessInput source code?
Sure, here it is:
Class User.TestService Extends EnsLib.Kafka.Service
{
Method OnProcessInput(pInput As %Net.Remote.Object, Output pOutput As %RegisteredObject) As %Status {
Set tSC = $$$OK
Try {
Set tMsg = ##class(EnsLib.Kafka.Message).%New()
Do tMsg.FromRemoteObject(pInput)
#Dim row as Test.Stat
Set row = ##class(Test.Stat).%New()
Set row.Topic = tMsg.topic
Set row.Key = tMsg.key
// row.Ts - generated automatically in the Test.Stat constructor
Do row.%Save()
$$$LOGINFO("Saved")
}
Catch (ex) {
Set tSC = ex.AsStatus()
}
Quit tSC
}
}
InterSystems' Kafka component is Java-based. You need to tune the Java (Java Gateway) used by your service.
For Java 8, here are some widely recommended JVM parameters and best practices, particularly focusing on memory management and Garbage Collection (GC):
1. Heap Size Configuration
This is the most fundamental step.
Fixed Heap Size: Set the initial and maximum heap sizes to the same value to prevent the JVM from resizing the heap dynamically during runtime. This reduces GC overhead and improves application stability.
-Xms<size>: Initial Java heap size.
-Xmx<size>: Maximum Java heap size.
Example: -Xms4G -Xmx4G (sets initial and max heap to 4 Gigabytes)
Sizing: The value should be determined by monitoring your application's memory usage under load. A good rule of thumb is to set -Xmx to about 75-80% of the available physical RAM to leave room for the Operating System and JVM overhead (Metaspace, thread stacks, etc.).
2. Garbage Collector (GC) Selection
In Java 8, the default GC is the Parallel Collector (for server-class machines), which focuses on high throughput (less time spent in GC overall, but potentially longer pause times).
For most modern applications, especially those requiring lower latency, the G1 (Garbage-First) Collector is generally recommended starting from Java 7/8.
Use G1 Collector:
-XX:+UseG1GC
Thanks Yuri, I've tweaked Java a bit according your recommendations, I will report on the results later (right now the production hangs and I'm unable to terminate Java processes :)
Correct me if I'm wrong but latest IRIS distributions come with Java 11 which rarely requires fine-tuning
Java 1.8