Kafunk: F# Kafka client
F# Shell Batchfile
Latest commit 280f7ec Aug 17, 2016 @strmpnk strmpnk committed on GitHub Merge pull request #56 from haf/feature/logging
Provide nice logging API

README.md

Kafunk - F# Kafka client

Kafunk is a Kafka client written in F#.

See the home page for details.

Please also join the F# Open Source Group

Status

This is a work in progress and not yet published as a package. See the issue list for the remaining items. At the point, the public API and internals may undergo several changes before stabilizing. Still, contributions are welcome!

Feature Status
Wire Protocol Complete
Base API Complete
Compression GZip
Routing Partial
Consumer Groups Partial
Producers Partial
Fault Tolerance Partial
v0.9 Partial
v0.10 Partial

Hello World

open Kafunk

let conn = Kafka.connHost "existentialhost"


let metadata = 
  Kafka.metadata conn (Metadata.Request([|"absurd-topic"|])) 
  |> Async.RunSynchronously

for b in metadata.brokers do
  printfn "broker|host=%s port=%i nodeId=%i" b.host b.port b.nodeId

for t in metadata.topicMetadata do
  printfn "topic|topic_name=%s topic_error_code=%i" t.topicName t.topicErrorCode
  for p in t.partitionMetadata do
    printfn "topic|topic_name=%s|partition|partition_id=%i" t.topicName p.partitionId



let producerCfg = 
  ProducerCfg.create ([|"absurd-topic"|], Partitioner.konst 0, requiredAcks=RequiredAcks.Local)

let producer = 
  Producer.createAsync conn producerCfg 
  |> Async.RunSynchronously

let prodRes =
  Producer.produceSingle producer ("absurd-topic", [| ProducerMessage.ofBytes ("hello world"B) |])
  |> Async.RunSynchronously

for (tn,offsets) in prodRes.topics do
  printfn "topic_name=%s" tn
  for (p,ec,offset) in offsets do
    printfn "partition=%i error_code=%i offset=%i" p ec offset



let fetchRes = 
  Kafka.fetch conn (FetchRequest.ofTopicPartition "absurd-topic" 0 0L 0 0 1000) 
  |> Async.RunSynchronously

for (tn,pmds) in fetchRes.topics do
  for (p,ec,hmo,mss,ms) in pmds do
    printfn "topic=%s partition=%i error=%i" tn p ec



let consumerCfg = 
  Consumer.ConsumerConfig.create ("consumer-group", [|"absurd-topic"|])

Consumer.consume conn consumerCfg
|> AsyncSeq.iterAsync (fun (generationId,memberId,topics) ->
  // the outer AsyncSeq yield on every generation of the consumer groups protocol
  topics
  |> Seq.map (fun (topic,partition,stream) ->
    // the inner AsyncSeqs correspond to individual topic-partitions
    stream
    |> AsyncSeq.iterAsync (fun (ms,commit) -> async {
      for (offset,_,msg) in ms.messages do          
        printfn "processing topic=%s partition=%i offset=%i key=%s" topic partition offset (Message.keyString msg)
      do! commit }))
  |> Async.Parallel
  |> Async.Ignore)
|> Async.RunSynchronously

Maintainer(s)

License

This project is subject to the Apache Licence, Version 2.0. A copy of the license can be found in LICENSE.txt at the root of this repo.

Code of Conduct

This project has adopted the code of conduct defined by the Contributor Covenant to clarify expected behavior in our community. This code of conduct has been adopted by many other projects. For more information see the Code of conduct.