Uncategorized

remote hadoop command fails

ISSUE:

30000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/1.2.3.4:50010]

SOLUTION:

set this client property locally, this should solve the problem. Issue is at the IP Resolution.

dfs.client.use.datanode.hostname=true

Advertisements
Kafka

Kafka Standards

ReadMe

This Document intent to Captures the High Level Standards.
Note: Certain Standards and guidelines may not be applicable with the version of the tools on the stack.

Kafka

Interface Standards

Producer API

    • Batching/Buffering
      • Do the performance bench mark with the help of kafka-producer-perf-test utility.
      • Set the linger.ms for the time window on batching. Default is 0 but set it to a value by understanding the peak load that can happen on the producer.
      • Set the batch.size to a value which can accommodate the size of data that can come in at linger.ms interval.
      • Set the Buffer.memory to a value of around 32 mb per topic/partition which can hold and send the messages to the broker within max.block.ms if foreseen that the limit could exceed then handle the Timeout Exception with retries.
      • Set block.on.buffer.full=true
      • Set retries=Long.MAX_VALUE
      • Every Producer should manage a client id and should be managed centrally by the Development Team.
      • Flush the buffer before closing the producer
      • Formula to decide on the number of partitions/topic:
        • Required # of partitions= Max (T/P, T/C)
        • P= Throughput from producer to a single partition is
        • C= Throughput from a single partition to a consumer
        • T=Target throughput
    • Compression
      • Always set the producer and broker compression type to snappy
      • Producer should use compression combined with batching.
      • Any custom serialization library used must have deserialization methods as well and should be managed centrally.
    • Retries/Timeouts
      • Set retries = MAX_INT to ensure that you never loose a message
      • All the request.timeout.ms should be set to 15000. Adjust the values if network latency will be higher.
      • Set the retry.backoff.ms to 1000 intervals between each retry.
      • Set the max.in.flight.request.per.connection to 1, to maintain the order within a partition
    • Partitioners
      • Topics should be partitioned to have higher throughput.
      • Based on the partition logic
      • Custom Partitioners should not lead to hotspot issue. It should distribute the data across partitions reasonably even.
      • 1000 partitions per leader broker with a node count of 3 will lead to 20 ms latency. So limit the number of partitions per broker and increase the brokers to increase the throughput.
      • When the number of partitions increases set the memory size of producer to be a multiple of the partitions and buffer size.
    • Guaranteed Delivery
      • To get guaranteed delivery set the producer ACKS_CONFIG to “all” “-1”
      • To effectively manage, always use Producer.metrics() to collect the metrics and push into elastic for Kibana DashBoard & Monitoring.
      • Implement ProducerInterceptors interface to add hooks on onSend/onAck/close into Producer.

Consumer API

    • To Load Balance in a topic Always Use more than one consumer per partition in a consumer Group
    • If we need more than one parallel consumer per topic per partition always go with the option of having more than one consumer groups
    • The performance bench mark should be done with the help of kafka-producer-perf-test utility.
    • When the consumer starts from a defined offset, ensure the offset is available within the partitions and set the retentions accordingly.
    • All the consumer should have Group.id managed centrally during development.
    • Manage offsets externally at every 10-20 minutes interval apart from kafka internal so that the recovery plan is valid.
    • Disable auto.offset.commit to avoid issues at the zookeeper.
    • Use Kafka for committing offsets instead of Zookeeper
    • max.in.flights.requests.per.session to =1 if you want to ensure the order of messages
    • Flush the offset commits to commitSync() with only the ACTIVE consumer thread in the consumer group. Don’t use this too often so as to avoid load on the zookeeper hit.
    • The consumer poll has to be within the max.poll.interval.ms else consumer will be considered as dead in other words the processing time between each poll should be within max.poll.interval.ms
    • Set the max.poll.records to a value which can be processed within max.poll.interval.ms
    • Avoid assign() to manually manage consumer to partition mapping.
    • Always commit after event has been processed. When exactly-once delivery is necessary use an external key/value store like elastic search.
    • Use the New Kafka API from below maven for both consumer and Producer

<dependency>

<groupId> org.apache.kafka </groupId>

<artifactId> kafka-clients </artifactId>

<version> ${kafka-version} </version>

</dependency>

Security

    • Secure the producer and consumer channels with SSL with signed CA.
    • The producer and consumer node restriction should be done with IP based restriction with Ranger.
    • All the Producers and Consumers should use New API to connect to cluster.

Configuration

    • All the Kafka Nodes & Brokers should be running on the dedicated Node.
    • Higher number of partitions will help in throughput at the same time it will lead to too many open files.
    • All brokers and Topics should have min.insync.replicas to 2 so that the cluster can survive a single node failure.
    • Set num.replica.fetchers in Brokers to more than or equal to 2 so that the replication can be faster.
    • Set offsets.topic.replication.factor in Brokers to 3.
    • Disable unclean leader election
    • The retention for each topic should be mentioned and it should be at the minimum of one day.
    • Set offsets.retention.minutes to a maximum SLA for the application as once the offset is gone then it has to start from the beginning.
    • Set OS page cache to higher values so that a one can hold enough data into page cache before flushing to disk which increase the Broker write throughput.
    • Disable swapping by setting swap space very low, set vm.swappiness to 1
    • Use Java 7 and G1 to better handle large heapsize (-XX:+UseG1GC)

Monitoring & Alert

     Options:

    • All the Kafka components/Producers/Consumer exposes JMX metric which can have monitored using jConsole.
    • For Central Dash Boarding and Alerting Requirements, preferred to upload the JMX Metrics Real Time to Elastic and Create Kibana Dashboards on the same.
    • Kafka Manager can be used to monitor ONLY Kafka.
    • Prometheus & Data Dog are proprietary versions for Cluster Monitoring.

Note: There are many tools solve the same problem. The choice is made by the religion of the environment. The Topics and client and groupId has to be centrally managed to identify the reusability and better monitoring.

High Availability

    • All the HA principles are addressed when the other section standards are met.
    • Use multiple instance of kafka-mirror-maker utility to replicate the data between kafka clusters real time and set the –num.streams to a number for the consumer threads.
      • Recommendations:
        • Run kafka mirror on a dedicated Node.
        • Set high socket buffer Size and consumer fetch size
        • Offset cannot be managed across kafka cluster
        • Monitor mirrors with the consumer lags

Naming Standards

Consumer and Producer

    • To Consumer groups should be prefixed with CG<ConsumerApplicationName> to better manage the Consumer Groups and Consumer ID’s.
    • Consumer and Producers should have client Id managed centrally.

Topic Names

    • The Topics Names should be in Camel Case instead of having a ‘.’  ‘-‘ ‘_’ as separators across.
    • Specifically avoid ‘.’ in any of the kafka related entity as the JMX metrics exposed by Apache kafka has issues at certain versions
HDFS, HIVE

HIVE Data-Copy – Import/Export

HIVE Data Copy:

 

With hive one better option to copy the Hive data and metadata from cluster A to cluster B is to use HIVE Import/Export

 

NOTE:  In HIVE CLI exporting the data directly to target cluster will work fine as the Path can be resolved, But Beeline won’t allow copying the data directly to the target cluster directly. To do via beeline, Export table to the local HDFS and copy the data to the target cluster and connect to target hiveServer 2 and call the import to import the data.

 

  • EXPORT TABLE tablename [PARTITION (part_column=”value”[, …])]

TO ‘export_target_path’ [ FOR replication(‘eventid’) ]

 

  • IMPORT [[EXTERNAL] TABLE new_or_original_tablename [PARTITION (part_column=”value”[, …])]]

FROM ‘source_path’

[LOCATION ‘import_target_path’]

 

 

Reference:

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ImportExport

HIVE

Query JDBC datasource Directly from HIVE


We see many many options to retrieve data from RDBMS and we validate the data every time if we receive it as dump. Say for a use case where the source can give you access to fetch the full data. one can use the JDBC Hive storage Handler to directly query the JDBC RDBMS data source.

Below is the URL for further Mind tweaking.

 

Official:

https://issues.apache.org/jira/browse/HIVE-1555

https://community.hortonworks.com/articles/4671/sparksql-jdbc-federation.html

 

Forks:

 

https://github.com/qubole/Hive-JDBC-Storage-Handler

 

https://github.com/myui/HiveJdbcStorageHandler

 

https://github.com/QubitProducts/hive-jdbc-storage-handler

 

HIVE

Basic HQL CheckList

We can start with simple one – these are general stuffs to check while writing HQL.

 

General HIVE Query:

  • Avoid providing inline queries in the where clause.
  • Make all the joins explicitly in the “From” clause instead of “where” clause.
  • FULL OUTER JOINS are very expensive in HIVE.
  • If possible generate statistics for each table before running the query.
  • Make the order of join from smaller to larger tables.
  • Any partitioned tables must have the partition range filtered before making any joins.
  • While joining smaller tables, of up to 1 GB data size, make a MAPJOIN hint for the smaller table.
  • While doing any join if we want to have certain filter as part of the join then it has to be mention as part of the from clause join statement instead of having the same on where clause.
  • While Joining Make use of the Key columns to make the Joins.
  • If a query is having multiple layers of inner queries and used in many other queries to arrive at a result, please create a physical table which can be refreshed and queried as and when needed.
FALCON

Falcon CLI Commands

OPTIONS:
<entity> => delete, update, submit and schedule
<instance> => handling of scheduled process instances
<admin> => status, stack url
<graph> => get the direction, edges, vertex etc,.

Example:
Admin:
falcon admin -<status/stack/url etc,. >

Entity:
falcon entity -type <process/feed> -file <path to file> -<submit/submitAndScheddule>
falcon entity -type <process/feed> -name <procecss or feed name> -<status/update/delete etc,.>

Instance:
falcon instance -type <process/feed> -start <date> -end <date> -<suspend/resume/rerun/continue/kill etc,.>

Reference:

https://falcon.apache.org/falconcli/FalconCLI.html

https://falcon.apache.org/FalconCLI.html