Adding a New Writer
Writers consume the validated ParsedRecord stream and route records to a destination β Kafka, webhook, database, S3, etc.
Writer Lifecycleβ
Available Destinationsβ
Stepsβ
1. Create the writer fileβ
platform-core/src/main/kotlin/com/transformplatform/core/writers/WebhookRecordWriter.kt
2. Implement RecordWriterβ
@Component
class WebhookRecordWriter(private val webClient: WebClient) : RecordWriter {
override val writerName = "WEBHOOK_WRITER"
override fun supports(type: DestinationType) = type == DestinationType.WEBHOOK
override suspend fun write(record: ParsedRecord, request: PipelineRequest) {
val payload = record.fields // serialize as needed
webClient.post()
.uri(request.destination.url!!)
.bodyValue(payload)
.retrieve()
.awaitBodilessEntity()
}
override suspend fun flush(request: PipelineRequest) {
// Called once after all records β use for batching, connection cleanup, etc.
}
}
3. Add the destination type enum value (if new)β
enum class DestinationType {
KAFKA, FILE, DATABASE, WEBHOOK
}
4. Write testsβ
class WebhookRecordWriterTest : FunSpec({
val mockWebClient = mockk<WebClient>()
val writer = WebhookRecordWriter(mockWebClient)
test("supports WEBHOOK destination") {
writer.supports(DestinationType.WEBHOOK) shouldBe true
}
test("does not support KAFKA destination") {
writer.supports(DestinationType.KAFKA) shouldBe false
}
})
Checklistβ
- Writer implements
RecordWriterand is annotated@Component -
supports()matches exactly oneDestinationType -
write()issuspendβ uses coroutines for I/O -
flush()implemented if batching or connection cleanup is needed - Tests cover
supports(),write(), andflush() -
AGENTS.mdΒ§6 updated with the new writer