您的位置:首页 > 财经 > 金融 > 全网精选小程序_邢台做网站的价格究竟多少钱?_广告留电话号的网站_品牌传播方案

全网精选小程序_邢台做网站的价格究竟多少钱?_广告留电话号的网站_品牌传播方案

2025/1/10 13:22:55 来源:https://blog.csdn.net/qq_43071699/article/details/144857392  浏览:    关键词:全网精选小程序_邢台做网站的价格究竟多少钱?_广告留电话号的网站_品牌传播方案
全网精选小程序_邢台做网站的价格究竟多少钱?_广告留电话号的网站_品牌传播方案

连接不上kafka,报下边的错

org.apache.kafka.common.KafkaException: Producer is closed forcefully.at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:760) [kafka-clients-3.0.2.jar:na]at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:747) [kafka-clients-3.0.2.jar:na]at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:283) ~[kafka-clients-3.0.2.jar:na]at java.lang.Thread.run(Thread.java:750) ~[na:1.8.0_351]2024-12-31 21:04:59.505 ERROR 35092 --- [ad | producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='{"id":null,"payerName":"payer756","payerAcc":"payer_acc756","payeeName":"payee756","payeeAcc":"payee...' to topic Fraud_acc:org.apache.kafka.common.KafkaException: Producer is closed forcefully.at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:760) [kafka-clients-3.0.2.jar:na]at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:747) [kafka-clients-3.0.2.jar:na]at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:283) ~[kafka-clients-3.0.2.jar:na]at java.lang.Thread.run(Thread.java:750) ~[na:1.8.0_351]2024-12-31 21:04:59.505 ERROR 35092 --- [ad | producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='{"id":null,"payerName":"payer757","payerAcc":"payer_acc757","payeeName":"payee757","payeeAcc":"payee...' to topic Fraud_acc:org.apache.kafka.common.KafkaException: Producer is closed forcefully.at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:760) [kafka-clients-3.0.2.jar:na]at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:747) [kafka-clients-3.0.2.jar:na]at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:283) ~[kafka-clients-3.0.2.jar:na]at java.lang.Thread.run(Thread.java:750) ~[na:1.8.0_351]2024-12-31 21:04:59.505 ERROR 35092 --- [ad | producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='{"id":null,"payerName":"payer758","payerAcc":"payer_acc758","payeeName":"payee758","payeeAcc":"payee...' to topic Fraud_acc:org.apache.kafka.common.KafkaException: Producer is closed forcefully.at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:760) [kafka-clients-3.0.2.jar:na]at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:747) [kafka-clients-3.0.2.jar:na]at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:283) ~[kafka-clients-3.0.2.jar:na]at java.lang.Thread.run(Thread.java:750) ~[na:1.8.0_351]2024-12-31 21:04:59.505 ERROR 35092 --- [ad | producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='{"id":null,"payerName":"payer759","payerAcc":"payer_acc759","payeeName":"payee759","payeeAcc":"payee...' to topic Fraud_acc:org.apache.kafka.common.KafkaException: Producer is closed forcefully.at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:760) [kafka-clients-3.0.2.jar:na]at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:747) [kafka-clients-3.0.2.jar:na]at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:283) ~[kafka-clients-3.0.2.jar:na]at java.lang.Thread.run(Thread.java:750) ~[na:1.8.0_351]2024-12-31 21:04:59.505 ERROR 35092 --- [ad | producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='{"id":null,"payerName":"payer760","payerAcc":"payer_acc760","payeeName":"payee760","payeeAcc":"payee...' to topic Fraud_acc:org.apache.kafka.common.KafkaException: Producer is closed forcefully.at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:760) [kafka-clients-3.0.2.jar:na]at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:747) [kafka-clients-3.0.2.jar:na]at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:283) ~[kafka-clients-3.0.2.jar:na]at java.lang.Thread.run(Thread.java:750) ~[na:1.8.0_351]2024-12-31 21:04:59.505 ERROR 35092 --- [ad | producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='{"id":null,"payerName":"payer761","payerAcc":"payer_acc761","payeeName":"payee761","payeeAcc":"payee...' to topic Fraud_acc:org.apache.kafka.common.KafkaException: Producer is closed forcefully.at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:760) [kafka-clients-3.0.2.jar:na]at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:747) [kafka-clients-3.0.2.jar:na]at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:283) ~[kafka-clients-3.0.2.jar:na]at java.lang.Thread.run(Thread.java:750) ~[na:1.8.0_351]
解决方法

cat server.properties 文件中配置一下下边的参数, 当初是localhost所以访问丢失
在这里插入图片描述

2025-01-01 09:18:04.991  WARN 6240 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-my-group-1, groupId=my-group] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2025-01-01 09:18:05.736  INFO 6240 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2025-01-01 09:18:05.736  INFO 6240 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2025-01-01 09:18:05.737  INFO 6240 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 1 ms
2025-01-01 09:18:05.843  INFO 6240 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: acks = -1batch.size = 16384bootstrap.servers = [192.168.1.112:9092]buffer.memory = 33554432client.dns.lookup = use_all_dns_ipsclient.id = producer-1compression.type = noneconnections.max.idle.ms = 540000delivery.timeout.ms = 120000enable.idempotence = trueinterceptor.classes = []key.serializer = class org.apache.kafka.common.serialization.StringSerializerlinger.ms = 0max.block.ms = 60000max.in.flight.requests.per.connection = 5max.request.size = 1048576metadata.max.age.ms = 300000metadata.max.idle.ms = 300000metric.reporters = []metrics.num.samples = 2metrics.recording.level = INFOmetrics.sample.window.ms = 30000partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitionerreceive.buffer.bytes = 32768reconnect.backoff.max.ms = 1000reconnect.backoff.ms = 50request.timeout.ms = 30000retries = 2147483647retry.backoff.ms = 100sasl.client.callback.handler.class = nullsasl.jaas.config = nullsasl.kerberos.kinit.cmd = /usr/bin/kinitsasl.kerberos.min.time.before.relogin = 60000sasl.kerberos.service.name = nullsasl.kerberos.ticket.renew.jitter = 0.05sasl.kerberos.ticket.renew.window.factor = 0.8sasl.login.callback.handler.class = nullsasl.login.class = nullsasl.login.refresh.buffer.seconds = 300sasl.login.refresh.min.period.seconds = 60sasl.login.refresh.window.factor = 0.8sasl.login.refresh.window.jitter = 0.05sasl.mechanism = GSSAPIsecurity.protocol = PLAINTEXTsecurity.providers = nullsend.buffer.bytes = 131072socket.connection.setup.timeout.max.ms = 30000socket.connection.setup.timeout.ms = 10000ssl.cipher.suites = nullssl.enabled.protocols = [TLSv1.2]ssl.endpoint.identification.algorithm = httpsssl.engine.factory.class = nullssl.key.password = nullssl.keymanager.algorithm = SunX509ssl.keystore.certificate.chain = nullssl.keystore.key = nullssl.keystore.location = nullssl.keystore.password = nullssl.keystore.type = JKSssl.protocol = TLSv1.2ssl.provider = nullssl.secure.random.implementation = nullssl.trustmanager.algorithm = PKIXssl.truststore.certificates = nullssl.truststore.location = nullssl.truststore.password = nullssl.truststore.type = JKStransaction.timeout.ms = 60000transactional.id = nullvalue.serializer = class org.apache.kafka.common.serialization.StringSerializer2025-01-01 09:18:05.862  INFO 6240 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Instantiated an idempotent producer.
2025-01-01 09:18:05.882  INFO 6240 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.0.2
2025-01-01 09:18:05.882  INFO 6240 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 25b1aea02e37da14
2025-01-01 09:18:05.882  INFO 6240 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1735694285881
2025-01-01 09:18:06.127  INFO 6240 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Resetting the last seen epoch of partition my-topic-0 to 0 since the associated topicId changed from null to ZiipuoTKS22oBX6HbBpMbQ
2025-01-01 09:18:06.128  INFO 6240 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: DioEcCfQQNi6Ea50_-07Ag
2025-01-01 09:18:06.160  INFO 6240 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-1] ProducerId set to 16 with epoch 0
2025-01-01 09:18:08.127  WARN 6240 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-my-group-1, groupId=my-group] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

kafka消费者报错

2025-01-01 17:04:38.425 ERROR 38716 --- [. Out (20/24)#0] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-my-group-49, groupId=my-group] Offset commit failed on partition FraudAcc-0 at offset 25: The coordinator is not aware of this member.
2025-01-01 17:04:38.425  INFO 38716 --- [. Out (20/24)#0] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-my-group-49, groupId=my-group] OffsetCommit failed with Generation{generationId=-1, memberId='', protocol='null'}: The coordinator is not aware of this member.
2025-01-01 17:04:38.425  INFO 38716 --- [. Out (20/24)#0] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-my-group-49, groupId=my-group] Resetting generation due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2025-01-01 17:04:38.425  INFO 38716 --- [. Out (20/24)#0] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-my-group-49, groupId=my-group] Request joining group due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2025-01-01 17:04:38.425  WARN 38716 --- [. Out (20/24)#0] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-my-group-49, groupId=my-group] Asynchronous auto-commit of offsets {FraudAcc-0=OffsetAndMetadata{offset=25, leaderEpoch=0, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

解决方法

根据你提供的日志信息,错误的根本原因在于Kafka消费者在尝试提交偏移量(offset commit)时遇到了协调者(coordinator)不知道的成员(member)。这通常发生在以下几种情况下:

  1. 消费者组重新平衡:当消费者组中的一个或多个消费者加入或离开时,Kafka会触发一次重新平衡操作。在此期间,所有消费者都会暂时失去对分区的所有权,并且需要重新加入组以获得新的分配。如果你的消费者在重新平衡后没有正确地重新加入,它将无法提交偏移量。

  2. max.poll.interval.ms超时:这个配置项定义了消费者可以花费多长时间处理来自poll()调用的消息而不返回到Kafka进行心跳检测。如果消费者花费的时间超过了这个值,Kafka认为该消费者已经死亡并触发重新平衡。这可能是由于消息处理时间过长或者消费者卡住未能及时调用poll()导致的。

  3. session.timeout.ms超时:这是另一个与消费者健康检查有关的参数,它决定了Kafka等待消费者发送心跳的最大时间。如果超过这个时间没有收到心跳,Kafka也会认为该消费者已经死亡并触发重新平衡。

  4. max.poll.records设置过高:如果你的max.poll.records配置得太高,那么每次poll()调用可能会返回大量的记录,从而增加了处理这些记录所需的时间,可能导致max.poll.interval.ms超时。

解决方案

你可以通过调整以下几个配置来解决这个问题:

  • 增加max.poll.interval.ms:如果知道你的消费者可能需要更多时间来处理一批消息,可以适当增加这个值。默认是5分钟(300,000毫秒),你可以根据实际情况调整为更长的时间。
    spring.kafka.consumer.max-poll-interval=600000  # 例如设置为10分钟
    
  • 减少max.poll.records:降低每次poll()调用返回的最大记录数,可以减少单次处理的负担,使消费者更快地完成处理并返回心跳。
    spring.kafka.consumer.max-poll-records=100  # 根据具体情况调整
    
  • 优化消息处理逻辑:确保你的消息处理逻辑尽可能高效,避免长时间阻塞的操作,如数据库查询、网络请求等。考虑使用异步处理或其他方式来加速处理过程。
  • 检查消费者的健壮性:确保消费者不会因为异常情况而卡住或崩溃。添加适当的异常处理和监控机制可以帮助快速发现和解决问题。
  • 确保消费者组ID唯一:确认每个消费者使用的消费者组ID是唯一的,除非它们确实属于同一个消费者组并且共享相同的订阅主题。

日志分析

从日志中可以看到,消费者consumer-my-group-49试图提交偏移量,但协调者并不知道这个成员的存在,因此提交失败。然后消费者重置了它的生成代(generation),并请求加入组。紧接着的日志警告指出了问题的核心——消费者在两次poll()之间的间隔超过了max.poll.interval.ms,这通常意味着消息处理时间过长。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com