Kafka 訊息佇列 Java版

2021-09-19 17:42:17 字數 3803 閱讀 4900

apache kafka工具類,消費者consumer類

public class consumer 

/** * 初始化接收器

*/private void init()

@override

public void onpartitionsassigned(collectionpartitions)

} else }}

});start();

} else

}public void start() catch (exception e) }}

}try catch (exception e)

}consumer.close();

}public void stop()

}

消費者配置consumerconfig類

public class consumerconfig 

/** * 建立消費者配置

* @param bootstrapservers 伺服器配合 格式為伺服器ip:埠號,集群用逗號分隔 例如 192.168.1.1:9092,192.168.1.2:9092

* @param groupid groupid

* @param autocommitinterval 自動提交時間單位毫秒, 預設1000

* @param sessiontimeout 超時時間單位毫秒 , 預設30000

* @param topiclist topiclist列表

* @param processbeforedata 是否處理啟動之前的資料,該開關需要配置consumerhandler的跨步儲存使用

* @param polltime 每次獲取資料等待時間單位毫秒,預設100毫秒

*/public consumerconfig(string bootstrapservers, string groupid, int autocommitinterval, int sessiontimeout

,listtopiclist,boolean processbeforedata,long polltime)

public string getbootstrapservers()

public void setbootstrapservers(string bootstrapservers)

public string getgroupid()

public void setgroupid(string groupid)

public int getautocommitinterval()

public void setautocommitinterval(int autocommitinterval)

public int getsessiontimeout()

public void setsessiontimeout(int sessiontimeout)

public listgettopiclist()

public void settopiclist(listtopiclist)

public boolean isprocessbeforedata() 

public void setprocessbeforedata(boolean processbeforedata)

public long getpolltime()

public void setpolltime(long polltime)

}

消費者處理consumerhandler類

public inte***ce consumerhandler
kafka生產者,工具producer類

public class producer 

private void init()

/**

* 傳送訊息

* @param topic 要傳送的topic

* @param msg

*/public void sendmessage(string topic,string msg) catch (interruptedexception e) catch (executionexception e)

producer.flush();

}public void close()

}

kafka生產者配置producerconfig類

public class producerconfig 

/** * 建立生產者配置檔案

* @param bootstrapservers 伺服器配合 格式為伺服器ip:埠號,集群用逗號分隔 例如 192.168.1.1:9092,192.168.1.2:9092

* @param retries

* @param batchsize

* @param lingerms

* @param buffermemory

*/public producerconfig(string bootstrapservers,int retries, int batchsize, int lingerms, int buffermemory)

public string getbootstrapservers()

public void setbootstrapservers(string bootstrapservers)

public string gettopic()

public void settopic(string topic)

public int getretries()

public void setretries(int retries)

public int getbatchsize()

public void setbatchsize(int batchsize)

public int getlingerms()

public void setlingerms(int lingerms)

public int getbuffermemory()

public void setbuffermemory(int buffermemory)

}

消費者處理實現consumerhandlerimpl類

public class consumerhandlerimpl implements consumerhandler

/** * 獲取跨步

* @param topic 接受訊息的topic

* @param partition 接受訊息的partition

* @return 當前topic,partition下的seek

*/public long getseek(string topic , int partition)

}

main方法類

訊息佇列 訊息佇列 kafka

kafka是乙個分布式的基於發布 訂閱模式的訊息佇列,主要用於大資料實時處理領域。要理解kafka首先要有分布式的概念,要有訊息佇列的概念。分布式系統最大的優勢就是解耦和削峰,這種情況下,a系統生成了乙個訊息,b系統非同步獲取,那麼就需要乙個存放訊息的訊息佇列 mq 相比較傳統的訊息佇列,訊息被消費...

訊息佇列 Kafka學習

kafka是乙個分布式的訊息佇列,學習見apache kafka文件,中文翻譯見kafka分享,乙個簡單的入門例子見kafka 入門例項。本文只針對自己感興趣的點記錄下。producer consumer 訊息的生成者和使用者。broker kafka server充當broker角色,起到訊息佇列...

訊息佇列 Kafka學習

kafka是乙個分布式的訊息佇列,學習見apache kafka文件,中文翻譯見kafka分享,乙個簡單的入門例子見kafka 入門例項。本文只針對自己感興趣的點記錄下。producer consumer 訊息的生成者和使用者。broker kafka server充當broker角色,起到訊息佇列...