Apache Hadoop is an open-source software framework for storage and large-scale distributed computing on the top of commodity hardware.
It basically provides us two things:
Mainly HDFS data is accessed either by using the Java API, or the Hadoop command line client.
Let's suppose a client (a HDFS Client) wants to read a file from HDFS. So the steps involved in reading the file are:
For installation and working of HDFS + MapReduce follow next post.
- A distributed filesystem called HDFS (Hadoop Distributed File System)
- A framework and API for building and running MapReduce jobs
Key Features
- HDFS stores files in blocks typically at least 64 MB in size, much larger than the 4-32 KB seen in most filesystems.
- HDFS is optimized for throughput over latency; it is very efficient at streaming read requests for large files but poor at seek requests for many small ones.
- HDFS is optimized for workloads that are generally of the write-once and read-many type.
- Each storage node runs a process called a DataNode that manages the blocks on that host, and these are coordinated by a master NameNode process running on a separate host.
- Instead of handling disk failures by having physical redundancies in disk arrays or similar strategies, HDFS uses replication (NameNode takes care of replication, failover)
- NameNode constantly monitors reports sent by each DataNode to ensure that failures have not dropped any block below the desired replication factor. If this does happen, it schedules the addition of another copy within the cluster.
Key Components
- DataNode - This reprents the actual node, where HDFS stores the data.
Few facts about datanode:
- The data node is where the actual data resides.
- All datanodes send a heartbeat message to the namenode every 3 seconds to say that they are alive. If the namenode does not receive a heartbeat from a particular data node for 10 minutes, then it considers that data node to be dead/out of service and initiates replication of blocks which were hosted on that data node to be hosted on some other data node.
- The data nodes can talk to each other to rebalance data, move and copy data around and keep the replication high.
- When the datanode stores a block of information, it maintains a checksum for it as well. The data nodes update the namenode with the block infonnation periodically and before updating verify the checksums. If the checksum is incorrect for a particular block i.e. there is a disk level corruption for that block, it skips that block while reporting the block infonnation to the namenode. In this way, namenode is aware of the disk level corruption on that datanode and takes steps accordingly
- NameNode - The MASTER node for all datanodes. It controls all the meta data for the cluster. e.g. what blocks make up a file, and what datanodes those blocks are stored on.
Few facts about namenode: - It is just like equivalent of the file allocation table in a traditional filesystem.
- Namenode is the node which stores the filesystem metadata i.e. which file maps to what block locations and which blocks are stored on which datanode
- NameNode contains a registry of all of the blocks in HDFS. This information is located in an image file called fsimage. The namenode maintains two in-memory tables, one which maps the blocks to datanodes (one block maps to 3 datanodes for a replication value of 3) and a datanode to block number mapping.
- Whenever a datanode reports a disk corruption of a particular block, the first table gets updated and whenever a datanode is detected to be dead (because of a node/network failure) both the tables get updated.
- Failover semantics: The secondary namenode regularly connects to the primary namenode and keeps snapshotting the filesystem metadata into local/remote storage.
- fsimage
- The NameNode process stores two data structures to disk, the fsimage file and the edits log of changes to it. The fsimage file holds the key filesystem attributes mentioned in the previous section: the name and details of each file and directory on the filesystem and the mapping of the blocks that correspond to each.
- If the fsimage file is lost, we have a series of no de s holding blocks of data without any knowledge of which blocks correspond to which part of which file. In fact, we won’t even know which files are supposed to be constructed in the first place.
- Loss of the fsimage file leaves you with all the filesystem data but renders it effectively Useless.
- The fsimage file is read by the NameNode process at startup and is held and manipulated in memory for performance reasons. To avoid changes to the filesystem being lost, any modifications made are written to the edits log throughout NameNode's uptime.
- The next time it restarts, it looks for this log at startup and uses it to update the fsimage file which it then reads into memory
- startup
- NameNode enters Safemode (note that Replication does not occur in Safemode)
- Each Datanode sends Heartbeat
- Each Datanode sends Blockreport (Lists all HDFS data blocks)
- NameNode verifies that each block has acceptable number of replicas
- NameNode creates Blockmap from Blockreports
- NameNode exits Safemode
- Replicate any under-repticated blocks
- Safemode for the NameNode is essentially a read-only mode for the HDFS cluster, where it does not allow any modifications to file system or blocks.
- If required, HDFS could be placed in Safemode explicitly using bin/hadoop dfsadmin -safemode command (I faced a case where it was not leaving SAFEMODE due to lesser disk space, then I changed hadoop tmp dir to a directory with large disk space)
- SecondaryNameNode - this is NOT a backup namenode, but is a separate service that keeps a copy of both the edit logs, and filesystem image, merging them periodically to keep the size reasonable (it is deprecated due to upcoming backup node and the checkpoint node in Hadoop-2x, which will provide similar functionality)
Few facts about secondarynamenode: - The term "secondary name-node" is misleading here. It is not a name-node in the sense that data-nodes cannot connect to the secondary name-node, and in no event it can replace the primary name-node in case of its failure.
- The only purpose of the secondary name-node is to perform periodic checkpoints. The secondary name-node periodically downloads current name-node image and edits log files, joins them into new image and uploads the new image back to the (primary and the only) name-node.
- If the name-node fails and we can restart it on the same physical node then there is no need to shutdown data-nodes. just the name-node need to be restarted. If we cannot use the old node anymore we will need to copy the latest image somewhere else. The latest image can be found either on the node that used to be the primary before failure if available; or on the secondary name-node. The latter will be the latest checkpoint without subsequent edits logs, that is the most recent name space modifications may be missing there.
- To have a Secondary NN in a cluster is not mandatory.
- The namenode stores the HDFS filesystem information in a file named fsimage. Updates to the file system (add/remove blocks) are not updating the fsimage file, but instead are logged into a file, so the I/O is fast append only streaming as opposed to random file writes. When restaring, the namenode reads the fsimage and then applies all the changes from the log file to bring the filesystem state up to date in memory. This process takes time.
- The secondarynamenode job is not to be a secondary to the name node, but only to periodically read the filesystem changes log and apply them into the fsimage file, thus bringing it up to date. This allows the namenode to start up faster next time.
HDFS Architecture |
Mainly HDFS data is accessed either by using the Java API, or the Hadoop command line client.
Let's suppose a client (a HDFS Client) wants to read a file from HDFS. So the steps involved in reading the file are:
- First the Client will open the file by giving a call to open() method on FileSystem object, which for HDFS is an instance of DistributedFileSystem class.
- DistributedFileSystem calls the Namenode, using RPC, to determine the locations of the blocks for the first few blocks of the file. For each block, the namenode returns the addresses of all the datanodes that have a copy off that block. The DistributedFileSystem returns an object of FSDataInputStream(an input stream that supports file seeks) to the client for it to read data from. FSDatalnputStream in turn wraps a DFSInputStream, which manages file datanode and namenode I/O.
- The client then calls read() on the stream. DFSInputStream, which has stored the datanode addresses for the first few blocks in the file, then connects to the first closest datanode for the first block in the file.
- Data is streamed from the datanode back to the client, which calls read() repeatedly on the stream.
- When the end of the block is reached, DFSInputStream will close the connection to the datanode, then find the best datanode for the next block. This happens transparently to the client, which from its point of view is just reading a continuous stream.
- Blocks are read in order, with the DFSInputStream opening new connections to datanodes as the client reads through the stream. It will also call the namenode to retrieve the datanode locations for the next batch of blocks as needed. When the client has finished reading, it calls close() on the FSDataInputStream
Communication Error & Reporting
- In case of communication error with datanode it will try to fetch the data from the next closest one for that block and temporarily blacklist that datanode for further reads. Even if a corrupted block is found, it is reported to the namenode before the DFSInputStream attempts to read a replica of the block from another datanode.
- NameNode periodically receives a Heartbeat and a Block report from each of the DataNodes in the cluster. Receipt of a Heartbeat implies that the DataNode is functioning properly.
- A Blockreport contains a list of all blocks on a DataNode. When NameNode notices that it has not received a heartbeat message from a data node after a certain amount of time, the data node is marked as dead. Since blocks will be under replicated the system begins replicating the blocks that were stored on the dead DataNode.
- The NameNode orchestrates the replication of data blocks from one DataNode to another. The replication data transfer happens directly between DataNode and the data never passes through the NameNode.
- The reports from each DataNode also included a count of the corrupt blocks. When a block is first stored, there is also a hidden file written to the same HDFS directory containing cryptographic checksums for the block. By default there is a checksum for each 512-byte chunk within the block.
- Whenever any client reads a block, it will also retrieve the list of checksums and compare these to the checksums it generates on the block data it has read. If there is a checksum mismatch, the block on that particular DataN ode will be marked as corrupt and the client will retrieve a different replica.
- On learning of the corrupt block, the NameNode will schedule a new replica to be made from one of the existing uncorrupted replicas.
Failures in HDFS
NameNode Failures- HADOOP-1.x (Just recovery, no HA as such)
- In Hadoop-1x we have the concept on SecondaryNameNode which holds a copy of the NameNode metadata. If NameNode goes down we can take the metadata copy stored with SecondaryNameNode and use it to resume client work, once NameNode is back again.
- HADOOP-2.x (HA, active/standby configuration)
- With hadoop-2.x(HA) we can have multiple NameNodes. In case primary NameNode goes down, the redundant NameNode can take over so that your cluster doesn’t stop working(either manual or automatic), in this implementation there is a pair of NameNodes in an active/standby configuration. In case of the failure of the active, the standby takes over its duties to continue servicing client requests.
- Before Hadoop 2, the Namenode was a single point of failure, so if it failed that meant your cluster became unusable. Even the SecondaryNameNode doesn’t help in that case since it was used only for saving checkpoints not as a backup NameNode. Hence NameNode needed to be restarted again manually.
- But since Hadoop 2, we have a better way to handle failures in the NameNode. We may run 2 redundant NameNodes in parallel, so that if one of the Namenodes fails, the cluster will quickly failover to standby NameNode.
- First, the pipeline is closed, and any packets in the ack queue are added to the front of the data queue so that datanodes that are downstream from the failed node will not miss any packets.
- The current block on the good datanodes is given a new identity, which is communicated to the namenode. so that the partial block on the failed datanode will be deleted if the failed datanode recovers later on.
- The failed datanode is removed from the pipeline, and the remainder of the block’s data is written to the two good datanodes in the pipeline.
- The namenode notices that the block is under-replicated, and it arranges for a further replica to be created on another node. Subsequent blocks are then treated as normal.
- If one of the DataNodes has failed i.e. the read from the DataNode times out-the client is responsible for retrieving the desired block from the next DataNode in the replication sequence.
Super Users in HDFS
- The super-user is the user with the same identity as name node process itself. Loosely, if you started the name node, then you are the super-user.
- The super-user can do anything in that permissions checks never fail for the super-user. There is no persistent notion of who was the super-user, when the name node is started the process identity determines who is the super-user for now.
- The HDFS super-user does not have to be the super-user of the name node host, nor is it necessary that all clusters have the same super-user. Also, an experimenter running HDFS on a personal workstation, conveniently becomes that installation’s super-user without any configuration.
- In addition, the administrator may identify a distinguished group using a configuration parameter. If set, members of this group are also super-users.
Hadoop 2 Checkpoint Node
- Checkpoint Node was introduced to solve the drawbacks of the NameNode. The changes are just written to edits and not merged to fsimage during the runtime. If the NameNode runs for a while edit logs gets huge and the next startup will take even longer because more changes have to be applied to the state to determine the last state of the metadata.
- The Checkpoint Node fetches periodically fsimage and edits from the NameNode and merges them. The resulting state is called checkpoint. After this is uploads the result to the NameNode.
- There was also a similar type of node called "SecondaryNamenode" but it doesn't have the "upload to NameNode" feature. So the NameNode need to fetch the state from the SecondaryNamenode. It also was confussing because the name suggest that the SecondaryNamenode takes the request if the NameNode fails which is NOT the case
- The Checkpoint node periodically creates checkpoints of the namespace. It downloads fsimage and edits from the active NameNode, merges them locally, and uploads the new image back to the active NameNode.
- The checkpoint node usually runs on a different machine than the NameNode since its memory requirements are on the same order as the NameNode. The Checkpoint node is started by bin hdfs namenode -checkpoint on the node specified in configuration file.
- The start of the checkpoint process on the Checkpoint node is controlled by two configuration parameters:
- dfs.namenode.checkpoint.period set to 1 hour by default, specifies the maximum delay between two consecutive checkpoints
- dfs.namenode.checkpomt.txns set to 1 million by default defines the number of uncheckpointed transactions on the NameNode which will force an urgent checkpoint even if the checkpoint period has not been reached. - The Checkpoint node stores the latest checkpoint in a directory that is structured the same as the NameNode's directory. This allows the checkpointed image to be always available for reading by the NameNode if necessary.
- Multiple checkpoint nodes may be specified in the cluster configuration file.
Hadoop Communication (Protocols Involved)
- HTTP
- Web UIs - The Hadoop daemons provide Web UIs for users and administrators to monitor jobs and the status of the cluster. The Web UIs use the HTTP protocol.
- FSImage operations - These are metadata transfers between the NameNode and the Secondary NameNode. They are done using the HTTP protocol.
- Shuffle part of a MapReduce job is the process of transferring data from the Map tasks to Reducer tasks. As this transfer is typically between different nodes in the cluster, the shuffle is done using the HTTP protocol.
- Direct TCP/IP
- HDFS data transfers are done using TCP/IP sockets directly. It involves reading/writing data to HDFS:
- By clients directly.
- By MapReduce jobs.
- Among Hadoop services.
- Hadoop RPC Calls
- Clients using the Hadoop API.
- MapReduce jobs.
- Among Hadoop services (JobTracker, TaskTrackers, NameNode, DataNodes).
For installation and working of HDFS + MapReduce follow next post.