My Learning Notes on Apache Kafka

 

Why Apache Kafka?

  • Distributed
  • Resilient
  • Fault Tolerant
  • Highly Horizontal Scalability:
    • Can scale to 100s of brokers
    • Can scale to millions of messages per second
  • High performance (latency of less than 10 ms) – real time
  • High Throughput
  • Open Source

 

Use cases of Kafka:

  • Messaging System – Millions of messages can be sent and received in real time, using Kafka.
  • Activity Tracking – Kafka can be used to aggregate user activity data such as clicks, navigation and searched from different websites of an organization and such user activities can be sent to a a real time monitoring system and hadoop system for offline processing.
  • Real Time Stream Processing – Kafka can be used to process a continuous stream of information in real time and pass it to stream processing systems such as Storm.
  • Log Aggregation  Kafka can be used to collect physical log files from multiple systems and store them in a central location such as HDFS.
  • Commit Log Service – Kafka can be used as an external commit log for distributed systems.
  • Event Sourcing – A time ordered sequence of events can be maintained through Kafka.
  • Gather metrics from many different locations
  • Application Logs gathering
  • Stream processing (with the Kafka Streams API or Spark for example)
  • Decoupling of system dependencies
  • Integration with Spark, Flink, Storm, Hadoop and many other Big Data technologies

 

Kafka solves the following problems:

  • How data is transported with different protocols (TCP, HTTP, REST, FTP, JDBC, gRPC, etc)
  • How data is parsed for different data formats (Binary, CSV, JSON, Avro, Thrift, Protocol Buffers, etc..)
  • How data is shaped and may change.Decoupling of Data Streams & Systems

Source Target Decoupling

 

Decoupling Kafka

 

 

Apache Kafka Architecture:

 

Kafka Architecture

 

Kafka Big Picture

 

Kafka Messaging

KAFKA JARGONS:

BROKERS, PRODUCERS, CONSUMERS, TOPICS, PARTITIONS AND OFFSETS:

Topics:

Topics are a particular stream of data.

  • Similar to a table in a database (without all the constraints)
  • You can have as many topics as you want
  • A topic is identified by its name.
  • Topics are split into Partitions.

Example:

Topics Example

 

Partitions:

  • Partitions are similar like columns in a table
  • Each partition is ordered.
  • Each message within a partition gets an incremental id, called offset.
  • You as a user have to specify the number of Partitions for a Topic.
  • The first message to Partition 0 starts with offset 0 then increments thereafter, where offsets can go to infinite, since they are unbounded.
  • Each Partition can have different number of messages (basically offsets), since they are independent

Partitions Overview

 

Offsets & Few Gotcha’s:

  • Offsets are like primary id of a Column, which keeps on incrementing and cannot be changed or updated.
  • Offset only have a meaning for a specific partition.
    • Which means Partition 0, Offset 0 is different from Partition 1, Offset 0.
  • Ordered is only guaranteed only within a partition (not across partitions)
  • Data in Kafka is kept only for a limited time (default retention period is one week)
  • Offsets keep on incrementing, they can never go back to 0.
  • Once the data is written to a partition, it can’t be changed (Data within a portion is immutable). If you want to write a new message, you write it at the end.
  • Data is assigned randomly to a partition unless a key is provided.

 

Brokers:

  • A Kafka cluster is composed of multiple brokers (servers)
  • Each broker is identified with its ID (integer). It cannot be named like “My Broker” or something.
  • Each broker contains certain topic partitions. Each broker contains some kind of data but not all data, because Kafka is distributed.
  • After connecting to any broker (called a bootstrap broker), you will be connected to entire cluster
  • A good number to get started is 3 brokers, but some big clusters have over 100 brokers.

Cluster of Brokers Example:

Cluster of Brokers

 

Brokers and Topics

  • There is no relationship between the Broker number and the Partition Number
  • If there is a topic with more than the number of brokers, then any one of the randomly selected broker will have more partitions for that topic to equal the total number of portions for the topic.
    • Ex: If Topic -C with 4 partitions, then any one of the broker (101, 102, 103) will have more than one Partition.

Broker and Topic Partitions

Topic Replication Factor:

  • Topics should have a replication factor > 1 (usually between 2 and 3)
  • This way if a broker is down, another broker can serve the data.

 

Topic Replication Factor

Topic Replication Factor Failure

 

Concept of Leader for a Partition

  • At any given time, ONLY ONE broker can be leader for a given partition.
  • ONLY that leader can receive and serve data for a partition
  • The other brokers will synchronize the data.
  • Therefore each portion has one leader and multiple ISR (in-sync replica)
  • If Broker 101 is Lost, a ELECTION happens, the Partition 0 on Broker 102 becomes the Leader (because it was in sync replica before). When Broker 101 gets back Alive, it tries to become the leader again after replicating the data back from initial ISR (partition 0 on broker 102). All this is handled by Kafka internally.

What decides Leader’s and ISR’s are done through Zookeeper.

STAR indicates the LEADER

Leader of a Partition

 

Producers:

  • Producers write data to topics (which is made of partitions)
  • Producers automatically know to which broker and partition to write to, so the developer doesn’t need to know that.
  • In case of Broker failures, Producers will automatically recover.
  • If producer sends data without a key, then data is sent in Round Robin Fashion a little bit of data to each one of the brokers in the cluster.
  • Producer can choose to receive acknowledgement of data writes.
  • There are 3 kinds of Acknowledgement modes
    • ACKS=0 -> Producer won’t wait for acknowledgment (possible data loss)
    • ACKS=1 -> Producer will wait for leader acknowledgment (limited data loss). This is the default.
    • ACKS=all -> Leader + replicas acknowledgment (no data loss)

 

Producers

 

Producers with Message Keys:

  • Producers can choose to send a KEY with a message. A key can be a string, number, etc..,
  • if key== null, data is sent round robin to all brokers in the cluster
  • If a key is sent, then all messages for that key will always go to the same partition.
  • A key is basically sent if you need message ordering for a specific field. (ex: truck_id)
  • If a key chooses to go to a Specific partition, it will ALWAYS go that same partition. We CANNOT say that a particular key goes to a particular partition.
    • Ex: if data with key “truck_id_123” is going to partition 0, then the producer sending the data with key truck_id_123 will ALWAYS go to partition 0.
    • Similarly if data with key “truck_id_345” is going to partition 1, then the producer sending the data with key truck_id_345 will ALWAYS go to partition 1.
  • The KEY to PARTITION is guaranteed by Key Hashing technique, which depends on the number of partitions.

Producers with Message Keys

 

Consumers

  • Consumers read data from a topic (identified by name)
  • Consumers know which broker to read from
  • In case of broker failures, consumers know how to recover
  • Data is read in order within each partitions
  • There is no Reading in Order between Partitions (see image above for Partition 1 and Partition 2.)

Consumers

 

Consumer Groups

How are the consumers reading data from all these partition groups?

  • Consumers read data in consumer groups.
  • Consumer can be any client like java application or any other language we are using or can be a command line utility
  • Each consumer within a group reads from exclusive partitions
  • If you have more consumers than partitions, some consumers will be inactive.

Consumer Groups

What if you have too many consumers?

  • If you have more consumers than partitions, some consumers will be inactive.
  • In below, if consumer 3 goes down, then consumer 4 becomes active. Ideally we have same number (at most) of consumers as partitions.

Many Consumers than Partitions

 

Consumer Offsets

  • Kafka stores the offsets at which a consumer group has been reading
  • The offsets committed live in a Kafka topic named “__consumer_offsets” (double underscore followed by consumer then followed by a single underscore then finally followed by offsets)
  • When a consumer in a group has processed data received from Kafka, the consumer then should be committing the offsets to the topic name “__consumer_offsets”. This is done automatically in Kafka.
  • If a consumer dies, it will be able to read back from where it left off, thanks to the committed consumer offsets!.

Consumer Offsets

 

Delivery Semantics for Consumers

  • Consumers choose when to commit offsets
  • There are 3 delivery semantics:

At most once:

  • Offsets are committed as soon as the message is received.
  • If the processing goes wrong, the message will be lost (it won’t be read again)

At least once (usually preferred):

  • Offsets are committed only after the message is processed.
  • If the processing goes wrong, the message will be read again.
  • This can result in duplicate processing of messages. Make sure your processing is idempotent. (i.e processing again the messages won’t impact your systems)

Exactly Once:

  • Can be achieved for Kafka to Kafka workflows using Kafka Streams API. May be it can achieved by Spark an other stuff…
  • For Kafka to External Systems (like a database) workflows, use an idempotent consumer. (to make sure there are no duplicates whilee inserting in the final database)

Kafka Broker Discovery

  • Every Kafka broker is also called a “bootstrap server”
  • That means that you only need to connect to one broker, and you will be connected to the entire cluster.
  • Each broker know about all brokers, topics and partitions (metadata)
  • Kafka Client can connect to any broker automatically.

Kafka Broker Discovery

 

ZOOKEEPER

  • Zookeeper manages brokers. It holds the brokers together (keeps a list of them)
  • Zookeeper helps in performing leader election for partitions, when a broker goes down a new replicated partition of another broker becomes the leader.
  • Zookeeper sends notifications to kafka in case of changes (e.g new topic, broker dies, broker comes up, delete topics, etc…)
  • Kafka cannot work without a Zookeeper. So we first need to start Zookeeper.
  • Zookeeper by design operates with an odd number of servers (3,5,7) in production.
  • Zookeeper also follows the concept of Leaders and Followers. Zookeeper that has a leader (handles writes) the rest of the zookeeper servers are followers (handles reads)
  • Zookeeper does NOT store consumer offsets with Kafka > v0.10

 

Zookeeper for kafka

 

Kafka Guarantees

  • Messages are appended to a topic partition in the order they are sent.
  • Consumers read messages in the order stored in a topic partition
  • With a replication factor of N, producers and consumers can tolerate up to N-I brokers being down.
  • This is why a replication factor of 3 is a good idea:
    • Allows for one broker to be taken down for maintenance
    • Allows for another broker to be take down unexpectedly
  • As long as the number of partitions remains constant for a topic (no new partitions), the same key will always go to the same partition

 

Resources:

https://www.udemy.com/apache-kafka/learn/v4/

https://youtu.be/U4y2R3v9tlY

Source Code:

https://courses.datacumulus.com/kafka-beginners-bu5

http://media.datacumulus.com/kafka-beginners/code.zip

https://github.com/simplesteph/kafka-beginners-course

Some Points about Abstract and Interfaces.

Abstract Classes:

1). A class defined as abstract cannot be instantiated. Meaning, you cannot call just/only new AbstractClass().You need to override all the methods that are marked as abstract as well.(See AbstractMain)

2). Abstract keyword is non access modifier.

3). When a method is marked as abstract, there should not be any implementation for the method.

4). Abstract methods have to be overridden in the sub classes.

5). Abstract classes NEED NOT have abstract methods. Those methods which do not have abstract defined, should be implemented. Meaning there should be a implementation code for this method right in this abstract class method body.

6). If there is atleast one abstract method defined in the whole class, then that class should be declared as abstract.

7). Classes and Methods marked as private cannot be abstract

8). Classes and Methods marked as static cannot be abstract.

9). Classes and Methods marked as final cannot be abstract.

10). Abstract classes can have default and as well as overloaded constructors. When no constructor is defined a default constructor will be generated by compiler.

11). Constructors of abstract classes are invoked when a subclass object is created. You can call the superclass constructor by calling super().

12). Abstract classes hides the functionality of those methods that are defined as abstract. The actual functionality of the abstract methods are exposed by the implementation classes.

13). Abstract Method Overloading signatures are supported in abstract classes.

14). By making this class as abstract and implementing the methods as well, will not give any compile time error., but gives a run time error after Instantiating the AbstractSubClass.

15).  @Override – Not necessary to provide this annotation, because, Compiler by default takes care of overriding, when it see’s extends keyword.

Interfaces:

1). Interfaces does not have any method implementation.

2). All methods in an interface are public and/or abstract.

3). Any field that is declared, is public and static.

4). There is no multiple inheritance in Java. Which means, any Java class cannot extend more than one class at a time. Instead there is a Multiple Interface Inheritance. Which means, any Java can implement/extends more than one interface.

5). Any Java class that implements any interface, will be having a ISA relationship, between the Java Class and the interface.

6). If a method deceleration in a interface is abstract, the whole interface need not be declared as abstract, this is unlike abstract classes.

7). Java does not support multiple inheritance, because of the Diamond of Death Problem.

8). Method Overriding is allowed in Interfaces. Each method signature in an interface is unique. Meaning, you can have multiple methods of same name with different arguments.

9). Multiple Interface Inheritance example can extend any number of interfaces.

Ex: public interface MultipleInterfaceImplementation extends InterfaceExample,AnotherInterface

* here InterfaceExample and AnotherInterface are both interfaces.

Source code: https://github.com/chouhan/CoreJavaBasics

Big O, Omega and Theta Notations.

Big O, Omega and Theta Notations are used to describe not only the way an algorithm performs but the way an algorithm scales to produce a output. It measures the efficiency of an algorithm with respect to time it takes for an algorithm to run as a function of a given input. They are used to determine the Worst case complexity, Best case complexity and the Average case complexity.

Big Omega notation describes the best case of a running algorithm. In contrast, Big O notation describes the worst case of a running algorithm.

Big O Notation is denoted as O(n)  also termed as Order of n, or also termed as O of n.

Big Omega Notation is denoted as Ω(n) also termed as Omega of n.

Apart from Big O (O) and Big Omega (Ω), there are Big Theta (Θ), Small O (o) and Small Omega (ω) notations that are used in computer science programming. You can get more information from http://en.wikipedia.org/wiki/Big_O_notation#Big_Omega_notation.

O(1) – Constant Time

This means that the algorithm requires the same fixed number of steps regardless of the size of the task.

Examples:

1). Finding an element in a HashMap is usually a constant time, which is O(1). This is a constant time because, a hashing function is used to find an element, and computing a hash value does not depend on the number of elements in the HashMap.

2). Push and Pop operations for a stack containing n elements.

3). Insert and Remove the operations of a queue.

O(log n) – Logarithmic Time

This means that the algorithm requires the Logarithmic amount of time to perform the task.

Examples:

1). Binary search in a sorted list or Array list of n elements.

2). Insert and Find operations for a binary search tree with n nodes.

3). Insert and Remove operations for a heap with n nodes.

4). Fast insertion, removal and lookup time of a TreeMap (a.k.a balanced tree because a TreeMap maintains key/value objects in a sorted order by using a red black tree) is O(log n)

O(n) – Linear Time

This means that the algorithm requires a number of steps directly proportional to the size of the task.

Examples:

1). Traversal or searching of a list(a linked list or a array) with n elements. This is linear because you will have to search the entire list. This means that if a list is twice as big, searching will take twice as long.

2). Finding the maximum or minimum element in a list, or sequential search in an unsorted list of n elements.

3). Traversal of a tree with n nodes.

4). Calculating iteratively n-factorial, for example finding iteratively the nth Fibonacci number.

O(n log n) – N times Logarithmic time

This means that the algorithm requires N times the Logarithmic time of solving a algorithm.

Examples:

1).  Advanced Sorting Algorithms like quick sort and merge sort.

O(n2) – Quadratic Time

Examples:

1). Simple sorting algorithms, for example a selection sort of n elements.

2). Comparing 2 two dimensional arrays of size n by n.

3). Finding duplicates in an unsorted list of n elements.

Note: If a solution to a problem has one single iteration, in other words, if the solution is achieved by either only one for loop or one while loop or one do-while loop or a single recursive function, then that algorithm is said to perform with O(n) else if the solution is achieved by 2 nested loops, then the algorithm is said to perform with O(n2) and if it is achieved by 3 nested loops, then the algorithm is said to perform with O(n3) and so on..

O(n3) – Polynomial Time

Examples:

1). Given a expression 23n3 + 12n2 + 9, and n = large numbers, the execution time for n3 increases drastically which takes O(n3) to perform the operation.

O(an) for a > 1 – Exponential Time

Examples:

1). Recursive Fibonacci implementation

2). Problem to solve Towers of Hanoi

3). Generating all permutations of n symbols.

Here is the order of execution time, in which the way Big O notations worst case behavior is determined.

O(1) < O(log n) < O(n) < O(n log n) < O(n2) < O(n3) <O(an)

Constant Time < Logarithmic Time < Linear Time < N times of Logarithmic Time < Quadratic Time < Polynomial Time < Exponential Time.

If algorithm is __ then its performance is __

algorithm performance
o(n) < n
O(n) ≤ n
Θ(n) = n
Ω(n) ≥ n
ω(n) > n
4 important Big O rules:

1). If you have 2 different steps in your algorithm, add up those steps

Example:

function something() {
    doStep1();	// O(a)
    doStep2();	// O(b)
}

Overall runtime: O(a+b)

2). Drop Constants

function minMax1(array) {
	min, max = null
	for each e in array
		min = MIN(e, min)	//O(n)
	for each e in array
		max = MAX(e, max)	//O(n)
}

Another example:

function minMax2(array){
	min, max = null
	for each e in array
		min = MIN(e, min)
		max = MAX(e, max)	
}

Overall runtime: O(2n)
So dropping the constants overall runtime equals to O(n)

3). Different Arrays is equivalent to (=>) Different Variables.

function getSize(arrayA, arrayB){
	int count = 0;
	for a in arrayA{
		for b in arrayB{
			if(a == b){
				count = count++;
			}
		}
	}
	return count;
}

Overall runtime is NOT O(n^2), but instead it is O(a * b)

Note that, the loops are emulated on 2 different array’s.

4). Drop non dominant terms.

function printSize(array){
	max = null;
	
	// runtime for the below loop is O(n)
	for each a in array{
		max = MAX(a, max);
	}
	
	//runtime for the below nested loop is O(n^2)
	for a in array{
		for b in array{
			print a, b;
		}
	}
}

Overall runtime: O(n + n^2)

However, since O(n) is too small and can be neglected.
In other words,

O(n^2) <= O(n + n^2) <= O(n^2 + n^2)
O(n^2) <= O(n + n^2) <= O(2 * n^2)

Now, dropping the constants based on Rule 2,

O(n^2) <= O(n + n^2) <= O(n^2)

if left and right are equivalent, then center is too..

i.e O(n + n^2) is equivalent to O(n^2)

Hence, overall runtime is O(n^2)


Sources:

http://www.apcomputerscience.com/apcs/misc/BIG-O.pdf
http://bigocheatsheet.com/

This post was more or less taken from the afore mentioned sites, with little or more changes based on my knowledge and observation. I will be adding more examples to one or more above mentioned time analysis Big O notations as and when I come across in the near future of programming career.