D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
td-agent
/
embedded
/
lib
/
ruby
/
gems
/
2.1.0
/
gems
/
ruby-kafka-0.4.2
/
lib
/
kafka
/
Filename :
consumer.rb
back
Copy
require "kafka/consumer_group" require "kafka/offset_manager" require "kafka/fetch_operation" module Kafka # A client that consumes messages from a Kafka cluster in coordination with # other clients. # # A Consumer subscribes to one or more Kafka topics; all consumers with the # same *group id* then agree on who should read from the individual topic # partitions. When group members join or leave, the group synchronizes, # making sure that all partitions are assigned to a single member, and that # all members have some partitions to read from. # # ## Example # # A simple producer that simply writes the messages it consumes to the # console. # # require "kafka" # # kafka = Kafka.new(seed_brokers: ["kafka1:9092", "kafka2:9092"]) # # # Create a new Consumer instance in the group `my-group`: # consumer = kafka.consumer(group_id: "my-group") # # # Subscribe to a Kafka topic: # consumer.subscribe("messages") # # # Loop forever, reading in messages from all topics that have been # # subscribed to. # consumer.each_message do |message| # puts message.topic # puts message.partition # puts message.key # puts message.value # puts message.offset # end # class Consumer def initialize(cluster:, logger:, instrumenter:, group:, offset_manager:, session_timeout:, heartbeat:) @cluster = cluster @logger = logger @instrumenter = instrumenter @group = group @offset_manager = offset_manager @session_timeout = session_timeout @heartbeat = heartbeat # A list of partitions that have been paused, per topic. @paused_partitions = {} # Whether or not the consumer is currently consuming messages. @running = false # The maximum number of bytes to fetch from a single partition, by topic. @max_bytes = {} end # Subscribes the consumer to a topic. # # Typically you either want to start reading messages from the very # beginning of the topic's partitions or you simply want to wait for new # messages to be written. In the former case, set `start_from_beginning` # to true (the default); in the latter, set it to false. # # @param topic [String] the name of the topic to subscribe to. # @param default_offset [Symbol] whether to start from the beginning or the # end of the topic's partitions. Deprecated. # @param start_from_beginning [Boolean] whether to start from the beginning # of the topic or just subscribe to new messages being produced. This # only applies when first consuming a topic partition – once the consumer # has checkpointed its progress, it will always resume from the last # checkpoint. # @param max_bytes_per_partition [Integer] the maximum amount of data fetched # from a single partition at a time. # @return [nil] def subscribe(topic, default_offset: nil, start_from_beginning: true, max_bytes_per_partition: 1048576) default_offset ||= start_from_beginning ? :earliest : :latest @group.subscribe(topic) @offset_manager.set_default_offset(topic, default_offset) @max_bytes[topic] = max_bytes_per_partition nil end # Stop the consumer. # # The consumer will finish any in-progress work and shut down. # # @return [nil] def stop @running = false end # Pause processing of a specific topic partition. # # When a specific message causes the processor code to fail, it can be a good # idea to simply pause the partition until the error can be resolved, allowing # the rest of the partitions to continue being processed. # # If the `timeout` argument is passed, the partition will automatically be # resumed when the timeout expires. # # @param topic [String] # @param partition [Integer] # @param timeout [Integer] the number of seconds to pause the partition for, # or `nil` if the partition should not be automatically resumed. # @return [nil] def pause(topic, partition, timeout: nil) @paused_partitions[topic] ||= {} @paused_partitions[topic][partition] = timeout && Time.now + timeout end # Resume processing of a topic partition. # # @see #pause # @param topic [String] # @param partition [Integer] # @return [nil] def resume(topic, partition) paused_partitions = @paused_partitions.fetch(topic, {}) paused_partitions.delete(partition) end # Whether the topic partition is currently paused. # # @see #pause # @param topic [String] # @param partition [Integer] # @return [Boolean] true if the partition is paused, false otherwise. def paused?(topic, partition) partitions = @paused_partitions.fetch(topic, {}) if partitions.key?(partition) # Users can set an optional timeout, after which the partition is # automatically resumed. When pausing, the timeout is translated to an # absolute point in time. timeout = partitions.fetch(partition) if timeout.nil? true elsif Time.now < timeout true else @logger.info "Automatically resuming partition #{topic}/#{partition}, pause timeout expired" resume(topic, partition) false end end end # Fetches and enumerates the messages in the topics that the consumer group # subscribes to. # # Each message is yielded to the provided block. If the block returns # without raising an exception, the message will be considered successfully # processed. At regular intervals the offset of the most recent successfully # processed message in each partition will be committed to the Kafka # offset store. If the consumer crashes or leaves the group, the group member # that is tasked with taking over processing of these partitions will resume # at the last committed offsets. # # @param min_bytes [Integer] the minimum number of bytes to read before # returning messages from the server; if `max_wait_time` is reached, this # is ignored. # @param max_wait_time [Integer, Float] the maximum duration of time to wait before # returning messages from the server, in seconds. # @param automatically_mark_as_processed [Boolean] whether to automatically # mark a message as successfully processed when the block returns # without an exception. Once marked successful, the offsets of processed # messages can be committed to Kafka. # @yieldparam message [Kafka::FetchedMessage] a message fetched from Kafka. # @raise [Kafka::ProcessingError] if there was an error processing a message. # The original exception will be returned by calling `#cause` on the # {Kafka::ProcessingError} instance. # @return [nil] def each_message(min_bytes: 1, max_wait_time: 5, automatically_mark_as_processed: true) consumer_loop do batches = fetch_batches(min_bytes: min_bytes, max_wait_time: max_wait_time) batches.each do |batch| batch.messages.each do |message| @instrumenter.instrument("process_message.consumer") do |notification| notification.update( topic: message.topic, partition: message.partition, offset: message.offset, offset_lag: batch.highwater_mark_offset - message.offset - 1, key: message.key, value: message.value, ) begin yield message rescue => e location = "#{message.topic}/#{message.partition} at offset #{message.offset}" backtrace = e.backtrace.join("\n") @logger.error "Exception raised when processing #{location} -- #{e.class}: #{e}\n#{backtrace}" raise ProcessingError.new(message.topic, message.partition, message.offset) end end mark_message_as_processed(message) if automatically_mark_as_processed @offset_manager.commit_offsets_if_necessary @heartbeat.send_if_necessary return if !@running end end # We may not have received any messages, but it's still a good idea to # commit offsets if we've processed messages in the last set of batches. @offset_manager.commit_offsets_if_necessary end end # Fetches and enumerates the messages in the topics that the consumer group # subscribes to. # # Each batch of messages is yielded to the provided block. If the block returns # without raising an exception, the batch will be considered successfully # processed. At regular intervals the offset of the most recent successfully # processed message batch in each partition will be committed to the Kafka # offset store. If the consumer crashes or leaves the group, the group member # that is tasked with taking over processing of these partitions will resume # at the last committed offsets. # # @param min_bytes [Integer] the minimum number of bytes to read before # returning messages from the server; if `max_wait_time` is reached, this # is ignored. # @param max_wait_time [Integer, Float] the maximum duration of time to wait before # returning messages from the server, in seconds. # @param automatically_mark_as_processed [Boolean] whether to automatically # mark a batch's messages as successfully processed when the block returns # without an exception. Once marked successful, the offsets of processed # messages can be committed to Kafka. # @yieldparam batch [Kafka::FetchedBatch] a message batch fetched from Kafka. # @return [nil] def each_batch(min_bytes: 1, max_wait_time: 5, automatically_mark_as_processed: true) consumer_loop do batches = fetch_batches(min_bytes: min_bytes, max_wait_time: max_wait_time) batches.each do |batch| unless batch.empty? @instrumenter.instrument("process_batch.consumer") do |notification| notification.update( topic: batch.topic, partition: batch.partition, offset_lag: batch.offset_lag, highwater_mark_offset: batch.highwater_mark_offset, message_count: batch.messages.count, ) begin yield batch rescue => e offset_range = (batch.first_offset .. batch.last_offset) location = "#{batch.topic}/#{batch.partition} in offset range #{offset_range}" backtrace = e.backtrace.join("\n") @logger.error "Exception raised when processing #{location} -- #{e.class}: #{e}\n#{backtrace}" raise ProcessingError.new(batch.topic, batch.partition, offset_range) end end mark_message_as_processed(batch.messages.last) if automatically_mark_as_processed end @offset_manager.commit_offsets_if_necessary @heartbeat.send_if_necessary return if !@running end end end # Move the consumer's position in a topic partition to the specified offset. # # Note that this has to be done prior to calling {#each_message} or {#each_batch} # and only has an effect if the consumer is assigned the partition. Typically, # you will want to do this in every consumer group member in order to make sure # that the member that's assigned the partition knows where to start. # # @param topic [String] # @param partition [Integer] # @param offset [Integer] # @return [nil] def seek(topic, partition, offset) @offset_manager.seek_to(topic, partition, offset) end def commit_offsets @offset_manager.commit_offsets end def mark_message_as_processed(message) @offset_manager.mark_as_processed(message.topic, message.partition, message.offset) end def send_heartbeat_if_necessary @heartbeat.send_if_necessary end private def consumer_loop @running = true while @running begin yield rescue HeartbeatError, OffsetCommitError join_group rescue FetchError, NotLeaderForPartition, UnknownTopicOrPartition @cluster.mark_as_stale! rescue LeaderNotAvailable => e @logger.error "Leader not available; waiting 1s before retrying" @cluster.mark_as_stale! sleep 1 rescue SignalException => e @logger.warn "Received signal #{e.message}, shutting down" @running = false end end ensure # In order to quickly have the consumer group re-balance itself, it's # important that members explicitly tell Kafka when they're leaving. make_final_offsets_commit! @group.leave rescue nil @running = false end def make_final_offsets_commit!(attempts = 3) @offset_manager.commit_offsets rescue ConnectionError, EOFError # It's important to make sure final offsets commit is done # As otherwise messages that have been processed after last auto-commit # will be processed again and that may be huge amount of messages return if attempts.zero? @logger.error "Retrying to make final offsets commit (#{attempts} attempts left)" sleep(0.1) make_final_offsets_commit!(attempts - 1) end def join_group old_generation_id = @group.generation_id @group.join if old_generation_id && @group.generation_id != old_generation_id + 1 # We've been out of the group for at least an entire generation, no # sense in trying to hold on to offset data @offset_manager.clear_offsets else # After rejoining the group we may have been assigned a new set of # partitions. Keeping the old offset commits around forever would risk # having the consumer go back and reprocess messages if it's assigned # a partition it used to be assigned to way back. For that reason, we # only keep commits for the partitions that we're still assigned. @offset_manager.clear_offsets_excluding(@group.assigned_partitions) end end def fetch_batches(min_bytes:, max_wait_time:) join_group unless @group.member? subscribed_partitions = @group.subscribed_partitions @heartbeat.send_if_necessary raise NoPartitionsAssignedError if subscribed_partitions.empty? operation = FetchOperation.new( cluster: @cluster, logger: @logger, min_bytes: min_bytes, max_wait_time: max_wait_time, ) subscribed_partitions.each do |topic, partitions| partitions.each do |partition| offset = @offset_manager.next_offset_for(topic, partition) max_bytes = @max_bytes.fetch(topic) if paused?(topic, partition) @logger.warn "Partition #{topic}/#{partition} is currently paused, skipping" else @logger.debug "Fetching batch from #{topic}/#{partition} starting at offset #{offset}" operation.fetch_from_partition(topic, partition, offset: offset, max_bytes: max_bytes) end end end operation.execute rescue OffsetOutOfRange => e @logger.error "Invalid offset for #{e.topic}/#{e.partition}, resetting to default offset" @offset_manager.seek_to_default(e.topic, e.partition) retry rescue ConnectionError => e @logger.error "Connection error while fetching messages: #{e}" raise FetchError, e end end end