KAFKASTRM-LL down while bulk loading (3.1)

I’m noticing that after bulk loading CSVs for a while, I see the following status:

+--------------------+-------------------------+-------------------------+
|    Service Name    |     Service Status      |      Process State      |
+--------------------+-------------------------+-------------------------+
|       ADMIN        |         Online          |         Running         |
|        CTRL        |         Online          |         Running         |
|        DICT        |         Online          |         Running         |
|        ETCD        |         Online          |         Running         |
|        GPE         |         Online          |         Running         |
|        GSE         |         Online          |         Running         |
|        GSQL        |         Online          |         Running         |
|        GUI         |         Online          |         Running         |
|        IFM         |         Online          |         Running         |
|       KAFKA        |         Online          |         Running         |
|     KAFKACONN      |         Online          |         Running         |
|    KAFKASTRM-LL    |          Down           |         Stopped         |
|       NGINX        |         Online          |         Running         |
|       RESTPP       |         Online          |         Running         |
|        TS3         |         Online          |         Running         |
|      TS3SERV       |         Online          |         Running         |
|         ZK         |         Online          |         Running         |
+--------------------+-------------------------+-------------------------+

In the logs for this service I see the following error:

[2020-12-10 15:14:57,777] INFO stream-thread [log-aggregation-e76da75d-e559-416b-9b7f-de107b1b3b65-StreamThread-1] Shutdown complete (org.apache.kafka.streams.processor.internals.StreamThread:1150)
[2020-12-10 15:14:57,777] ERROR Log aggregation stream stopped due to unexpected exception (com.tigergraph.kafka.stream.loadinglog.LogAggregation:168)
org.apache.kafka.streams.errors.StreamsException: task [0_0] Abort sending since an error caught with a previous record (timestamp 1607611403192) to topic log-aggregation-loading-progress-changelog due to org.apache.kafka.common.errors.RecordTooLargeException: The message is 1056245 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:154)
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:214)
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:959)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:862)
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:181)
        at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:69)
        at org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore.log(ChangeLoggingTimestampedKeyValueBytesStore.java:35)
        at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69)
        at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
        at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:87)
        at org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
        at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
        at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109)
        at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124)
        at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:272)
        at org.apache.kafka.streams.state.internals.WrappedStateStore.flush(WrappedStateStore.java:84)
        at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$flush$7(MeteredKeyValueStore.java:192)
        at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
        at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:192)
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:285)
        at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:177)
        at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:554)
        at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:490)
        at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:478)
        at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:226)
        at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:543)
        at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:977)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:823)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1056245 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.
[2020-12-10 15:14:57,779] INFO stream-client [log-aggregation-e76da75d-e559-416b-9b7f-de107b1b3b65] State transition from ERROR to PENDING_SHUTDOWN (org.apache.kafka.streams.KafkaStreams:287)
[2020-12-10 15:14:57,782] INFO stream-thread [log-aggregation-e76da75d-e559-416b-9b7f-de107b1b3b65-StreamThread-1] Informed to shut down (org.apache.kafka.streams.processor.internals.StreamThread:1116)
[2020-12-10 15:14:57,785] INFO stream-client [log-aggregation-e76da75d-e559-416b-9b7f-de107b1b3b65] State transition from PENDING_SHUTDOWN to NOT_RUNNING (org.apache.kafka.streams.KafkaStreams:287)
[2020-12-10 15:14:57,785] INFO stream-client [log-aggregation-e76da75d-e559-416b-9b7f-de107b1b3b65] Streams client stopped completely (org.apache.kafka.streams.KafkaStreams:989)
[2020-12-10 15:14:57,793] INFO Stopped ServerConnector@183ec003{HTTP/1.1,[http/1.1]}{0.0.0.0:30004} (org.eclipse.jetty.server.AbstractConnector:380)
[2020-12-10 15:14:57,794] INFO node0 Stopped scavenging (org.eclipse.jetty.server.session:158)
[2020-12-10 15:14:57,804] INFO Stopped o.e.j.s.ServletContextHandler@417ad4f3{/,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:1016)
[2020-12-10 15:14:57,805] INFO Log aggregation stream stopped (com.tigergraph.kafka.stream.loadinglog.LogAggregation:179)
[2020-12-10 15:14:57,806] INFO REST server stopped (com.tigergraph.kafka.stream.loadinglog.LogAggregation:180)
(END)

Is it a problem that this service is down? What does it do? Loading seems to continue. What can I do to resolve this?

The message error says that the size of the message ( 1056245 ) bypasses the Kafka Configuration see below picture ( 10485760 ) ,
gadmin config get GPE.Kafka.MsgMaxBytes

You can edit that in the config and increase it for your loading job .

gadmin config set GPE.Kafka.MsgMaxBytes new_val

and then apply your new config value :slightly_smiling_face:

gadmin config apply

hope this helps !

2 Likes