Description
This Library should handle all interactions between kafka and couch
Data Asumptions,
It Monitor's created and updated,
- Created assume's no document exists, and creates a new one
- Updated assumes Document exits
- message has an id field, that is the id for the document
#Usage
import { taskSchema } from "bf-legal-task";
import { KafkaCouchWatcher } from "lib-kafkacouch";
const TaskWatcher = new KafkaCouchWatcher({
subject: "task",
couch: `workflow-task_management`,
kafka: { topic: `WORKFLOW.TASK_MANAGEMENT`, groupId: "app-asdf" },
bodyValidation: taskSchema,
id: "id",
filter: (body, type, header) => {
return true;
},
map: (body, type, header) => {
return body;
}
});
//After a commit happens
TaskWatcher.on("updated", async (body, type, header) => {
console.log(packet);
});
Without couch ( for command's ext... )
import { taskSchema } from "bf-legal-task";
import { KafkaCouchWatcher } from "lib-kafkacouch";
const TaskWatcher = new KafkaCouchWatcher({
subject: "task",
couch: false,
kafka: { topic: `WORKFLOW.TASK_MANAGEMENT`, groupId: "app-asdf" },
bodyValidation: taskSchema,
filter: (body, type, header) => {
return true;
},
map: (body, type, header) => {
return body;
}
});
//After a commit happens
TaskWatcher.on("updated", async (body, type, header) => {
console.log(packet);
});
Config Stuff
Couch by default get's it's creds from enviromental variables
const COUCH_USER: string = process.env.COUCH_USER || "admin";
const COUCH_PASSWORD: string = process.env.COUCH_PASSWORD || "password";
const COUCH_PORT: string = process.env.COUCH_PORT || "5984";
const COUCH_HOST: string = process.env.COUCH_HOST || "couchdb";
const COUCH_PROTO: string = process.env.COUCH_PROTO || "http";
KafkaCouchWatcher.kafka
Key | Description | Required |
---|---|---|
topic | Kafka Topic | true |
groupId | Group ID ( for Round Robin Groups ) | true |
host | Kafka Host | false |
sessionTimeout | false | |
protocol | false | |
fromOffset | false | |
protocol | false | |
commitOffsetsOnFirstJoin | false | |
outOfRangeOffset | false | |
onRebalance | false |
const k = {
topic: `WORKFLOW.TASK_MANAGEMENT`,
host: process.env.ZOOKEEPER_HOST || "zookeeper:2181",
groupId: process.env.KAFKA_GROUP_ID || "default-groupa",
sessionTimeout: 15000,
protocol: ["roundrobin"],
fromOffset: "earliest",
commitOffsetsOnFirstJoin: true,
outOfRangeOffset: "earliest",
onRebalance: (isAlreadyMember: any, callback: any) => {
callback();
} // or null
};