1. 首页
  2. IT资讯

Kafka 基本原理

“u003Cpu003EJava后端,选择u003Cu002Fpu003Eu003Cpu003E优质文章,及时送达u003Cu002Fpu003Eu003Cimg src=”http:u002Fu002Fp1.pstatp.comu002Flargeu002Fpgc-imageu002FRcsUnd92eoboDR” img_width=”640″ img_height=”29″ alt=”Kafka 基本原理” inline=”0″u003Eu003Cpu003E作者 | 阿凡卢u003Cu002Fpu003Eu003Cpu003E链接 | cnblogsu003Ci class=”chrome-extension-mutihighlight chrome-extension-mutihighlight-style-6″u003E.comu003Cu002Fiu003Eu002Fluu003Ci class=”chrome-extension-mutihighlight chrome-extension-mutihighlight-style-1″u003Exiu003Cu002Fiu003Eaoxunu002Fpu002F5492u003Ci class=”chrome-extension-mutihighlight chrome-extension-mutihighlight-style-2″u003E64u003Cu002Fiu003E6.htmlu003Cu002Fpu003Eu003Cpu003Eu003Cstrongu003E简介u003Cu002Fstrongu003Eu003Cu002Fpu003Eu003Cpu003EApache Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。u003Cu002Fpu003Eu003Cpu003Eu003Cstrongu003EKafkau003Cu002Fstrongu003Eu003Cstrongu003E架构u003Cu002Fstrongu003Eu003Cu002Fpu003Eu003Cpu003E它的架构包括以下组件:u003Cu002Fpu003Eu003Cpu003Eu003Cstrongu003E话题(Topic):u003Cu002Fstrongu003E是特定类型的消息流。消息是字节的有效负载(Payload),话题是消息的分类名或种子(Feed)名。u003Cu002Fpu003Eu003Cpu003Eu003Cstrongu003E生产者(Producer):u003Cu002Fstrongu003E是能够发布消息到话题的任何对象。u003Cu002Fpu003Eu003Cpu003Eu003Cstrongu003E服务代理(Broker):u003Cu002Fstrongu003E已发布的消息保存在一组服务器中,它们被称为代理(Broker)或Kafka集群。u003Cu002Fpu003Eu003Cpu003Eu003Cstrongu003E消费者(Consumer):u003Cu002Fstrongu003E可以订阅一个或多个话题,并从Broker拉数据,从而消费这些已发布的消息。u003Cu002Fpu003Eu003Cimg src=”http:u002Fu002Fp1.pstatp.comu002Flargeu002Fpgc-imageu002FRfz0KORCg1IyBN” img_width=”640″ img_height=”417″ alt=”Kafka 基本原理” inline=”0″u003Eu003Cpu003Eu003Cstrongu003EKafkau003Cu002Fstrongu003Eu003Cstrongu003E存储策略u003Cu002Fstrongu003Eu003Cu002Fpu003Eu003Cpu003E1)kafka以topic来进行消息管理,每个topic包含多个partition,每个partition对应一个逻辑log,有多个segment组成。u003Cu002Fpu003Eu003Cpu003E2)每个segment中存储多条消息(见下图),消息id由其逻辑位置决定,即从消息id可直接定位到消息的存储位置,避免id到位置的额外映射。u003Cu002Fpu003Eu003Cpu003E3)每个part在内存中对应一个index,记录每个segment中的第一条消息偏移。u003Cu002Fpu003Eu003Cpu003E4)发布者发到某个topic的消息会被均匀的分布到多个partition上(或根据用户指定的路由规则进行分布),broker收到发布消息往对应partition的最后一个segment上添加该消息,当某个segment上的消息条数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息订阅者才能订阅到,segment达到一定的大小后将不会再往该segment写数据,broker会创建新的segment。u003Cu002Fpu003Eu003Cpu003Etips: 欢迎u003Ci class=”chrome-extension-mutihighlight chrome-extension-mutihighlight-style-1″u003E关注u003Cu002Fiu003Eu003Ci class=”chrome-extension-mutihighlight chrome-extension-mutihighlight-style-1″u003E微信u003Cu002Fiu003E公众号:Java后端,获取更多推送。u003Cu002Fpu003Eu003Cimg src=”http:u002Fu002Fp1.pstatp.comu002Flargeu002Fpgc-imageu002FRfz0KOm2JpYx5V” img_width=”538″ img_height=”497″ alt=”Kafka 基本原理” inline=”0″u003Eu003Cpu003Eu003Cstrongu003EKafka数据保留策略u003Cu002Fstrongu003Eu003Cu002Fpu003Eu003Cpu003E1)N天前的删除。u003Cu002Fpu003Eu003Cpu003E2)保留最近的多少Size数据。u003Cu002Fpu003Eu003Cpu003Eu003Cstrongu003EKafka brokeru003Cu002Fstrongu003Eu003Cu002Fpu003Eu003Cpu003E与其它消息系统不同,Kafka broker是无状态的。这意味着消费者必须维护已消费的状态信息。这些信息由消费者自己维护,broker完全不管(有offset managerbroker管理)。u003Cu002Fpu003Eu003Culu003Eu003Cliu003Eu003Cpu003E从代理删除消息变得很棘手,因为代理并不知道消费者是否已经使用了该消息。Kafka创新性地解决了这个问题,它将一个简单的基于时间的SLA应用于保留策略。当消息在代理中超过一定时间后,将会被自动删除。u003Cu002Fpu003Eu003Cu002Fliu003Eu003Cliu003Eu003Cpu003E这种创新设计有很大的好处,消费者可以故意倒回到老的偏移量再次消费数据。这违反了队列的常见约定,但被证明是许多消费者的基本特征。u003Cu002Fpu003Eu003Cu002Fliu003Eu003Cu002Fulu003Eu003Cpu003E以下摘抄自kafka官方文档:u003Cu002Fpu003Eu003Cpu003Eu003Cstrongu003EKafka Designu003Cu002Fstrongu003Eu003Cu002Fpu003Eu003Cpu003E目标u003Cu002Fpu003Eu003Cpu003E1) 高吞吐量来支持高容量的事件流处理u003Cu002Fpu003Eu003Cpu003E2) 支持从离线系统加载数据u003Cu002Fpu003Eu003Cpu003E3) 低延迟的消息系统u003Cu002Fpu003Eu003Cpu003E持久化u003Cu002Fpu003Eu003Cpu003E1) 依赖文件系统,持久化到本地u003Cu002Fpu003Eu003Cpu003E2) 数据持久化到logu003Cu002Fpu003Eu003Cpu003E效率u003Cu002Fpu003Eu003Cpu003E1) 解决”small IO problem“:u003Cu002Fpu003Eu003Cpu003E使用”message set“组合消息。u003Cu002Fpu003Eu003Cpu003Eserver使用”chunks of messages“写到log。u003Cu002Fpu003Eu003Cpu003Econsumer一次获取大的消息块。u003Cu002Fpu003Eu003Cpu003E2)解决”byte copying“:u003Cu002Fpu003Eu003Cpu003E在producer、broker和consumer之间使用统一的binary message format。u003Cu002Fpu003Eu003Cpu003E使用系统的page cache。u003Cu002Fpu003Eu003Cpu003E使用sendfile传输log,避免拷贝。u003Cu002Fpu003Eu003Cpu003E端到端的批量压缩(End-to-end Batch Compression)u003Cu002Fpu003Eu003Cpu003EKafka支持GZIP和Snappy压缩协议。u003Cu002Fpu003Eu003Cpu003Eu003Cstrongu003E复制(Replication)u003Cu002Fstrongu003Eu003Cu002Fpu003Eu003Cpu003E1)一个partition的复制个数(replication factor)包括这个partition的leader本身。u003Cu002Fpu003Eu003Cpu003E2)所有对partition的读和写都通过leader。u003Cu002Fpu003Eu003Cpu003E3)Followers通过pull获取leader上log(message和offset)u003Cu002Fpu003Eu003Cpu003E4)如果一个follower挂掉、卡住或者同步太慢,leader会把这个follower从”in sync replicas“(ISR)列表中删除。u003Cu002Fpu003Eu003Cpu003E5)当所有的”in sync replicas“的follower把一个消息写入到自己的log中时,这个消息才被认为是”committed“的。u003Cu002Fpu003Eu003Cpu003E6)如果针对某个partition的所有复制节点都挂了,Kafka默认选择最先复活的那个节点作为leader(这个节点不一定在ISR里)。u003Cu002Fpu003Eu003Cpu003Eu003Cu002Fpu003Eu003Ch1 toutiao-origin=”h3″u003ELeader选举u003Cu002Fh1u003Eu003Cpu003EKafka在Zookeeper中为每一个partition动态的维护了一个ISR,这个ISR里的所有replica都跟上了leader,只有ISR里的成员才能有被选为leader的可能(unclean.leader.election.enable=false)。u003Cu002Fpu003Eu003Cpu003E在这种模式下,对于f+1个副本,一个Kafka topic能在保证不丢失已经commit消息的前提下容忍f个副本的失败,在大多数使用场景下,这种模式是十分有利的。事实上,为了容忍f个副本的失败,“少数服从多数”的方式和ISR在commit前需要等待的副本的数量是一样的,但是ISR需要的总的副本的个数几乎是“少数服从多数”的方式的一半。u003Cu002Fpu003Eu003Cpu003Eu003Cstrongu003EThe Produceru003Cu002Fstrongu003Eu003Cu002Fpu003Eu003Cpu003E发送确认u003Cu002Fpu003Eu003Cpu003E通过request.required.acks来设置,选择是否等待消息commit(是否等待所有的”in sync replicas“都成功复制了数据)u003Cu002Fpu003Eu003Cpu003EProducer可以通过u003Ccodeu003Eacksu003Cu002Fcodeu003E参数指定最少需要多少个Replica确认收到该消息才视为该消息发送成功。u003Ccodeu003Eacksu003Cu002Fcodeu003E的默认值是1,即Leader收到该消息后立即告诉Producer收到该消息,此时如果在ISR中的消息复制完该消息前Leader宕机,那该条消息会丢失。u003Cu002Fpu003Eu003Cpu003E推荐的做法是,将u003Ccodeu003Eacksu003Cu002Fcodeu003E设置为u003Ccodeu003Eallu003Cu002Fcodeu003E或者u003Ccodeu003E-1u003Cu002Fcodeu003E,此时只有ISR中的所有Replica都收到该数据(也即该消息被Commit),Leader才会告诉Producer该消息发送成功,从而保证不会有未知的数据丢失。u003Cu002Fpu003Eu003Cpu003E负载均衡u003Cu002Fpu003Eu003Cpu003E1)producer可以自定义发送到哪个partition的路由规则。默认路由规则:hash(key)%numPartitions,如果key为则随机选择一个partition。u003Cu002Fpu003Eu003Cpu003E2)自定义路由:如果key是一个user id,可以把同一个user的消息发送到同一个partition,这时consumer就可以从同一个partition读取同一个user的消息。u003Cu002Fpu003Eu003Cpu003E异步批量发送u003Cu002Fpu003Eu003Cpu003E批量发送:配置不多于固定消息数目一起发送并且等待时间小于一个固定延迟的数据。u003Cu002Fpu003Eu003Cpu003Eu003Cstrongu003EThe Consumeru003Cu002Fstrongu003Eu003Cu002Fpu003Eu003Cpu003Econsumer控制消息的读取。u003Cu002Fpu003Eu003Cpu003EPush vs Pullu003Cu002Fpu003Eu003Cpu003E1) producer push data to broker,consumer pull data from brokeru003Cu002Fpu003Eu003Cpu003E2) consumer pull的优点:consumer自己控制消息的读取速度和数量。u003Cu002Fpu003Eu003Cpu003E3) consumer pull的缺点:如果broker没有数据,则可能要pull多次忙等待,Kafka可以配置consumer long pull一直等到有数据。u003Cu002Fpu003Eu003Cpu003EConsumer Positionu003Cu002Fpu003Eu003Cpu003E1) 大部分消息系统由broker记录哪些消息被消费了,但Kafka不是。u003Cu002Fpu003Eu003Cpu003E2) Kafka由consumer控制消息的消费,consumer甚至可以回到一个old offset的位置再次消费消息。u003Cu002Fpu003Eu003Cpu003Eu003Cstrongu003EConsumer groupu003Cu002Fstrongu003Eu003Cu002Fpu003Eu003Cpu003E每一个consumer实例都属于一个consumer group。u003Cu002Fpu003Eu003Cpu003E每一条消息只会被同一个consumer group里的一个consumer实例消费。u003Cu002Fpu003Eu003Cpu003E不同consumer group可以同时消费同一条消息。u003Cu002Fpu003Eu003Cpu003Eu003Cstrongu003EConsumer Rebalanceu003Cu002Fstrongu003Eu003Cu002Fpu003Eu003Cpu003EKafka consumer high level API:u003Cu002Fpu003Eu003Cpu003E如果某consumer group中consumer数量少于partition数量,则至少有一个consumer会消费多个partition的数据。u003Cu002Fpu003Eu003Cpu003E如果consumer的数量与partition数量相同,则正好一个consumer消费一个partition的数据。u003Cu002Fpu003Eu003Cpu003E如果consumer的数量多于partition的数量时,会有部分consumer无法消费该topic下任何一条消息。u003Cu002Fpu003Eu003Cpu003Eu003Cstrongu003EMessage Delivery Semanticsu003Cu002Fstrongu003Eu003Cu002Fpu003Eu003Cpu003E三种:u003Cu002Fpu003Eu003Cpu003EAt most once—Messages may be lost but are never redelivered.u003Cu002Fpu003Eu003Cpu003EAt least once—Messages are never lost but may be redelivered.u003Cu002Fpu003Eu003Cpu003EExactly once—this is what people actually want, each message is delivered once and only once.u003Cu002Fpu003Eu003Cpu003EProducer:有个”acks“配置可以控制接收的leader的在什么情况下就回应producer消息写入成功。u003Cu002Fpu003Eu003Cpu003EConsumer:u003Cu002Fpu003Eu003Cpu003E* 读取消息,写log,处理消息。如果处理消息失败,log已经写入,则无法再次处理失败的消息,对应”At most once“。u003Cu002Fpu003Eu003Cpu003E* 读取消息,处理消息,写log。如果消息处理成功,写log失败,则消息会被处理两次,对应”At least once“。u003Cu002Fpu003Eu003Cpu003E* 读取消息,同时处理消息并把result和log同时写入。这样保证result和log同时更新或同时失败,对应”Exactly once“。u003Cu002Fpu003Eu003Cpu003EKafka默认保证at-least-once delivery,容许用户实现at-most-once语义,exactly-once的实现取决于目的存储系统,kafka提供了读取offset,实现也没有问题。u003Cu002Fpu003Eu003Cpu003Eu003Cstrongu003EDistributionu003Cu002Fstrongu003Eu003Cu002Fpu003Eu003Cpu003EConsumer Offset Trackingu003Cu002Fpu003Eu003Cpu003E1)High-level consumer记录每个partition所消费的mau003Ci class=”chrome-extension-mutihighlight chrome-extension-mutihighlight-style-1″u003Exiu003Cu002Fiu003Emum offset,并定期commit到offset manager(broker)。u003Cu002Fpu003Eu003Cpu003E2)Simple consumer需要手动管理offset。现在的Simple consumer Java API只支持commit offset到zookeeper。u003Cu002Fpu003Eu003Cpu003EConsumers and Consumer Groupsu003Cu002Fpu003Eu003Cpu003E1)consumer注册到zookeeperu003Cu002Fpu003Eu003Cpu003E2)属于同一个group的consumer(group id一样)平均分配partition,每个partition只会被一个consumer消费。u003Cu002Fpu003Eu003Cpu003E3)当broker或同一个group的其他consumer的状态发生变化的时候,consumer rebalance就会发生。u003Cu002Fpu003Eu003Cpu003Eu003Cstrongu003EZookeeper协调控制u003Cu002Fstrongu003Eu003Cu002Fpu003Eu003Cpu003E1)管理broker与consumer的动态加入与离开。u003Cu002Fpu003Eu003Cpu003E2)触发负载均衡,当broker或consumer加入或离开时会触发负载均衡算法,使得一个consumer group内的多个consumer的订阅负载平衡。u003Cu002Fpu003Eu003Cpu003E3)维护消费关系及每个partition的消费信息。u003Cu002Fpu003Eu003Cpu003Eu003Cstrongu003E日志压缩(Log Compaction)u003Cu002Fstrongu003Eu003Cu002Fpu003Eu003Cpu003E1)针对一个topic的partition,压缩使得Kafka至少知道每个key对应的最后一个值。u003Cu002Fpu003Eu003Cpu003E2)压缩不会重排序消息。u003Cu002Fpu003Eu003Cpu003E3)消息的offset是不会变的。u003Cu002Fpu003Eu003Cpu003E4)消息的offset是顺序的。u003Cu002Fpu003Eu003Cpu003E5)压缩发送和接收能降低网络负载。u003Cu002Fpu003Eu003Cpu003E6)以压缩后的形式持久化到磁盘。u003Cu002Fpu003Eu003Cpu003E生产者代码示例:u003Cu002Fpu003Eu003Cpreu003Eu003Ccodeu003Epublic class TestProducer {u003Cbru003Epublic static void main(String[] args) {u003Cbru003Elong events = Long.parseLong(args[0]);u003Cbru003ERandom rnd = new Random;u003Cbru003Eu003Cbru003EProperties props = new Properties;u003Cbru003Eprops.put(“metadata.broker.list”, “broker1:9092,broker2:9092 “);u003Cbru003Eprops.put(“serializer.class”, “kafka.serializer.StringEncoder”);u003Cbru003Eprops.put(“partitioner.class”, “example.producer.SimplePartitioner”);u003Cbru003Eprops.put(“request.required.acks”, “1”);u003Cbru003Eu003Cbru003EProducerConfig config = new ProducerConfig(props);u003Cbru003Eu003Cbru003EProducer<String, String> producer = new Producer<String, String>(config);u003Cbru003Eu003Cbru003Efor (long nEvents = 0; nEvents < events; nEvents++) {u003Cbru003Elong runtime = new Date.getTime;u003Cbru003EString ip = “192.168.2.” + rnd.nextInt(255);u003Cbru003EString msg = runtime + “,u003Ci class=”chrome-extension-mutihighlight chrome-extension-mutihighlight-style-2″u003Ewwwu003Cu002Fiu003E.exampleu003Ci class=”chrome-extension-mutihighlight chrome-extension-mutihighlight-style-6″u003E.comu003Cu002Fiu003E,” + ip;u003Cbru003EKeyedMessage<String, String> data = new KeyedMessage<String, String>(“page_visits”, ip, msg);u003Cbru003Eproducer.send(data);u003Cbru003E}u003Cbru003Eproducer.close;u003Cbru003E}u003Cbru003E}u003Cu002Fcodeu003Eu003Cu002Fpreu003Eu003Cpu003EPartitioning Code:u003Cu002Fpu003Eu003Cpreu003Eu003Ccodeu003Epublic u003Cstrong toutiao-origin=”span”u003Eclassu003Cu002Fstrongu003Eu003Cstrong toutiao-origin=”span”u003ESimplePartitioneru003Cu002Fstrongu003Eu003Cstrong toutiao-origin=”span”u003Eimplementsu003Cu002Fstrongu003Eu003Cstrong toutiao-origin=”span”u003EPartitioneru003Cu002Fstrongu003E{u003Cbru003Eu003Cstrong toutiao-origin=”span”u003Epublicu003Cu002Fstrongu003Eu003Cstrong toutiao-origin=”span”u003ESimplePartitioneru003Cu002Fstrongu003E(VerifiableProperties props) {u003Cbru003Eu003Cbru003E}u003Cbru003Eu003Cbru003Eu003Cstrong toutiao-origin=”span”u003Epublicu003Cu002Fstrongu003Eu003Cstrong toutiao-origin=”span”u003Eintu003Cu002Fstrongu003Eu003Cstrong toutiao-origin=”span”u003Epartitionu003Cu002Fstrongu003E(Object key, int a_numPartitions) {u003Cbru003Eint partition = 0;u003Cbru003EString stringKey = (String) key;u003Cbru003Eint offset = stringKey.lastIndexOf(‘.’);u003Cbru003Eif (offset > 0) {u003Cbru003Epartition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions;u003Cbru003E}u003Cbru003Ereturn partition;u003Cbru003E}u003Cbru003Eu003Cbru003E}u003Cu002Fcodeu003Eu003Cu002Fpreu003Eu003Cpu003E消费者代码示例:u003Cu002Fpu003Eu003Cpreu003Eu003Ccodeu003Epublic class ConsumerGroupExample {u003Cbru003Eprivate final ConsumerConnector consumer;u003Cbru003Eprivate final String topic;u003Cbru003Eprivate ExecutorService executor;u003Cbru003Eu003Cbru003Epublic ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {u003Cbru003Econsumer = kafka.consumer.Consumer.createJavaConsumerConnector(u003Cbru003EcreateConsumerConfig(a_zookeeper, a_groupId));u003Cbru003Ethis.topic = a_topic;u003Cbru003E}u003Cbru003Eu003Cbru003Epublic void shutdown {u003Cbru003Eif (consumer != ) consumer.shutdown;u003Cbru003Eif (executor != ) executor.shutdown;u003Cbru003Etry {u003Cbru003Eif (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {u003Cbru003ESystem.out.println(“Timed out waiting for consumer threads to shut down, eu003Ci class=”chrome-extension-mutihighlight chrome-extension-mutihighlight-style-1″u003Exiu003Cu002Fiu003Eting uncleanly”);u003Cbru003E}u003Cbru003E} catch (InterruptedException e) {u003Cbru003ESystem.out.println(“Interrupted during shutdown, eu003Ci class=”chrome-extension-mutihighlight chrome-extension-mutihighlight-style-1″u003Exiu003Cu002Fiu003Eting uncleanly”);u003Cbru003E}u003Cbru003E}u003Cbru003Eu003Cbru003Epublic void run(int a_numThreads) {u003Cbru003EMap<String, Integer> topicCountMap = new HashMap<String, Integer>;u003Cbru003EtopicCountMap.put(topic, new Integer(a_numThreads));u003Cbru003EMap<String, List<KafkaStream<byte, byte>>> consumerMap = consumer.createMessageStreams(topicCountMap);u003Cbru003EList<KafkaStream<byte, byte>> streams = consumerMap.get(topic);u003Cbru003Eu003Cbru003Eu002Fu002F now launch all the threadsu003Cbru003Eu002Fu002Fu003Cbru003Eexecutor = Executors.newFixedThreadPool(a_numThreads);u003Cbru003Eu003Cbru003Eu002Fu002F now create an object to consume the messagesu003Cbru003Eu002Fu002Fu003Cbru003Eint threadNumber = 0;u003Cbru003Efor (final KafkaStream stream : streams) {u003Cbru003Eexecutor.submit(new ConsumerTest(stream, threadNumber));u003Cbru003EthreadNumber++;u003Cbru003E}u003Cbru003E}u003Cbru003Eu003Cbru003Eprivate static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {u003Cbru003EProperties props = new Properties;u003Cbru003Eprops.put(“zookeeper.connect”, a_zookeeper);u003Cbru003Eprops.put(“group.id”, a_groupId);u003Cbru003Eprops.put(“zookeeper.session.timeout.ms”, “400”);u003Cbru003Eprops.put(“zookeeper.sync.time.ms”, “200”);u003Cbru003Eprops.put(“autou003Ci class=”chrome-extension-mutihighlight chrome-extension-mutihighlight-style-6″u003E.comu003Cu002Fiu003Emit.interval.ms”, “1000”);u003Cbru003Eu003Cbru003Ereturn new ConsumerConfig(props);u003Cbru003E}u003Cbru003Eu003Cbru003Epublic static void main(String[] args) {u003Cbru003EString zooKeeper = args[0];u003Cbru003EString groupId = args[1];u003Cbru003EString topic = args[2];u003Cbru003Eint threads = Integer.parseInt(args[3]);u003Cbru003Eu003Cbru003EConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);u003Cbru003Eexample.run(threads);u003Cbru003Eu003Cbru003Etry {u003Cbru003EThread.sleep(10000);u003Cbru003E} catch (InterruptedException ie) {u003Cbru003Eu003Cbru003E}u003Cbru003Eexample.shutdown;u003Cbru003E}u003Cbru003E}u003Cu002Fcodeu003Eu003Cu002Fpreu003Eu003Cpreu003Eu003Ccodeu003Epublic class ConsumerTest implements Runnable {u003Cbru003Eprivate KafkaStream m_stream;u003Cbru003Eprivate int m_threadNumber;u003Cbru003Eu003Cbru003Epublic ConsumerTest(KafkaStream a_stream, int a_threadNumber) {u003Cbru003Em_threadNumber = a_threadNumber;u003Cbru003Em_stream = a_stream;u003Cbru003E}u003Cbru003Eu003Cbru003Epublic void run {u003Cbru003EConsumerIterator<byte, byte> it = m_stream.iterator;u003Cbru003Ewhile (it.hasNext)u003Cbru003ESystem.out.println(“Thread ” + m_threadNumber + “: ” + new String(it.next.message));u003Cbru003ESystem.out.println(“Shutting down Thread: ” + m_threadNumber);u003Cbru003E}u003Cbru003E}u003Cu002Fcodeu003Eu003Cu002Fpreu003Eu003Cpu003E关于Consumer的一个细节说明:u003Cu002Fpu003Eu003Cpreu003Eu003Ccodeu003EtopicCountMap.put(topic, new Integer(a_numThreads));u003Cu002Fcodeu003Eu003Cu002Fpreu003Eu003Cpu003E这里,如果提供的thread数目(a_numThreads)大于这个topic的partition的数目,有些thread会永远读不到消息。u003Cu002Fpu003Eu003Cpu003E如果如果提供的thread数目(a_numThreads)小于这个topic的partition的数目,有些thread会从多个partition读到消息。u003Cu002Fpu003Eu003Cpu003E如果一个线程从多个partition读取消息,无法保证的消息的顺序,只能保证从同一个partition读取到的消息是顺序的。u003Cu002Fpu003Eu003Cpu003E增加更多的进程u002F线程消费消息,会导致Kafka re-balance,可能会改变Partition和消费Thread的对应关系。u003Cu002Fpu003Eu003Cpu003E开发环境搭建:u003Cu002Fpu003Eu003Cpu003Ehttps:u002Fu002Fcwiki.apache.orgu002Fconfluenceu002Fdisplayu002FKAFKAu002FDeveloper+Setupu003Cu002Fpu003Eu003Cpu003E一些example:u003Cu002Fpu003Eu003Cpu003Ehttps:u002Fu002Fcwiki.apache.orgu002Fconfluenceu002Fdisplayu002FKAFKAu002F0.8.0+Producer+Exampleu003Cu002Fpu003Eu003Cpu003E自己管理offset:u003Cu002Fpu003Eu003Cpu003Ehttps:u002Fu002Fcwiki.apache.orgu002Fconfluenceu002Fdisplayu002FKAFKAu002FCommitting+and+fetching+consumer+offsets+in+Kafkau003Cu002Fpu003Eu003Cpu003E参考:u003Cu002Fpu003Eu003Cpu003EKafka深度解析:http:u002Fu002Fu003Ci class=”chrome-extension-mutihighlight chrome-extension-mutihighlight-style-2″u003Ewwwu003Cu002Fiu003E.jasongju003Ci class=”chrome-extension-mutihighlight chrome-extension-mutihighlight-style-6″u003E.comu003Cu002Fiu003Eu002F2015u002F01u002F02u002FKafkau003Cu002Fpu003Eu003Cpu003Ehttps:u002Fu002Fkafka.apache.orgu002Fdocumentation.htmlu003Cu002Fpu003Eu003Cpu003Ehttps:u002Fu002Fcwiki.apache.orgu002Fconfluenceu002Fdisplayu002FKAFKAu002FIndex u003Cu002Fpu003Eu003Cp class=”pgc-end-source”u003E-END-u003Cu002Fpu003E”

原文始发于:Kafka 基本原理

主题测试文章,只做测试使用。发布者:逗乐男神i,转转请注明出处:http://www.cxybcw.com/17864.html

联系我们

13687733322

在线咨询:点击这里给我发消息

邮件:1877088071@qq.com

工作时间:周一至周五,9:30-18:30,节假日休息

QR code