Skip to main content

Adding a New Writer

Writers consume the validated ParsedRecord stream and route records to a destination β€” Kafka, webhook, database, S3, etc.

Writer Lifecycle​

Available Destinations​

Steps​

1. Create the writer file​

platform-core/src/main/kotlin/com/transformplatform/core/writers/WebhookRecordWriter.kt

2. Implement RecordWriter​

@Component
class WebhookRecordWriter(private val webClient: WebClient) : RecordWriter {

override val writerName = "WEBHOOK_WRITER"

override fun supports(type: DestinationType) = type == DestinationType.WEBHOOK

override suspend fun write(record: ParsedRecord, request: PipelineRequest) {
val payload = record.fields // serialize as needed
webClient.post()
.uri(request.destination.url!!)
.bodyValue(payload)
.retrieve()
.awaitBodilessEntity()
}

override suspend fun flush(request: PipelineRequest) {
// Called once after all records β€” use for batching, connection cleanup, etc.
}
}

3. Add the destination type enum value (if new)​

enum class DestinationType {
KAFKA, FILE, DATABASE, WEBHOOK
}

4. Write tests​

class WebhookRecordWriterTest : FunSpec({

val mockWebClient = mockk<WebClient>()
val writer = WebhookRecordWriter(mockWebClient)

test("supports WEBHOOK destination") {
writer.supports(DestinationType.WEBHOOK) shouldBe true
}

test("does not support KAFKA destination") {
writer.supports(DestinationType.KAFKA) shouldBe false
}
})

Checklist​

  • Writer implements RecordWriter and is annotated @Component
  • supports() matches exactly one DestinationType
  • write() is suspend β€” uses coroutines for I/O
  • flush() implemented if batching or connection cleanup is needed
  • Tests cover supports(), write(), and flush()
  • AGENTS.md Β§6 updated with the new writer